# Datasets for archival

Creating archival datasets

In [None]:
import glob

import cf_xarray
import dask
import distributed
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr

import pump

In [None]:
import ncar_jobqueue

cluster = ncar_jobqueue.NCARCluster(
    account="ncgd0048", scheduler_options=dict(dashboard_address=":9797")
)
cluster.scale(12)

In [None]:
client = distributed.Client(cluster)
dask.config.set(scheduler=client)
client

0,1
Client  Scheduler: tcp://10.12.205.21:42130  Dashboard: https://jupyterhub.ucar.edu/dav/user/dcherian/proxy/9797/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## Attributes

In [None]:
dataset_attrs = {
        "Conventions": "CF-1.6",
        "institution": "National Center for Atmospheric Research",
        "source": "MIT General Circulation Model (MITgcm) checkpoint 64v",
        "references": "Cherian et al. (2021) Off-equatorial deep-cycle turbulence forced by Tropical Instability Waves in the equatorial Pacific. Journal of Physical Oceanography.",
    }

In [None]:
variable_attrs = {
    "latitude": {"standard_name": "latitude", "units": "degrees_north", "axis": "Y"},
    "longitude": {"standard_name": "longitude", "units": "degrees_east", "axis": "X"},
    "time": {"standard_name": "time"},
    "depth": {"units": "m", "positive": "up"},
    "ETAN": {"standard_name": "sea_surface_height", "units": "m"},
    "theta": {"standard_name": "sea_water_potential_temperature", "units": "degC"},
    "salt": {"standard_name": "sea_water_practical_salinity", "units": "psu"},
    "u": {"standard_name": "eastward_sea_water_velocity", "units": "m/s"},
    "v": {"standard_name": "northward_sea_water_velocity", "units": "m/s"},
    "w": {"standard_name": "upward_sea_water_velocity", "units": "m/s"},
    "oceQnet": {
        "long_name": "net surface heat flux into the ocean",
        "units": "W/m^2 ",
        "description": "(+=down), >0 increases theta",
    },
    "oceQsw": {
        "long_name": "net short wave radiation into the ocean",
        "units": "W/m^2",
        "description": "(+=down), >0 increases theta",
    },
    "DFrI_TH": {
        "units": "degC m^3/s",
        "long_name": "Vertical Diffusive Flux of Pot.Temperature (Implicit part)",
    },
    "KPP_diffusivity": {
        "long_name": "KPP diffusivity for temperature",
        "units": "m^2/s^2",
    },
    "KPPhbl": {"long_name": "KPP boundary layer depth", "units": "m"},
    "KPPRi": {"long_name": "KPP bulk Richardson number", "units": "1"},
    "KPPbo": {"long_name": "Surface turbulence buoyancy forcing", "units": "m^2/s^3"},
    "KPPviscA": {
        "long_name": "KPP vertical eddy viscosity coefficient",
        "units": "m^2/s",
    },
    "KPPdiffT": {
        "long_name": "KPP Vertical diffusion coefficient for heat",
        "units": "m^2/s",
    },
    "KPPg_TH": {
        "long_name": "KPP non-local flux of potential temperature",
        "units": "degC m^3/s",
    },
    "VISrI_Um": {
        "long_name": "Vertical   Viscous Flux of U momentum (Implicit part)",
        "units": "m^4/s^2",
    },
    "VISrI_Vm": {
        "long_name": "Vertical   Viscous Flux of U momentum (Implicit part)",
        "units": "m^4/s^2",
    },
    "Um_Diss": {"long_name": "U momentum tendency from Dissipation", "units": "m/s^2"},
    "Vm_Diss": {"long_name": "V momentum tendency from Dissipation", "units": "m/s^2"},
}

## Full domain TIW season

In [None]:
dirname = "/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC"
outdir = "/glade/campaign/cgd/oce/people/dcherian/Cherian-et-al-2021-TIW/full_domain"

In [None]:
files = sorted(glob.glob(f"{dirname}/*.nc*"))
files[:10]

