In [2]:
import xarray as xr
import rioxarray as rioxr

import geopandas as geopd
import pandas as pd

import numpy as np

import os

from multiprocessing import Pool

from tqdm.notebook import tqdm

import glob

import matplotlib.pyplot as plt
plt.rcParams['figure.dpi'] = 300
plt.style.use('dark_background')

In [3]:
# Path to the vector watershed file
vector_watershed_path = "/path/to/CAMELS-FI_catchments.gpkg"
watersheds = geopd.read_file(vector_watershed_path, layer='v1')

In [None]:
# Plot the watersheds
ax = watersheds.plot(alpha=0.1, color='lightgreen')
bounds = ax.axis('off')

In [36]:
def mean_weather(args):
    """ Calculates the daily mean of the given watersheds for one variable, over the time period of one year (and one file)
    Does not return anything, instead writes the result to a csv file of shape (timesteps, catchments)
    """
    src_path,  dst_path, watersheds = args

    weather = pd.DataFrame(index=pd.to_datetime([]), columns=watersheds.Paikka_Id)
    weather.index.name = 'date'

    with rioxr.open_rasterio(src_path, mask_and_scale=True) as data_array:
        # Iterating over the days in the file 
        for time_step in data_array.Time:
            time = time_step.item()
            one_day_data = data_array.sel({'Time':time})

            row = []
            for i in tqdm(range(len(watersheds))): 
                watershed = watersheds.iloc[[i]]
                place_id = watershed.Paikka_Id[i]
                
                # Calculating the average of the attribute for the whole catchment
                clipped = one_day_data.rio.clip(watershed.geometry.values, crs=watershed.crs)
                average = clipped.mean().item()
                average = round(average, 1)
                
                # Failsafe for catchments smaller than the pixel size
                if average is np.nan:
                    clipped = data_array.rio.clip(watershed.geometry.values, crs=watershed.crs, all_touched=True)
                    average = clipped.mean().item()

                row.append(average)
                
            weather.loc[str(time)] = row
            
    weather.to_csv(dst_path)

In [6]:
# Root directories for input and output data
root = "/path/to/finland_climate/fmi_grid"
dst_root = "/path/to/CAMELS-FI/data/raw_time_series"

# Directories and attributes for climate data
dirs = ['RRday', 'ET0_FAO', 'Tday',
        'Tgmin', 'Tmin', 'Tmax',
        'Rh', 'Globrad', 'Snow']
attributes = {'Rh' : 'humidity' ,'ET0_FAO': 'pet', 'Tday': 'temperature_mean',
              'Tmin': 'temperature_min', 'Tgmin': 'temperature_gmin', 'Tmax': 'temperature_max',
              'RRday': 'precipitation', 'Globrad': 'radiation_global', 'Snow': 'snow_depth'}

# Years to process
years = (1961, 2023)

args = []
for current_dir in dirs:
    dst_dir =  os.path.join(dst_root, attributes[current_dir])
    if not os.path.exists(dst_dir):
        os.makedirs(dst_dir)
        
    for year in range(years[0], years[1] + 1):
        if year == 2023 and current_dir == 'Tgmin':
            continue
        
        # Evapotranspiration has differing scheme to all other sources
        if current_dir == 'ET0_FAO':
            #Pet is only available from 1981
            if year < 1981:
                continue
            # Different time range than other variables
            src_file_name = f"{current_dir}_{year}_months_4_to_9.nc"
        else:
            src_file_name = f"{current_dir.lower()}_{year}.nc"

        src_path = os.path.join(root, current_dir, src_file_name)
        if not os.path.exists(src_path):
            print(f"path {src_path} doesn't exist")
            with open(os.path.join(dst_root, "error_log.txt"), 'a') as error_log:
                error_log.write(f"Path does not exist:{src_path}")
            continue
            
        dst_file_name = f"{attributes[current_dir]}_{year}.csv"
        dst_path = os.path.join(dst_dir, dst_file_name)
        args.append((src_path,  dst_path, watersheds))
        
# Process the data in parallel
with Pool() as p:
    p.map(mean_weather, args)

In [10]:
# Function to calculate the mean for a single catchment
def catchment_mean(args):
    data_array, watershed, time = args
    one_day_data = data_array.sel({'Time':time})

    # Calculating the average of the attribute for the whole catchment
    clipped = one_day_data.rio.clip(watershed.geometry.values, crs=watershed.crs)
    average = clipped.mean().item()
    average = round(average, 1)
    
    # Failsafe for catchments smaller than the pixel size
    if average is np.nan:
        clipped = data_array.rio.clip(watershed.geometry.values, crs=watershed.crs, all_touched=True)
        average = clipped.mean().item()
        
    return average

In [None]:
# Process the data for each watershed
dst_dir = "/path/to/CAMELS-FI/data/timeserie"

if not os.path.exists(dst_dir):
    os.makedirs(dst_dir)

weather = pd.DataFrame(index=pd.to_datetime([]))
weather.index.name = 'date'

pbar = tqdm(range(len(watersheds)))
for i in pbar:
    watershed = watersheds.loc[[i]]
    place_id = watershed.at[i, 'Paikka_Id']
    # Constructing the path of the timeseries for this catchmentparallel 
    path = os.path.join(dst_dir, f"CAMELS_FI_hydromet_timeseries_{place_id}_{years[0]}0101-{years[1]}1231.csv")
    for current_dir in dirs:
         
        for year in range(years[0], years[1] + 1):
            pbar.set_description(f"Processing watershed {place_id:>4}, {attributes[current_dir]:>16}, {year}")
            
            # Evapotranspiration has differing scheme to all other sources
            if current_dir == 'ET0_FAO':
                # Different time range than other variables
                file_name = f"{current_dir}_{year}_months_4_to_9.nc"
            else:
                file_name = f"{current_dir.lower()}_{year}.nc"
                
            src_path = os.path.join(root, current_dir, file_name)
            if not os.path.exists(src_path):
                print(f"Path {src_path} doesn't exists")
                continue
            
            with rioxr.open_rasterio(src_path, mask_and_scale=True) as src:
                data_array = src.copy()
    
            # Iterating over the days and adding it to a list for parallel mapping
            arg_list = []
            times = []
            for time_step in data_array.Time:
                time = time_step.item()
                arg_list.append((data_array, watershed, time))
                times.append(str(time))
            
            with Pool(6) as p:
                averages = p.map(catchment_mean, arg_list)
            
            for average, time in list(zip(averages, times)):
                weather.loc[time,  attributes[current_dir]] = average
            
    weather.to_csv(path)