In [1]:
#!/usr/bin/env python3
import os
import glob
import pandas as pd
import datetime
import numpy as np
from timezonefinder import TimezoneFinder
import pytz

# -----------------------------------------------------------
# Step 1. Read metadata and catchment area information
# -----------------------------------------------------------
# Read gauge coordinate metadata (for time zone determination)
attributes_file = '/p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/GRDC_Caravan_extension_csv/attributes/grdc/attributes_other_grdc.csv'
df_attributes = pd.read_csv(attributes_file)
# Create mapping: gauge_id -> (lat, lon)
gauge_coords = {row['gauge_id']: (row['gauge_lat'], row['gauge_lon'])
                for _, row in df_attributes.iterrows()}
# (Adjust the column name for gauge id if needed)
gauge_area = {row['gauge_id']: row['area'] for _, row in df_attributes.iterrows()}

# -----------------------------------------------------------
# Step 2. Determine each gauge's local time zone offset (in hours)
# -----------------------------------------------------------
tf = TimezoneFinder()
gauge_tz_offset = {}  # offset in hours
rep_date = datetime.datetime(2020, 1, 1)  # representative date (adjust if needed)

for gauge_id, (lat, lon) in gauge_coords.items():
    tz_str = tf.timezone_at(lng=lon, lat=lat)
    if tz_str is None:
        print(f"Timezone not found for {gauge_id} (lat={lat}, lon={lon}). Using UTC (offset=0).")
        gauge_tz_offset[gauge_id] = 0
    else:
        tz = pytz.timezone(tz_str)
        localized = tz.localize(rep_date, is_dst=False)
        offset_hours = localized.utcoffset().total_seconds() / 3600
        gauge_tz_offset[gauge_id] = offset_hours
        print(f"Gauge {gauge_id}: Timezone {tz_str}, Offset {offset_hours} hours.")


Gauge GRDC_1159100: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159103: Timezone Africa/Windhoek, Offset 2.0 hours.
Gauge GRDC_1159105: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159110: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159120: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159125: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159130: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159131: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159290: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159300: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159301: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159302: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159304: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159305: Timezone Africa/Johannesburg, Offset 2.0 hours.
Gauge GRDC_1159310: Timezone Africa/Johannesburg, Of

In [None]:
# -----------------------------------------------------------
# Step 3. Process each gauge's CSV file to shift daily streamflow to UTC 
#         and convert units from mm/day to m3/s.
#
#   (a) We load the gauge CSV (with dates and streamflow in mm/day).
#   (b) We reindex the data to a complete daily time series.
#   (c) We detect gaps. For gaps shorter than 31 days, we fill missing days
#       via linear interpolation; for longer gaps, we treat the data as separate segments.
#   (d) For each continuous segment, we apply the weighted average conversion:
#
#       For UTC+X (X>0):
#         Q_UTC(D) = ((24-X) * Q_local(D) + X * Q_local(D+1)) / 24
#
#       For UTC-X (X>0):
#         Q_UTC(D) = (X * Q_local(D-1) + (24-X) * Q_local(D)) / 24
#
#       (The first or last day of each segment is skipped accordingly.)
#
#   (e) Finally, we convert from mm/day to m3/s using:
#         Q_m3s = (Q_UTC * A) / 86.4
#       where A is the catchment area (in km2).
# -----------------------------------------------------------

input_dir = '/p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/GRDC_Caravan_extension_csv/timeseries/csv/grdc'
output_dir = '/p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv'
os.makedirs(output_dir, exist_ok=True)

# Process each CSV file in the input directory.
#all_files = glob.glob(os.path.join(input_dir, '*.csv'))
#for file_path in all_files[:10]:

