In [1]:
import sys
print("Python version")
print (sys.version)
print("Version info.")
print (sys.version_info)


Python version
3.10.1 | packaged by conda-forge | (main, Dec 22 2021, 01:39:05) [GCC 9.4.0]
Version info.
sys.version_info(major=3, minor=10, micro=1, releaselevel='final', serial=0)


In [2]:
import numpy as np
import cfgrib
import xarray as xr
import matplotlib.pyplot as plt
import cProfile
import pstats
import io
import time
from pstats import SortKey
import dask

# Computing the Bulk Richardson Number

> The Bulk Richardson Number (BRN) is an approximation of the Gradient Richardson number.[1] The BRN is a dimensionless ratio in meteorology related to the consumption of turbulence divided by the shear production (the generation of turbulence kinetic energy caused by wind shear) of turbulence. It is used to show dynamic stability and the formation of turbulence.

https://en.wikipedia.org/wiki/Bulk_Richardson_number

$R_{B} = \frac{g / T_{v} \Delta \theta _ {v} \Delta z}{(\Delta U)^{2} + (\Delta V)^2} $

In [3]:
pc_g = 9.80665

def fthetav(p, t, qv):
    pc_r_d = 287.05
    pc_r_v = 461.51  # Gas constant for water vapour[J kg-1 K-1]
    pc_cp_d = 1005.0
    pc_rvd = pc_r_v / pc_r_d

    pc_rdocp = pc_r_d/pc_cp_d
    pc_rvd_o = pc_rvd - 1.0

    # Reference surface pressure for computation of potential temperature
    p0 = 1.0e5
    return (p0 / p) ** pc_rdocp * t * (1.+(pc_rvd_o*qv / (1.-qv)))

def fbrn(p,t,qv,u,v,hhl,hsurf):
    nlevels = len(p.coords["generalVerticalLayer"])

    thetav = fthetav(p,t,qv)
    thetav_sum = thetav.isel(generalVerticalLayer=slice(None, None, -1)).cumsum(dim='generalVerticalLayer') 

    # dask.delayed(thetav_sum.data).visualize(filename='thetasum.svg')

    nlevels_xr =xr.DataArray(data=np.arange(nlevels,0,-1), dims=["generalVerticalLayer"])

    brn = pc_g* (hhl-hsurf)*(thetav - thetav.isel(generalVerticalLayer=79)) / \
        ( (thetav_sum/nlevels_xr)*(u**2 + v**2))
    return brn


# Load the COSMO-1E data from grib using cfgrib

In [9]:
data_dir ='/scratch/cosuna/postproc_np_products/grib_files/cosmo-1e/'
def load_data(data_dir, chunk_size=None):
    
    chunk_arg = {}
    if chunk_size:
        chunk_arg = {"chunks": {"generalVerticalLayer": chunk_size}}

    dss = cfgrib.open_datasets(
        data_dir + "/lfff00000000",
        backend_kwargs={
            "read_keys": ["typeOfLevel", "gridType"],
            "filter_by_keys": {"typeOfLevel": "generalVerticalLayer"},
        },
        encode_cf=("time", "geography", "vertical"),
        **chunk_arg
    )

    massds = dss[0]
    uds = cfgrib.open_dataset(
        data_dir + "/lfff00000000",
        backend_kwargs={
            "read_keys": ["cfVarName"],
            "filter_by_keys": {"cfVarName": "u"},
        },
        encode_cf=("time", "geography", "vertical"),
        **chunk_arg
    )

    vds = cfgrib.open_dataset(
        data_dir + "/lfff00000000",
        backend_kwargs={
            "read_keys": ["cfVarName"],
            "filter_by_keys": {"cfVarName": "v"},
        },
        encode_cf=("time", "geography", "vertical"),
        **chunk_arg
    )

    hsurf_ds = cfgrib.open_dataset(
        data_dir + "/lfff00000000c",
        backend_kwargs={
            "read_keys": ["shortName"],
            "filter_by_keys": {"shortName": "HSURF"},
        },
        encode_cf=("time", "geography", "vertical"),
        **chunk_arg
    )

    if chunk_size:
        chunk_arg = {"chunks": {"generalVertical": chunk_size}}

    cds = cfgrib.open_dataset(
        data_dir + "/lfff00000000c",
        backend_kwargs={
            "read_keys": ["typeOfLevel", "gridType"],
            "filter_by_keys": {"typeOfLevel": "generalVertical"},
        },
        encode_cf=("time", "geography", "vertical"),
        **chunk_arg
    )

    hhl = cds["HHL"].rename({"generalVertical": "generalVerticalLayer"})

    return (
        massds["P"],
        massds["T"],
        massds["QV"],
        hhl,
        hsurf_ds["HSURF"],
        uds["U"],
        vds["V"],
    )

