## Develop a method for harmonizing data from OzWALD

Requirements:
* Must be reproducible in an operational context i.e. minumum of fuss to rerun the whole process each year, but first off we need a ~20yr archive to build the models and run historic predictions
* For now, run at 5 km resolution
* Intermediate files are fine, but lets keep the number of steps to a minimum
* Some variables are already computed by OzWALD, but others need to be either computed on-the-fly or saved and stored as intermediate files.
* Many of the pre-computed variables available in OzWALD require resampling spatially and temporally
* A python environment is required, but should be a small as possible (but will undoubtedly still be cumbersome)
* There is a soft requirement that the model be built on features as close to possible as the published 'AusEFlux' article in Biogeosciences.



In [1]:
import os
import pandas as pd
import xarray as xr
import numpy as np
from odc.geo.xr import assign_crs
from odc.geo.geobox import zoom_out

import warnings
warnings.simplefilter(action='ignore')

import sys
sys.path.append('/g/data/os22/chad_tmp/AusEFlux/src/')
from _utils import start_local_dask, round_coords

In [2]:
client = start_local_dask(mem_safety_margin='2Gb')
client



0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 1
Total threads: 24,Total memory: 95.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:39585,Workers: 1
Dashboard: /proxy/8787/status,Total threads: 24
Started: Just now,Total memory: 95.00 GiB

0,1
Comm: tcp://127.0.0.1:45845,Total threads: 24
Dashboard: /proxy/40193/status,Memory: 95.00 GiB
Nanny: tcp://127.0.0.1:36669,
Local directory: /jobfs/105625903.gadi-pbs/dask-scratch-space/worker-sq5lot1e,Local directory: /jobfs/105625903.gadi-pbs/dask-scratch-space/worker-sq5lot1e


In [3]:
base = '/g/data/ub8/au/'
results='/g/data/os22/chad_tmp/AusEFlux/data/5km/'
years = [str(i) for i in range(2003,2023)]

## Define dictionaries for groups of vars

In [4]:
# clim_inputs = {
#         'rain': 'OzWALD/daily/meteo/Pg/OzWALD.daily.Pg.'+year+'.nc',
#         'Tmin': 'OzWALD/daily/meteo/Tmin/OzWALD.Tmin.'+year+'.nc', 
#         'Tmax': 'OzWALD/daily/meteo/Tmax/OzWALD.Tmax.'+year+'.nc',
#         'kTavg': 'OzWALD/daily/meteo/kTavg/OzWALD.kTavg.'+year+'.nc',
#         'VPeff': 'OzWALD/daily/meteo/VPeff/OzWALD.VPeff.'+year+'.nc',
#          }

# # NDWI = (NIR860 - NIR1240) / (NIR860 + NIR1240)
# # band5=NIR1240
# # band2=NIR860

# modis_sr_inputs = {
#         'SR_B2': 'MODIS/mosaic/MCD43A4.006/MCD43A4.006.b02.500m_0841_0876nm_nbar.'+year+'.nc',
#         'SR_B5': 'MODIS/mosaic/MCD43A4.006/MCD43A4.006.b05.500m_1230_1250nm_nbar.'+year+'.nc',
#          }

# rs_inputs = {
#         'NDVI': 'OzWALD/8day/NDVI/OzWALD.NDVI.'+year+'.nc',
#         'LST':'MODIS/mosaic/MYD11A1.006/MYD11A1.006.LST_Day_1km.'+year+'.nc',
#         'VegH': 'LandCover/OzWALD_LC/VegH_2007-2010_mosaic_AustAlb_25m.nc'
#          }

### Grab a common grid to reproject too and a create a mask

In [5]:
gbox = xr.open_dataset('/g/data/os22/chad_tmp/climate-carbon-interactions/data/5km/WCF_5km_monthly_1982_2022.nc').odc.geobox
gbox

#mask a mask of aus extent
mask = xr.open_dataset('/g/data/os22/chad_tmp/climate-carbon-interactions/data/5km/WCF_5km_monthly_1982_2022.nc')['WCF']
mask = mask.mean('time')
mask = xr.where(mask>-99, 1, 0)

## Process MODIS Surface Reflectance

This will give us NDWI

In [None]:
#loop through each year
chunks=dict(latitude=1000, longitude=1000, time=1)

