# Prepare atmospheric and ocean data

In [1]:
from dask.distributed import Client,LocalCluster
from dask_jobqueue import PBSCluster

In [2]:
# %reload_ext autoreload
# %autoreload 2

In [3]:
# client.close()
# cluster.close()

In [4]:
import xarray as xr
import numpy as np
import pandas as pd

import os

In [5]:
import functions as fn

In [6]:
def anomalise_and_process(
    load,
    dataset,
    filepath,
    data_root_path,
    var_name,
    years,
    subset_region,
    subset_level,
    preprocess_func,
    mfdataset_chunks,
    anom_freq,
    new_var_name,
    long_name,
    units,
    chunks
):
    """
    Load or process and write data.
    
    Data is processed by loading from the desired dataset a variable over a specified period.
    The data are anomalised by removing the anom_freq means computed over the entire
    period.
    
    load : Bool. Whether to load or process.
    dataset : "era5" or "hadisst"
    filepath : file to load saved data
    data_root_path : path to raw era5 or hadisst data on Gadi.
    var_name : string. name of era5 variable (ignored if other dataset).
    years : range. Period to process.
    subset_region : list. Region to subset over. See functions.open_era_data
    subset_level : list. Pressure levels to select. See functions.open_era_data
    preprocess_func : Preprocess using xr.open_mfdataset if desired. See functions.open_era_data
    mfdataset_chunks : Chunks to use for preprocessing in xr.open_mfdataset. See functions.open_era_data
    anom_freq : string. Over what frequency to anomalise, e.g. month or dayofyear
    new_var_name : desired name of variable.
    long_name : long name for xarray Dataset attributes.
    units : units for xarray Dataset attributes.
    chunks : dict. Specify chunks before writing to zarr.
    """
    if load:
        return xr.open_zarr(filepath, consolidated=True)
    else:
        print('Opening files...')
        if dataset == 'era5':
            ds = fn.open_era_data(
                root_path=data_root_path,
                variable=var_name,
                years=years,
                subset_region=subset_region,
                subset_level=subset_level,
                preprocess_func=preprocess_func,
                mfdataset_chunks=mfdataset_chunks
            )
        elif dataset == 'hadisst':
            ds = xr.open_zarr(data_root_path, consolidated=True)
            ds = ds.sel(time=slice(str(years[0]), str(years[-1])))
            ds['time'] = pd.date_range(str(years[0]), str(years[-1])+'-12-01', freq='1MS')
            ds = xr.where(ds > -2, ds, np.nan)
        else:
            raise ValueError("Incorrect dataset.")
            
        ds = ds.rename({
            'latitude': 'lat',
            'longitude': 'lon',
            var_name: new_var_name
        })
        da = ds[new_var_name]
        da = da.chunk(chunks)
            
        print('Anomalising...')
        da_anoms = da.groupby('time.' + anom_freq).apply(lambda x: x - x.mean('time'))
            
        print('Attributes and writing...')
        da_anoms = da_anoms.assign_attrs({
            'long_name': long_name,
            'short_name': new_var_name,
            'units': units
        })
            
        ds_anoms = da_anoms.to_dataset(name=new_var_name + '_anom')
        encoding = {new_var_name + '_anom': {'dtype': 'float32'}}
        ds_anoms.to_zarr(
            filepath,
            mode='w',
            consolidated=True,
            encoding=encoding
        )
        
        return ds_anoms

In [7]:
data_fp = '/g/data/w42/dr6273/work/data/'
years = range(1959, 2022)

# Daily data on extended Australia region

### To process the hourly data we need significant resources - 3 full nodes.

In [6]:
# One node on Gadi has 48 cores - try and use up a full node before going to multiple nodes (jobs)

walltime = '00:30:00'
cores = 48
memory = str(4 * cores) + 'GB'

cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=cores,
                     job_extra_directives=['-q normal',
                                           '-P w42',
                                           '-l ncpus='+str(cores),
                                           '-l mem='+str(memory),
                                           '-l storage=gdata/w42+gdata/rt52+gdata/xv83'],
                     local_directory='$TMPDIR',
                     job_directives_skip=["select"])
                     # python=os.environ["DASK_PYTHON"])

