In [1]:
import sys
import pathlib
import datetime
import xarray as xr
import cftime
import variables
import pandas as pd
import re
import dask
import numpy as np
import os
from cluster import PerlmutterSLURMCluster
import joblib

In [2]:
scratch = pathlib.Path(os.environ['SCRATCH']) / 'dor'
joblib_cache_dir = scratch / 'joblib'
dask_log_directory = pathlib.Path(os.environ['SCRATCH']) / 'dask' / 'logs'
dask_local_directory = pathlib.Path(os.environ['SCRATCH']) / 'dask' / 'local-dir'


scratch.mkdir(parents=True, exist_ok=True)
dask_local_directory.mkdir(parents=True, exist_ok=True)
dask_log_directory.mkdir(parents=True, exist_ok=True)

memory = joblib.Memory(joblib_cache_dir, verbose=0)

In [3]:
n_workers = 1 # Number of Slurm jobs to launch in parallel
n_nodes_per_calc = 1 # Number of nodes to reserve for each Slurm job
n_cores_per_node = 48 # Number of CPU cores per node
mem_per_node = "512 GB" # Total memory per node
cluster_kwargs = {
 
    # Dask worker options
    "processes" : n_cores_per_node,
    "cores": n_cores_per_node, # total number of cores (per Slurm job) for Dask worker
    "memory": mem_per_node, # total memory (per Slurm job) for Dask worker
    
    # SLURM options
    "job_name" : 'dor-dataset-compression',
    "shebang": "#!/bin/bash",
    "walltime": "00:30:00", # DD:HH:SS
    "job_mem": "0", # all memory on node
    "job_script_prologue": ["source ~/.bashrc"], # commands to run before calculation, including exports
    "job_directives_skip": ["-n", "--cpus-per-task"], # Slurm directives we can skip
    "job_extra_directives": [f"-N {n_nodes_per_calc}", "-q debug", "-C cpu"], # num. of nodes for calc (-N), queue (-q), and constraints (-c)
    "log_directory" : str(dask_log_directory),
    "local_directory": str(dask_local_directory)
}

cluster = PerlmutterSLURMCluster(
    
    **cluster_kwargs
)

client = dask.distributed.client.Client(cluster)
cluster

0,1
Dashboard: https://jupyter.nersc.gov/user/abanihi/perlmutter-login-node-base/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.55.64.19:38715,Workers: 0
Dashboard: https://jupyter.nersc.gov/user/abanihi/perlmutter-login-node-base/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [4]:
cluster.scale(1)

In [5]:
#print(cluster.job_script())

In [6]:
# cluster.scale(0)

In [7]:
parent_dir = pathlib.Path.cwd().parent
sys.path.append(str(parent_dir))

In [8]:
import atlas

In [9]:
calc = atlas.global_irf_map(cdr_forcing="DOR", vintage="001")
calc.df

Unnamed: 0_level_0,blueprint,polygon,polygon_master,basin,start_date,cdr_forcing,cdr_forcing_file,simulation_key,refdate,stop_n,wallclock,curtail_output
case,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
smyle.cdr-atlas-v0.control.001,smyle,,,,1999-01,,,baseline,0347-01-01,16,12:00:00,False
smyle.cdr-atlas-v0.glb-dor_North_Atlantic_basin_000_1999-01-01_00000.001,smyle,0.0,0.0,North_Atlantic_basin,1999-01,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_North_Atlantic_basin_000_1999-01-01_00000,0347-01-01,15,10:00:00,True
smyle.cdr-atlas-v0.glb-dor_North_Atlantic_basin_000_1999-04-01_00001.001,smyle,0.0,0.0,North_Atlantic_basin,1999-04,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_North_Atlantic_basin_000_1999-04-01_00001,0347-04-01,15,10:00:00,True
smyle.cdr-atlas-v0.glb-dor_North_Atlantic_basin_000_1999-07-01_00002.001,smyle,0.0,0.0,North_Atlantic_basin,1999-07,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_North_Atlantic_basin_000_1999-07-01_00002,0347-07-01,15,10:00:00,True
smyle.cdr-atlas-v0.glb-dor_North_Atlantic_basin_000_1999-10-01_00003.001,smyle,0.0,0.0,North_Atlantic_basin,1999-10,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_North_Atlantic_basin_000_1999-10-01_00003,0347-10-01,15,10:00:00,True
...,...,...,...,...,...,...,...,...,...,...,...,...
smyle.cdr-atlas-v0.glb-dor_Southern_Ocean_038_1999-10-01_02755.001,smyle,38.0,688.0,Southern_Ocean,1999-10,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_Southern_Ocean_038_1999-10-01_02755,0347-10-01,15,10:00:00,True
smyle.cdr-atlas-v0.glb-dor_Southern_Ocean_039_1999-01-01_02756.001,smyle,39.0,689.0,Southern_Ocean,1999-01,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_Southern_Ocean_039_1999-01-01_02756,0347-01-01,15,10:00:00,True
smyle.cdr-atlas-v0.glb-dor_Southern_Ocean_039_1999-04-01_02757.001,smyle,39.0,689.0,Southern_Ocean,1999-04,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_Southern_Ocean_039_1999-04-01_02757,0347-04-01,15,10:00:00,True
smyle.cdr-atlas-v0.glb-dor_Southern_Ocean_039_1999-07-01_02758.001,smyle,39.0,689.0,Southern_Ocean,1999-07,DOR,/global/cfs/projectdirs/m4746/Projects/OAE-Eff...,glb-dor_Southern_Ocean_039_1999-07-01_02758,0347-07-01,15,10:00:00,True


