#### This notebook takes raw ERA5 (pressure-coordinate) data and performs the following operations using Dask: 
1. Concatenate the disjointed ERA5 files over some region 
2. Saves the concatenated data 
3. Uses concatenated data to compute a daily climatology 


In [1]:
import os, gc; from os.path import exists
os.chdir('/vortexfs1/home/anthony.meza/CTWPC/scripts')
main_dir = "/vortexfs1/home/anthony.meza/CTWPC"
plotsdir = lambda x="": main_dir + "/plots/" + x
GLORYS_dir = lambda x="": main_dir + "/GLORYS_data" + x
GLORYS_data_dir = lambda x="": main_dir + "/GLORYS_processed/" + x
ERA5_data_dir = lambda x="": main_dir + "/ERA5_data/" + x

In [2]:
from help_funcs import * 
import xarray as xr
import pandas as pd
import netCDF4 as nc
from pathlib import Path
from natsort import natsorted
import matplotlib.pyplot as plt
import dask_labextension

In [3]:
from dask_jobqueue import SLURMCluster  # setup dask cluster 
cluster = SLURMCluster(
    cores=36,
    processes=1,
    memory='150GB',
    walltime='02:00:00',
    queue='compute',
    interface='ib0')
print(cluster.job_script())
cluster.scale(jobs=14)
from dask.distributed import Client
client = Client(cluster)
client

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -n 1
#SBATCH --cpus-per-task=30
#SBATCH --mem=140G
#SBATCH -t 02:00:00

/vortexfs1/home/anthony.meza/mambaforge/envs/atm_rivers/bin/python -m distributed.cli.dask_worker tcp://172.16.3.56:37106 --nthreads 30 --memory-limit 139.70GiB --name dummy-name --nanny --death-timeout 60 --interface ib0



In [5]:
#search for the correct files
results_era5 = natsorted(str(result) for result in list(Path(ERA5_data_dir()).rglob("*.[nN][cC]")))
results_era5_filtered = []
for result in results_era5:
    if len(result) < 81:
        results_era5_filtered = results_era5_filtered + [result]

In [6]:
#extract IVT components
def _preprocess_ERA5(ds):
    return ds[["p71.162", "p72.162"]].sel(latitude = slice(60, -2)).sel(longitude = slice(-150, -75)).resample(time="1D").mean().sel(time = slice("1992", None))

ds = xr.open_mfdataset(
        results_era5_filtered,
        data_vars="minimal",
        coords="minimal",
        compat="override",
        preprocess=_preprocess_ERA5,
        parallel=True, engine = "netcdf4", 
        chunks={"latitude":-1, "longitude":-1, "time":-1})

#save the dataset before processing 
ds.to_netcdf(GLORYS_data_dir("ERA5_WaterVars_NE_PAC_daily.nc"),
             mode = "w", format = "NETCDF4", 
             engine = "netcdf4", compute = True)

In [8]:
#reopen daily fields that have been saved in a nicer format and save the anomalies
gc.collect()
era5_daily = xr.open_mfdataset(GLORYS_data_dir("ERA5_WaterVars_NE_PAC_daily.nc"), 
                                data_vars="minimal",
                                coords="minimal",
                                compat="override",
                                parallel=True,
                                chunks={"longitude": -1, "latitude":-1, "time":320},
                                engine="netcdf4")
era5_daily = era5_daily.convert_calendar('noleap') #remove leap years from operations

In [9]:
era5_climatology = era5_daily.groupby("time.dayofyear").mean("time")
era5_climatology.to_netcdf(ERA5_data_dir("ERA5_WaterVars_Daily_Climatology.nc"),
             mode = "w", format = "NETCDF4", 
             engine = "netcdf4", compute = True)