In [None]:
start=time.time()
p,t,qv,hhl,hsurf,u,v = load_data(data_dir)
end=time.time()
print("time:", end-start)

In [None]:
start=time.time()
brn = fbrn(p,t,qv,u,v,hhl,hsurf)
brn.name = "BRN"
end=time.time()
print("Time:", end-start)

In [None]:
brn.data.visualize(filename='thetav.svg')

# Compute and Store

In [None]:
start=time.time()
brn.compute()
end=time.time()
print("Time:", end-start)

In [None]:
start=time.time()
brn.to_netcdf(path="brn_out.nc")
end=time.time()
print("Time:", end-start)

# Fieldextra timing BRN:

| section  | time (s) | 
| --- | --- |
| collect data | 3.654 |
| generate output | 22.292 |

# Numpy timing BRN:

In [5]:
import importlib
import perfdata_brn
importlib.reload(perfdata_brn)
display(perfdata_brn.ssync)

Unnamed: 0,chunk,#cores/node,#nodes,time (s),load (s),compute (s),compute & store (s)
0,0,1,1,18.26,6.9,8.77,11.36
1,10,1,1,19.86,8.43,11.22,11.43


# Allocate a cluster and defined the scheduler

In [6]:
scheduler = "distributed"
cluster = None
if scheduler == "distributed":
    from dask.distributed import Client

    from dask_jobqueue import SLURMCluster

    cluster = SLURMCluster(
        queue="postproc",
        cores=16,
        memory="24GB",
        job_extra=["--exclusive"],
    )
    client = Client(cluster)
elif scheduler == "localcluster":
    from dask.distributed import Client, LocalCluster

    cluster = LocalCluster(n_workers=16, threads_per_worker=2)
    client = Client(cluster)
elif scheduler == "threads":
    from multiprocessing.pool import ThreadPool

    dask.config.set(pool=ThreadPool(1))
    # dask.config.set(scheduler="threads")
elif scheduler == "synchronous":
    dask.config.set(
        scheduler="synchronous"
    )  # overwrite default with single-threaded scheduler
elif scheduler == "processes":
    from multiprocessing.pool import Pool
    dask.config.set(pool=Pool(2))
   

  from distributed.utils import tmpfile


In [7]:
cluster

Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

# Distributed Computing

In [12]:
start=time.time()
p,t,qv,hhl,hsurf,u,v = load_data(data_dir, chunk_size=1)
brn = fbrn(p,t,qv,u,v,hhl,hsurf)
brn.to_netcdf(path="brn_out.nc")
end=time.time()
print("Time:", end-start)

Time: 16.754263877868652


In [14]:
brn = fbrn(p,t,qv,u,v,hhl,hsurf)
brn_ds =xr.Dataset(data_vars={"BRN": brn})
xr.save_mfdataset([dat], paths=[apath], format="NETCDF4")

In [None]:
%%html
<style>
table {float:left}
</style>

# Visualization

# Distributed Dask Timing BRN:

In [None]:
import importlib
import perfdata_brn
importlib.reload(perfdata_brn)
display(perfdata_brn.ssync)
display(perfdata_brn.sthreads)
display(perfdata_brn.slocalc)
display(perfdata_brn.sdist)


# Dask timing THETAV:

In [None]:
import importlib
import perfdata, perfdata_chunk4
importlib.reload(perfdata)
importlib.reload(perfdata_chunk4)

from IPython.display import display_html
from itertools import chain,cycle
def display_side_by_side(*args,titles=cycle([''])):
    html_str=''
    for df,title in zip(args, chain(titles,cycle(['</br>'])) ):
        html_str+='<th style="text-align:center"><td style="vertical-align:top">'
        html_str+=f'<h2>{title}</h2>'
        html_str+=df.to_html().replace('table','table style="display:inline"')
        html_str+='</td></th>'
    display_html(html_str,raw=True)

display_side_by_side(perfdata.ssync_thetav, perfdata_chunk4.ssync_thetav)
display_side_by_side(perfdata.sthreads_thetav, perfdata_chunk4.sthreads_thetav)
display_side_by_side(perfdata.slocalc_thetav, perfdata_chunk4.slocalc_thetav)
display_side_by_side(perfdata.sdist_thetav, perfdata_chunk4.sdist_thetav)