In [None]:
%%time

data = calc.df_case_status
done = data.loc[data.archive]

done_cases = done.index.to_list()
done_cases.remove('smyle.cdr-atlas-v0.control.001')
done_cases = sorted(done_cases)

df = calc.df.loc[done_cases]
len(done_cases)

In [None]:
df

In [None]:
def glob_nc_files(base_path: str | pathlib.Path, case:str):
    if case not in done_cases:
        raise ValueError(f'{case} not in list of done cases')
    base_path = pathlib.Path(base_path)
    print("Globbing files (this may take a while)...")
    pattern = base_path / case / 'ocn' / 'hist' / '*.pop.h.*.nc'
    nc_files = sorted(base_path.glob(str(pattern.relative_to(base_path))))
    return nc_files


In [None]:
base_directory = pathlib.Path('/global/cfs/projectdirs/m4746/Projects/Ocean-CDR-Atlas-v0/data/archive')

In [None]:
nc_files = glob_nc_files(base_path=base_directory, case='smyle.cdr-atlas-v0.glb-dor_Southern_Ocean_026_1999-04-01_02705.001')

In [None]:
len(nc_files)

## Add coordinate information and expand dimensions

In [None]:
def get_year_and_month(ds):
    value = ds.time_bound.isel(d2=0).data.item()
    year = value.year
    month = value.month
    return year, month
    

def set_coords(ds: xr.Dataset) -> xr.Dataset:
    return ds.set_coords(variables.COORDS)

def get_case_metadata(case:str) -> pd.Series:
    case_metadata = df.loc[case]
    return case_metadata

def add_additional_coords(ds: xr.Dataset):
    case = ds.title
    case_metadata = get_case_metadata(case)

    polygon_master = int(case_metadata.polygon_master)
    if polygon_master < 0 or polygon_master > 689:
        raise ValueError(f'Polygon id must be in range [0, 690). Found polygon_id={polygon_master}')
    
    # add as an integer coordinate
    polygon_id_coord = xr.DataArray(
        name='polygon_id', 
        dims='polygon_id', 
        data=[polygon_master], 
        attrs={'long_name': "polygon ID"},
    )

    # injenction date
    injection_date_coord = xr.DataArray(
        data=[cftime.DatetimeNoLeap.strptime(case_metadata.start_date, '%Y-%m', calendar='noleap', has_year_zero=True)],
        dims=['injection_date'],
        attrs={'long_name': "injection date"}
    )

    # add elapsed time coord
    year, month = get_year_and_month(ds)
    current_year = 1999 + int(year) - int('0347')  # all simulations start in the same year
    current_time = cftime.datetime(year=current_year, month=int(month), day=1, calendar='noleap', has_year_zero=True)
    # print(f'current_date: {current_time}')

    injection_date = injection_date_coord.data[0]
   
    elapsed_time = current_time - injection_date

    elapsed_time_coord = xr.DataArray(
        data=[elapsed_time], 
        dims=['elapsed_time'],
    )

    renamed = ds.drop_indexes('time').rename_dims(time='elapsed_time')
    

    return renamed.assign_coords(
        polygon_id=polygon_id_coord,
        injection_date=injection_date_coord,
        elapsed_time=elapsed_time_coord
    )

