# Prep variables for HydroTas 2020-2021 workplan:
- Skill assessment
  - rainfall, surface temp and surface wind over Australia region and Tasmania region
  - Assess skill as function of start month and ensemble size
- UNSEEN
  - Tasmanian rainfall and Melbourne surface temperature

In [1]:
import glob
import cftime
import geopandas
import regionmask
import numpy as np
import xarray as xr
import myfuncs as my
from dask.distributed import Client
from dask_jobqueue import PBSCluster

In [2]:
walltime = '01:00:00'
cores = 24
memory = '48GB'

cluster = PBSCluster(processes=1,
                     walltime=str(walltime), cores=cores, memory=str(memory),
                     job_extra=['-l ncpus='+str(cores),
                                '-l mem='+str(memory),
                                '-P ux06',
                                '-l jobfs=100GB',
                                '-l storage=gdata/xv83+gdata/v14+scratch/v14'],
                     local_directory='$PBS_JOBFS',
                     header_skip=['select'])

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37757 instead


In [3]:
cluster.scale(jobs=2)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.6.63.72:40935  Dashboard: http://10.6.63.72:37757/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


# Create Australia land mask on CAFE grid

In [15]:
def preprocess_f6_atmos(ds):
    """ Preprocess steps for the CAFE-f6 atmos forecasts"""
    # Drop some coordinates
    for drop_coord in ['average_DT', 'average_T1', 'average_T2', 'zsurf', 'area']:
        if drop_coord in ds.coords:
            ds = ds.drop(drop_coord)
    # Truncate latitudes to 10dp
    for dim in ds.dims:
        if 'lat' in dim:
            ds = ds.assign_coords({dim: ds[dim].round(decimals=10)})
    return ds

In [16]:
CAFE_area = preprocess_f6_atmos(
    my.open_zarr(
        '/g/data/v14/vxk563/CAFE/forecasts/f6/WIP/c5-d60-pX-f6-20201101/ZARR/atmos_isobaric_daily.zarr.zip',
    )['area'])

NRM = geopandas.read_file('data/NRM_clusters/NRM_clusters.shp')
regions = regionmask.Regions(
    name='NRM_regions', 
    numbers=list(NRM.index), 
    names=list(NRM.label), 
    abbrevs=list(NRM.code),
    outlines=list(NRM.geometry))
regions_mask = regions.mask(CAFE_area, lon_name='lon', lat_name='lat')

Australia_mask = xr.where(regions_mask.notnull(), True, False)

# Parameters

In [17]:
REGIONS = {'AUS': Australia_mask,
           'TAS': (-42, 146.5),
           'MEL': (-37.81, 144.96)}

# Get f6 atmospheric monthly variables

In [22]:
VARIABLES = {'precip': 
                 {'name': 'precip', 
                  'regions': ['AUS', 'TAS', 'MEL']},
             't_ref': 
                 {'name': 't_ref',
                  'regions': ['AUS', 'TAS', 'MEL']},
             'u_ref': 
                 {'name': 'u_ref',
                  'regions': ['AUS', 'TAS', 'MEL']},
             'v_ref': 
                 {'name': 'v_ref',
                  'regions': ['AUS', 'TAS', 'MEL']},
             'h500':
                 {'name': 'h500',
                  'regions': ['NATIVE']}}

In [23]:
paths_xv83 = glob.glob(
    '/g/data/xv83/ds0092/CAFE/forecasts/f6/WIP/c5-d60-pX-f6-??????01/ZARR/atmos_isobaric_month.zarr.zip'
)
paths_v14 = glob.glob(
    '/g/data/v14/vxk563/CAFE/forecasts/f6/WIP/c5-d60-pX-f6-??????01/ZARR/atmos_isobaric_month.zarr.zip'
)
paths = sorted(paths_xv83+paths_v14, key=lambda x: x.split('/')[-3])

