# Notebook to create aggregated reference files (json) into monthly or yearly zarrs

In [1]:
%load_ext autoreload
%autoreload 2

## Step 0: Import the library of code

In [2]:
import sys
import os
sys.path.append('/home/jovyan/intake-aodn/')

import intake_aodn
import intake

from intake_aodn.utils import get_local_cluster, get_distributed_cluster
from intake_aodn.indexing import process_aggregate
from intake_aodn.indexing import keep_fields  

In [3]:
# client = get_local_cluster()
client = get_distributed_cluster(worker_cores=1, worker_memory=2.0, min_workers=1, max_workers=31)

Creating new cluster. Please wait for this to finish.


VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [4]:
# requires a working installation of kerchunk in the notebook environment and on the workers if using distributed
from dask.distributed import PipInstall
plugin = PipInstall(packages=["kerchunk"], pip_options=["--upgrade"])
client.register_worker_plugin(plugin)

# If using a distributed cluster on EASI build eggs using "python setup.py bdist_egg" and upload to workers
# otherwise dask workers wont have code for imports
client.upload_file('/home/jovyan/intake-aodn/dist/intake_aodn-0+untagged.71.gf409e17.dirty-py3.8.egg')

{'tls://10.0.56.122:45175': {'status': 'OK'}}

# Unzip existing references

In [16]:
#!cd ../../intake_aodn/catalogs/ && unzip -q aodn_refs.zip

# SST Data

In [17]:
variables = ['time',
             'lat','lon','latitude','longitude'
             'dt_analysis',
             'l2p_flags',
             'quality_level',
             'satellite_zenith_angle',
             'sea_surface_temperature',
             'sses_bias',
             'sses_count',
             'sses_standard_deviation',
             'sst_dtime']


In [18]:
'quality_level1' in variables

False

In [19]:
%%time
# s3://imos-data-pixeldrill/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2016/20161001152000-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc
kwargs = dict(root='imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/',
               year='2021',
               month='01',
               mask='{year}/{year}{month}',
               suffix='-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night',
               extension='nc',
               check_chunking='sea_surface_temperature',
               preprocess=keep_fields(variables),
               storage_options=dict(anon=True),
               dest='../../intake_aodn/catalogs/',
               dask=True)
#process_aggregate(**kwargs)

CPU times: user 13 µs, sys: 0 ns, total: 13 µs
Wall time: 18.4 µs


In [20]:
import pandas as pd
dt = pd.date_range('1988-01-01',pd.Timestamp.now() + pd.DateOffset(months=1),freq='M')
# dt = pd.date_range('2022-02-01',pd.Timestamp.now() + pd.DateOffset(months=1),freq='M')
print(dt)

DatetimeIndex(['1988-01-31', '1988-02-29', '1988-03-31', '1988-04-30',
               '1988-05-31', '1988-06-30', '1988-07-31', '1988-08-31',
               '1988-09-30', '1988-10-31',
               ...
               '2021-10-31', '2021-11-30', '2021-12-31', '2022-01-31',
               '2022-02-28', '2022-03-31', '2022-04-30', '2022-05-31',
               '2022-06-30', '2022-07-31'],
              dtype='datetime64[ns]', length=415, freq='M')


In [21]:
results = []
for d in dt:
    kws = kwargs.copy()
    kws['year'] = d.strftime('%Y')
    kws['month'] = d.strftime('%m')
    results.append(process_aggregate(**kws))

Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198801*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198802*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198803*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198804*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198805*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198806*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198807*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/1988/198808*-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc - 0

# MODIS Ocean Colour

In [None]:
kwargs = dict(root='imos-data/IMOS/SRS/OC/gridded/aqua/P1D/',
              mask='{year}/{month}/A.P1D.{year}{month}',
              dest='../../intake_aodn/catalogs/',
              dask=True
             ) 

results = []

for d in dt:
    for var in ['K_490','chl_oc3','chl_oc3','chl_gsm']:
        kws = kwargs.copy()
        kws['year'] = d.strftime('%Y')
        kws['month'] = d.strftime('%m')
        kws['suffix'] = f'.aust.{var}'
        kws['check_chunking'] = var
        results.append(process_aggregate(**kws))

Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/01/A.P1D.198801*.aust.K_490.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/01/A.P1D.198801*.aust.chl_oc3.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/01/A.P1D.198801*.aust.chl_oc3.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/01/A.P1D.198801*.aust.chl_gsm.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/02/A.P1D.198802*.aust.K_490.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/02/A.P1D.198802*.aust.chl_oc3.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/02/A.P1D.198802*.aust.chl_oc3.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/02/A.P1D.198802*.aust.chl_gsm.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/03/A.P1D.198803*.aust.K_490.nc - 0 found.
Aggregating s3://imos-data/IMOS/SRS/OC/gridded/aqua/P1D/1988/

## Zip references

In [22]:
!cd ../../intake_aodn/catalogs/ && rm aodn_refs.zip  && zip -r -q aodn_refs.zip imos-data && rm -rf ../../intake_aodn/catalogs/imos-data/

2022-07-21 13:29:25,275 - distributed.batched - INFO - Batched Comm Closed <TLS (closed) Client->Scheduler local=tls://10.0.67.107:58630 remote=gateway://traefik-dask-gateway.easihub:80/easihub.613eedabdf2647bb9061603a8bb4a881>
Traceback (most recent call last):
  File "/env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 303, in write
    raise StreamClosedError()
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/env/lib/python3.8/site-packages/distributed/batched.py", line 94, in _background_send
    nbytes = yield self.comm.write(
  File "/env/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 319, in write
    convert_stream_closed_error(self, e)
  File "/env/lib/python3.8/site-packages/distributed/comm/tcp.py", line 150, in convert_stream_closed

In [None]:
client.shutdown()