for file_path in glob.glob(os.path.join(input_dir, '*.csv')):
    gauge_id = os.path.basename(file_path).split('.')[0]
    print(f"\nProcessing gauge {gauge_id} ...")
    
    # Load the time series data with date parsing.
    df = pd.read_csv(file_path, parse_dates=['date'])
    df.sort_values('date', inplace=True)
    # Filter to only include data from January 1, 1979 onward.
    df = df[df['date'] >= pd.Timestamp('1979-01-01')]

    df.reset_index(drop=True, inplace=True)
    #df.set_index('date', inplace=True)
    
    # Ensure we have the 'streamflow' column.
    if 'streamflow' not in df.columns:
        print(f"File {file_path} does not have a 'streamflow' column. Skipping.")
        continue
    
    # Set the date as index.
    df.set_index('date', inplace=True)
    
    # Create a complete daily date range spanning the observed period.
    full_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq='D')
    df_full = df.reindex(full_index)
    # Mark which dates are originally observed.
    df_full['observed'] = ~df_full['streamflow'].isna()
    
    # -------------------------------------------------------
    # Identify continuous segments.
    # We want to group days where the gap between consecutive observed dates is less than 7 days.
    # -------------------------------------------------------
    observed_dates = df_full.index[df_full['observed']]
    if len(observed_dates) == 0:
        print(f"No observed dates for {gauge_id}. Skipping file.")
        continue
    gap_count = 0
    segments = []
    seg_start = observed_dates[0]
    prev_date = observed_dates[0]

    for current_date in observed_dates[1:]:
        gap = (current_date - prev_date).days
        if gap < 31: # if there is a gap less than 30 days, we consider it as a continuous segment
            # Continue the current segment (even if not strictly consecutive, we'll fill the small gap)
            prev_date = current_date
        else:
            # Gap too long: print details, then close the current segment and start a new one.
            print(f"Gap detected: {gap} days between {prev_date.strftime('%Y-%m-%d')} and {current_date.strftime('%Y-%m-%d')}")
            gap_count += 1
            segments.append((seg_start, prev_date))
            seg_start = current_date
            prev_date = current_date

    # Append the last segment.
    segments.append((seg_start, prev_date))
    print(f"Total gaps detected in {gauge_id}: {gap_count}")
    
    # For each segment, reindex with daily frequency and fill gaps by linear interpolation.
    converted_segments = []
    for seg_start, seg_end in segments:
        seg_index = pd.date_range(start=seg_start, end=seg_end, freq='D')
        seg_df = df_full.loc[seg_start:seg_end].reindex(seg_index)
        # Only fill gaps if they exist; if the gap is small (<7 days) this will interpolate.
        seg_df['streamflow'] = seg_df['streamflow'].interpolate(method='linear')
        seg_df = seg_df.copy()  # work on a copy
        
        # Apply the time conversion using the gauge's UTC offset.
        offset = gauge_tz_offset.get(gauge_id, 0)
        if offset > 0:
            # For UTC+X, compute Q_UTC for day D using Q_local(D) and Q_local(D+1)
            seg_df['streamflow_next'] = seg_df['streamflow'].shift(-1)
            seg_df['Q_utc'] = ((24 - offset) * seg_df['streamflow'] + offset * seg_df['streamflow_next']) / 24.0
            # Remove the last day of the segment (cannot combine with next day)
            seg_df = seg_df.iloc[:-1]
        elif offset < 0:
            # For UTC-X, compute Q_UTC for day D using Q_local(D-1) and Q_local(D)
            seg_df['streamflow_prev'] = seg_df['streamflow'].shift(1)
            abs_offset = abs(offset)
            seg_df['Q_utc'] = (abs_offset * seg_df['streamflow_prev'] + (24 - abs_offset) * seg_df['streamflow']) / 24.0
            # Remove the first day of the segment
            seg_df = seg_df.iloc[1:]
        else:
            seg_df['Q_utc'] = seg_df['streamflow']
        
        # Keep only the computed Q_utc and the date index.
        converted_segments.append(seg_df[['Q_utc']])
    
    if not converted_segments:
        print(f"No valid segments found for {gauge_id}.")
        continue

    # Combine all segments into one DataFrame (they remain separate in time).
    df_conv = pd.concat(converted_segments).sort_index()
    
    # -------------------------------------------------------
    # Unit conversion: convert Q_utc from mm/day to m3/s.
    # Formula: Q_m3/s = (Q_utc * A) / 86.4, where A is in km².
    # -------------------------------------------------------
    area = gauge_area.get(gauge_id)
    if area is None:
        print(f"Catchment area not found for {gauge_id}. Skipping unit conversion.")
        continue
    df_conv['streamflow'] = (df_conv['Q_utc'] * area) / 86.4
    
    # Prepare the output DataFrame (date and converted streamflow in m3/s).
    df_out = df_conv[['streamflow']].reset_index().rename(columns={'index': 'date'})
    # -------------------------------------------------------
    # Append gauge metadata: latitude, longitude, and catchment area.
    # -------------------------------------------------------
    if gauge_id in gauge_coords:
        lat, lon = gauge_coords[gauge_id]
    else:
        lat, lon = np.nan, np.nan
    df_out['lat'] = lat
    df_out['lon'] = lon
    df_out['area'] = area
    
    # Save to a new CSV file (same gauge id as file name).
    output_file = os.path.join(output_dir, f"{gauge_id}.csv")
    df_out.to_csv(output_file, index=False)
    print(f"Processed {gauge_id}: converted data saved to {output_file}")

