# compress-3d
This notebook will open the 3D model dataset and compress it into a .zarr object.

In [1]:
from itertools import product
from pathlib import Path
import numpy as np
import xarray as xr
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import numcodecs
from xmitgcm import open_mdsdataset
import f90nml

In [2]:
base_path = Path("/work/n01/n01/fwg/irminger-proj")
run_folder = base_path / "data/raw/3d-models-200m"

In [None]:
prefix = ["a", "b", "c", "d", "e", "f"]
suffix = ["standard", "control"]


def open_partial_run(suff, pref):
    run_name = pref + suff
    run_path = run_folder / run_name
    print(run_path)
    data_nml = f90nml.read(run_path / "data")
    
    delta_t = data_nml["parm03"]["deltat"]

    ds = open_mdsdataset(
        run_path,
        prefix=['ZLevelVars', 'IntLevelVars'],
        delta_t=delta_t,
        geometry='cartesian',
    )
    return ds

partial_run_dict = {suff: [] for suff in suffix}
for suff, pref in product(suffix, prefix):
    ds = open_partial_run(suff, pref)
    partial_run_dict[suff].append(ds)

    
full_run_dict = {suff: None for suff in suffix}
for suff in suffix:
    full_run_dict[suff] = xr.concat(partial_run_dict[suff], dim="time")

In [None]:
def create_encoding_for_ds(ds, clevel=5):
    compressor = numcodecs.Blosc(cname="zstd", clevel=clevel, shuffle=-1)
    enc = {x: {"compressor": compressor} for x in ds}
    return enc

In [None]:
try:
    scluster.close()
    client.close()
except:
    pass

log_path = base_path / 'src/post_processing/.tmp/slurm-out'
dask_worker_path = base_path / 'src/post_processing/.tmp/dask-worker-space'
env_path = base_path / 'irminger-proj/bin/activate'

scluster = SLURMCluster(queue='standard',
                        account="n01-siAMOC",
                        job_cpu=256,
                        log_directory=log_path,
                        local_directory=dask_worker_path,
                        cores=24,
                        processes=24,  # Can change this
                        memory="256 GiB",
                        job_directives_skip= ['#SBATCH --mem='],  
                        walltime="00:25:00",
                        death_timeout=60,
                        interface='hsn0',
                        job_extra_directives=["--qos=standard", "--partition=standard"],
                        job_script_prologue=['module load cray-python',
                                'source {}'.format(str(env_path.absolute()))]
                    )


client = Client(scluster)

scluster.adapt(minimum_jobs=1, maximum_jobs=16,
               interval="1000 ms", wait_count=30)

In [None]:
client

In [None]:
for suff in suffix:
    compressed_path = base_path / f"data/interim/{suff}.zarr"
    print(compressed_path)
    
    enc = create_encoding_for_ds(full_run_dict[suff], 8)
    
    full_run_dict[suff].to_zarr(
        compressed_path,
        encoding=enc
    )

In [None]:
scluster.close()
client.close()

## Compress run32

In [5]:
base_path = Path("/work/n01/n01/fwg/irminger-proj")
run_folder = base_path / "data/raw/2d-models"

In [7]:
suffix = ["a", "b", "c", "d", "e", "f"]
prefix = "run32_"


def open_partial_run(suff, pref):
    run_name = pref + suff
    run_path = run_folder / run_name
    print(run_path)
    data_nml = f90nml.read(run_path / "../input_data_files_a/data")
    
    delta_t = data_nml["parm03"]["deltat"]

    ds = open_mdsdataset(
        run_path,
        prefix=['ZLevelVars', 'IntLevelVars'],
        delta_t=delta_t,
        geometry='cartesian',
    )
    return ds

partial_run_list = list()
for suff in suffix:
    ds = open_partial_run(suff, prefix)
    partial_run_list.append(ds)

full_run_ds = xr.concat(partial_run_list, dim="time")

/work/n01/n01/fwg/irminger-proj/data/raw/2d-models/run32_a
/work/n01/n01/fwg/irminger-proj/data/raw/2d-models/run32_b
/work/n01/n01/fwg/irminger-proj/data/raw/2d-models/run32_c
/work/n01/n01/fwg/irminger-proj/data/raw/2d-models/run32_d
/work/n01/n01/fwg/irminger-proj/data/raw/2d-models/run32_e
/work/n01/n01/fwg/irminger-proj/data/raw/2d-models/run32_f


In [8]:
def create_encoding_for_ds(ds, clevel=5):
    compressor = numcodecs.Blosc(cname="zstd", clevel=clevel, shuffle=-1)
    enc = {x: {"compressor": compressor} for x in ds}
    return enc

In [9]:
try:
    scluster.close()
    client.close()
except:
    pass

log_path = base_path / 'src/post_processing/.tmp/slurm-out'
dask_worker_path = base_path / 'src/post_processing/.tmp/dask-worker-space'
env_path = base_path / 'irminger-proj/bin/activate'

scluster = SLURMCluster(queue='standard',
                        account="n01-siAMOC",
                        job_cpu=256,
                        log_directory=log_path,
                        local_directory=dask_worker_path,
                        cores=24,
                        processes=24,  # Can change this
                        memory="256 GiB",
                        job_directives_skip= ['#SBATCH --mem='],  
                        walltime="00:25:00",
                        death_timeout=60,
                        interface='hsn0',
                        job_extra_directives=["--qos=standard", "--partition=standard"],
                        job_script_prologue=['module load cray-python',
                                'source {}'.format(str(env_path.absolute()))]
                    )


client = Client(scluster)

scluster.adapt(
    minimum_jobs=1,
    maximum_jobs=16,
    interval="1000 ms",
    wait_count=30,
)

<distributed.deploy.adaptive.Adaptive at 0x7f6f5fa42490>

In [10]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.253.23.31:8787/status,

0,1
Dashboard: http://10.253.23.31:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.253.23.31:46471,Workers: 0
Dashboard: http://10.253.23.31:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [11]:
compressed_path = base_path / f"data/interim/run32.zarr"
print(compressed_path)
    
enc = create_encoding_for_ds(full_run_ds, 8)
    
full_run_ds.to_zarr(
    compressed_path,
    encoding=enc
)

/work/n01/n01/fwg/irminger-proj/data/interim/run32.zarr


<xarray.backends.zarr.ZarrStore at 0x7f70a9646900>

In [12]:
scluster.close()
client.close()