#### Imports

In [1]:
# Core scientific and data libraries
import numpy as np
from pathlib import Path
from src.coordinate_utils import *
from dask.distributed import Client

#### Variables


In [2]:
dataset_url = 'gs://gcp-public-data-arco-era5/co/single-level-reanalysis.zarr'
relevant_regions = ['Central_Arctic', 'Beaufort', 'Chukchi-NA', 'Chukchi-Asia', 'E_Siberian', 'Laptev', 'Kara', 'Barents', 'E_Greenland', 'Baffin', 'Hudson', 'Can_Arch', 'Bering-NA', 'Bering-Asia', 'pan_arctic']

In [3]:
dataset = xr.open_zarr(
    dataset_url,
    consolidated=True,
    decode_timedelta=False,
    chunks={'time': 240},
)
dataset = dataset.assign_coords(longitude=((dataset.longitude + 180) % 360) - 180)


In [4]:
selected_vars_dataset = dataset[["t2m", "u10", "v10", "msl"]]

#### Helper Functions

In [5]:
def create_and_cache_mask(region_name):
    """
    Creates and caches a mask for a specified geographical region.

    This function generates a mask by determining whether geographical points
    belong to a specified region shape, using longitude and latitude coordinates
    from a dataset. The mask is then cached as a .npy file for future use.
    It supports both "pan_arctic" and other custom region names.

    Parameters:
        region_name (str): Name of the geographical region for which the mask
            is to be created. If "pan_arctic", a predefined shape is used, otherwise
            a specified regional shape is retrieved.

    Raises:
        Any exceptions associated with file I/O or operations on the dataset.

    Returns:
        numpy.ndarray: A boolean array where each element indicates whether
            the corresponding geographical point is within the specified region.
    """
    coords_only = xr.open_zarr(dataset_url, chunks={'values': -1},
                               decode_timedelta=False, )
    lon = coords_only["longitude"].compute().values
    lon = lon_to_180(lon)
    lat = coords_only["latitude"].compute().values

    if region_name == "pan_arctic":
        geom = get_pan_arctic_shape()
    else:
        geom = get_region_shape(region_name)

    pts = shapely.points(lon, lat)
    mask_vals = shapely.contains(geom, pts)

    mask_file = f'../data/processed/area_masks/{region_name}_mask.npy'
    Path(mask_file).parent.mkdir(parents=True, exist_ok=True)
    np.save(mask_file, mask_vals)
    return mask_vals


def load_and_apply_mask(dataset, region_name="pan_arctic"):
    """
    Loads a dataset, applies a mask, and returns the masked dataset. If the mask
    does not already exist in the specified path, it is created and cached.

    Parameters:
    dataset (xarray.Dataset): The dataset to which the mask will be applied.
    region_name (str): The name of the region defining the mask. Defaults to
        "pan_arctic".

    Returns:
    xarray.Dataset: The dataset after applying the mask.

    Raises:
    FileNotFoundError: If the mask file cannot be created or found in the specified
        path.
    """
    mask_file = f'../data/processed/area_masks/{region_name}_mask.npy'

    if not Path(mask_file).exists():
        mask_vals = create_and_cache_mask(region_name)
    else:
        mask_vals = np.load(mask_file)

    values_dim = dataset.sizes.get("values", None)
    time_dim = dataset.sizes.get("time", None)
    mask = xr.DataArray(mask_vals, dims=("values",))
    masked = dataset.where(mask, drop=True)
    values_dim_after = masked.sizes.get("values", None)
    return masked


def list_available_masks():
    """
    Lists all available masks from the specified directory.

    This function scans through a designated directory to identify files with
    a specific naming pattern. It extracts the base names of the mask files,
    removes unnecessary suffixes, and returns them in a sorted order. If the
    designated directory is not found, an empty list is returned.

    Returns
    -------
    list of str
        A sorted list of available mask names (excluding file suffixes and
        unnecessary suffixes), or an empty list if the directory does not exist.
    """
    mask_dir = Path('../data/processed/area_masks')
    if mask_dir.exists():
        masks = [f.stem.replace('_mask', '') for f in mask_dir.glob('*_mask.npy')]
        masks_sorted = sorted(masks)
        return masks_sorted
    return []

def daily_stats_all_vars(region_ds, year, variables):
    """
    region_ds: masked + persisted dataset containing *all* variables
    returns: pandas DataFrame with daily mean/std/p15/p85 for each variable
    """
    # 1) daily temporal mean (still per-gridcell)
    daily = region_ds[variables].sel(time=slice(f"{year}-01-01", f"{year}-12-31")) \
                                .resample(time="1D").mean()

    # 2) spatial reductions stay in Dask
    mu  = daily.mean(dim="values")                    # mean over grid
    sd  = daily.std(dim="values")                     # std over grid
    q   = daily.quantile([0.15, 0.85], dim="values")  # quantiles over grid  (lazy)

    # 3) assemble compact Dataset (dims ~ time only)
    out = xr.Dataset()
    for var in variables:
        out[f"{var}_mean"] = mu[var]
        out[f"{var}_std"]  = sd[var]
        out[f"{var}_p15"]  = q.sel(quantile=0.15)[var]
        out[f"{var}_p85"]  = q.sel(quantile=0.85)[var]

    # 4) small df now; compute happens here
    df = out.to_dataframe().reset_index()
    return df