['/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0001.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0001_hb.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0001_sf.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0002.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0002_hb.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0002_sf.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0003.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0003_hb.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0003_sf.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/full_domain/HOLD_NC/Day_0004.nc']

In [None]:
patterns = np.unique([ff.split("/")[-1][:8] for ff in files[::3]])
patterns[:10]

array(['Day_0001', 'Day_0002', 'Day_0003', 'Day_0004', 'Day_0005',
       'Day_0006', 'Day_0007', 'Day_0008', 'Day_0009', 'Day_0010'],
      dtype='<U8')

In [None]:
def process(patterns):
    import cf_xarray

    cmpr = dict(zlib=True, complevel=4, shuffle=True)

    decimal_places = {
        "u": 4,
        "v": 4,
        "w": 9,
        "salt": 4,
        "theta": 4,
        "DFrI_TH": 4,
        "oceQsw": 3,
        "oceQnet": 3,
    }

    if isinstance(patterns, str):
        patterns = [patterns]

    datasets = []
    for pattern in patterns:
        toopen = [f"{dirname}/{pattern}{suffix}.nc" for suffix in ["", "_hb", "_sf"]]
        # idx = ds.indexes["longitude"].get_loc(-110, method="nearest")

        datasets.append(
            xr.merge([xr.open_dataset(file, chunks=-1) for file in toopen]).sel(
                depth=slice(-305)
            )
            # .isel(longitude=[idx - 1, idx, idx + 1])
        )

    if len(datasets) > 1:
        with xr.set_options(keep_attrs=True):
            dset = xr.concat(datasets, "time").mean("time")
            dset["time"] = (
                ("time"),
                datasets[0].time.data + pd.Timedelta("12h"),
                datasets[0].time.attrs,
            )
    else:
        dset = datasets[0]

    dset = dset.cf.guess_coord_axis()
    dset = dset.drop_vars("TFLUX")
    for var in dset.variables:
        dset[var].encoding = datasets[0][var].encoding
        del dset[var].encoding["contiguous"]
        dset[var].encoding.update(cmpr)

        if var in variable_attrs:
            dset[var].attrs.update(variable_attrs[var])
        # if var in decimal_places:
        #    dset[var].encoding["least_significant_digit"] = decimal_places[var]

    del dset["time"].attrs["long_name"]
    dset["time"].attrs["description"] = "Time in Universal Coordinated Time (UTC)"
    
    dset.attrs = dataset_attrs
    dset.attrs["title"] = "Daily averaged fields from tropical Pacific cold tongue regional model simulation"
    
    time = str(datasets[0].time.data.squeeze())[:10]
    with dask.config.set(scheduler="single-threaded"):
        dset.sel(latitude=slice(-8, 8), longitude=slice(-160, -100)).load().to_netcdf(f"{outdir}/{time}.nc")

    return [ds.time.data for ds in datasets]


# dset = process(patterns[0])
# dset.cf.describe()

These are 4 hourly snapshots (:facepalm:)

Construct daily averages by
- Average Day 1 00:00 to Day 2, 00:00
- then    Day 2 00:00 to Day 3, 00:00
etc.

In [None]:
time = str(xr.open_mfdataset(f"{dirname}/*Day_0001*.nc").time.data.squeeze())[:10]
f"{time}"

'1995-09-01'

