In [1]:
import numpy as np
import pandas as pd
import xarray as xr
import dask
import os
import glob
import sys

In [16]:
client.close()
cluster.close()

In [2]:
import dask.distributed as dd
import dask
import dask.array as da
# Create a Dask cluster
print("Starting parallel computing...")
cluster = dd.LocalCluster(n_workers=8, dashboard_address=':22722')

# Connect to the cluster
client = dd.Client(cluster)
client

Starting parallel computing...


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:22722/status,

0,1
Dashboard: http://127.0.0.1:22722/status,Workers: 8
Total threads: 96,Total memory: 187.55 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35861,Workers: 8
Dashboard: http://127.0.0.1:22722/status,Total threads: 96
Started: Just now,Total memory: 187.55 GiB

0,1
Comm: tcp://127.0.0.1:46151,Total threads: 12
Dashboard: http://127.0.0.1:35523/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:34407,
Local directory: /tmp/dask-worker-space/worker-rfsmo_fb,Local directory: /tmp/dask-worker-space/worker-rfsmo_fb

0,1
Comm: tcp://127.0.0.1:42033,Total threads: 12
Dashboard: http://127.0.0.1:33799/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:40273,
Local directory: /tmp/dask-worker-space/worker-59of1crn,Local directory: /tmp/dask-worker-space/worker-59of1crn

0,1
Comm: tcp://127.0.0.1:39771,Total threads: 12
Dashboard: http://127.0.0.1:45995/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:42221,
Local directory: /tmp/dask-worker-space/worker-hv3v1tjd,Local directory: /tmp/dask-worker-space/worker-hv3v1tjd

0,1
Comm: tcp://127.0.0.1:39155,Total threads: 12
Dashboard: http://127.0.0.1:40261/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:41739,
Local directory: /tmp/dask-worker-space/worker-61d8jlc3,Local directory: /tmp/dask-worker-space/worker-61d8jlc3

0,1
Comm: tcp://127.0.0.1:42303,Total threads: 12
Dashboard: http://127.0.0.1:42321/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:41915,
Local directory: /tmp/dask-worker-space/worker-chv4g2k2,Local directory: /tmp/dask-worker-space/worker-chv4g2k2

0,1
Comm: tcp://127.0.0.1:46435,Total threads: 12
Dashboard: http://127.0.0.1:44919/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:43159,
Local directory: /tmp/dask-worker-space/worker-a7dff392,Local directory: /tmp/dask-worker-space/worker-a7dff392

0,1
Comm: tcp://127.0.0.1:37525,Total threads: 12
Dashboard: http://127.0.0.1:35447/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:39329,
Local directory: /tmp/dask-worker-space/worker-nh6i_kba,Local directory: /tmp/dask-worker-space/worker-nh6i_kba

0,1
Comm: tcp://127.0.0.1:44951,Total threads: 12
Dashboard: http://127.0.0.1:33961/status,Memory: 23.44 GiB
Nanny: tcp://127.0.0.1:39103,
Local directory: /tmp/dask-worker-space/worker-1w1k28_l,Local directory: /tmp/dask-worker-space/worker-1w1k28_l


In [3]:
# SFC
source_dir = 'CERRA_complete'
var_names = ['t2m','si10','skt']
for var in var_names:
    files = sorted(glob.glob(f'{source_dir}/{var}/*.nc'))
    ds = xr.open_mfdataset(files,concat_dim='valid_time',parallel=True,combine='nested')
    ds.to_netcdf(f'{source_dir}/{var}.nc')

In [12]:
# PRES
source_dir = 'CERRA_complete'
var = 'PRES'
files = sorted(glob.glob(f'{source_dir}/{var}/*.nc'))
ds = xr.open_mfdataset(files,concat_dim='valid_time',parallel=True,combine='nested')
ds.to_netcdf(f'{source_dir}/{var}.nc')

### Combining individual variables

In [13]:
def compute_wind_speed(ds1,ds2, par_name):
    ds = (ds1**2+ds2**2)**0.5
    ds = ds.rename(par_name)
    return ds
def compute_alpha(dataset, par_name1,par_name2, par_name):
    ds = np.log(dataset[par_name2]/dataset[par_name1])/np.log(100/10)
    ds = ds.rename(par_name)
    return ds
def compute_gradient(dataset, par_name1,par_name2, par_name):
    ds = dataset[par_name2]-dataset[par_name1]
    ds = ds.rename(par_name)
    return ds

In [14]:
combined_dataset = xr.Dataset()

var = 'si10'
ds = xr.open_dataset(f'{source_dir}/{var}.nc')
ds = ds[var].drop('heightAboveGround')
ds = ds.rename('10ws')
combined_dataset = xr.merge([combined_dataset, ds])

# 100m wind speed from CERRA height level
files = sorted(glob.glob(f'CERRA_height_level/*.nc'))
ds = xr.open_mfdataset(files,concat_dim='time',parallel=True,combine='nested')
# Rename the coordinate from 'obs' to 'location'
ds = ds.rename({'obs': 'location'})
ds = ds.rename({'lat': 'latitude'})
ds = ds.rename({'lon': 'longitude'})
# Extract data at every 3rd hour
ds = ds.sel(time=ds.time.dt.hour % 3 == 0)
# Rename the variable to 'ws100'
ds = ds.rename({'data': '100ws'})
# selecting 100m level
ds = ds.sel(heightAboveGround=100).drop('heightAboveGround')
# Rename the coordinate from 'time' to 'valid_time'
ds = ds.rename({'time': 'valid_time'})
combined_dataset = xr.merge([combined_dataset, ds],compat='override')