In [7]:
cluster.scale(jobs=3)
client = Client(cluster)

In [8]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.6.44.33:8787/status,

0,1
Dashboard: http://10.6.44.33:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.6.44.33:37627,Workers: 0
Dashboard: http://10.6.44.33:8787/status,Total threads: 0
Started: 1 minute ago,Total memory: 0 B


In [9]:
extended_aus = fn.get_extended_AUS_boundary()

### Note:

I couldn't process hourly data to daily using the function above.
So, try loading hourly data beforehand, using preprocess to select subregion, level and calculate averages.
Write these daily data to file, then load using this function to anomalise as before.

In [10]:
def get_files(file_path, var, years):
    """
    Get list of files
    """
    fp_list = []
    for year in years:
        fp_dir = file_path+var+'/'+str(year)+'/'
        for fp in sorted(os.listdir(fp_dir)):
            fp_list.append(fp_dir+fp)
    return fp_list

In [11]:
def hourly_to_daily(
    data_path,
    variable,
    years,
    pre_process_func,
    daily_func,
    output_path,
    new_var_name=None,
    ):
    """
    Process hourly data to daily (24 hour means starting at 1400UTC).
    
    data_path : directory for hourly data
    variable : variable name
    years : range of years to process
    pre_process_fun : function to preprocess data in open_mfdataset
    daily_func : function which processes hourly data (i.e. to 24 or 12 hours in this analysis)
    output_path : path to write output
    new_var_name : None, or string. Desired new variable name.
    """
    files = get_files(data_path, variable, years) # Get files
    
    ds = xr.open_mfdataset( # Open dataset
        files,
        chunks={'time': 24, 'level': 1},
        preprocess=pre_process_func,
        compat='override',
        coords='minimal',
        engine='netcdf4'
    )
    
    # ds = fn.daily_mean_1400(ds) # daily mean (24 hours from 1400UTC)
    ds = daily_func(ds)
    ds = ds.chunk({'time': 365})
        
    if isinstance(new_var_name, str):
        print('Renaming variable')
        ds = ds.rename({variable: new_var_name})
        variable = new_var_name
    
    encoding = {variable: {'dtype': 'float32'}} # specify encoding
    
    ds.to_zarr( # Write
        output_path,
        mode='w',
        consolidated=True,
        encoding=encoding
    )

### 500 hPa geopotential height

- convert by dividing the geopotential by the Earth's gravitational acceleration, g (=9.80665 m s-2)

In [14]:
def z_preprocess(ds):
    """
    Select desired levels and subregion. Convert to
    geopotential height by dividing the geopotential
    by Earth's gravitational acceleration
    g = 9.80665 m s-2
    
    Source: https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-pressure-levels-monthly-means
    """
    ds = ds.rename({
        'longitude': 'lon',
        'latitude': 'lat'
    })
    ds = ds.sel(
        level=levels,
        lon=slice(extended_aus[0], extended_aus[1]),
        lat=slice(extended_aus[2], extended_aus[3])
    )
    
    # Convert from geopotential to geopotential height
    g = 9.80665
    ds['z'] = ds['z'] / g
    
    # New attributes
    ds['z'] = ds['z'].assign_attrs({
        'long_name': 'Geopotential height',
        'standard_name': 'geopotential height',
        'units': 'm'
    })

    return ds

In [15]:
dataset = 'era5'
levels = [500]
z_output_path = data_fp + dataset + '/z/z_' + dataset + '_daily_1400UTC_pl_' + str(years[0]) + '-' + str(years[-1]) + '_extended_Aus_region.zarr'
print(z_output_path)

/g/data/w42/dr6273/work/data/era5/z/z_era5_daily_1400UTC_pl_1959-2021_extended_Aus_region.zarr


