In [1]:
import os

import zarr
import numpy as np

In [2]:
import dask

dask.config.set({"temporary-directory": "/ec/res4/scratch/syma/dask-tmp"})
dask.config.set({"local-directory": "/ec/res4/scratch/syma/dask-tmp"})

<dask.config.set at 0x146c87d193a0>

In [3]:
import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

In [4]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=8, threads_per_worker=4)
client = Client(cluster)
_ = client.run(trim_memory)
# client.scheduler_info()

2023-03-20 14:17:37,323 - distributed.diskutils - INFO - Found stale lock file and directory '/ec/res4/scratch/syma/dask-tmp/dask-worker-space/worker-76qikyfn', purging
2023-03-20 14:17:37,329 - distributed.diskutils - INFO - Found stale lock file and directory '/ec/res4/scratch/syma/dask-tmp/dask-worker-space/worker-6c5v1_zc', purging
2023-03-20 14:17:37,335 - distributed.diskutils - INFO - Found stale lock file and directory '/ec/res4/scratch/syma/dask-tmp/dask-worker-space/worker-8uq9zepc', purging
2023-03-20 14:17:37,339 - distributed.diskutils - INFO - Found stale lock file and directory '/ec/res4/scratch/syma/dask-tmp/dask-worker-space/worker-tuigfadk', purging
2023-03-20 14:17:37,344 - distributed.diskutils - INFO - Found stale lock file and directory '/ec/res4/scratch/syma/dask-tmp/dask-worker-space/worker-vb3e9ugj', purging
2023-03-20 14:17:37,348 - distributed.diskutils - INFO - Found stale lock file and directory '/ec/res4/scratch/syma/dask-tmp/dask-worker-space/worker-ywn0e

In [5]:
from aifs.utils.config import YAMLConfig

config = YAMLConfig("/home/syma/dask/codes/aifs/aifs/config/era_config_atos.yaml")

In [6]:
def get_data_filename(type: str, config: YAMLConfig) -> str:
    # type == [pl | sfc]
    return os.path.join(
            config[f"input:{type}:training:basedir"].format(resolution=config["input:resolution"]),
            config[f"input:{type}:training:filename"].format(resolution=config["input:resolution"]),
    )

In [None]:
fname = get_data_filename("pl", config)
fname

In [None]:
import dask.array

ds_wb = dask.array.from_zarr(fname)
display(ds_wb)

In [None]:
# leave the var and plev dimensions (1, 2) untouched
var_means = ds_wb.mean(axis=(0, -1), keepdims=True).compute()
var_sds = (ds_wb.var(axis=0, keepdims=True).mean(axis=-1, keepdims=True).compute()) ** (1.0 / 2.0)

In [None]:
var_means, var_sds

In [None]:
var_means.shape

In [None]:
var_sds.shape

In [None]:
var_sds[:, -1, ...]

In [None]:
resolution = "o160"
var_means_file = f"/ec/res4/scratch/syma/era5/{resolution}/zarr/statistics/pl_1979_2016_mu.npy"
var_sds_file = f"/ec/res4/scratch/syma/era5/{resolution}/zarr/statistics/pl_1979_2016_sd.npy"

In [None]:
np.save(var_means_file, var_means, allow_pickle=False)
np.save(var_sds_file, var_sds, allow_pickle=False)

In [None]:
!ls -l /ec/res4/scratch/syma/era5/o160/zarr/statistics/

In [7]:
fname = get_data_filename("sfc", config)
fname

'/ec/res4/scratch/syma/era5/o160/zarr/sfc/era5_o160_blh_lsm_msl_z_sfc_training.zarr'

In [9]:
import dask.array

ds_sfc =  dask.array.from_zarr(fname)
display(ds_sfc)

Unnamed: 0,Array,Chunk
Bytes,87.12 GiB,1.65 MiB
Shape,"(54056, 4, 108160)","(1, 4, 108160)"
Count,2 Graph Layers,54056 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 87.12 GiB 1.65 MiB Shape (54056, 4, 108160) (1, 4, 108160) Count 2 Graph Layers 54056 Chunks Type float32 numpy.ndarray",108160  4  54056,

Unnamed: 0,Array,Chunk
Bytes,87.12 GiB,1.65 MiB
Shape,"(54056, 4, 108160)","(1, 4, 108160)"
Count,2 Graph Layers,54056 Chunks
Type,float32,numpy.ndarray


In [10]:
# leave the var dimension (1) untouched
var_sfc_means = ds_sfc.mean(axis=(0, 2), keepdims=True).compute()
var_sfc_sds = (ds_sfc.var(axis=0, keepdims=True).mean(axis=2, keepdims=True).compute()) ** (1.0 / 2.0)

In [11]:
var_sfc_min = ds_sfc.min(axis=(0, 2), keepdims=True).compute()
var_sfc_max = ds_sfc.max(axis=(0, 2), keepdims=True).compute()

In [12]:
var_sfc_max.shape

(1, 4, 1)

In [13]:
var_2d_stats = np.zeros((var_sfc_means.shape[1], 2), dtype=np.float32)
var_2d_stats.shape

(4, 2)

In [14]:
# blh
var_2d_stats[0, 0] = var_sfc_min[:, 0, ...].squeeze()
var_2d_stats[0, 1] = var_sfc_max[:, 0, ...].squeeze()
# lsm
var_2d_stats[1, 0] = 0.0
var_2d_stats[1, 1] = 1.0
# msl
var_2d_stats[2, 0] = var_sfc_means[:, 2, ...].squeeze()
var_2d_stats[2, 1] = var_sfc_sds[:, 2, ...].squeeze()
# z - needs special treatment
z_min, z_max = ds_sfc[0, -1, ...].min().compute(), ds_sfc[0, -1, ...].max().compute()
var_2d_stats[3, 0] = z_min
var_2d_stats[3, 1] = z_max

In [15]:
var_2d_stats

array([[ 7.1217775e+00,  7.2502432e+03],
       [ 0.0000000e+00,  1.0000000e+00],
       [ 1.0115370e+05,  7.4635583e+02],
       [-7.5737769e+02,  5.4746457e+04]], dtype=float32)

In [17]:
resolution = "o160"
var_2d_file = f"/ec/res4/scratch/syma/era5/{resolution}/zarr/statistics/sfc_1979_2016.npy"

np.save(var_2d_file, var_2d_stats, allow_pickle=False)