# Calculate daily FFDI

In [1]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

On Pearcey, nodes have 20 cores, each with 6GB. More than 20 cores, need more than one worker. `processes` ensures the number of workers.

In [66]:
cluster = SLURMCluster(processes=1,
                       walltime='00:20:00',
                       cores=15,
                       memory='90GB',
                       job_extra=['--qos="express"'])

Perhaps you already have a cluster running?
Hosting the HTTP server on port 44217 instead


In [13]:
cluster.scale(jobs=1)

In [14]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.141.3.75:8787/status,

0,1
Dashboard: http://10.141.3.75:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.141.3.75:44993,Workers: 0
Dashboard: http://10.141.3.75:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [47]:
import xarray as xr
import numpy as np
import dask

import os

import matplotlib.pyplot as plt

In [17]:
jra_path = '/scratch1/projects/dcfp/data/csiro-dcfp-jra55/'
my_scratch_path = '/scratch1/ric368/projects/fire/data/'

In [18]:
font_size = 12

In [19]:
def write_and_read(ds, filename):
    """ Write to zarr and read back in.
        If filepath exists, just read."""
    if isinstance(ds, xr.DataArray):
        is_DataArray = True
        name = ds.name
        ds = ds.to_dataset(name=name)
    else:
        is_DataArray = False
            
#     if not os.path.exists(filename):
#         ds.to_zarr(filename, consolidated=True, mode='w')
    ds.to_zarr(filename, consolidated=True, mode='w')
    ds = xr.open_zarr(filename, consolidated=True)
    
    return ds[name] if is_DataArray else ds

# Calculate FFDI

In [20]:
jra_sfc = xr.open_zarr(jra_path+'surface_daily.zarr', consolidated=True)
jra_sfc = jra_sfc.rename({'initial_time0_hours': 'time'})

### Drought factor

In [23]:
%%time
jra_tprat = jra_sfc['TPRAT_GDS0_SFC']
jra_tprat = jra_tprat.rename({'g0_lat_2': 'lat',
                              'g0_lon_3': 'lon'})
jra_tprat = write_and_read(jra_tprat, '/scratch1/ric368/projects/fire/data/jra_daily_tprat_global_19580101_20201231.zarr')

CPU times: user 143 ms, sys: 14.4 ms, total: 158 ms
Wall time: 1.57 s


In [24]:
%%time
# This step benefits from one worker only (I used 10 cores, 60GB)
jra_tprat = jra_tprat.chunk({'time': -1, 'lat': 25, 'lon': 50})
jra_tprat = jra_tprat.rolling(time=20).sum()
jra_tprat = write_and_read(jra_tprat, '/scratch1/ric368/projects/fire/data/jra_daily_tprat20d_global_19580101_20201231.zarr')

CPU times: user 1.23 s, sys: 98.3 ms, total: 1.32 s
Wall time: 6.04 s


In [25]:
%%time
jra_df = -10 * ((jra_tprat - jra_tprat.min('time')) / (jra_tprat.max('time') - jra_tprat.min('time'))) + 10
jra_df = write_and_read(jra_df, '/scratch1/ric368/projects/fire/data/jra_daily_drought_factor_global_19580101_20201231.zarr')

CPU times: user 460 ms, sys: 49.3 ms, total: 509 ms
Wall time: 3.08 s


### Relative humidity, maximum temperature and maximum wind speed

In [26]:
%%time
jra_rh = jra_sfc['RH_GDS0_HTGL']
jra_rh = jra_rh.rename({'g0_lat_1': 'lat',
                        'g0_lon_2': 'lon'})
jra_rh = write_and_read(jra_rh, '/scratch1/ric368/projects/fire/data/jra_daily_rh_global_19580101_20201231.zarr')

CPU times: user 213 ms, sys: 26.5 ms, total: 240 ms
Wall time: 2.13 s


In [27]:
%%time
jra_tmax = jra_sfc['TMAX_GDS4_HTGL']
jra_tmax = jra_tmax.rename({'g0_lat_1': 'lat',
                            'g0_lon_2': 'lon'})
jra_tmax = jra_tmax - 273.15 # K to C
jra_tmax = write_and_read(jra_tmax, '/scratch1/ric368/projects/fire/data/jra_daily_tmax_global_19580101_20201231.zarr')

CPU times: user 198 ms, sys: 30 ms, total: 228 ms
Wall time: 2.43 s


In [28]:
%%time
jra_wmax = jra_sfc['WSMX_GDS4_HTGL']
jra_wmax = jra_wmax.rename({'g0_lat_1': 'lat',
                            'g0_lon_2': 'lon'})
jra_wmax = jra_wmax * 3.6 # m/s to km/h
jra_wmax = write_and_read(jra_wmax, '/scratch1/ric368/projects/fire/data/jra_daily_wmax_global_19580101_20201231.zarr')

CPU times: user 206 ms, sys: 24.7 ms, total: 231 ms
Wall time: 2.53 s


In [29]:
ffdi_ds = jra_df.chunk(jra_tmax.chunks).to_dataset(name='DF')
ffdi_ds['Tmax'] = jra_tmax
ffdi_ds['Wmax'] = jra_wmax
ffdi_ds['RH'] = jra_rh

In [30]:
ffdi_ds['FFDI'] = ffdi_ds['DF']**0.987 * np.exp(0.0338*ffdi_ds['Tmax'] - 0.0345*ffdi_ds['RH'] + 0.0234*ffdi_ds['Wmax'] + 0.243147)

In [31]:
# empty attribute encoding
for var in ffdi_ds.variables:
    ffdi_ds[var].encoding = {}

In [32]:
%%time
# 15 cores, 90GB
ffdi_ds.to_zarr('/scratch1/ric368/projects/fire/data/jra_daily_ffdi_global_19580101_20201231_test.zarr', consolidated=True, mode='w')

CPU times: user 1.37 s, sys: 96.1 ms, total: 1.47 s
Wall time: 15.7 s


<xarray.backends.zarr.ZarrStore at 0x2aaae2d6c9e0>

# Close cluster

In [65]:
client.close()
cluster.close()