In [16]:
%%time
hourly_to_daily(
    data_path='/g/data/rt52/era5/pressure-levels/reanalysis/',
    variable='z',
    years=years,
    pre_process_func=z_preprocess,
    daily_func=fn.daily_mean_1400,
    output_path=z_output_path
)

CPU times: user 14min 46s, sys: 37.2 s, total: 15min 23s
Wall time: 19min 17s


Open daily data, anomalise and write

In [17]:
z = xr.open_zarr(z_output_path, consolidated=True)

In [18]:
z_anoms = z.groupby('time.dayofyear').apply(lambda x: x - x.mean('time'))

In [19]:
z_anoms = z_anoms.chunk({'time': 365})

In [20]:
%%time
z_anoms.to_zarr( # Write
    data_fp + dataset + '/z/z_anoms_' + dataset + '_daily_1400UTC_pl_' + str(years[0]) + '-' + str(years[-1]) + '_extended_Aus_region.zarr',
    mode='w',
    consolidated=True,
)

CPU times: user 1min 47s, sys: 2.26 s, total: 1min 49s
Wall time: 1min 50s


<xarray.backends.zarr.ZarrStore at 0x14d2dff1be60>

### MSLP

In [17]:
# Using preprocess in open_mfdataset to select desired levels improves performance
#  versus doing a .sel() afterwards
def mslp_preprocess(ds):
    """
    Select desired levels and subregion
    """
    ds = ds.rename({
        'longitude': 'lon',
        'latitude': 'lat'
    })
    ds = ds.sel(
        lon=slice(extended_aus[0], extended_aus[1]),
        lat=slice(extended_aus[2], extended_aus[3])
    )
    return ds

In [18]:
dataset = 'era5'
mslp_output_path = data_fp + dataset + '/mslp/mslp_' + dataset + '_daily_1400UTC_' + str(years[0]) + '-' + str(years[-1]) + '_extended_Aus_region.zarr'
print(mslp_output_path)

/g/data/w42/dr6273/work/data/era5/mslp/mslp_era5_daily_1400UTC_1959-2021_extended_Aus_region.zarr


In [19]:
%%time
hourly_to_daily(
    data_path='/g/data/rt52/era5/single-levels/reanalysis/',
    variable='msl',
    years=years,
    pre_process_func=mslp_preprocess,
    daily_func=fn.daily_mean_1400,
    output_path=mslp_output_path,
    new_var_name='mslp'
)

Renaming variable
CPU times: user 9min 44s, sys: 17.7 s, total: 10min 2s
Wall time: 10min 58s


Open daily data, anomalise and write

In [20]:
mslp = xr.open_zarr(mslp_output_path, consolidated=True)

In [21]:
mslp_anoms = mslp.groupby('time.dayofyear').apply(lambda x: x - x.mean('time'))

In [22]:
mslp_anoms = mslp_anoms.chunk({'time': 365})

In [23]:
%%time
mslp_anoms.to_zarr( # Write
    data_fp + dataset + '/mslp/mslp_anoms_' + dataset + '_daily_1400UTC_' + str(years[0]) + '-' + str(years[-1]) + '_extended_Aus_region.zarr',
    mode='w',
    consolidated=True
)

CPU times: user 2min 19s, sys: 2.63 s, total: 2min 22s
Wall time: 2min 22s


<xarray.backends.zarr.ZarrStore at 0x154f80908b30>

### Fraction of cloud cover (tcc). Just do this for daytime hours (~2000 - 0800 UTC)

In [16]:
def daytime_mean(ds):
    """
    Daytime average of ds. Computed for 12 hour periods
    starting at 2000UTC (so an eastern Aus 6am, roughly).
    Currently hard-coded as its easier with open_mfdataset preprocess/
    """
    ds_12 = ds.rolling(time=12).mean()
    ds_daily = ds_12.isel(time=ds_12.time.dt.hour == 8)
    return ds_daily

