# Calculate velocity potential

### This notebook is adapted from:

https://github.com/dougrichardson/Richardson_2022_coffee/blob/main/0e_calculate_velocity_potential.ipynb

In [2]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client

In [2]:
# One node on Gadi has 48 cores - try and use up a full core before going to multiple nodes (jobs)

walltime = '00:10:00'
cores = 48
memory = str(cores * 4) + 'GB'

cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=cores,
                     job_extra_directives=['-q normal',
                                           '-P w42',
                                           '-l ncpus='+str(cores),
                                           '-l mem='+str(memory),
                                           '-l storage=gdata/w42+gdata/rt52'],
                     local_directory='$TMPDIR',
                     job_directives_skip=["select"])

In [3]:
cluster.scale(jobs=1)
client = Client(cluster)

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.6.62.32:8787/status,

0,1
Dashboard: http://10.6.62.32:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.6.62.32:40711,Workers: 0
Dashboard: http://10.6.62.32:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [1]:
import xarray as xr
import os
import numpy as np

In [55]:
years = range(2022, 2023)
levels = [150, 850]

# Daily u and v data - use standard dask and xarray tools

- To process hourly data to daily, for two isobaric levels, takes around 4 hours (using 3 full nodes; ~576 GB)

In [4]:
def get_files(file_path, var, years):
    """
    Get list of files
    """
    fp_list = []
    for year in years:
        fp_dir = file_path+var+'/'+str(year)+'/'
        for fp in sorted(os.listdir(fp_dir)):
            fp_list.append(fp_dir+fp)
    return fp_list

In [5]:
load = True

In [6]:
%%time
if load:
    u = xr.open_zarr('/g/data/w42/dr6273/work/data/era5/u/u_era5_daily_'+str(years[0])+'-'+str(years[-1])+'.zarr', consolidated=True)
    v = xr.open_zarr('/g/data/w42/dr6273/work/data/era5/v/v_era5_daily_'+str(years[0])+'-'+str(years[-1])+'.zarr', consolidated=True)
else:
    u_files = get_files('/g/data/rt52/era5/pressure-levels/reanalysis/', 'u', years)
    v_files = get_files('/g/data/rt52/era5/pressure-levels/reanalysis/', 'v', years)
    
    # Using preprocess in open_mfdataset to select desired levels improves performance
    #  versus doing a .sel() afterwards
    def preprocess(ds):
        return ds.sel(level=levels)
    
    u = xr.open_mfdataset(u_files,
                          chunks={'time': 24, 'level': 1},
                          preprocess=preprocess,
                          compat='override',
                          coords='minimal',
                          engine='netcdf4')

    v = xr.open_mfdataset(v_files,
                          chunks={'time': 24, 'level': 1},
                          compat='override',
                          preprocess=preprocess,
                          coords='minimal',
                          engine='netcdf4')
    
    u = u.resample(time='1D').mean()
    v = v.resample(time='1D').mean()
    
    u_encoding = {'u': {'dtype': 'float32'}}
    v_encoding = {'v': {'dtype': 'float32'}}
    
    u.to_zarr('/g/data/w42/dr6273/work/data/era5/u/u_era5_daily_'+str(years[0])+'-'+str(years[-1])+'.zarr',
              mode='w',
              consolidated=True,
              encoding=u_encoding)

    v.to_zarr('/g/data/w42/dr6273/work/data/era5/v/v_era5_daily_'+str(years[0])+'-'+str(years[-1])+'.zarr',
                mode='w',
                consolidated=True,
                encoding=v_encoding)
    
    # Close cluster
    client.close()
    cluster.close()

CPU times: user 615 ms, sys: 443 ms, total: 1.06 s
Wall time: 2.51 s


# Calculate velocity potential using `windspharm`

- Non-lazy, so we do this separately for each year and isobaric level
- Used 10 cores at 40GB

In [7]:
from windspharm.standard import VectorWind
from windspharm.tools import prep_data, recover_data, order_latdim

In [8]:
# Need the system path to xeof package
import sys
sys.path.append('/g/data/w42/dr6273/work/')

