# Add hourly Canadian data
Apparently, since last checking the Water Survey of Canada now provides longer records of sub-daily data for each gauge. We'll try and add these to CAMELS-SPAT.

In [106]:
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
import lzma
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathlib import Path
import re
import sys
import xarray as xr

import netCDF4 as nc4
import time
from datetime import timedelta
from datetime import datetime

sys.path.append(str(Path().absolute().parent))
import python_cs_functions as cs

In [2]:
# Data location
cs_main_folder = Path("/scratch/gwf/gwf_cmt/wknoben/camels-spat-upload")

In [3]:
# Destination location
cs_update_folder = Path("/scratch/gwf/gwf_cmt/wknoben/camels-spat-upload-updates")

In [4]:
# Working folder
cs_working_folder = Path("/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip")

In [5]:
# Specify the folder structure
obs_path_part1 = "observations"
obs_path_parts2 = ["headwater", "macro-scale", "meso-scale"]
obs_path_part3 = "obs-hourly"

In [6]:
# Load the meta-data, so we known which basins we have
cs_meta = pd.read_csv(cs_main_folder / "camels-spat-metadata.csv")

In [7]:
# Ensure we have our working dir
cs_working_folder.mkdir(exist_ok=True, parents=True)

## Data download
Data is stored here:
https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/ in subfolders numbered `01..11`

In [16]:
# Loop over the subfolders on the webpage and grab the download links
main_url = "https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/"
all_urls = []
for folder_num in range(1,12):
    subfolder = f"{folder_num:02}"
    sub_urls = cs.find_files_in_webpage_folder(f"{main_url}/{subfolder}/", extension='xz')
    all_urls.extend(sub_urls)

In [30]:
# Download the URLs for stations we actually have in CAMELS-SPAT
downloaded_new_obs = []
for url in all_urls:
    file_name = Path(url).name
    basin = file_name.split("@")[1].split(".")[0] # assumes: Discharge.Working@01AD003.20110101_corrected.csv.xz
    if (cs_meta['Station_id'] == basin).any():
        downloaded_new_obs.append(basin)
        cs.download_url_into_folder(url,cs_working_folder)

Successfully downloaded https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/01/Discharge.Working@01AD003.20110101_corrected.csv.xz
Successfully downloaded https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/01/Discharge.Working@01AF007.20110101_corrected.csv.xz
Successfully downloaded https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/01/Discharge.Working@01AF009.20110101_corrected.csv.xz
Successfully downloaded https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/01/Discharge.Working@01AJ003.20110101_corrected.csv.xz
Successfully downloaded https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/01/Discharge.Working@01AJ004.20110101_corrected.csv.xz
Successfully downloaded https://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/UnitValueData/Discharge/corrected/01/Discharge.Working@01AJ010.20110101_cor

In [40]:
# Check for how many stations we downloaded data
num_files = len(list(cs_working_folder.glob("*.xz")))
num_basin = (cs_meta['Country'] == 'CAN').sum()
print(f"Downloaded {num_files} files for an expected {num_basin} basins.")

Downloaded 768 files for an expected 764 basins.


In [44]:
# Figure out what we have extra
new_files = list(cs_working_folder.glob("*.xz"))

In [47]:
download_basins = []
for file in new_files:
    download_basins.append(file.name.split("@")[1].split(".")[0])

In [55]:
# anything snuck in that shouldn't be there?
not_in_meta = [basin for basin in download_basins if basin not in cs_meta['Station_id'].values]
print(not_in_meta)

[]


In [54]:
# any duplicates?
from collections import Counter
duplicates = [basin for basin, count in Counter(download_basins).items() if count > 1]
print(duplicates)

['09AG001', '07FA003', '05MA025', '10MA002']


In [59]:
# what's different for these?
for file in sorted(new_files):
    basin = file.name.split("@")[1].split(".")[0]
    if basin in duplicates:
        print(file)

/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@05MA025.20110101_corrected.csv.xz
/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@05MA025.20120301_corrected.csv.xz
/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@07FA003.20110101_corrected.csv.xz
/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@07FA003.20121002_corrected.csv.xz
/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@09AG001.20110101_corrected.csv.xz
/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@09AG001.20160101_corrected.csv.xz
/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@10MA002.20110101_corrected.csv.xz
/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@10MA002.20160101_corrected.csv.xz


In [70]:
# different dates, apparently. Why?
# 10MA002 just has reduced data coverage (starts at 2016) - the 2016 file has more recent data
# 09AG001 - 2016 file also has more recent data
# 07FA003 - 2012 file has more recent data
# 05MA025 - 2012 file has more recent data

# we'll manually remove the older ones