In [17]:
# Using preprocess in open_mfdataset to select desired levels improves performance
#  versus doing a .sel() afterwards
def tcc_preprocess(ds):
    """
    Select desired levels and subregion
    """
    ds = ds.rename({
        'longitude': 'lon',
        'latitude': 'lat'
    })
    ds = ds.sel(
        lon=slice(extended_aus[0], extended_aus[1]),
        lat=slice(extended_aus[2], extended_aus[3])
    )
    return ds

In [18]:
dataset = 'era5'
tcc_output_path = data_fp + dataset + '/tcc/tcc_' + dataset + '_12hr_2000-0800UTC_' + str(years[0]) + '-' + str(years[-1]) + '_extended_Aus_region.zarr'
print(tcc_output_path)

/g/data/w42/dr6273/work/data/era5/tcc/tcc_era5_12hr_2000-0800UTC_1959-2021_extended_Aus_region.zarr


In [19]:
%%time
hourly_to_daily(
    data_path='/g/data/rt52/era5/single-levels/reanalysis/',
    variable='tcc',
    years=years,
    pre_process_func=tcc_preprocess,
    daily_func=daytime_mean,
    output_path=tcc_output_path
)

CPU times: user 8min 59s, sys: 20.3 s, total: 9min 20s
Wall time: 11min 2s


Open daily data, anomalise and write

In [27]:
tcc = xr.open_zarr(tcc_output_path, consolidated=True)

In [28]:
tcc_anoms = tcc.groupby('time.dayofyear').apply(lambda x: x - x.mean('time'))

In [29]:
tcc_anoms = tcc_anoms.chunk({'time': 365})

In [30]:
%%time
tcc_anoms.to_zarr( # Write
    data_fp + dataset + '/tcc/tcc_anoms_' + dataset + '_12hr_2000-0800UTC_' + str(years[0]) + '-' + str(years[-1]) + '_extended_Aus_region.zarr',
    mode='w',
    consolidated=True
)

CPU times: user 1min 28s, sys: 2.88 s, total: 1min 31s
Wall time: 1min 36s


<xarray.backends.zarr.ZarrStore at 0x14c600c70660>

### Close cluster

In [51]:
client.close()
cluster.close()

# Monthly data, global

### To process monthly data we need fewer resources

In [52]:
# One node on Gadi has 48 cores - try and use up a full node before going to multiple nodes (jobs)

walltime = '00:20:00'
cores = 8
memory = str(4 * cores) + 'GB'

cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=cores,
                     job_extra_directives=['-q normal',
                                           '-P w42',
                                           '-l ncpus='+str(cores),
                                           '-l mem='+str(memory),
                                           '-l storage=gdata/w42+gdata/rt52+gdata/xv83'],
                     local_directory='$TMPDIR',
                     job_directives_skip=["select"])
                     # python=os.environ["DASK_PYTHON"])

In [53]:
cluster.scale(jobs=1)
client = Client(cluster)

In [54]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.6.1.34:8787/status,

0,1
Dashboard: http://10.6.1.34:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.6.1.34:38595,Workers: 0
Dashboard: http://10.6.1.34:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


## SST data

In [55]:
dataset = 'hadisst'
sst_fp = data_fp + dataset + '/sst/sst_anom_'+dataset+'_moda_sfc_'+str(years[0])+'-'+str(years[-1])+'.zarr'
print(sst_fp)

/g/data/w42/dr6273/work/data/hadisst/sst/sst_anom_hadisst_moda_sfc_1959-2021.zarr


In [57]:
sst_anoms = anomalise_and_process(
    load=True,
    dataset='hadisst',
    filepath=sst_fp,
    data_root_path='/g/data/w42/dr6273/work/data/hadisst/sst/sst_hadisst_187001-202301.zarr',
    var_name='sst',
    years=years,
    subset_region=None,
    subset_level=None,
    preprocess_func=None,
    mfdataset_chunks=None,
    anom_freq='month',
    new_var_name='sst',
    long_name='Sea surface temperature',
    units='K',
    chunks={'time': 12, 'lat': -1, 'lon': -1}
)

### Nino 3.4

In [58]:
nino34 = sst_anoms.sst_anom.sel(
    lat=slice(5, -5),
    lon=slice(-170, -120)
).mean(['lat','lon']).to_dataset(name='nino34')