def expand_ensemble_dims(ds: xr.Dataset) -> xr.Dataset:
    """Add new dimensions across the ensemble"""

    copied = ds.copy()

    # all data variables should be ensemble variables
    for name in list(ds.data_vars):
        copied[name] = copied[name].expand_dims(['polygon_id', 'injection_date'])

    # absolute time is a function of injection_date because of the different starting times
    copied['time'] = copied['time'].expand_dims(['injection_date'])
    copied['time_bound'] = copied['time_bound'].expand_dims(['injection_date'])
            
    return copied

In [None]:
path = nc_files[146]
path

In [None]:
expanded = xr.open_dataset(path, engine='netcdf4').pipe(set_coords).pipe(add_additional_coords).pipe(expand_ensemble_dims)
expanded

## Compute anomalies

In [None]:
def compute_anomalies(ds: xr.Dataset) -> xr.Dataset:
    """Subtract counterfactual from experimental values, and leave only these anomalies in the resulting dataset."""

    # do this manually instead of a loop over variable names because there are too many variable names that don't follow a consistent pattern

    def compute_anomaly_for_variable(ds, name, alt_name, new_name):
        ds[new_name] = ds[name] - ds[alt_name]
        ds[new_name].attrs = ds[name].attrs
        ds = ds.drop_vars([name, alt_name], errors='raise')
        return ds

    ds = compute_anomaly_for_variable(ds, name="DIC", alt_name="DIC_ALT_CO2", new_name="DIC_ANOM")
    ds = compute_anomaly_for_variable(ds, name="ALK", alt_name="ALK_ALT_CO2", new_name="ALK_ANOM")
    ds = compute_anomaly_for_variable(ds, name="FG_CO2", alt_name="FG_ALT_CO2", new_name="FG_ANOM")
    ds = compute_anomaly_for_variable(ds, name="PH", alt_name="PH_ALT_CO2", new_name="PH_ANOM")
    ds = compute_anomaly_for_variable(ds, name="CO2STAR", alt_name="CO2STAR_ALT_CO2", new_name="CO2STAR_ANOM")
    ds = compute_anomaly_for_variable(ds, name="CO3", alt_name="CO3_ALT_CO2", new_name="CO3_ANOM")
    ds = compute_anomaly_for_variable(ds, name="pH_3D", alt_name="pH_3D_ALT_CO2", new_name="pH_3D_ANOM")
    ds = compute_anomaly_for_variable(ds, name="DCO2STAR", alt_name="DCO2STAR_ALT_CO2", new_name="DCO2STAR_ANOM")
    ds = compute_anomaly_for_variable(ds, name="pCO2SURF", alt_name="pCO2SURF_ALT_CO2", new_name="pCO2SURF_ANOM")
    ds = compute_anomaly_for_variable(ds, name="DpCO2", alt_name="DpCO2_ALT_CO2", new_name="DpCO2_ANOM")
    ds = compute_anomaly_for_variable(ds, name="DIC_zint_100m", alt_name="DIC_ALT_CO2_zint_100m", new_name="DIC_ANOM_zint_100m")
    ds = compute_anomaly_for_variable(ds, name="ALK_zint_100m", alt_name="ALK_ALT_CO2_zint_100m", new_name="ALK_ANOM_zint_100m")
    ds = compute_anomaly_for_variable(ds, name="tend_zint_100m_DIC", alt_name="tend_zint_100m_DIC_ALT_CO2", new_name="tend_zint_100m_DIC_ANOM")
    ds = compute_anomaly_for_variable(ds, name="tend_zint_100m_ALK", alt_name="tend_zint_100m_ALK_ALT_CO2", new_name="tend_zint_100m_ALK_ANOM")
    ds = compute_anomaly_for_variable(ds, name="STF_ALK", alt_name="STF_ALK_ALT_CO2", new_name="STF_ALK_ANOM")
    
    return ds

In [None]:
anomalies = compute_anomalies(expanded)
anomalies

## Quick check for polygon's correctness

In [None]:
anomalies['ALK_ANOM'].isel(z_t=0).plot();

## Fix encoding

