In [1]:
import xarray as xr
import PyCO2SYS as pyco2
import numpy as np
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress
from dask.diagnostics import ProgressBar
from time import time

  from distributed.utils import tmpfile


In [2]:
cluster = SLURMCluster(
    cores = 48,
    memory = '192GB',
    processes=1,
    walltime='12:00:00'
)
cluster.scale(jobs=12)
client = Client(cluster)

In [3]:
datapath = '/lus/scratch/shao/data/NEP36_extremes/'
processed_datapath = f'{datapath}/processed'
prefix = 'NEP36-CanOE_3h_benthic_shelf_'
x_slice = slice(215,713)
y_slice = slice(180,750)

In [5]:
def calc_omega_a( alkalinity, dic ):
    results = pyco2.sys(
        par1 = alkalinity,
        par1_type = 1,
        par2 = dic,
        par2_type =2
    )
    return results['saturation_aragonite']

with xr.open_dataset(f'{datapath}/NEP36-CanOE_3h_benthic_shelf_20200219-20200223.nc') as ds:
    print(ds)

<xarray.Dataset>
Dimensions:  (time: 40, y: 1020, x: 714)
Coordinates:
    nav_lon  (y, x) float32 ...
  * time     (time) datetime64[ns] 2020-02-19T01:30:00 ... 2020-02-23T22:30:00
    nav_lat  (y, x) float32 ...
Dimensions without coordinates: y, x
Data variables:
    S        (time, y, x) float32 ...
    depths   (y, x) float64 ...
    T        (time, y, x) float32 ...
    DIC      (time, y, x) float32 ...
    ALK      (time, y, x) float32 ...
    NO3      (time, y, x) float32 ...
    O2       (time, y, x) float32 ...
Attributes:
    description:  benthic data for the shelf


In [None]:
# Subset and resample all fields to daily averages
for year in range(1996,2021):
    start_time = time()
    with xr.open_mfdataset(f'{datapath}/{prefix}*{year}*',parallel=True) as ds:
        # Groupby year and subset in space
        ds_year = ds.groupby('time.year')[year]
        ds_year = ds_year.isel(
            x = x_slice,
            y = y_slice
        )
        # Perform other calculations
        ds_year['OmegaA'] = xr.apply_ufunc(
            calc_omega_a,
            ds_year.ALK,
            ds_year.DIC,
            dask='parallelized'
        )
        ds_year = ds_year.resample(time='1D').mean(dim='time')
        ds_year.to_netcdf(f'{processed_datapath}/daily/{year}.nc',engine='h5netcdf')
    print(f'Year {year}: {time() - start_time}s')

In [8]:
# Subset all fields at highest resolution for 1 year
start_time = time()
with xr.open_mfdataset(f'{datapath}/{prefix}*{year}*',parallel=True,engine='h5netcdf') as ds:
    # Groupby year and subset in space
    ds_year = ds.groupby('time.year')[year]
    ds_year = ds_year.isel(
        x=slice(215,713),
        y=slice(180,750)
    )
    # Perform other calculations
    ds_year['OmegaA'] = xr.apply_ufunc(
        calc_omega_a,
        ds_year.ALK,
        ds_year.DIC,
        dask='parallelized'
    )
    ds_year.to_netcdf(f'{processed_datapath}/3hr/{year}.nc',engine='h5netcdf')

In [4]:
# Subset the mask file
mask = xr.open_dataset(f'{datapath}/NEP36-mask_channel.nc')
mask_sub = mask.isel(
    x = x_slice,
    y = y_slice
)

mask_sub.to_netcdf(f'{processed_datapath}/static/mask.nc')