## Find current Qobs
We need to already look in the `updated` folder, because in the previous step we updated all Qobs files with the more recent ones I had on my laptop that have all times in LST (apart from hourly observations for basin `02ZH002` - but we still have an hourly file for this one in the update location, we just can't run the same comparison for this one as for others).

In [8]:
scales = []
basins = []
files = []
paths = []
for obs_path_part2 in obs_path_parts2:
    search_this = cs_update_folder / obs_path_part1 / obs_path_part2 / obs_path_part3
    old_paths = list(search_this.glob('*.nc'))
    for old_path in old_paths:
        old_file = old_path.name
        scales.append(obs_path_part2)
        basins.append(f"{old_file.split('_')[0]}_{old_file.split('_')[1]}")
        files.append(old_file)
        paths.append(old_path)
old_nc_lookup = pd.DataFrame(data={'basin': basins,
                                   'scale': scales,
                                   'filename': files,
                                   'path': paths})

In [9]:
old_nc_lookup.head(1)

Unnamed: 0,basin,scale,filename,path
0,USA_02143040,headwater,USA_02143040_hourly_flow_observations.nc,/scratch/gwf/gwf_cmt/wknoben/camels-spat-uploa...


## Process into new Qobs
We already have processing code for this, that ultimately results in a netcdf. Here we:
- Convert the new downloads into the same format that our existing code expects;
  - Already remove initial or final parts of the data where we don't have at least 1 observation per hour
- Re-run that code;
- Compare the old netcdf with the new one, to confirm our overlapping dates haven't changed.

In [10]:
num_basin = (cs_meta['Country'] == 'CAN').sum()
new_files = list(cs_working_folder.glob("*.xz"))
print(f"Have {len(new_files)} files for an expected {num_basin} basins.")

Have 764 files for an expected 764 basins.


In [27]:
# CRITICAL for parallel processing: sort the files
new_files = sorted(new_files)

### Example of current netcdf

In [11]:
example_path = cs_update_folder / "observations" / "headwater" / "obs-hourly"
example_file = "CAN_01AK006_hourly_flow_observations.nc"
ds = xr.open_dataset(example_path / example_file)

In [12]:
ds

In [13]:
ds.close() # still lives in memory but releases file handle

### Convert new downloads into hourly netcdfs

In [11]:
def extract_csv_header_metadata(xz_path, header_out_path):
    header_lines = []
    start_line = None
    pattern = re.compile(r"# CSV data starts at line (\d+)\.?")

    with lzma.open(xz_path, "rt") as f:
        for i, line in enumerate(f, start=1):
            header_lines.append(line)
            match = pattern.search(line)
            if match:
                start_line = int(match.group(1))
                break

    if start_line is None:
        raise ValueError("Could not find marker line: '# CSV data starts at line X'")

    # Write metadata header lines to a new file
    with open(header_out_path, "w") as out_f:
        out_f.writelines(header_lines)

    return start_line

In [12]:
def extract_metadata_fields_from_txt(header_txt_path):
    patterns = {
        "time_series_identifier": re.compile(r"#\s*Time-series identifier:\s*(.*)"),
        "location": re.compile(r"#\s*Location:\s*(.*)"),
        "utc_offset": re.compile(r"#\s*UTC offset:\s*(.*)"),
        "value_units": re.compile(r"#\s*Value units:\s*(.*)"),
        "value_parameter": re.compile(r"#\s*Value parameter:\s*(.*)"),
    }

    results = {}

    with open(header_txt_path, "r") as f:
        for line in f:
            for key, pattern in patterns.items():
                match = pattern.match(line)
                if match:
                    value = match.group(1).strip()

                    if key == "time_series_identifier" and value.startswith("Discharge.Working@"):
                        value = value.replace("Discharge.Working@", "")
                    elif key == "utc_offset" and value.startswith("(") and value.endswith(")"):
                        value = value[1:-1]

                    results[key] = value

    return results

In [13]:
def check_header_info(basin,row,header_dict):
    assert header_info['value_parameter'].lower() == 'discharge', f"Header variable not discharge for {basin}"
    assert header_info['value_units'].lower() == 'm^3/s', f"Header units not m^3/s for {basin}"

In [14]:
def extract_offset(value):
    return value[-6:]

In [42]:
def compare_netcdf_dataframe_hourly_values(df, nc_path, basin, log=[]):
    
    ds = xr.open_dataset(str(nc_path), engine='netcdf4')

    # ensure matching dtype
    ds['time'] = pd.to_datetime(ds['time'].values)
    df.index = pd.to_datetime(df.index)

    # merge
    xr_series = ds["q_obs"].to_series()
    aligned_df, aligned_xr = df["Value/Valeur"].align(xr_series, join="inner")

    # Check if df contains all time steps from the NetCDF file
    missing_times = ds['time'].values[~np.isin(ds['time'].values, df.index.values)]
    if len(missing_times) > 0:
        msg = f"Basin {basin}: {len(missing_times)} time steps from NetCDF are missing in the DataFrame."
        log_message(log, msg) if log else print(msg)

    # compare data in absolute and relative terms
    diff_abs = np.abs(aligned_df.values - aligned_xr.values)
    diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values 

    abs_mask = diff_abs > 0.1  # difference > 0.1 m3/s
    rel_mask = diff_rel > 0.01  # difference > 1%

    n_abs = abs_mask.sum()
    n_rel = rel_mask.sum()

    if n_abs > 0:
        msg = f"Basin {basin}: {n_abs}/{len(aligned_df.values)} values more than 0.1 m3/s different."
        log_message(log, msg) if log else print(msg)
        
        if n_abs > 750:
            msg = f"    Absolute difference stats for {n_abs} mismatches:"
            log_message(log, msg) if log else print(msg)
            msg = f"    Max abs diff: {diff_abs[abs_mask].max():.4f}"
            log_message(log, msg) if log else print(msg)
            msg = f"    Mean abs diff: {diff_abs[abs_mask].mean():.4f}"
            log_message(log, msg) if log else print(msg)
            msg = f"    95th percentile: {np.percentile(diff_abs[abs_mask], 95):.4f}"
            log_message(log, msg) if log else print(msg)

    if n_rel > 0:
        msg = f"Basin {basin}: {n_rel}/{len(aligned_df.values)} values more than 1% different."
        log_message(log, msg) if log else print(msg)
        if n_rel > 750:
            msg = f"    Relative difference stats for {n_rel} mismatches:"
            log_message(log, msg) if log else print(msg)
            msg = f"    Max rel diff: {diff_rel[rel_mask].max():.4f}"
            log_message(log, msg) if log else print(msg)
            msg = f"    Mean rel diff: {diff_rel[rel_mask].mean():.4f}"
            log_message(log, msg) if log else print(msg)
            msg = f"    95th percentile: {np.percentile(diff_rel[rel_mask], 95):.4f}"
            log_message(log, msg) if log else print(msg)

In [16]:
def subdaily_flow_csv_to_netcdf(csv, nc_path, country, station, tz, center_window):
    
    '''Converts a standardized csv file with flow observations to xarray data set and saves as netcdf'''
    
    # 1. Define standard values
    # -------------------------
    
    # Auxiliary
    global_att_countries = ['USA', 'CAN', 'MEX']
    global_att_i = global_att_countries.index(country)
    global_att_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    # Global attributes
    global_att_ttl = 'CAMELS-spat streamflow data'
    global_att_con = 'CF-1.10'
    global_att_src = 'Streamflow derived from observed water levels'
    global_att_ins = ['United States Geological Survey',
                      'Water Survey of Canada']
    global_att_ref = [('U.S. Geological Survey, 2016, National Water Information System data available ' +
                       'on the World Wide Web (USGS Water Data for the Nation), accessed 2023-03-23, at '+
                       'URL [http://waterdata.usgs.gov/nwis/]'),
                      ('Original data extracted from the Environment and Climate Change Canada Real-time ' +
                       'Hydrometric Data web site (https://wateroffice.ec.gc.ca/mainmenu/real_time_data_index_e.html) ' + 
                       'on 2023-04-05')]
    global_att_his = (f'{global_att_now} | File prepared using CAMELS-spat scripts. See:' + 
                       'https://github.com/CH-Earth/camels-spat')
    global_att_com = 'n/a'
    
    # Data variables
    q_obs_unit = 'm3 s-1'
    q_obs_long = 'observed streamflow values'
    q_obs_anc = [column for column in csv.columns if '_is_' in column] # Get names of all ancillary variables in .csv 
    q_obs_anc.append('q_obs_data_quality') # add the 'q_obs_data_quality' variable that's not captured by the above
    q_obs_anc = ' '.join([f"'{anc}'" for anc in q_obs_anc]) # convert full list into single string
    
    # Time settings
    time_unit = 'minutes since 1950-01-01 00:00:00'
    time_cal = 'proleptic_gregorian'
    
    # 2. Create a basic data set to build from
    ds = csv.to_xarray()
    
    # 3. Global attributes
    ds.attrs['title'] = global_att_ttl
    ds.attrs['conventions'] = global_att_con
    ds.attrs['source'] = global_att_src
    ds.attrs['country'] = country
    ds.attrs['station'] = station
    ds.attrs['institution'] = global_att_ins[global_att_i]
    ds.attrs['references'] = global_att_ref[global_att_i]
    ds.attrs['history'] = global_att_his
    #ds.attrs['comment'] = global_att_com

    # 4a. Time attributes (coordinate already exists)
    # NOTE: attributes 'units' and 'calendar' are automatically specified when writing to netcdf
    #       This can be checked by saving to netcdf, and then loading as follows: xr.open_dataset(nc_path, decode_times=False)
    ds.time.attrs['standard_name'] = 'time'
    ds.time.attrs['bounds'] = 'time_bnds'
    ds.time.encoding['units'] = time_unit
    ds.time.encoding['calendar'] = time_cal
        
    # 4b. Time bounds variable
    ds = ds.assign_coords(nbnds=[1,2])
    if center_window:    
        ds = ds.assign(time_bnds=(['nbnds','time'], [csv.index - pd.Timedelta('30min'), csv.index + pd.Timedelta('30min')]))
    else:
        ds = ds.assign(time_bnds=(['nbnds','time'], [csv.index, csv.index + pd.Timedelta('1h')]))
    ds.nbnds.attrs['standard_name'] = 'bounds for timestep intervals'
    ds.time_bnds.attrs['long_name'] = 'start and end points of each time step'
    ds.time_bnds.attrs['time_zone'] = tz
    
    # 5. Observed streamflow
    ds.q_obs.attrs['units'] = q_obs_unit
    ds.q_obs.attrs['long_name'] = q_obs_long
    ds.q_obs.attrs['cell_methods'] = 'time:mean' # indicating that values are average values over the timestep
    ds.q_obs.attrs['ancillary_variables'] = q_obs_anc
    ## TO DO: add other variables to ancillary_variables list
    
    # 6. Data quality flags
    flags = [str(s) for s in csv['q_obs_data_quality'].unique()]
    flags.sort()
    meanings = cs.return_data_quality_flag_meaning(flags,country)
    ds.q_obs_data_quality.attrs['standard_name'] = 'quality_flag'
    ds.q_obs_data_quality.attrs['long_name'] = 'lowest data quality flag listed in the values used to generate an average flow value for each timestep'
    ds.q_obs_data_quality.attrs['flag_values'] = ' '.join([f"'{flag}'" for flag in flags])
    ds.q_obs_data_quality.attrs['flag_meanings'] = ' '.join([f"'{meaning}'" for meaning in meanings])
    
    # 7. Other status variables
    for variable in ds.variables:
        if '_is_' in variable:
            ds[variable].attrs['standard_name'] = 'quality_flag'
            ds[variable].attrs['long_name'] = 'flag indicating if main variable is affected by process in variable name'
            ds[variable].attrs['flag_values'] = "'0' '1'"
            ds[variable].attrs['flag_meanings'] = "'no' 'yes'"
    
    # Save to file
    #ds = ds.drop_indexes(['time','nbnds'])
    ds.to_netcdf(nc_path)
    
    return ds

In [17]:
def lst_to_tz_name(lst):
    # https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
    tz_map = {
        'CST': 'America/Chicago',
        'PST': 'America/Los_Angeles',
        'MST': 'America/Denver',
        'EST': 'America/New_York',
        'AST': 'America/Halifax',
        'NST': 'America/St_Johns'
    }   
    return tz_map[lst]

In [49]:
def log_message(log_path, message):
    with open(log_path, "a") as f:
        f.write(message + "\n")

In [61]:
def merge_logs(log_dir, output_file, pattern="log_*"):
    log_files = sorted(Path(log_dir).glob(pattern))  # Sort for consistent order
    with open(output_file, "w") as outfile:
        for log_file in log_files:
            with open(log_file, "r") as infile:
                outfile.write(infile.read())
                outfile.write("\n")  # Optional: add newline between logs
    print(f"Merged {len(log_files)} files into {output_file}")

In [75]:
def summarize_log(log_file_path):
    with open(log_file_path, "r") as f:
        lines = f.readlines()

    basins_checked = 0
    last_was_basin_line = False
    no_warning = 0
    abs_warning = 0
    rel_warning = 0
    abs_big = 0
    rel_big = 0
    
    for line in lines:
        line = line.strip()
        if line == "" and last_was_basin_line:
            no_warning += 1        
        if 'Processing' in line:
            basins_checked += 1
            last_was_basin_line = True
        else:
            last_was_basin_line = False
        if 'more than 0.1 m3/s different.' in line:
            abs_warning += 1
        if 'more than 1% different.' in line:
            rel_warning += 1
        if 'Absolute difference stats' in line:
            abs_big += 1
        if 'Relative difference stats' in line:
            rel_big += 1

    print(f"Total basins checked:                        {basins_checked}")
    print(f"Basins with no warnings:                     {no_warning}")
    print(f"Basins with absolute diff warning:           {abs_warning}")
    print(f"Basins with more than 750 abs diff warnings: {abs_warning}")
    print(f"Basins with relative diff warning:           {abs_big}")
    print(f"Basins with more than 750 rel diff warnings: {rel_big}")

In [108]:
def compare_netcdf_timeseries_scatter(file_list1, file_list2, var_name, color_var, save_dir):
    # Build dict for quick lookup by filename
    file_map2 = {Path(f).name: f for f in file_list2}

    for f1 in file_list1:
        name = Path(f1).name
        basin = f"{name.split('_')[0]}_{name.split('_')[1]}"
        f2 = file_map2.get(name)
        if not f2:
            print(f"No match found for {name} in second list. Skipping.")
            continue

        # Open both datasets
        ds1 = xr.open_dataset(f1)
        ds2 = xr.open_dataset(f2)

        # Extract variables and align by time
        var1 = ds1[var_name].to_series()
        var2 = ds2[var_name].to_series()
        color_series = ds2[color_var].to_series()

        # Align for scatter plot
        aligned1, aligned2 = var1.align(var2, join="inner")
        aligned_colors = color_series.reindex(aligned2.index)

        # ---- Plotting ----
        fig, axes = plt.subplots(2, 1, figsize=(12, 8), constrained_layout=True)

        # 1. Time series
        axes[0].plot(var1.index, var1.values, label="File 1")
        axes[0].plot(var2.index, var2.values, label="File 2", alpha=0.7)
        axes[0].set_ylabel(f"{var_name}  [m3 s-1]")
        axes[0].legend()

        # 2. Scatter plot
        scatter = axes[1].scatter(aligned1, 
                                  aligned2, 
                                  c=aligned_colors, 
                                  cmap="viridis", 
                                  alpha=0.7,
                                  vmin=0,
                                  vmax=1)
        axes[1].set_xlabel("Q_obs (File 1) [m3 s-1]")
        axes[1].set_ylabel("Q_obs (File 2) [m3 s-1]")
        plt.colorbar(scatter, ax=axes[1], label=color_var)

        all_vals = pd.concat([aligned1, aligned2])
        min_val, max_val = all_vals.min(), all_vals.max()
        axes[1].plot([min_val, max_val], [min_val, max_val], color="black", linestyle="--", linewidth=1)
                
        plt.suptitle(f"{basin}: old and new data")
        plt.savefig(save_dir/f"{basin}.png", dpi=150)
        plt.close()

        # Close datasets
        ds1.close()
        ds2.close()

In [18]:
# Make an even more temporary working dir
cs_tmp = cs_working_folder / f"tmp_new_csv_and_netcdf"
cs_tmp.mkdir(exist_ok=True, parents=True)

In [56]:
# Wrap the processing in a function that simply accepts an index
def process_new_file(new_file):

    # Setup
    # ----------------------------------------------------------------------
    # Extract the basin name from the file
    file_name = new_file.name
    basin = file_name.split("@")[1].split(".")[0]

    # Define the log path
    log_path = cs_tmp / f"log_{basin}.txt"
    log_message(log_path, f"Processing {basin}")

    # Get the associated row from the metadata
    row = cs_meta[cs_meta['Station_id'] == basin].iloc[0]

    # Output file (useful for resuming after interrupts
    nc_path = cs_tmp / f"{row.Country}_{basin}_hourly_flow_observations.nc"
    #if nc_path.exists():
    #    log_message(log_path, f"Basin {basin}: final netcdf already found. Skipping.")
    #    return f"Basin {basin} skipped (already complete)'"

    # Handle file header
    # ----------------------------------------------------------------------
    # Extract the header
    start_line = extract_csv_header_metadata(new_file, cs_tmp/f'header_{basin}.txt')
    
    # Get header values and match against expectations
    header_info = extract_metadata_fields_from_txt(cs_tmp/f'header_{basin}.txt')
    check_header_info(basin, row, header_info) # is discharge, is m3/s, same timezone as metadata

    # Process data
    # ----------------------------------------------------------------------
    # Read the xz file into memory
    df = pd.read_csv(new_file, skiprows=start_line-1, # line-1 accounts for 0-index
                     index_col=0, parse_dates=True,
                     usecols=range(5), # only get 'ISO 8601 UTC','Timestamp (UTC-HH:mm)', 'Value, 'Approval Level', 'Grade' 
                     low_memory=False)
    
    # Rename the columns
    df = df.rename(columns={'Value': 'Value/Valeur',
                            'Grade': 'Qualifier/Qualificatif',
                            'Approval Level': 'Approval/Approbation'})
    df.index.name = 'Date'
    
    # Subset the data to not go before or beyond what we already have forcing data for
    lst = row['dv_flow_obs_timezone'] # e.g. 'AST'
    tz_name = lst_to_tz_name(lst)
    time_s = min(pd.to_datetime(row['iv_flow_obs_availability_start']), 
                 pd.to_datetime(row['dv_flow_obs_availability_start'])).tz_localize(tz_name)
    time_e = max(pd.to_datetime(row['iv_flow_obs_availability_end']), 
                 pd.to_datetime(row['dv_flow_obs_availability_end'])).tz_localize(tz_name)
    raw = df.loc[time_s:time_e].copy()

    # Timezone stuff
    # ----------------------------------------------------------------------    
    # UTC to LST
    # 2a. Find what we're converting to
    #lst = row['dv_flow_obs_timezone'] # e.g. 'AST' - we already use this above
    utc = cs.tz_abbreviation_to_utc(lst) # e.g. 'UTC-04'
    offset = cs.relative_utc_to_timedelta(utc) # e.g. '+4:00:00'
    
    # 2b. Convert the UTC timestamps into Local Standard Time
    raw['datetime_str'] = raw.index.astype(str)
    raw['offset_str'] = raw['datetime_str'].apply(extract_offset)
    # ensure that we know that what we're modifying is in fact UTC
    assert all(raw['offset_str'] == '+00:00'), f'Not all timezone offsets are +00:00'
    raw.index = raw.index.tz_convert(None) + pd.Timedelta(offset)
    
    # Check that we now match the Local time column
    timestamp_col = next(col for col in df.columns if col.startswith("Timestamp (UTC-"))
    
    # 2c. Clean up the extra columns
    raw = raw.drop(['datetime_str','offset_str'], axis=1)
    
    # 2d. Replace any streamflow codes that are not part of the docs with 0
    known_codes = set([-1, 10, 20, 30, 40, 50])
    is_known = raw['Qualifier/Qualificatif'].isin(known_codes) | raw['Qualifier/Qualificatif'].isna()
    invalid_values = raw.loc[~is_known, 'Qualifier/Qualificatif']
    n_replaced = invalid_values.shape[0]
    unique_replaced = invalid_values.unique()
    raw['Qualifier/Qualificatif'] = raw['Qualifier/Qualificatif'].where(is_known, 0)
    if n_replaced > 0:
        log_message(log_path, f"Basin {basin}: Replaced {n_replaced} value(s): {', '.join(map(str, unique_replaced))}")
    
    # 2e. Replace any random statuses that don't exist in the docs
    raw.loc[raw['Approval/Approbation'] == 'Checked', 'Approval/Approbation'] = 'Reviewed'
    raw.loc[raw['Approval/Approbation'] == 'Ready for Approval', 'Approval/Approbation'] = 'Reviewed'
    raw.loc[raw['Approval/Approbation'] == 'Undefined', 'Approval/Approbation'] = 'nan'

    # hourly average
    # ---------------------------------------------------------------------- 
    # 3a. Create a temporary dataframe for hourly averaging
    tmp = raw.copy()
    tmp = tmp.drop(columns={'Qualifier/Qualificatif','Approval/Approbation',  # remove the QC column
                            timestamp_col})

    # 3b. Replace any negative streamflow values with nan
    tmp.loc[tmp['Value/Valeur'] < 0, 'Value/Valeur'] = np.nan   

    # 4. Create hourly average flow rates
    _,raw_H = cs.resample_arbitrary_flux_observations_to_hourly(tmp, data='Value/Valeur', center_window=False)   

    # 5. Assign quality flags to the dataframe
    raw['Qualifier/Qualificatif'] = raw['Qualifier/Qualificatif'].fillna(0).astype(int) # Replace any NaN with 0 (unknown)
    raw_H = cs.assign_hourly_quality_flag(raw, raw_H, 'CAN', center_window=False)

    # 6. Check that we have similar values now as in the existing file
    # Note that there will be some differences due to two reasons:
    # - the old files had flow values in 3 decimals, the new ones use 6
    # - most of the old files were "provisional" data, in the new ones all should be final
    old_file_path = old_nc_lookup[old_nc_lookup['basin'] == f"CAN_{basin}"]['path'].iloc[0]
    compare_netcdf_dataframe_hourly_values(raw_H, old_file_path, basin, log=log_path)

    # 7. Save the hourly file as .csv
    csv_path = cs_tmp/f'{basin}_hourly.csv'
    raw_H.to_csv(csv_path)

    # hourly netcdf
    # ---------------------------------------------------------------------- 
    # 8. Load the csv
    csv = cs.prep_subdaily_country_csv_for_netcdf(csv_path,row.Country)
    
    # 9. Convert to netcdf and save
    ds = subdaily_flow_csv_to_netcdf(csv, nc_path, row.Country, basin, lst, False) # False to set center_window = False
    ds.close()

    return f"Basin {basin} complete"

In [57]:
with ProcessPoolExecutor(max_workers=30) as executor:
    results = list(executor.map(process_new_file, new_files))

  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.v

In [76]:
# Check the outcomes
print(f"Results contains entries for {len(results)} out of {len(new_files)} expected")

# Check the logs
merge_logs(cs_tmp, cs_tmp/'logs_merged.txt', pattern="log_*.txt")
summarize_log(cs_tmp/'logs_merged.txt')

Results contains entries for 764 out of 764 expected
Merged 764 files into /scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/tmp_new_csv_and_netcdf/logs_merged.txt
Total basins checked:                        764


In [80]:
# Create summary plots
new_files = list(cs_tmp.glob('CAN_*_hourly_flow_observations.nc'))
old_files = []
for obs_path_part2 in obs_path_parts2:
    old_files.extend(
        list(
            (cs_update_folder / obs_path_part1 / obs_path_part2 / obs_path_part3).glob('CAN_*_hourly_flow_observations.nc')
        )
    )

cs_img = cs_working_folder / 'img'
cs_img.mkdir(exist_ok=True, parents=True)

compare_netcdf_timeseries_scatter(old_files, new_files, 
                                  'q_obs', 'q_obs_is_ice_affected',
                                  cs_img)

In [127]:
# Update the metadata file with new start and end times, and missing values
cs_meta = pd.read_csv(cs_main_folder / "camels-spat-metadata.csv") # reload the file

# loop
for new_file in new_files:
    basin = new_file.name.split("_")[1]
    basin_mask = cs_meta['Station_id'] == basin
    ds = xr.open_dataset(new_file)
    h_start = pd.to_datetime(ds['time'][0].values).strftime('%Y-%m-%d %X')
    h_end = pd.to_datetime(ds['time'][-1].values).strftime('%Y-%m-%d %X')
    cs_meta.loc[basin_mask,'iv_flow_obs_availability_start'] = h_start
    cs_meta.loc[basin_mask,'iv_flow_obs_availability_end'] = h_end
    cs_meta.loc[basin_mask,'flow_obs_missing_hourly'] = ds['q_obs'].isnull().sum()
    ds.close()

In [130]:
# Save new metadata to file
cs_meta.to_csv(cs_update_folder / "camels-spat-metadata.csv", index=False)

### Replace existing netcdf files with these longer new ones

In [151]:
import hashlib
import shutil

In [144]:
def file_hash(path, algo='sha256'):
    hasher = hashlib.new(algo)
    with open(path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hasher.update(chunk)
    return hasher.hexdigest()

In [152]:
for new_file in new_files:

    # identify where we want to move this file
    file_name = new_file.name
    mask = old_nc_lookup['filename'] == file_name
    assert mask.sum() == 1, "Indentified file paths not equal to 1"
    move_here = old_nc_lookup[mask]['path'].iloc[0]

    # we want to be sure the move succeeded, so we'll compute a hash before and after move
    src_hash = file_hash(new_file)
    shutil.copy(new_file, move_here)
    dst_hash = file_hash(move_here)
    assert src_hash == dst_hash, f"Basin {basin}: file copy may not have succeeded."

### BACKUP (SERIAL CODE)

In [23]:
# Loop over the new files and process into hourly netcdfs
for ix,new_file in enumerate(new_files):

    # Setup
    # ----------------------------------------------------------------------
    # Extract the basin name from the file
    file_name = new_file.name
    basin = file_name.split("@")[1].split(".")[0]
    print(f"\n{ix:03}. Processing {basin}")

    # Get the associated row from the metadata
    row = cs_meta[cs_meta['Station_id'] == basin].iloc[0]

    # Output file (useful for resuming after interrupts
    nc_path = cs_tmp / f"{row.Country}_{basin}_hourly_flow_observations.nc"
    if nc_path.exists():
        print(f"Basin {basin}: final netcdf already found. Skipping.")
        continue

    # Handle file header
    # ----------------------------------------------------------------------
    # Extract the header
    start_line = extract_csv_header_metadata(new_file, cs_tmp/f'header_{basin}.txt')
    
    # Get header values and match against expectations
    header_info = extract_metadata_fields_from_txt(cs_tmp/f'header_{basin}.txt')
    check_header_info(basin, row, header_info) # is discharge, is m3/s, same timezone as metadata

    # Process data
    # ----------------------------------------------------------------------
    # Read the xz file into memory
    df = pd.read_csv(new_file, skiprows=start_line-1, # line-1 accounts for 0-index
                     index_col=0, parse_dates=True,
                     usecols=range(5), # only get 'ISO 8601 UTC','Timestamp (UTC-HH:mm)', 'Value, 'Approval Level', 'Grade' 
                     low_memory=False)
    
    # Rename the columns
    df = df.rename(columns={'Value': 'Value/Valeur',
                            'Grade': 'Qualifier/Qualificatif',
                            'Approval Level': 'Approval/Approbation'})
    df.index.name = 'Date'
    
    # Subset the data to not go before or beyond what we already have forcing data for
    lst = row['dv_flow_obs_timezone'] # e.g. 'AST'
    tz_name = lst_to_tz_name(lst)
    time_s = min(pd.to_datetime(row['iv_flow_obs_availability_start']), 
                 pd.to_datetime(row['dv_flow_obs_availability_start'])).tz_localize(tz_name)
    time_e = max(pd.to_datetime(row['iv_flow_obs_availability_end']), 
                 pd.to_datetime(row['dv_flow_obs_availability_end'])).tz_localize(tz_name)
    raw = df.loc[time_s:time_e].copy()

    # Timezone stuff
    # ----------------------------------------------------------------------    
    # UTC to LST
    # 2a. Find what we're converting to
    #lst = row['dv_flow_obs_timezone'] # e.g. 'AST' - we already use this above
    utc = cs.tz_abbreviation_to_utc(lst) # e.g. 'UTC-04'
    offset = cs.relative_utc_to_timedelta(utc) # e.g. '+4:00:00'
    
    # 2b. Convert the UTC timestamps into Local Standard Time
    raw['datetime_str'] = raw.index.astype(str)
    raw['offset_str'] = raw['datetime_str'].apply(extract_offset)
    # ensure that we know that what we're modifying is in fact UTC
    assert all(raw['offset_str'] == '+00:00'), f'Not all timezone offsets are +00:00'
    raw.index = raw.index.tz_convert(None) + pd.Timedelta(offset)
    
    # Check that we now match the Local time column
    timestamp_col = next(col for col in df.columns if col.startswith("Timestamp (UTC-"))
    
    # 2c. Clean up the extra columns
    raw = raw.drop(['datetime_str','offset_str'], axis=1)
    
    # 2d. Replace any streamflow codes that are not part of the docs with 0
    known_codes = set([-1, 10, 20, 30, 40, 50])
    is_known = raw['Qualifier/Qualificatif'].isin(known_codes) | raw['Qualifier/Qualificatif'].isna()
    invalid_values = raw.loc[~is_known, 'Qualifier/Qualificatif']
    n_replaced = invalid_values.shape[0]
    unique_replaced = invalid_values.unique()
    raw['Qualifier/Qualificatif'] = raw['Qualifier/Qualificatif'].where(is_known, 0)
    if n_replaced > 0:
        print(f"Basin {basin}: Replaced {n_replaced} value(s): {', '.join(map(str, unique_replaced))}")
    
    # 2e. Replace any random statuses that don't exist in the docs
    raw.loc[raw['Approval/Approbation'] == 'Checked', 'Approval/Approbation'] = 'Reviewed'
    raw.loc[raw['Approval/Approbation'] == 'Ready for Approval', 'Approval/Approbation'] = 'Reviewed'
    raw.loc[raw['Approval/Approbation'] == 'Undefined', 'Approval/Approbation'] = 'nan'

    # hourly average
    # ---------------------------------------------------------------------- 
    # 3a. Create a temporary dataframe for hourly averaging
    tmp = raw.copy()
    tmp = tmp.drop(columns={'Qualifier/Qualificatif','Approval/Approbation',  # remove the QC column
                            timestamp_col})

    # 3b. Replace any negative streamflow values with nan
    tmp.loc[tmp['Value/Valeur'] < 0, 'Value/Valeur'] = np.nan   

    # 4. Create hourly average flow rates
    _,raw_H = cs.resample_arbitrary_flux_observations_to_hourly(tmp, data='Value/Valeur', center_window=False)   

    # 5. Assign quality flags to the dataframe
    raw['Qualifier/Qualificatif'] = raw['Qualifier/Qualificatif'].fillna(0).astype(int) # Replace any NaN with 0 (unknown)
    raw_H = cs.assign_hourly_quality_flag(raw, raw_H, 'CAN', center_window=False)

    # 6. Check that we have similar values now as in the existing file
    # Note that there will be some differences due to two reasons:
    # - the old files had flow values in 3 decimals, the new ones use 6
    # - most of the old files were "provisional" data, in the new ones all should be final
    old_file_path = old_nc_lookup[old_nc_lookup['basin'] == f"CAN_{basin}"]['path'].iloc[0]
    compare_netcdf_dataframe_hourly_values(raw_H, old_file_path, basin)

    # 7. Save the hourly file as .csv
    csv_path = cs_tmp/'hourly.csv'
    raw_H.to_csv(csv_path)

    # hourly netcdf
    # ---------------------------------------------------------------------- 
    # 8. Load the csv
    csv = cs.prep_subdaily_country_csv_for_netcdf(csv_path,row.Country)
    
    # 9. Convert to netcdf and save
    ds = subdaily_flow_csv_to_netcdf(csv, nc_path, row.Country, basin, lst, False) # False to set center_window = False
    ds.close()

    # metadata update
    # ---------------------------------------------------------------------- 
    # 10. Update metadata start and end date
    # We'll do this at the very end, after we've ensured we've not lost any data
    basin_mask = cs_meta['Station_id'] == basin
    h_start = pd.to_datetime(ds['time'][0].values).strftime('%Y-%m-%d %X')
    h_end = pd.to_datetime(ds['time'][-1].values).strftime('%Y-%m-%d %X')
    cs_meta.loc[basin_mask,'iv_flow_obs_availability_start'] = h_start
    cs_meta.loc[basin_mask,'iv_flow_obs_availability_end'] = h_end
    cs_meta.loc[basin_mask,'flow_obs_missing_hourly'] = ds['q_obs'].isnull().sum()


000. Processing 05BL022
Basin 05BL022: final netcdf already found. Skipping.

001. Processing 05MC003
Basin 05MC003: final netcdf already found. Skipping.

002. Processing 07AA001
Basin 07AA001: final netcdf already found. Skipping.

003. Processing 08KA001
Basin 08KA001: final netcdf already found. Skipping.

004. Processing 10FA002
Basin 10FA002: final netcdf already found. Skipping.

005. Processing 08NN015
Basin 08NN015: final netcdf already found. Skipping.

006. Processing 05DB002
Basin 05DB002: final netcdf already found. Skipping.

007. Processing 02CG003
Basin 02CG003: final netcdf already found. Skipping.

008. Processing 02FE011
Basin 02FE011: final netcdf already found. Skipping.

009. Processing 02EC018
Basin 02EC018: final netcdf already found. Skipping.

010. Processing 07JC001
Basin 07JC001: final netcdf already found. Skipping.

011. Processing 02ZA002
Basin 02ZA002: final netcdf already found. Skipping.

012. Processing 08MA003
Basin 08MA003: final netcdf already fou

  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values


Basin 07FD012: 1436/5880 values more than 1% different.
    Relative difference stats for 1436 mismatches:
    Max rel diff: 1.4768
    Mean rel diff: 0.2805
    95th percentile: 1.0000

128. Processing 01EF001
Basin 01EF001: Replaced 13 value(s): -2
Basin 01EF001: 300/9794 values more than 0.1 m3/s different.
Basin 01EF001: 1/9794 values more than 1% different.

129. Processing 08NB016
Basin 08NB016: Replaced 907 value(s): -2
Basin 08NB016: 1/9770 values more than 1% different.

130. Processing 08HF006
Basin 08HF006: 1/9770 values more than 0.1 m3/s different.
Basin 08HF006: 1/9770 values more than 1% different.

131. Processing 08ND012
Basin 08ND012: 5852/9770 values more than 0.1 m3/s different.
    Absolute difference stats for 5852 mismatches:
    Max abs diff: 8.2040
    Mean abs diff: 0.5340
    95th percentile: 1.3824
Basin 08ND012: 3804/9770 values more than 1% different.
    Relative difference stats for 3804 mismatches:
    Max rel diff: 0.6595
    Mean rel diff: 0.0920
    

  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values
  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values


Basin 05LJ045: 2065/5880 values more than 0.1 m3/s different.
    Absolute difference stats for 2065 mismatches:
    Max abs diff: 10.6861
    Mean abs diff: 1.2187
    95th percentile: 2.8253
Basin 05LJ045: 5040/5880 values more than 1% different.
    Relative difference stats for 5040 mismatches:
    Max rel diff: inf
    Mean rel diff: inf
    95th percentile: 1.0000

134. Processing 01EE005
Basin 01EE005: Replaced 10 value(s): -2
Basin 01EE005: 448/9794 values more than 0.1 m3/s different.
Basin 01EE005: 1952/9794 values more than 1% different.
    Relative difference stats for 1952 mismatches:
    Max rel diff: 0.5948
    Mean rel diff: 0.0942
    95th percentile: 0.3013

135. Processing 07DA006
Basin 07DA006: 1/9774 values more than 1% different.

136. Processing 05QE012
Basin 05QE012: 603/9770 values more than 0.1 m3/s different.
Basin 05QE012: 3018/9770 values more than 1% different.
    Relative difference stats for 3018 mismatches:
    Max rel diff: 0.1001
    Mean rel diff: 

  diff_rel = np.abs(aligned_df.values - aligned_xr.values) / aligned_df.values


Basin 05LJ007: 3114/5879 values more than 0.1 m3/s different.
    Absolute difference stats for 3114 mismatches:
    Max abs diff: 58.2120
    Mean abs diff: 2.1515
    95th percentile: 4.4860
Basin 05LJ007: 5497/5879 values more than 1% different.
    Relative difference stats for 5497 mismatches:
    Max rel diff: 5.5159
    Mean rel diff: 0.1366
    95th percentile: 0.6624

140. Processing 07DC001
Basin 07DC001: 182/9775 values more than 0.1 m3/s different.
Basin 07DC001: 1/9775 values more than 1% different.

141. Processing 05DB005
Basin 05DB005: Replaced 17544 value(s): -2
Basin 05DB005: 1675/6038 values more than 0.1 m3/s different.
    Absolute difference stats for 1675 mismatches:
    Max abs diff: 20.2437
    Mean abs diff: 0.7180
    95th percentile: 2.8674
Basin 05DB005: 2641/6038 values more than 1% different.
    Relative difference stats for 2641 mismatches:
    Max rel diff: 0.4634
    Mean rel diff: 0.1189
    95th percentile: 0.3231

142. Processing 06DA004
Basin 06DA

KeyboardInterrupt: 

## DEV

In [15]:
# we'll work through the whole process with 1 file first
new_file = new_files[0]
new_file

PosixPath('/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@05BL022.20110101_corrected.csv.xz')

In [20]:
new_file = Path('/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@10LC007.20110101_corrected.csv.xz')

In [21]:
new_file

PosixPath('/scratch/gwf/gwf_cmt/wknoben/camels-spat-wip/Discharge.Working@10LC007.20110101_corrected.csv.xz')

In [22]:
# Extract the basin name from the file
file_name = new_file.name
basin = file_name.split("@")[1].split(".")[0]

In [23]:
# Get the associated row from the metadata
row = cs_meta[cs_meta['Station_id'] == basin].iloc[0]

In [24]:
# Make an even more temporary working dir
cs_tmp = cs_working_folder / f"tmp_{basin}"
cs_tmp.mkdir(exist_ok=True, parents=True)

#### Header

In [25]:
# Extract the header
start_line = extract_csv_header_metadata(new_file, cs_tmp/'header.txt')

In [26]:
# Get header values and match against expectations
header_info = extract_metadata_fields_from_txt(cs_tmp/'header.txt')
check_header_info(basin, row, header_info) # is discharge, is m3/s, same UTC as metadata

#### Data 

In [27]:
# Read the xz file into memory
df = pd.read_csv(new_file, skiprows=start_line-1, # line-1 accounts for 0-index
                 index_col=0, parse_dates=True,
                 usecols=range(5), # only get 'ISO 8601 UTC','Timestamp (UTC-HH:mm)', 'Value, 'Approval Level', 'Grade' 
                 low_memory=False) 

In [28]:
# Rename the columns
df = df.rename(columns={'Value': 'Value/Valeur',
                        'Grade': 'Qualifier/Qualificatif',
                        'Approval Level': 'Approval/Approbation'})
df.index.name = 'Date'

In [30]:
# Subset the data to not go before or beyond what we already have forcing data for
lst = row['dv_flow_obs_timezone'] # e.g. 'AST'
tz_name = lst_to_tz_name(lst)
time_s = min(pd.to_datetime(row['iv_flow_obs_availability_start']), 
             pd.to_datetime(row['dv_flow_obs_availability_start'])).tz_localize(tz_name)
time_e = max(pd.to_datetime(row['iv_flow_obs_availability_end']), 
             pd.to_datetime(row['dv_flow_obs_availability_end'])).tz_localize(tz_name)
raw = df.loc[time_s:time_e].copy()

#### Timezone

In [31]:
# UTC to LST
# 2a. Find what we're converting to
#lst = row['dv_flow_obs_timezone'] # e.g. 'AST' - we already use this above
utc = cs.tz_abbreviation_to_utc(lst) # e.g. 'UTC-04'
offset = cs.relative_utc_to_timedelta(utc) # e.g. '+4:00:00'
print(lst,utc,offset)

MST UTC-07 -7:00:00


In [32]:
# 2b. Convert the UTC timestamps into Local Standard Time
raw['datetime_str'] = raw.index.astype(str)
raw['offset_str'] = raw['datetime_str'].apply(extract_offset)
# ensure that we know that what we're modifying is in fact UTC
assert all(raw['offset_str'] == '+00:00'), f'Not all timezone offsets are +00:00'
raw.index = raw.index.tz_convert(None) + pd.Timedelta(offset)

# Check that we now match the Local time column
timestamp_col = next(col for col in df.columns if col.startswith("Timestamp (UTC-"))

In [33]:
# 2c. Clean up the extra columns
raw = raw.drop(['datetime_str','offset_str'], axis=1)

In [34]:
# 2d. Replace any streamflow codes that are not part of the docs with 0
known_codes = set([-1, 10, 20, 30, 40, 50])
is_known = raw['Qualifier/Qualificatif'].isin(known_codes) | raw['Qualifier/Qualificatif'].isna()
invalid_values = raw.loc[~is_known, 'Qualifier/Qualificatif']
n_replaced = invalid_values.shape[0]
unique_replaced = invalid_values.unique()
raw['Qualifier/Qualificatif'] = raw['Qualifier/Qualificatif'].where(is_known, 0)
print(f"Basin {basin}: Replaced {n_replaced} value(s): {', '.join(map(str, unique_replaced))}")

Basin 10LC007: Replaced 74 value(s): -2


In [35]:
# 2e. Replace any random statuses that don't exist in the docs
raw.loc[raw['Approval/Approbation'] == 'Checked', 'Approval/Approbation'] = 'Reviewed'
raw.loc[raw['Approval/Approbation'] == 'Ready for Approval', 'Approval/Approbation'] = 'Reviewed'
raw.loc[raw['Approval/Approbation'] == 'Undefined', 'Approval/Approbation'] = 'nan'

#### hourly average

In [36]:
# 3a. Create a temporary dataframe for hourly averaging
tmp = raw.copy()
tmp = tmp.drop(columns={'Qualifier/Qualificatif','Approval/Approbation',  # remove the QC column
                        timestamp_col})

In [37]:
# 3b. Replace any negative streamflow values with nan
tmp.loc[tmp['Value/Valeur'] < 0, 'Value/Valeur'] = np.nan

In [38]:
# 4. Create hourly average flow rates
_,raw_H = cs.resample_arbitrary_flux_observations_to_hourly(tmp, data='Value/Valeur', center_window=False)   

In [39]:
# 5. Assign quality flags to the dataframe
raw['Qualifier/Qualificatif'] = raw['Qualifier/Qualificatif'].fillna(0).astype(int) # Replace any NaN with 0 (unknown)
raw_H = cs.assign_hourly_quality_flag(raw, raw_H, 'CAN', center_window=False)

ValueError: 'nan:10' is not in list

In [195]:
# 6. Check that we have similar values now as in the existing file
# Note that there will be some differences due to two reasons:
# - the old files had flow values in 3 decimals, the new ones use 6
# - most of the old files were "provisional" data, in the new ones all should be final
old_file_path = old_nc_lookup[old_nc_lookup['basin'] == f"CAN_{basin}"]['path'].iloc[0]

In [196]:
compare_netcdf_dataframe_hourly_values(raw_H, old_file_path, basin)

Basin 05BL022: 1/9774 values more than 0.1% different.


In [198]:
# 7. Save the hourly file as .csv
csv_path = cs_tmp/'hourly.csv'
raw_H.to_csv(csv_path)

#### make the netcdf

In [199]:
# 8. Load the csv
csv = cs.prep_subdaily_country_csv_for_netcdf(csv_path,row.Country)

In [200]:
nc_path = cs_tmp / f"{row.Country}_{basin}_hourly_flow_observations.nc"

In [201]:
# 9. Convert to netcdf and save
ds = subdaily_flow_csv_to_netcdf(csv, nc_path, row.Country, basin, lst, False) # False to set center_window = False
ds.close()

#### metadata

In [203]:
# 10. Update metadata start and end date
# We'll do this at the very end, after we've ensured we've not lost any data
basin_mask = cs_meta['Station_id'] == basin
h_start = pd.to_datetime(ds['time'][0].values).strftime('%Y-%m-%d %X')
h_end = pd.to_datetime(ds['time'][-1].values).strftime('%Y-%m-%d %X')
cs_meta.loc[basin_mask,'iv_flow_obs_availability_start'] = h_start
cs_meta.loc[basin_mask,'iv_flow_obs_availability_end'] = h_end
cs_meta.loc[basin_mask,'flow_obs_missing_hourly'] = ds['q_obs'].isnull().sum()

## End DEV