#### Systematic Regional-Yearly Processing

In [6]:
def get_parquet_path(region, year, variable):
    """
    Generates the file path for parquet files following the project structure.
    
    Parameters:
        region (str): Name of the geographical region
        year (int): Year for the data
        variable (str): ERA5 variable name (t2m, u10, v10, msl)
    
    Returns:
        Path: Path object for the parquet file
    """
    base_dir = Path('../data/processed/parquet/year')
    filename = f"{region}_{year}_{variable}.parquet"
    return base_dir / filename

In [7]:
def calculate_daily_statistics(dataset, region, year, variable):
    """
    Calculates daily statistics for a specific region, year, and variable.
    
    Parameters:
        dataset (xarray.Dataset): The ERA5 dataset
        region (str): Name of the geographical region
        year (int): Year for the data processing
        variable (str): ERA5 variable name
    
    Returns:
        pandas.DataFrame: DataFrame with daily statistics
    """
    # Apply regional mask
    masked_dataset = load_and_apply_mask(dataset[[variable]], region_name=region)
    masked_dataset = masked_dataset.persist()
    
    # Select year and resample to daily means
    year_data = masked_dataset.sel(time=slice(f"{year}-01-01", f"{year}-12-31"))
    daily_means = year_data.resample(time='1D').mean().compute()
    
    # Convert to DataFrame for easier statistics calculation
    df = daily_means.to_dataframe().reset_index()
    
    # Calculate daily statistics
    daily_stats = df.groupby(df['time'].dt.date).agg({
        variable: ['mean', 'std', lambda x: np.percentile(x, 15), lambda x: np.percentile(x, 85)]
    }).round(4)
    
    # Flatten column names
    daily_stats.columns = [f'{variable}_mean', f'{variable}_std', f'{variable}_p15', f'{variable}_p85']
    daily_stats = daily_stats.reset_index()
    daily_stats['region'] = region
    daily_stats['year'] = year
    
    return daily_stats

In [8]:
def process_regions_and_years(dataset, regions, years, variables):
    total_combinations = len(regions) * len(years) * len(variables)
    processed_count = 0
    skipped_count = 0
    print(f"Processing {total_combinations} region-year-variable combinations...")

    for region in regions:
        print(f"\nProcessing region: {region}")

        # Mask once per region and persist for reuse across all years
        region_ds = load_and_apply_mask(dataset[variables], region_name=region)
        region_ds = region_ds.persist()

        for year in years:
            # If *all* var files for this (region,year) exist, skip quickly
            if all(get_parquet_path(region, year, v).exists() for v in variables):
                skipped_count += len(variables)
                continue

            try:
                stats_df_all = daily_stats_all_vars(region_ds, year, variables)
                stats_df_all["region"] = region
                stats_df_all["year"] = year

                # write one file per variable (keeps your existing structure)
                for v in variables:
                    parquet_path = get_parquet_path(region, year, v)
                    if parquet_path.exists():
                        skipped_count += 1
                        continue
                    parquet_path.parent.mkdir(parents=True, exist_ok=True)
                    cols = ["time", "region", "year",
                            f"{v}_mean", f"{v}_std", f"{v}_p15", f"{v}_p85"]
                    # Rename time→date if you prefer that schema
                    out = stats_df_all[["time", "region", "year",
                                        f"{v}_mean", f"{v}_std", f"{v}_p15", f"{v}_p85"]].copy()
                    out.rename(columns={"time": "date"}, inplace=True)
                    out.to_parquet(parquet_path, index=False)
                    processed_count += 1
                    print(f"  ✓ {parquet_path.name} created")

            except Exception as e:
                print(f"  ✗ Error processing {region}_{year}: {str(e)}")

    print(f"\nProcessing complete!")
    print(f"Files processed: {processed_count}")
    print(f"Files skipped (already exist): {skipped_count}")


In [9]:
# Define processing parameters
years_to_process = list(range(1979, 2022))  # 1979 to 2021 inclusive
variables_to_process = ["t2m", "u10", "v10", "msl"]

print(f"Total years: {len(years_to_process)} (1979-2021)")
print(f"Total regions: {len(relevant_regions)}")
print(f"Total variables: {len(variables_to_process)}")
print(f"Total combinations: {len(relevant_regions) * len(years_to_process) * len(variables_to_process)}")

Total years: 43 (1979-2021)
Total regions: 15
Total variables: 4
Total combinations: 2580


In [10]:
# Test with a small subset first
test_regions = ['Beaufort']  # Start with one region
test_years = [2019, 2020]    # Test with two years
test_variables = ['t2m']     # Test with one variable

print("Starting test run with subset...")
print(f"Test regions: {test_regions}")
print(f"Test years: {test_years}")  
print(f"Test variables: {test_variables}")

# Restart dask client for clean processing
client = Client(n_workers=4, threads_per_worker=3, memory_limit='8GB')

Starting test run with subset...
Test regions: ['Beaufort']
Test years: [2019, 2020]
Test variables: ['t2m']


In [None]:
process_regions_and_years(
    dataset=selected_vars_dataset,
    regions=relevant_regions,
    years=years_to_process,
    variables=variables_to_process)


Processing 2580 region-year-variable combinations...

Processing region: Central_Arctic