# 975 ws
ds1 = xr.open_dataset(f'{source_dir}/PRES.nc')['u'].sel(isobaricInhPa=975).drop('isobaricInhPa')
ds2 = xr.open_dataset(f'{source_dir}/PRES.nc')['v'].sel(isobaricInhPa=975).drop('isobaricInhPa')
ds = compute_wind_speed(ds1,ds2, '975ws')
combined_dataset = xr.merge([combined_dataset, ds])

# 950 ws
ds1 = xr.open_dataset(f'{source_dir}/PRES.nc')['u'].sel(isobaricInhPa=950).drop('isobaricInhPa')
ds2 = xr.open_dataset(f'{source_dir}/PRES.nc')['v'].sel(isobaricInhPa=950).drop('isobaricInhPa')
ds = compute_wind_speed(ds1,ds2, '950ws')
combined_dataset = xr.merge([combined_dataset, ds])

var = 't2m'
ds = xr.open_dataset(f'{source_dir}/{var}.nc')
ds = ds[var].drop('heightAboveGround')
combined_dataset = xr.merge([combined_dataset, ds])

var = 'skt'
ds = xr.open_dataset(f'{source_dir}/{var}.nc')
ds = ds[var].drop('surface')
combined_dataset = xr.merge([combined_dataset, ds])

# 975 t
ds1 = xr.open_dataset(f'{source_dir}/PRES.nc')['t'].sel(isobaricInhPa=975).drop('isobaricInhPa')
ds = compute_wind_speed(ds1,ds2, 't_975')
combined_dataset = xr.merge([combined_dataset, ds])

# 950 t
ds1 = xr.open_dataset(f'{source_dir}/PRES.nc')['t'].sel(isobaricInhPa=950).drop('isobaricInhPa')
ds = compute_wind_speed(ds1,ds2, 't_950')
combined_dataset = xr.merge([combined_dataset, ds])

# === At present, the SURF variables are having only data during '2000-01-01T00','2019-12-31T21. So, we trim accordingly
combined_dataset = combined_dataset.sel(valid_time=slice('2000-01-01T00','2019-12-31T21'))

# === derived parameters === #
# --- 100 alpha ---#
ds = compute_alpha(combined_dataset,'10ws','100ws','100alpha')
combined_dataset = xr.merge([combined_dataset, ds])

# --- 975 wind gradient ---#
ds = compute_gradient(combined_dataset,'100ws','975ws','975wsgrad')
combined_dataset = xr.merge([combined_dataset, ds])

# --- 950 wind gradient ---#
ds = compute_gradient(combined_dataset,'975ws','950ws','950wsgrad')
combined_dataset = xr.merge([combined_dataset, ds]) 

# --- 2m temperature gradient ---#
ds = compute_gradient(combined_dataset,'skt','t2m','2mtempgrad')
combined_dataset = xr.merge([combined_dataset, ds]) 

# --- 950 temperature gradient ---#
ds = compute_gradient(combined_dataset,'t_975','t_950','950tempgrad')
combined_dataset = xr.merge([combined_dataset, ds]) 

# --- 975 temperature gradient ---#
ds = compute_gradient(combined_dataset,'t2m','t_975','975tempgrad')
combined_dataset = xr.merge([combined_dataset, ds]) 

combined_dataset = combined_dataset.drop(['time','step']).rename({'valid_time': 'time'})
combined_dataset


Unnamed: 0,Array,Chunk
Bytes,4.90 MiB,501.88 kiB
Shape,"(11, 58440)","(11, 5840)"
Dask graph,19 chunks in 50 graph layers,19 chunks in 50 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.90 MiB 501.88 kiB Shape (11, 58440) (11, 5840) Dask graph 19 chunks in 50 graph layers Data type float64 numpy.ndarray",58440  11,

Unnamed: 0,Array,Chunk
Bytes,4.90 MiB,501.88 kiB
Shape,"(11, 58440)","(11, 5840)"
Dask graph,19 chunks in 50 graph layers,19 chunks in 50 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.90 MiB,501.88 kiB
Shape,"(11, 58440)","(11, 5840)"
Dask graph,19 chunks in 55 graph layers,19 chunks in 55 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.90 MiB 501.88 kiB Shape (11, 58440) (11, 5840) Dask graph 19 chunks in 55 graph layers Data type float64 numpy.ndarray",58440  11,

Unnamed: 0,Array,Chunk
Bytes,4.90 MiB,501.88 kiB
Shape,"(11, 58440)","(11, 5840)"
Dask graph,19 chunks in 55 graph layers,19 chunks in 55 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.90 MiB,501.88 kiB
Shape,"(58440, 11)","(5840, 11)"
Dask graph,19 chunks in 54 graph layers,19 chunks in 54 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.90 MiB 501.88 kiB Shape (58440, 11) (5840, 11) Dask graph 19 chunks in 54 graph layers Data type float64 numpy.ndarray",11  58440,

Unnamed: 0,Array,Chunk
Bytes,4.90 MiB,501.88 kiB
Shape,"(58440, 11)","(5840, 11)"
Dask graph,19 chunks in 54 graph layers,19 chunks in 54 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [15]:
# === save file ===#
file_path = f'CERRA.nc'
if os.path.exists(file_path):
    os.remove(file_path)
combined_dataset.to_netcdf(file_path)