# Notebook to create aggregated reference files (json) into monthly or yearly zarrs

In [1]:
%load_ext autoreload
%autoreload 2

## Step 0: Import the library of code

In [2]:
import sys
import os
sys.path.append('/home/jovyan/intake-aodn/')
import intake_aodn
import intake

from intake_aodn.utils import get_local_cluster, get_distributed_cluster
from intake_aodn.indexing import process_aggregate

In [3]:
# client = get_local_cluster()
client = get_distributed_cluster(worker_cores=1, worker_memory=2.0, min_workers=8, max_workers=8)

Creating new cluster. Please wait for this to finish.


VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [7]:
# requires a working installation of kerchunk in the notebook environment and on the workers if using distributed
from dask.distributed import PipInstall
plugin = PipInstall(packages=["kerchunk"], pip_options=["--upgrade"])
client.register_worker_plugin(plugin)

# If using a distributed cluster on EASI build eggs using "python setup.py bdist_egg" and upload to workers
# otherwise dask workers wont have code for imports
client.upload_file('/home/jovyan/intake-aodn/dist/intake_aodn-0+untagged.57.g0b7665b.dirty-py3.8.egg')

{'tls://10.0.42.37:43811': {'status': 'OK'},
 'tls://10.0.49.28:35887': {'status': 'OK'},
 'tls://10.0.49.69:41237': {'status': 'OK'},
 'tls://10.0.50.183:33535': {'status': 'OK'},
 'tls://10.0.54.132:46451': {'status': 'OK'},
 'tls://10.0.57.7:42813': {'status': 'OK'},
 'tls://10.0.58.166:46673': {'status': 'OK'},
 'tls://10.0.60.196:34881': {'status': 'OK'}}

# Unzip existing references

In [4]:
!cd ../../intake_aodn/catalogs/ && unzip -q -f aodn_refs.zip

# SST Data

In [5]:
def sst_preprocess(ds):
    variables = ['dt_analysis',
                 'l2p_flags',
                 'quality_level',
                 'satellite_zenith_angle',
                 'sea_surface_temperature',
                 'sses_bias',
                 'sses_count',
                 'sses_standard_deviation',
                 'sst_dtime']
    ds = ds[variables]
    return ds

In [8]:
%%time
# s3://imos-data-pixeldrill/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2016/20161001152000-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc
kwargs = dict(root='imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/',
               year='2021',
               month='07',
               mask='{year}/{year}{month}',
               suffix='-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night',
               extension='nc',
               check_chunking='sea_surface_temperature',
               preprocess=sst_preprocess,
               storage_options=dict(anon=True),
               dest='../../intake_aodn/catalogs/',
               dask=True)
process_aggregate(**kwargs)

Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2021/202107*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 23 found.
Loading references...
... using dask ...
Checking chunk layout...
Aggregating into ../../intake_aodn/catalogs/imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/202107-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_a.json
CPU times: user 1.34 s, sys: 104 ms, total: 1.45 s
Wall time: 14.9 s


{'2021/202107': ['imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/202107-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_a.json']}

In [12]:
import pandas as pd
# dt = pd.date_range('2021-07-01',pd.Timestamp.now(),freq='M')
dt = pd.date_range('2022-02-01',pd.Timestamp.now() + pd.DateOffset(months=1),freq='M')
print(dt)

DatetimeIndex(['2022-02-28'], dtype='datetime64[ns]', freq='M')


In [13]:
results = []
for d in dt:
    kws = kwargs.copy()
    kws['year'] = d.strftime('%Y')
    kws['month'] = d.strftime('%m')
    results.append(process_aggregate(**kws))

Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2022/202202*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 18 found.
Loading references...
... using dask ...
Checking chunk layout...
Aggregating into ../../intake_aodn/catalogs/imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/202202-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_a.json


# MODIS Ocean Colour

In [14]:
kwargs = dict(root='imos-data/IMOS/SRS/OC/gridded/aqua/P1D/',
              mask='{year}/{month}/A.P1D.{year}{month}',
              dest='../../intake_aodn/catalogs/',
              dask=True
             ) 

results = []

for d in dt:
    for var in ['K_490','chl_oc3','chl_oc3','chl_gsm']:
        kws = kwargs.copy()
        kws['year'] = d.strftime('%Y')
        kws['month'] = d.strftime('%m')
        kws['suffix'] = f'.aust.{var}'
        kws['check_chunking'] = var
        results.append(process_aggregate(**kws))


Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/2022/02/A.P1D.202202*.aust.K_490.nc - 18 found.
Loading references...
... using dask ...
Checking chunk layout...
Aggregating into ../../intake_aodn/catalogs/imos-data/IMOS/SRS/OC/gridded/aqua/P1D/202202.aust.K_490_a.json
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/2022/02/A.P1D.202202*.aust.chl_oc3.nc - 18 found.
Loading references...
... using dask ...
Checking chunk layout...
Aggregating into ../../intake_aodn/catalogs/imos-data/IMOS/SRS/OC/gridded/aqua/P1D/202202.aust.chl_oc3_a.json
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/2022/02/A.P1D.202202*.aust.chl_oc3.nc - 18 found.
Loading references...
... using dask ...
Checking chunk layout...
Aggregating into ../../intake_aodn/catalogs/imos-data/IMOS/SRS/OC/gridded/aqua/P1D/202202.aust.chl_oc3_a.json
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/2022/02/A.P1D.202202*.aust.chl_gsm.nc - 18 found.
Loading references...
... using dask ...
Checking ch

## Zip references

In [15]:
!cd ../../intake_aodn/catalogs/ && zip -r -q aodn_refs.zip imos-data

# Update catalog entries

In [12]:
cat = intake.open_catalog('../../intake_aodn/catalogs/main.yaml')

In [25]:
entry=cat.aodn_s3.SST_L3S_1d_ngt()
entry

In [33]:
from intake.catalog.local import LocalCatalogEntry
new_entry = entry.describe()
ee = LocalCatalogEntry(**new_entry)

TypeError: __init__() got an unexpected keyword argument 'container'