print("\nAll files have been processed.")


Processing gauge GRDC_4136500 ...
Total gaps detected in GRDC_4136500: 0
Processed GRDC_4136500: converted data saved to /p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv/GRDC_4136500.csv

Processing gauge GRDC_6340625 ...
Total gaps detected in GRDC_6340625: 0
Processed GRDC_6340625: converted data saved to /p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv/GRDC_6340625.csv

Processing gauge GRDC_4208918 ...
No observed dates for GRDC_4208918. Skipping file.

Processing gauge GRDC_4119442 ...
Total gaps detected in GRDC_4119442: 0
Processed GRDC_4119442: converted data saved to /p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv/GRDC_4119442.csv

Processing gauge GRDC_5302405 ...
Total gaps detected in GRDC_5302405: 0
Processed GRDC_5302405: converted data saved to /p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv/GRDC_5302405.csv

Processing gauge GRDC_6731112 ...
Total gaps detected in GRDC_6731112

KeyboardInterrupt: 

In [3]:
# -----------------------------------------------------------
# Step 3. Process each gauge's CSV file to shift daily streamflow to UTC 
#         and convert units from mm/day to m3/s.
#
#   (a) We load the gauge CSV (with dates and streamflow in mm/day).
#   (b) We reindex the data to a complete daily time series.
#   (c) We detect gaps. For gaps shorter than 31 days, we fill missing days
#       via linear interpolation; for longer gaps, we treat the data as separate segments;
#       if the total missing gap days exceed 1500, we skip the CSV file.
#   (d) We remove catchment areas smaller than 500 km².
#   (e) For each continuous segment, we apply the weighted average conversion:
#
#       For UTC+X (X>0):
#         Q_UTC(D) = ((24-X) * Q_local(D) + X * Q_local(D+1)) / 24
#
#       For UTC-X (X>0):
#         Q_UTC(D) = (X * Q_local(D-1) + (24-X) * Q_local(D)) / 24
#
#       (The first or last day of each segment is skipped accordingly.)
#
#   (f) Finally, we convert from mm/day to m3/s using:
#         Q_m3s = (Q_UTC * A) / 86.4
#       where A is the catchment area (in km2).
# -----------------------------------------------------------

input_dir = '/p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/GRDC_Caravan_extension_csv/timeseries/csv/grdc'
output_dir = '/p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv'
os.makedirs(output_dir, exist_ok=True)

# Process each CSV file in the input directory.
#all_files = glob.glob(os.path.join(input_dir, '*.csv'))
#for file_path in all_files[:10]:

for file_path in glob.glob(os.path.join(input_dir, '*.csv')):
    gauge_id = os.path.basename(file_path).split('.')[0]
    print(f"\nProcessing gauge {gauge_id} ...")
    
    # Load the time series data with date parsing.
    df = pd.read_csv(file_path, parse_dates=['date'])
    df.sort_values('date', inplace=True)
    # Filter to only include data from January 1, 1979 onward.
    df = df[df['date'] >= pd.Timestamp('1979-01-01')]

    df.reset_index(drop=True, inplace=True)
    #df.set_index('date', inplace=True)
    
    # Ensure we have the 'streamflow' column.
    if 'streamflow' not in df.columns:
        print(f"File {file_path} does not have a 'streamflow' column. Skipping.")
        continue
    
    # Set the date as index.
    df.set_index('date', inplace=True)
    
    # Create a complete daily date range spanning the observed period.
    full_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq='D')
    df_full = df.reindex(full_index)
    # Mark which dates are originally observed.
    df_full['observed'] = ~df_full['streamflow'].isna()
    
    # -------------------------------------------------------
    # Identify continuous segments.
    # We want to group days where the gap between consecutive observed dates is less than 7 days.
    # -------------------------------------------------------
    observed_dates = df_full.index[df_full['observed']]
    if len(observed_dates) == 0:
        print(f"No observed dates for {gauge_id}. Skipping file.")
        continue
    total_gap_days = 0
    gap_count = 0
    segments = []
    seg_start = observed_dates[0]
    prev_date = observed_dates[0]
    
    for current_date in observed_dates[1:]:
        gap = (current_date - prev_date).days
        # Count missing days: if gap > 1 then missing days = gap - 1
        if gap > 1:
            total_gap_days += (gap - 1)
        if gap < 31:  # if gap is less than 31 days, consider it continuous
            prev_date = current_date
        else:
            print(f"Gap detected: {gap} days between {prev_date.strftime('%Y-%m-%d')} and {current_date.strftime('%Y-%m-%d')}")
            gap_count += 1
            segments.append((seg_start, prev_date))
            seg_start = current_date
            prev_date = current_date

    # Append the last segment.
    segments.append((seg_start, prev_date))
    print(f"Total gaps detected in {gauge_id}: {gap_count}")
    print(f"Total missing gap days in {gauge_id}: {total_gap_days} days")
    
    # If total missing gap days exceed 1500, skip this CSV file.
    if total_gap_days > 1500:
        print(f"Skipping file {gauge_id} because total missing gap days ({total_gap_days}) exceed 10% days between 1979 to 2022.")
        continue

    # -------------------------------------------------------
    # Skip file if catchment area is smaller than 500 km².
    # make it comparable with our 5km resolution model output.
    # -------------------------------------------------------
    area = gauge_area.get(gauge_id)
    if area is None:
        print(f"Catchment area not found for {gauge_id}. Skipping unit conversion.")
        continue
    if area < 500:
        print(f"Skipping file {gauge_id} because catchment area ({area} km²) is smaller than 500 km².")
        continue
    
    # For each segment, reindex with daily frequency and fill gaps by linear interpolation.
    converted_segments = []
    for seg_start, seg_end in segments:
        seg_index = pd.date_range(start=seg_start, end=seg_end, freq='D')
        seg_df = df_full.loc[seg_start:seg_end].reindex(seg_index)
        # Only fill gaps if they exist; if the gap is small (<7 days) this will interpolate.
        seg_df['streamflow'] = seg_df['streamflow'].interpolate(method='linear')
        seg_df = seg_df.copy()  # work on a copy
        
        # Apply the time conversion using the gauge's UTC offset.
        offset = gauge_tz_offset.get(gauge_id, 0)
        if offset > 0:
            # For UTC+X, compute Q_UTC for day D using Q_local(D) and Q_local(D+1)
            seg_df['streamflow_next'] = seg_df['streamflow'].shift(-1)
            seg_df['Q_utc'] = ((24 - offset) * seg_df['streamflow'] + offset * seg_df['streamflow_next']) / 24.0
            # Remove the last day of the segment (cannot combine with next day)
            seg_df = seg_df.iloc[:-1]
        elif offset < 0:
            # For UTC-X, compute Q_UTC for day D using Q_local(D-1) and Q_local(D)
            seg_df['streamflow_prev'] = seg_df['streamflow'].shift(1)
            abs_offset = abs(offset)
            seg_df['Q_utc'] = (abs_offset * seg_df['streamflow_prev'] + (24 - abs_offset) * seg_df['streamflow']) / 24.0
            # Remove the first day of the segment
            seg_df = seg_df.iloc[1:]
        else:
            seg_df['Q_utc'] = seg_df['streamflow']
        
        # Keep only the computed Q_utc and the date index.
        converted_segments.append(seg_df[['Q_utc']])
    
    if not converted_segments:
        print(f"No valid segments found for {gauge_id}.")
        continue

    # Combine all segments into one DataFrame (they remain separate in time).
    df_conv = pd.concat(converted_segments).sort_index()
    
    # -------------------------------------------------------
    # Unit conversion: convert Q_utc from mm/day to m3/s.
    # Formula: Q_m3/s = (Q_utc * A) / 86.4, where A is in km².
    # -------------------------------------------------------
    area = gauge_area.get(gauge_id)
    if area is None:
        print(f"Catchment area not found for {gauge_id}. Skipping unit conversion.")
        continue
    df_conv['streamflow'] = (df_conv['Q_utc'] * area) / 86.4
    
    # Prepare the output DataFrame (date and converted streamflow in m3/s).
    df_out = df_conv[['streamflow']].reset_index().rename(columns={'index': 'date'})
    # -------------------------------------------------------
    # Append gauge metadata: latitude, longitude, and catchment area.
    # -------------------------------------------------------
    if gauge_id in gauge_coords:
        lat, lon = gauge_coords[gauge_id]
    else:
        lat, lon = np.nan, np.nan
    df_out['lat'] = lat
    df_out['lon'] = lon
    df_out['area'] = area
    
    # Save to a new CSV file (same gauge id as file name).
    output_file = os.path.join(output_dir, f"{gauge_id}.csv")
    df_out.to_csv(output_file, index=False)
    print(f"Processed {gauge_id}: converted data saved to {output_file}")