for year in years:
    
    modis_sr_inputs = {
        'SR_B2': 'MODIS/mosaic/MCD43A4.006/MCD43A4.006.b02.500m_0841_0876nm_nbar.'+year+'.nc',
        'SR_B5': 'MODIS/mosaic/MCD43A4.006/MCD43A4.006.b05.500m_1230_1250nm_nbar.'+year+'.nc',
         }
    
    d = {}
    for k,i in modis_sr_inputs.items():
        print(k, year)

        #open and do some prelim processing
        ds = xr.open_dataset(base+i, chunks=chunks)
        ds = assign_crs(ds, crs='epsg:4326')
        ds = ds.to_array()
        ds = ds.squeeze().drop_vars('variable')
        ds.attrs['nodata'] = np.nan
        ds = ds.rename(k)        
        d[k] = ds #add to dict
        
    #calculate NDWI 
    ndwi = (d['SR_B2'] - d['SR_B5']) / (d['SR_B2'] + d['SR_B5'])

    #resample time, after rechunking
    ndwi = ndwi.resample(time='MS', loffset=pd.Timedelta(14, 'd')).mean() #bring into distributed memory
    ndwi = ndwi.rename('NDWI')#.compute()

    # resample spatial
    ndwi = ndwi.odc.reproject(gbox, resampling='average').compute()  # bring into memory
    ndwi = round_coords(ndwi)
    ndwi.attrs['nodata'] = np.nan
    ndwi = ndwi.rename('NDWI')

    #mask to aus extent
    ndwi = ndwi.where(mask)
    
    #export result
    folder = '/g/data/os22/chad_tmp/AusEFlux/data/5km/NDWI'
    if not os.path.exists(folder):
        os.makedirs(folder)

    ndwi.astype('float32').to_netcdf(results+'NDWI'+'/NDWI_5km_'+year+'.nc')


SR_B2 2003
SR_B5 2003


  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))


In [None]:
# ds = xr.open_dataarray('/g/data/os22/chad_tmp/NEE_modelling/data/1km/NDWI_1km_monthly_2002_2022.nc')
# ndwi.where(mask).plot.imshow(col='time', col_wrap=6, robust=True)

In [None]:
# def pre_preprocess(ds, var):
#     ds = ds.transpose('latitude', 'longitude', 'time')
#     ds = assign_crs(ds, crs='epsg:4326')
#     if var=='rain':
#         ds = ds.resample(time='MS', loffset=pd.Timedelta(14, 'd')).sum()
#     else:
#         ds = ds.resample(time='MS', loffset=pd.Timedelta(14, 'd')).mean()
#     ds = ds.to_array()
#     ds.attrs['nodata'] = np.nan
#     ds = ds.odc.reproject(common_grid, resampling='average')
#     ds = ds.squeeze().drop('variable')
#     ds = round_coords(ds)
#     ds.attrs['nodata'] = np.nan
#     return ds

In [None]:
#loop through each year
for year in years:
    
    modis_sr_inputs = {
        'SR_B2': 'MODIS/mosaic/MCD43A4.006/MCD43A4.006.b02.500m_0841_0876nm_nbar.'+year+'.nc',
        'SR_B5': 'MODIS/mosaic/MCD43A4.006/MCD43A4.006.b05.500m_1230_1250nm_nbar.'+year+'.nc',
         }
    
    for k,i in modis_sr_inputs.items():
         
        print(k, year)
        # if os.path.exists(results+k+'/'+k+'_5km_monthly_'+year+'.nc'):
        #     print('skipping model '+name)
        #     continue
        # else:
        #     print('', k)

        # Loop through the dataset and process them
        # Handle some datasets differently
        different = ['WCF', 'VegH']

        if any(k in j for j in different):
            if k=='VegH':
                ds = xr.open_dataset(i, chunks=chunks)
                ds = ds.isel(time=-1).squeeze().drop('time') #grab any time, its all the same
                
            if k=='WCF':    
                ds = xr.open_dataset(base+i, chunks=chunks)
                ds = ds.sel(time=year).squeeze().drop('time')
                ds = ds.transpose('latitude', 'longitude')
                ds = assign_crs(ds, crs='epsg:4326')

            ds = ds.to_array()
            ds.attrs['nodata'] = np.nan
            ds = ds.odc.reproject(common_grid, resampling='average')
            time = pd.date_range(year+"-01", year+"-12", freq='MS') 
            time = [t+pd.Timedelta(14, 'd') for t in time]
            ds = ds.expand_dims(time=time)
            ds= ds.squeeze().drop('variable')
            ds = round_coords(ds)
            ds.attrs['nodata'] = np.nan

        else:
            ds = xr.open_dataset(base+i, chunks=chunks)
            ds = pre_preprocess(ds, k)

        ds = ds.rename(k)

        #bring into memory
        ds = ds.compute() 

        #export result
        folder = '/g/data/os22/chad_tmp/AusEFlux/data/5km/'+k

        if not os.path.exists(folder):
            os.makedirs(folder)

        ds.to_netcdf(results+k+'/'+k+'_5km_monthly_'+year+'.nc')

        del ds
