# The demonstration of the tropical precipitation module on the different datasets

###  Loading the necessary packages

In [1]:
from dask_jobqueue import SLURMCluster # pip 
from dask.distributed import Client, progress 
import dask

import re
import matplotlib as mpl
# Define Agg as Backend for matplotlib when no X server is running
mpl.use('Agg')
import socket
import os
import importlib

import inspect
import timeit
import sys

### Function, which reads the status of the user in the slurm queue 

In [2]:
def squeue_user(username = "$USER"):
    _squeue_user = os.system("squeue --user="+str(username))
    return _squeue_user 

### Storing the path to the current repository into the variable

In [3]:
with os.popen("pwd ") as f:
    _pwd = f.readline()

pwd = re.split(r'[\n]', _pwd)[0]

### Setting the slurm job

In [20]:

extra_args=[
    "--error="+str(pwd)+"/slurm/logs/dask-worker-%j.err",
    "--output="+str(pwd)+"/slurm/output/dask-worker-%j.out"
]

cluster = SLURMCluster(
    name='dask-cluster', 
    cores=256,    
    memory="500 GB", 
    project="bb1153",
    queue= "compute", 
    walltime='04:30:50',
    job_extra=extra_args,
)
client = Client(cluster)
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -A bb1153
#SBATCH -n 1
#SBATCH --cpus-per-task=256
#SBATCH --mem=466G
#SBATCH -t 04:30:50
#SBATCH --error=/work/bb1153/b382267/AQUA/diagnostics/tropical_rainfall/notebooks/time_memory_estimation/slurm/logs/dask-worker-%j.err
#SBATCH --output=/work/bb1153/b382267/AQUA/diagnostics/tropical_rainfall/notebooks/time_memory_estimation/slurm/output/dask-worker-%j.out

/home/b/b382267/mambaforge/envs/tropical-rainfall/bin/python -m distributed.cli.dask_worker tcp://136.172.124.1:39181 --nthreads 16 --nworkers 16 --memory-limit 29.10GiB --name dummy-name --nanny --death-timeout 60



Perhaps you already have a cluster running?
Hosting the HTTP server on port 35665 instead


### Loading the slurm job to the queue 

In [21]:
cluster.scale(jobs=1)

### Checking our slurm job in the queue 

In [22]:
squeue_user()

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           4600495   compute dask-wor  b382267 PD       0:00      1 (None)


0

### Canceling the job if it is necessary. Set the job ID manually

In [8]:
Job_ID =   4523604
os.system("scancel " +str(Job_ID)) 

0

### Setting the path to the diagnostic repository 

In [7]:
sys.path.append(str(pwd)+'/../../')
import src.shared_func
import src.tr_pr_mod
import src.memory_estimator

### Loading the extra function, which can be used in any diagnostic

In [8]:
while True:
    try:
        importlib.reload(src.shared_func)
        from  src.shared_func import time_interpreter,  animation_creator, image_creator,  xarray_attribute_update, data_size
        break
    except NameError and AttributeError:
        import src.shared_func
        from  src.shared_func import time_interpreter,   animation_creator, image_creator,  xarray_attribute_update, data_size
        break

### Loading the tropical precipitation module

In [9]:
while True:
    try:
        importlib.reload(src.tr_pr_mod)
        from  src.tr_pr_mod import TR_PR_Diagnostic
        break
    except NameError and AttributeError:
        import src.tr_pr_mod
        from  src.tr_pr_mod import TR_PR_Diagnostic
        break

## Loading the memory estimator

In [26]:
while True:
    try:
        importlib.reload(src.memory_estimator)
        from  src.memory_estimator import  expected_total_memory_usage, adaptive_data_load, mem_units_converter
        break
    except NameError and AttributeError:
        import src.memory_estimator
        from  src.memory_estimator import  expected_total_memory_usage, adaptive_data_load, mem_units_converter
        break

In [14]:
configdir = '../../../../config/'
diagname  = 'tr_pr'
machine   = 'levante'

### Setting the class attributes

In [15]:
diag = TR_PR_Diagnostic()

diag.num_of_bins = 15
diag.first_edge = 0
diag.width_of_bin = 1*10**(-4)/diag.num_of_bins

last_edge = diag.first_edge  + diag.num_of_bins*diag.width_of_bin

### Importing the aqua module 

In [16]:
import aqua

In [17]:
from aqua import Reader
from aqua.reader import catalogue

In [18]:
catalogue(configdir=configdir)

IFS	tco3999-ng5	2.8km experiment, coupled with FESOM
	- ICMGG_atm2d	
	- ICMU_atm2d	
	- ICMU_atm3d	
	- interpolated_global	
	- interpolated_np	
	- interpolated_sp	
	- interpolated_sp_ci	
	- lra-r100-day	
	- lra-r100-mon	
IFS	tco2559-ng5	4km experiment, coupled with FESOM
	- ICMGG_atm2d	
	- ICMU_atm2d	
	- ICMU_atm3d	
	- interpolated_global	
	- interpolated_np	
	- interpolated_sp	
	- lra-r100-day	
	- lra-r100-mon	
IFS	tco1279-orca025	9km baseline, coupled to NEMO, deep conv ON
	- ICMGG_atm2d	
	- ICMU_atm2d	
	- ICMU_atm3d	
	- lra-r100-day	
	- lra-r100-mon	
IFS	test-tco2559	4km experiment, coupled with FESOM
	- ICMGG_atm2d	2d output
	- ICMU_atm3d	3d output

FESOM	tco3999-ng5	2.5km experiment, coupuled with IFS
	- elem_grid	
	- node_grid	
	- np	nearest-neighbor interpolation to lat-lon grid
	- interpolated_global2d	
	- interpolated_global_TS	
	- interpolated_global_UV	
	- interpolated_np	
	- interpolated_sp	
	- original_2d	original 2d output
	- original_3d	original 3d output
FESOM	tco2559-ng

levante:
  args:
    path: ../../../../config//levante/catalog.yaml
  description: ''
  driver: intake.catalog.local.YAMLFileCatalog
  metadata: {}


## Memory check

In [23]:
max_time_step = 2

### See description in README.md file

In [None]:

VmRSS_1, Vm_1_units = src.memory_estimator.read_VmRSS_av()

"""     """     """     """     """     """
reader = Reader(model="ICON", exp="ngc2009",  configdir=configdir, source="atm_2d_ml_R02B09", regrid="r200")
ICON_2009 = reader.retrieve()
ICON_2009_chunk = ICON_2009['tprate'][0:max_time_step,:]
ICON_2009_chunk = ICON_2009_chunk.compute()

ICON_2009_chunk= xarray_attribute_update(ICON_2009_chunk, ICON_2009)
# Fastest histogram
hist_fast_ICON  = diag.hist1d_fast(ICON_2009_chunk,  preprocess = False)
"""     """     """     """     """     """

mem_max, mem_units = expected_total_memory_usage(ds_part = ICON_2009_chunk,  ds_full = ICON_2009['tprate'], VmRSS_1 = VmRSS_1 )
mem_units_converter(old_mem_unit =  mem_units, old_value = mem_max, desirable_mem_unit = 'GB')

## Adaptive loading of data

In [None]:
VmRSS_1, Vm_1_units = src.memory_estimator.read_VmRSS_av()
adaptive_data_load(ds_part = ICON_2009_chunk,  ds_full =  ICON_2009['tprate'], VmRSS_1 = VmRSS_1, Mem_Perc_Max = 0.5)