print("\nAll files have been processed.")


Processing gauge GRDC_4136500 ...
Total gaps detected in GRDC_4136500: 0
Total missing gap days in GRDC_4136500: 0 days
Processed GRDC_4136500: converted data saved to /p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv/GRDC_4136500.csv

Processing gauge GRDC_6340625 ...
Total gaps detected in GRDC_6340625: 0
Total missing gap days in GRDC_6340625: 0 days
Processed GRDC_6340625: converted data saved to /p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv/GRDC_6340625.csv

Processing gauge GRDC_4208918 ...
No observed dates for GRDC_4208918. Skipping file.

Processing gauge GRDC_4119442 ...
Total gaps detected in GRDC_4119442: 0
Total missing gap days in GRDC_4119442: 0 days
Processed GRDC_4119442: converted data saved to /p/scratch/cesmtst/zhang36/NeuralFAS_dataset/GRDC_Caravan/converted_csv/GRDC_4119442.csv

Processing gauge GRDC_5302405 ...
Total gaps detected in GRDC_5302405: 0
Total missing gap days in GRDC_5302405: 0 days
Processed GRDC_5302

In [7]:
df

Unnamed: 0_level_0,dewpoint_temperature_2m_max,dewpoint_temperature_2m_mean,dewpoint_temperature_2m_min,potential_evaporation_sum_ERA5_LAND,potential_evaporation_sum_FAO_PENMAN_MONTEITH,snow_depth_water_equivalent_max,snow_depth_water_equivalent_mean,snow_depth_water_equivalent_min,streamflow,surface_net_solar_radiation_max,...,volumetric_soil_water_layer_1_min,volumetric_soil_water_layer_2_max,volumetric_soil_water_layer_2_mean,volumetric_soil_water_layer_2_min,volumetric_soil_water_layer_3_max,volumetric_soil_water_layer_3_mean,volumetric_soil_water_layer_3_min,volumetric_soil_water_layer_4_max,volumetric_soil_water_layer_4_mean,volumetric_soil_water_layer_4_min
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1979-01-01,-23.34,-28.65,-32.89,0.04,0.00,113.72,113.71,113.68,0.37,35.46,...,0.33,0.33,0.33,0.33,0.32,0.32,0.32,0.39,0.39,0.39
1979-01-02,-28.99,-30.64,-33.82,-0.00,0.05,114.06,113.85,113.72,0.36,25.69,...,0.33,0.33,0.33,0.33,0.32,0.32,0.32,0.39,0.39,0.39
1979-01-03,-31.40,-32.87,-33.86,0.01,0.03,114.34,114.22,114.08,0.36,28.15,...,0.33,0.33,0.33,0.33,0.32,0.32,0.32,0.39,0.39,0.39
1979-01-04,-32.63,-33.71,-35.21,-0.04,0.05,114.49,114.44,114.35,0.35,34.87,...,0.33,0.33,0.33,0.33,0.32,0.32,0.32,0.39,0.39,0.39
1979-01-05,-31.48,-34.08,-35.39,-0.04,0.06,114.66,114.55,114.49,0.35,35.86,...,0.33,0.33,0.33,0.33,0.32,0.32,0.32,0.39,0.39,0.39
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2023-05-14,3.46,-0.38,-5.71,0.65,0.68,7.14,6.51,5.46,,141.11,...,0.39,0.41,0.41,0.40,0.42,0.42,0.42,0.39,0.39,0.39
2023-05-15,-1.90,-8.63,-10.25,2.75,1.36,6.32,5.83,5.46,,240.63,...,0.39,0.41,0.41,0.40,0.42,0.42,0.42,0.39,0.39,0.39
2023-05-16,-5.71,-7.77,-10.47,3.13,1.63,7.51,7.10,6.34,,531.05,...,0.38,0.40,0.39,0.39,0.42,0.42,0.42,0.40,0.39,0.39
2023-05-17,-4.13,-6.06,-8.50,2.34,1.54,7.54,6.92,6.15,,421.89,...,0.38,0.39,0.39,0.38,0.42,0.41,0.41,0.40,0.40,0.40