In [None]:
reshaped = np.array(patterns[:1458]).reshape(1458//6, 6)
lastcol = np.append(reshaped[1:, 0], patterns[1458])
reshaped = np.hstack((reshaped, lastcol[:, np.newaxis]))
reshaped

array([['Day_0001', 'Day_0002', 'Day_0003', ..., 'Day_0005', 'Day_0006',
        'Day_0007'],
       ['Day_0007', 'Day_0008', 'Day_0009', ..., 'Day_0011', 'Day_0012',
        'Day_0013'],
       ['Day_0013', 'Day_0014', 'Day_0015', ..., 'Day_0017', 'Day_0018',
        'Day_0019'],
       ...,
       ['Day_1441', 'Day_1442', 'Day_1443', ..., 'Day_1445', 'Day_1446',
        'Day_1447'],
       ['Day_1447', 'Day_1448', 'Day_1449', ..., 'Day_1451', 'Day_1452',
        'Day_1453'],
       ['Day_1453', 'Day_1454', 'Day_1455', ..., 'Day_1457', 'Day_1458',
        'Day_1459']], dtype='<U8')

In [None]:
tasks = [dask.delayed(process)(row) for row in reshaped]

In [None]:
results = dask.compute(*tasks)

tornado.application - ERROR - Uncaught exception GET /individual-cpu/ws (::1)
HTTPServerRequest(protocol='http', host='localhost:9999', method='GET', uri='/individual-cpu/ws', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/glade/u/home/dcherian/miniconda3/envs/dcpy/lib/python3.8/site-packages/tornado/websocket.py", line 956, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  File "/glade/u/home/dcherian/miniconda3/envs/dcpy/lib/python3.8/site-packages/tornado/web.py", line 3178, in wrapper
    return method(self, *args, **kwargs)
  File "/glade/u/home/dcherian/miniconda3/envs/dcpy/lib/python3.8/site-packages/bokeh/server/views/ws.py", line 137, in open
    raise ProtocolError("Token is expired.")
bokeh.protocol.exceptions.ProtocolError: Token is expired.
tornado.application - ERROR - Uncaught exception GET /individual-memory-by-key/ws (::1)
HTTPServerRequest(protocol='http', host='localhost:9999', method='

In [None]:
metrics = pump.model.read_metrics(
    "/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/"
)
metrics["longitude"] = metrics.longitude - 169.025
cmpr = dict(zlib=True, complevel=4, shuffle=True)

for var in metrics.variables:
    metrics[var].encoding.update(cmpr)
(
    metrics
    .sel(depth=slice(-305), latitude=slice(-8, 8), longitude=slice(-160, -100))
    .isel(depth_left=slice(270))
    .to_netcdf(f"{outdir}/metrics.nc")
)

## TIW hourly

In [None]:
dirname = "/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/"
outdir = "/glade/campaign/cgd/oce/people/dcherian/Cherian-et-al-2021-TIW/tiw_hourly/"

In [None]:
files = sorted(glob.glob(f"{dirname}/*deepak*.nc*"))
files[:10]

['/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0001_deepak_KPP2D.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0001_deepak_KPP3D.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0001_deepak_buoy.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0001_deepak_diss.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0001_deepak_etan.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0002_deepak_KPP2D.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0002_deepak_KPP3D.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0002_deepak_buoy.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0002_deepak_diss.nc',
 '/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/tiw/File_0002_deepak_etan.nc']

In [None]:
patterns = np.unique([ff.split("/")[-1][:9] for ff in files[::5]])
patterns[:10]

array(['File_0001', 'File_0002', 'File_0003', 'File_0004', 'File_0005',
       'File_0006', 'File_0007', 'File_0008', 'File_0009', 'File_0010'],
      dtype='<U9')

In [None]:
def process(pattern):
    toopen = [
        f"{dirname}/{pattern}_deepak_{suffix}.nc"
        for suffix in ["KPP2D", "KPP3D", "buoy", "diss", "etan"]
    ]
    
    dset = (
        xr.merge([xr.open_dataset(file) for file in toopen])
        .sel(depth=slice(-305))
    )
    idx = dset.indexes["longitude"].get_loc(-110, method="nearest")
    dset = dset.isel(longitude=[idx - 1, idx, idx + 1])
    
    cmpr = dict(zlib=True, complevel=4, shuffle=True)

    for var in dset.variables:
        dset[var].encoding.update(cmpr)
        if var in variable_attrs:
            dset[var].attrs.update(variable_attrs[var])
        del dset[var].encoding["contiguous"]
        
    del dset["time"].attrs["long_name"]
        
    dset.attrs = dataset_attrs
    dset.attrs["title"] = "Hourly averaged fields from tropical Pacific cold tongue regional model simulation"

    dset.load().to_netcdf(f"{outdir}/{pattern}.nc")

    return pattern


# process(patterns[0])

In [None]:
tasks = [dask.delayed(process)(pattern) for pattern in patterns]

In [None]:
computed = dask.compute(*tasks, scheduler=client)

In [None]:
metrics = pump.model.read_metrics(
    "/glade/campaign/cgd/oce/people/dcherian/TPOS_MITgcm_1_hb/"
)
metrics["longitude"] = metrics.longitude - 169.025
idx = metrics.indexes["longitude"].get_loc(-110, method="nearest")
(
    metrics.isel(longitude=[idx - 1, idx, idx + 1])
    .sel(depth=slice(-305), latitude=slice(-10, 10))
    .isel(depth_left=slice(270))
    .to_netcdf(f"{outdir}/metrics.nc")
)

In [None]:
tiw = xr.open_mfdataset(f"{outdir}/*.nc", parallel=True)
tiw

In [None]:
tiw.get("T")

In [None]:
cluster.close(); client.close();

## Sections 4 hourly output 

In [None]:
sections = xr.open_zarr(
    "/glade/work/dcherian/pump/zarrs/gcm1-sections-rechunked.zarr", consolidated=True
)
#sections["dens"] = pump.mdjwf.dens(sections.salt, sections.theta, np.array([0.0]))
#sections = pump.calc.calc_reduced_shear(sections)
#sections["mld"] = pump.calc.get_mld(sections.dens)
sections

Unnamed: 0,Array,Chunk
Bytes,15.62 GB,57.60 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 240, 1)"
Count,321 Tasks,320 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 15.62 GB 57.60 MB Shape (2947, 345, 480, 4) (300, 100, 240, 1) Count 321 Tasks 320 Chunks Type float64 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,15.62 GB,57.60 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 240, 1)"
Count,321 Tasks,320 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.81 GB 16.56 MB Shape (2947, 345, 480, 4) (300, 100, 138, 1) Count 641 Tasks 640 Chunks Type float32 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.81 GB 16.56 MB Shape (2947, 345, 480, 4) (300, 100, 138, 1) Count 641 Tasks 640 Chunks Type float32 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.81 GB 16.56 MB Shape (2947, 345, 480, 4) (300, 100, 138, 1) Count 641 Tasks 640 Chunks Type float32 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.81 GB 16.56 MB Shape (2947, 345, 480, 4) (300, 100, 138, 1) Count 641 Tasks 640 Chunks Type float32 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.81 GB 16.56 MB Shape (2947, 345, 480, 4) (300, 100, 138, 1) Count 641 Tasks 640 Chunks Type float32 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.81 GB 16.56 MB Shape (2947, 345, 480, 4) (300, 100, 138, 1) Count 641 Tasks 640 Chunks Type float32 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 7.81 GB 16.56 MB Shape (2947, 345, 480, 4) (300, 100, 138, 1) Count 641 Tasks 640 Chunks Type float32 numpy.ndarray",2947  1  4  480  345,

Unnamed: 0,Array,Chunk
Bytes,7.81 GB,16.56 MB
Shape,"(2947, 345, 480, 4)","(300, 100, 138, 1)"
Count,641 Tasks,640 Chunks
Type,float32,numpy.ndarray


In [None]:
subset = sections.drop_vars("dens").sel(latitude=slice(-10, 10), depth=slice(-300), time=slice("1996-05-01"))
subset = subset.cf.guess_coord_axis()
subset.attrs.update(dataset_attrs)
subset.attrs["title"] = "4-hourly snapshots of fields from regional simulation"
del subset.time.attrs["long_name"]
for var in subset:
    if var in variable_attrs:
        subset[var].attrs.update(variable_attrs[var])

In [None]:
subset.coords.to_dataset().to_netcdf(f"{rootdir}/sections.nc")

for var in subset:
    subset[var].encoding.update(dict(zlib=True, shuffle=True, complevel=4))
    subset[[var]].compute().to_netcdf(f"{rootdir}/sections.nc", mode="a")