In [24]:
%%time
ds = my.open_zarr_forecasts(
        paths, 
        variables=VARIABLES.keys(),
        preprocess=preprocess_f6_atmos
).rename(
    {k: v['name'] for k, v in VARIABLES.items()})

for v_ in VARIABLES:
    v = VARIABLES[v_]['name']
    var = ds[[v]]
    regions = VARIABLES[v_]['regions']
    print(f'Extracting {v_} over region(s) {regions}...')
    
    for r in regions:
        # Weighted mean over region
        if r == 'NATIVE':
            var_region = var
        else:
            var_region = my.get_region(
                var, REGIONS[r]).weighted(
                CAFE_area).mean(
                ['lat','lon'])
            
            # Chunk appropriately
            var_region = var_region.chunk({'init_date': -1, 'lead_time': -1})
        
        # Fill nans in time with dummy times so that time operations work nicely
        var_region = var_region.assign_coords({
            'time': var_region.time.fillna(cftime.DatetimeJulian(1800, 1, 1))})

        # Save
        my.to_zarr(var_region, f'./data/f6_{v}_{r}_raw.zarr')

Extracting precip over region(s) ['AUS', 'TAS', 'MEL']...
Extracting t_ref over region(s) ['AUS', 'TAS', 'MEL']...
Extracting u_ref over region(s) ['AUS', 'TAS', 'MEL']...
Extracting v_ref over region(s) ['AUS', 'TAS', 'MEL']...
Extracting h500 over region(s) ['NATIVE']...
CPU times: user 1min 47s, sys: 41.5 s, total: 2min 29s
Wall time: 4min 50s


In [25]:
f6_lead_times = ds.lead_time
f6_init_dates = ds.init_date

# Get f6 ocean monthly variables

In [26]:
VARIABLES = {'sst': 
                 {'name': 'sst', 
                  'regions': ['NATIVE']}}

In [27]:
paths_xv83 = glob.glob(
    '/g/data/xv83/ds0092/CAFE/forecasts/f6/WIP/c5-d60-pX-f6-??????01/ZARR/ocean_month.zarr.zip'
)
paths_v14 = glob.glob(
    '/g/data/v14/vxk563/CAFE/forecasts/f6/WIP/c5-d60-pX-f6-??????01/ZARR/ocean_month.zarr.zip'
)
paths = sorted(paths_xv83+paths_v14, key=lambda x: x.split('/')[-3])

In [28]:
def preprocess_f6_ocean(ds):
    """ Preprocess steps for the CAFE-f6 ocean forecasts"""
    # Drop some coordinates
    for drop_coord in ['average_DT', 'average_T1', 'average_T2', 'geolat_t', 'geolon_t', 'area_t']:
        if drop_coord in ds.coords:
            ds = ds.drop(drop_coord)
    return ds

In [29]:
%%time
ds = my.open_zarr_forecasts(
        paths, 
        variables=VARIABLES.keys(),
        preprocess=preprocess_f6_ocean
).rename(
    {k: v['name'] for k, v in VARIABLES.items()})

for v_ in VARIABLES:
    v = VARIABLES[v_]['name']
    var = ds[[v]]
    regions = VARIABLES[v_]['regions']
    print(f'Extracting {v_} over region(s) {regions}...')
    
    for r in regions:
        # Weighted mean over region
        if r == 'NATIVE':
            var_region = var
        else:
            var_region = my.get_region(
                var, REGIONS[r]).weighted(
                CAFE_aarea).mean(
                ['lat','lon'])
            
            # Chunk appropriately
            var_region = var_region.chunk({'init_date': -1, 'lead_time': -1})
        
        # Fill nans in time with dummy times so that time operations work nicely
        var_region = var_region.assign_coords({
            'time': var_region.time.fillna(cftime.DatetimeJulian(1800, 1, 1))})

        # Save
        my.to_zarr(var_region, f'./data/f6_{v}_{r}_raw.zarr')