In [None]:
def set_encoding(ds: xr.Dataset) -> xr.Dataset:

    #ds = ds.drop_encoding()
    
    # merge encodings to include existing time encoding as well as previous compression encoding
    for name, var in ds.variables.items():

        # avoids some very irritating behaviour causing the netCDF files to be internally chunked
        if "original_shape" in ds[name].encoding:
            del ds[name].encoding["original_shape"]
        
        if np.issubdtype(var.dtype, np.floating):  # don't try to compress things that aren't floats
            ds[name].encoding["zlib"] = True
            ds[name].encoding["complevel"] = 4

        if var.ndim == 6:
            _3D_CHUNKS = (1, 1, 1, 60, 384, 320)
            #ds[name] = ds[name].chunk(_3D_CHUNKS)
            ds[name].encoding['chunksizes'] = _3D_CHUNKS
        elif var.ndim == 5:
            _2D_CHUNKS = (1, 1, 1, 384, 320)
            #ds[name] = ds[name].chunk(_2D_CHUNKS)
            ds[name].encoding['chunksizes'] = _2D_CHUNKS
    
    return ds

In [None]:
encoded = anomalies.pipe(set_encoding)
encoded

In [None]:
encoded.CO3_ANOM.encoding, encoded['DCO2STAR_ANOM'].encoding

In [None]:
def save_to_netcdf(
    ds: xr.Dataset, 
    out_filepath: str,
) -> None:
    ds.to_netcdf(out_filepath, format='NETCDF4')

In [None]:
save_to_netcdf(encoded, out_filepath=f'{scratch}/compressed-anomalies-test.nc')

In [None]:
!du -ch {scratch}/compressed-anomalies-test.nc

In [None]:
!du -ch {path}

## Whole pipeline for any task

In [None]:
def open_compress_and_save_file(filepath: str | pathlib.Path, out_path_prefix: str | pathlib.Path) -> None:
    ds = (xr.open_dataset(filepath, engine='netcdf4')
          .pipe(set_coords)
          .pipe(add_additional_coords)
          .pipe(expand_ensemble_dims)
          .pipe(compute_anomalies)
          .pipe(set_encoding)
         )

    polygon_id = ds.polygon_id.data.item()
    injection_month = ds.injection_date.dt.month.data.item()
    injection_year = ds.injection_date.dt.year.data.item()
    year, month = get_year_and_month(ds)

    # Pad the values with zeros
    padded_polygon_id = f"{polygon_id:03d}"
    padded_injection_month = f"{injection_month:02d}"
    padded_injection_year = f"{injection_year:04d}"
    padded_year = f"{year:04d}"
    padded_month = f"{month:02d}"

    out_dir = pathlib.Path(out_path_prefix) / f"{padded_polygon_id}/{padded_injection_month}/"
    out_dir.mkdir(parents=True, exist_ok=True)
    out_filepath = out_dir / f"smyle.cdr-atlas-v0.glb-dor.{padded_polygon_id}-{padded_injection_year}-{padded_injection_month}.pop.h.{padded_year}-{padded_month}.nc"
    save_to_netcdf(ds, out_filepath=out_filepath)
    print(f"""
🎉 Processing Complete! 🎉
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📊 Polygon ID:       {padded_polygon_id}
💉 Injection Date:   {padded_injection_year}-{padded_injection_month}
📅 Processed Period: {padded_year}-{padded_month}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📁 Input File:
   {filepath}
📁 Output File:
   {out_filepath}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
✅ File saved successfully!
    """)

    ds.close()
    return out_filepath

In [None]:
%%time
out_path_prefix = scratch / "compressed" / "anomalies"

ds = open_compress_and_save_file(nc_files[53], out_path_prefix=out_path_prefix)
ds

In [30]:
tasks = []
for file in nc_files:
    task = dask.delayed(open_compress_and_save_file)(file, out_path_prefix=out_path_prefix)
    tasks.append(task)

In [None]:
%%time
results = dask.compute(*tasks)

In [48]:
%%time

@memory.cache
def process_case(case: str, base_path: str, out_path_prefix: str):
    nc_files = glob_nc_files(base_path=base_directory, case=case)
    tasks = []
    for file in nc_files:
        tasks.append(dask.delayed(open_compress_and_save_file)(file, out_path_prefix=out_path_prefix))

    results = dask.compute(*tasks)    
    return len(results)


for case in done_cases[:5]:
    print(process_case(case=case, base_path=base_directory, out_path_prefix=out_path_prefix))
    

180
180
180
180
180
CPU times: user 1.12 ms, sys: 5.77 ms, total: 6.89 ms
Wall time: 8.18 ms