In [62]:
nino34_dt = fn.detrend_dim(nino34.nino34, 'time').to_dataset(name='nino34_detrended')

In [63]:
nino34 = nino34.merge(nino34_dt)

In [64]:
nino34_fp = data_fp + dataset + '/climate_modes/'+dataset+'_nino34_'+str(years[0])+'-'+str(years[-1])+'.zarr'
nino34.to_zarr(nino34_fp, mode='w', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x14ae3cdd69d0>

### Dipole mode index

In [65]:
def calc_dmi(da):
    """
    Calculate Dipole Mode Index
    """    
    da_W = da.sel(lat=slice(10, -10), lon=slice(50, 70)).mean(['lat', 'lon'])
    da_E = da.sel(lat=slice(0, -10), lon=slice(90, 110)).mean(['lat', 'lon'])
    
    return (da_W - da_E)

In [66]:
dmi = calc_dmi(sst_anoms.sst_anom).to_dataset(name='dmi')

In [67]:
dmi_dt = fn.detrend_dim(dmi.dmi, 'time').to_dataset(name='dmi_detrended')

In [68]:
dmi = dmi.merge(dmi_dt)

In [69]:
dmi_fp = data_fp + dataset + '/climate_modes/' + dataset + '_dmi_' + str(years[0]) + '-' + str(years[-1]) + '.zarr'

In [70]:
dmi.to_zarr(dmi_fp, mode='w', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x14ae31cb66c0>

## MSLP data

In [15]:
dataset = 'era5'
mslp_fp = data_fp + dataset + '/mslp/mslp_anom_'+dataset+'_moda_sfc_'+str(years[0])+'-'+str(years[-1])+'.zarr'
print(mslp_fp)

/g/data/w42/dr6273/work/data/era5/mslp/mslp_anom_era5_moda_sfc_1959-2021.zarr


In [16]:
mslp_anoms = anomalise_and_process(
    load=True,
    dataset='era5',
    filepath=mslp_fp,
    data_root_path='/g/data/rt52/era5/single-levels/monthly-averaged/',
    var_name='msl',
    years=years,
    subset_region=None,
    subset_level=None,
    preprocess_func=None,
    mfdataset_chunks=None,
    anom_freq='month',
    new_var_name='mslp',
    long_name='Mean sea level pressure',
    units='Pa',
    chunks={'time': 12, 'lat': -1, 'lon': -1}
)

Opening files...
Anomalising...
Attributes and writing...


### Southern Annular Mode index

In [17]:
def calc_sam(mslp_anoms, lat_name='lat', lon_name='lon', time_name='time'):
    """
    Calculate Southern Annular Mode index from MSLP anomalies. The SAM index is
    defined as the difference in MSLP anomalies between 40 and 65 degrees South.
    MSLP anomalies are first normalised by dividing by their standard deviation
    (calculated as a function of calendar month).
    """
    
    mslp_40 = mslp_anoms.interp({lat_name: -40}).mean(lon_name)
    mslp_65 = mslp_anoms.interp({lat_name: -65}).mean(lon_name)
    
    norm_40 = mslp_40.groupby(time_name+'.month').apply(lambda x: x / x.std(time_name))
    norm_65 = mslp_65.groupby(time_name+'.month').apply(lambda x: x / x.std(time_name))
    
    return norm_40 - norm_65

In [18]:
sam = calc_sam(mslp_anoms)

In [19]:
sam = sam.rename({'mslp_anom': 'sam'})

In [20]:
sam_dt = fn.detrend_dim(sam.sam, 'time').to_dataset(name='sam_detrended')

In [21]:
sam = sam.merge(sam_dt)

In [22]:
sam_fp = data_fp + dataset + '/climate_modes/'+dataset+'_sam_'+str(years[0])+'-'+str(years[-1])+'.zarr'
sam.to_zarr(sam_fp, mode='w', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x1539ae3e6ce0>

# Close cluster

In [71]:
client.close()
cluster.close()