Extracting sst over region(s) ['NATIVE']...
CPU times: user 5min 17s, sys: 5min 39s, total: 10min 56s
Wall time: 15min 33s


# Get f5 atmospheric monthly variables

In [30]:
VARIABLES = {'precip': 
                 {'name': 'precip', 
                  'regions': ['AUS', 'TAS', 'MEL']},
             't_ref': 
                 {'name': 't_ref',
                  'regions': ['AUS', 'TAS', 'MEL']},
             'u_ref': 
                 {'name': 'u_ref',
                  'regions': ['AUS', 'TAS', 'MEL']},
             'v_ref': 
                 {'name': 'v_ref',
                  'regions': ['AUS', 'TAS', 'MEL']},
             'h500':
                 {'name': 'h500',
                  'regions': ['NATIVE']}}

In [31]:
path = '/g/data/v14/vxk563/CAFE/forecasts/f5/ZARR/atmos_isobaric_month.zarr.zip'

In [32]:
def preprocess_f5(ds):
    """ Preprocess steps for the CAFE-f6 forecasts"""
    # Drop some coordinates
    for drop_coord in ['average_DT', 'average_T1', 'average_T2', 'zsurf', 'area']:
        if drop_coord in ds.coords:
            ds = ds.drop(drop_coord)
    # Truncate latitudes to 10dp
    for dim in ds.dims:
        if 'lat' in dim:
            ds = ds.assign_coords({dim: ds[dim].round(decimals=10)})
    return ds

In [33]:
%%time
ds = my.open_zarr(
        path, 
        variables=VARIABLES.keys(),
        preprocess=preprocess_f5).rename(
    {k: v['name'] for k, v in VARIABLES.items()})

for v_ in VARIABLES:
    v = VARIABLES[v_]['name']
    var = ds[[v]]
    regions = VARIABLES[v_]['regions']
    print(f'Extracting {v_} over region(s) {regions}...')
    
    for r in regions:
        # Weighted mean over region
        if r == 'NATIVE':
            var_region = var
        else:
            var_region = my.get_region(
                var, REGIONS[r]).weighted(
                CAFE_area).mean(
                ['lat','lon'])
            
            # Chunk appropriately
            var_region = var_region.chunk({'init_date': -1, 'lead_time': -1})
        
        # Fill nans in time with dummy times so that time operations work nicely
        var_region.time.attrs['calendar_type'] = 'JULIAN'
        var_region = var_region.assign_coords({
            'time': var_region.time.fillna(cftime.DatetimeJulian(1800, 1, 1))})

        # Save
        my.to_zarr(var_region, f'./data/f5_{v}_{r}_raw.zarr')

Extracting precip over region(s) ['AUS', 'TAS', 'MEL']...
Extracting t_ref over region(s) ['AUS', 'TAS', 'MEL']...
Extracting u_ref over region(s) ['AUS', 'TAS', 'MEL']...
Extracting v_ref over region(s) ['AUS', 'TAS', 'MEL']...
Extracting h500 over region(s) ['NATIVE']...
CPU times: user 30.5 s, sys: 5.52 s, total: 36 s
Wall time: 1min 32s


In [34]:
f5_lead_times = ds.lead_time
f5_init_dates = ds.init_date

In [35]:
obsv_lead_times = xr.concat([f5_lead_times, f6_lead_times], dim='lead_time')
obsv_lead_times = obsv_lead_times[np.unique(obsv_lead_times, return_index=True)[1]]

obsv_init_dates = xr.concat([f5_init_dates, f6_init_dates], dim='init_date')
obsv_init_dates = obsv_init_dates[np.unique(obsv_init_dates, return_index=True)[1]]

# JRA-55 surface data