### For each level and year

~ Takes around 3 minutes per level and year
~ 2 levels, 42 years takes around 4.5 hours

In [12]:
def write_vpot(u, v, level, year):
    
    lons = u.longitude.values
    lats = u.latitude.values
    year = str(year)

    # Subsample u and v
    u_ = u.u.sel(time=year, level=level)
    v_ = v.v.sel(time=year, level=level)

    # Transpose to ensure time is out front
    u_ = u_.transpose('time', 'latitude', 'longitude')
    v_ = v_.transpose('time', 'latitude', 'longitude')

    # Load values
    uwnd = u_.values
    vwnd = v_.values

    # Ensure data is in correct shape for windspharm
    print('Prepping data for windspharm...')
    uwnd, uwnd_info = prep_data(uwnd, 'tyx') # 'tyx' because data is in format time, lat, lon
    vwnd, vwnd_info = prep_data(vwnd, 'tyx')
    lats, uwnd, vwnd = order_latdim(lats, uwnd, vwnd)

    # Create a VectorWind instance to handle computation of streamfunction and velocity potential
    print('Creating VectorWind instance...')
    w = VectorWind(uwnd, vwnd)

    # Calculate velocity potential
    print('Calculating VPOT...')
    _, vp = w.sfvp()

    # Re-shape to original format
    print('Reshaping...')
    vp = recover_data(vp, uwnd_info)

    # Put into DataArray and format for writing
    print('Putting into dataArray and writing...')
    vp = xr.DataArray(vp,
                     dims=['time', 'latitude', 'longitude'],
                     coords={'time': u_['time'].values,
                             'latitude': u_['latitude'].values,
                             'longitude': u_['longitude'].values})
    vp = vp.assign_attrs({'short_name': 'vpot',
                          'long name': 'velocity potential',
                          'units': 'm^2 / s^-1'})
    
    # Specify lat/lon units and transpose to time first so we can use cdo later
    vp['latitude'].attrs = {'units': 'degrees_north'}
    vp['longitude'].attrs = {'units': 'degrees_east'}
    vp = vp.transpose('time', 'level', 'latitude', 'longitude')
    
    vp = vp.expand_dims({'level': [level]})
    
    vp = vp.to_dataset(name='vpot')
    
    vp_encoding = {'vpot': {'dtype': 'float32'}}

    vp.to_netcdf('/g/data/w42/dr6273/work/data/era5/vpot/nc/vpot_'+str(level)+'_era5_daily_'+str(year)+'.nc',
                mode='w',
                encoding=vp_encoding)
    print()

In [47]:
# %%time
# for level in levels:
#     print(level)
#     for year in years:
#         print(year)
#         write_vpot(u, v, level, year)

# Regrid VPOT to 2x2

- Takes around 2 minutes but need quite a few resources. I used two nodes here.

In [3]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client

In [4]:
# One node on Gadi has 48 cores - try and use up a full core before going to multiple nodes (jobs)

walltime = '00:15:00'
cores = 24
memory = str(cores * 4) + 'GB'

cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=cores,
                     job_extra_directives=['-q normal',
                                           '-P w42',
                                           '-l ncpus='+str(cores),
                                           '-l mem='+str(memory),
                                           '-l storage=gdata/w42+gdata/rt52'],
                     local_directory='$TMPDIR',
                     job_directives_skip=["select"])

In [5]:
cluster.scale(jobs=1)
client = Client(cluster)

In [6]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.6.52.15:8787/status,

0,1
Dashboard: http://10.6.52.15:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.6.52.15:36419,Workers: 0
Dashboard: http://10.6.52.15:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [1]:
import xarray as xr

# Load daily vpot data

In [48]:
vpot = xr.open_mfdataset('/g/data/w42/dr6273/work/data/era5/vpot/nc/*.nc',
                         coords='minimal',
                         compat='override')

In [51]:
vpot = vpot.chunk({'latitude': -1, 'longitude': -1, 'level': 1, 'time': 50})

### Use CDO to regrid because xesmf is not working

In [74]:
from subprocess import call