In [62]:
VARIABLES = {'TPRAT_GDS0_SFC': 
                 {'name': 'precip', 
                  'regions': ['AUS', 'TAS', 'MEL']},
             'TMP_GDS0_HTGL': 
                 {'name': 't_ref', 
                  'regions': ['AUS', 'TAS', 'MEL']},
             'UGRD_GDS0_HTGL': 
                 {'name': 'u_ref', 
                  'regions': ['AUS', 'TAS', 'MEL']},
             'VGRD_GDS0_HTGL': 
                 {'name': 'v_ref', 
                  'regions': ['AUS', 'TAS', 'MEL']}}

In [63]:
path = '/g/data/v14/ds0092/data/ZARR/csiro-dcfp-jra55/surface_month_cafe-grid.zarr'

In [64]:
def preprocess_jra(ds):
    """ Preprocess steps for the JRA data"""
    # Rename time and level
    for key, val in {'initial_time0_hours': 'time', 
                     'lv_ISBL1': 'level'}.items():
        if key in ds.coords:
                ds = ds.rename({key: val})
    # Drop filename attribute
    del ds.attrs['filename']
    # Truncate latitudes to 10dp
    for dim in ds.dims:
        if 'lat' in dim:
            ds = ds.assign_coords({dim: ds[dim].round(decimals=10)})
    return ds

In [65]:
%%time
ds = my.open_zarr(
        path, 
        variables=VARIABLES.keys(), 
        preprocess=preprocess_jra).rename(
    {k: v['name'] for k, v in VARIABLES.items()})

for v_ in VARIABLES:
    v = VARIABLES[v_]['name']
    var = ds[[v]]
    regions = VARIABLES[v_]['regions']
    print(f'Extracting {v_} over region(s) {regions}...')
    
    for r in regions:
        # Weighted mean over region
        if r == 'NATIVE':
            var_region = var
        else:
            var_region = my.get_region(
                var, REGIONS[r]).weighted(
                CAFE_area).mean(
                ['lat','lon'])
            
            # Chunk appropriately
            var_region = var_region.chunk({'time': -1})
            
        # Stack by initial date
        var_stacked = my.stack_by_init_date(
            var_region, 
            obsv_init_dates, 
            len(obsv_lead_times)).chunk(
            {'init_date': -1, 'lead_time': -1})
        
        # Fill nans in time with dummy times so that time operations work nicely
        var_stacked.time.attrs['calendar_type'] = 'Proleptic_Gregorian'
        var_stacked = var_stacked.assign_coords({
            'time': var_stacked.time.fillna(cftime.DatetimeProlepticGregorian(1800, 1, 1))})

        # Save
        my.to_zarr(var_region, f'./data/jra55_{v}_{r}_ts.zarr')
        my.to_zarr(var_stacked, f'./data/jra55_{v}_{r}.zarr')

CPU times: user 33.3 ms, sys: 4.28 ms, total: 37.5 ms
Wall time: 52.1 ms


# AWAP monthly data

In [40]:
VARIABLES = {'precip': 
                 {'name': 'precip', 
                  'regions': ['AUS', 'TAS', 'MEL']}}

In [41]:
path = '/g/data/v14/ds0092/data/ZARR/csiro-dcfp-csiro-awap/rain_day_19000101-20201202_cafe-grid.zarr'

In [42]:
def preprocess_awap(ds):
    """ Preprocess steps for the AWAP data"""
    # Truncate latitudes to 10dp
    for dim in ds.dims:
        if 'lat' in dim:
            ds = ds.assign_coords({dim: ds[dim].round(decimals=10)})
    return ds

In [43]:
%%time
ds = my.open_zarr(
        path, 
        variables=VARIABLES.keys(), 
        preprocess=preprocess_awap).rename(
    {k: v['name'] for k, v in VARIABLES.items()})

# Sum to monthly values
def sum_min_samples(ds, dim, min_samples):
    """ Return sum only if there are more than min_samples along dim """
    s = ds.sum(dim, skipna=False)
    return s if len(ds[dim]) >= min_samples else np.nan*s
ds = ds.resample(time='MS').map(sum_min_samples, dim='time', min_samples=28)

for v_ in VARIABLES:
    v = VARIABLES[v_]['name']
    var = ds[[v]]
    regions = VARIABLES[v_]['regions']
    print(f'Extracting {v_} over region(s) {regions}...')
    
    for r in regions:
        # Weighted mean over region
        if r == 'NATIVE':
            var_region = var
        else:
            var_region = my.get_region(
                var, REGIONS[r]).weighted(
                CAFE_area).mean(
                ['lat','lon'])
            
            # Chunk appropriately
            var_region = var_region.chunk({'time': -1})
            
        # Stack by initial date
        var_stacked = my.stack_by_init_date(
            var_region, 
            obsv_init_dates, 
            len(obsv_lead_times)).chunk(
            {'init_date': -1, 'lead_time': -1})
        
        # Fill nans in time with dummy times so that time operations work nicely
        var_stacked.time.attrs['calendar_type'] = 'Proleptic_Gregorian'
        var_stacked = var_stacked.assign_coords({
            'time': var_stacked.time.fillna(cftime.DatetimeProlepticGregorian(1800, 1, 1))})

        # Save
        my.to_zarr(var_region, f'./data/awap_{v}_{r}_ts.zarr')
        my.to_zarr(var_stacked, f'./data/awap_{v}_{r}.zarr')

Extracting precip over region(s) ['AUS', 'TAS', 'MEL']...
CPU times: user 47.2 s, sys: 1.74 s, total: 49 s
Wall time: 1min


# HadISST monthly data

In [68]:
VARIABLES = {'sst': 
                 {'name': 'sst', 
                  'regions': ['NATIVE']}}

In [69]:
path = '/g/data/v14/ds0092/data/ZARR/csiro-dcfp-hadisst/ocean_month.zarr.zip'

In [94]:
import cftime
def preprocess_had(ds):
    """ Preprocess steps for the AWAP data"""
    # Truncate time to start of month
    truncated_time = [
        cftime.DatetimeGregorian(t.year, t.month, 1) 
        for t in ds.time.values]
    return ds.assign_coords({'time': truncated_time})

In [99]:
%%time
ds = my.open_zarr(
        path, 
        variables=VARIABLES.keys(), 
        preprocess=preprocess_had).rename(
    {k: v['name'] for k, v in VARIABLES.items()})

for v_ in VARIABLES:
    v = VARIABLES[v_]['name']
    var = ds[[v]]
    regions = VARIABLES[v_]['regions']
    print(f'Extracting {v_} over region(s) {regions}...')
    
    for r in regions:
        # Weighted mean over region
        if r == 'NATIVE':
            var_region = var
        else:
            var_region = my.get_region(
                var, REGIONS[r]).weighted(
                CAFE_area).mean(
                ['lat','lon'])
            
            # Chunk appropriately
            var_region = var_region.chunk({'time': -1})
            
        # Stack by initial date
        var_stacked = my.stack_by_init_date(
            var_region, 
            obsv_init_dates, 
            len(obsv_lead_times)).chunk(
            {'lead_time': -1})
        
        # Fill nans in time with dummy times so that time operations work nicely
        var_stacked.time.attrs['calendar_type'] = 'Gregorian'
        var_stacked = var_stacked.assign_coords({
            'time': var_stacked.time.fillna(cftime.DatetimeGregorian(1800, 1, 1))})

        # Save
        my.to_zarr(var_region, f'./data/had_{v}_{r}_ts.zarr')
        my.to_zarr(var_stacked, f'./data/had_{v}_{r}.zarr')

Extracting sst over region(s) ['NATIVE']...
CPU times: user 4.77 s, sys: 2.62 s, total: 7.4 s
Wall time: 10.4 s


# End

In [107]:
cluster.close()
client.close()