In [75]:
directory = '/g/data/w42/dr6273/work/data/era5/vpot/nc/'

In [76]:
files = [
    'vpot_150_era5_daily_2022.nc',
    'vpot_850_era5_daily_2022.nc'
]

In [77]:
output = []

for file in files:
    cmd = [
        'cdo',
        '-f',
        'nc',
        'remapbil,' + directory + '2x2_grid.txt',
        '-selname,vpot',
        directory + file,
        directory + file[:-3] + '_2x2_grid.nc'
    ]
    output.append(call(cmd, stdout=None))

FileNotFoundError: [Errno 2] No such file or directory: 'cdo'

In [80]:
print(cmd)

['cdo', '-f', 'nc', 'remapbil,/g/data/w42/dr6273/work/data/era5/vpot/nc/2x2_grid.txt', '-selname,vpot', '/g/data/w42/dr6273/work/data/era5/vpot/nc/vpot_150_era5_daily_2022.nc', '/g/data/w42/dr6273/work/data/era5/vpot/nc/vpot_150_era5_daily_2022_2x2_grid.nc']


In [81]:
output = call(cmd, stdout=None)

FileNotFoundError: [Errno 2] No such file or directory: 'cdo'

### Set up 2x2 array

In [11]:
# NOTE:
#  The latest release of xesmf doesn't work because it tries to load the ESMF module, which has a different name.
#  From looking at the latest, unreleased version here: https://github.com/pangeo-data/xESMF/blob/master/xesmf/backend.py
#  I edited the backend.py script by changing "import ESMF" to "import esmpy as ESMF".

import xesmf as xe

In [12]:
target_grid = xr.Dataset({'latitude': (['latitude'], np.arange(90, -91, -2)),
                          'longitude': (['longitude'], np.arange(-180, 180, 2))})

In [13]:
regridder = xe.Regridder(vpot, target_grid, 'bilinear')
regridder

xESMF Regridder 
Regridding algorithm:       bilinear 
Weight filename:            bilinear_721x1440_91x180.nc 
Reuse pre-computed weights? False 
Input grid shape:           (721, 1440) 
Output grid shape:          (91, 180) 
Periodic in longitude?      False

In [14]:
vpot_2 = regridder(vpot)

In [15]:
vpot_2

Unnamed: 0,Array,Chunk
Bytes,127.97 kiB,63.98 kiB
Shape,"(2, 91, 180)","(1, 91, 180)"
Dask graph,2 chunks in 11 graph layers,2 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 127.97 kiB 63.98 kiB Shape (2, 91, 180) (1, 91, 180) Dask graph 2 chunks in 11 graph layers Data type float32 numpy.ndarray",180  91  2,

Unnamed: 0,Array,Chunk
Bytes,127.97 kiB,63.98 kiB
Shape,"(2, 91, 180)","(1, 91, 180)"
Dask graph,2 chunks in 11 graph layers,2 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [16]:
%%time
vpot_2.to_zarr(
    '/g/data/w42/dr6273/work/data/era5/vpot/vpot_era5_daily_'+str(years[0])+'-'+str(years[-1])+'_2x2_test.zarr',
    mode='w',
    consolidated=True
)

KeyboardInterrupt: 

In [34]:
xr.open_zarr('vpot_era5_daily2022-2022_2x2.zarr', consolidated=True)

Task exception was never retrieved
future: <Task finished name='Task-2629' coro=<Client._gather.<locals>.wait() done, defined at /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/distributed/client.py:2122> exception=AllExit()>
Traceback (most recent call last):
  File "/g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/distributed/client.py", line 2131, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-2630' coro=<Client._gather.<locals>.wait() done, defined at /g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/distributed/client.py:2122> exception=AllExit()>
Traceback (most recent call last):
  File "/g/data/w42/dr6273/apps/conda/envs/pangeo/lib/python3.10/site-packages/distributed/client.py", line 2131, in wait
    raise AllExit()
distributed.client.AllExit


KeyError: '.zmetadata'

# Close cluster

In [53]:
client.close()
cluster.close()