# Notebook to create aggregated reference files (json) into monthly or yearly zarrs

In [1]:
import sys

In [2]:
sys.path.insert(0,'/home/jovyan/filesystem_spec')

In [3]:
from fsspec_reference_maker.combine import MultiZarrToZarr
import fsspec
import xarray as xr
import os, glob
xr.set_options(display_style='text')

<xarray.core.options.set_options at 0x7f3948531d00>

In [4]:
fsspec.__file__

'/home/jovyan/filesystem_spec/fsspec/__init__.py'

In [5]:
gateway_cluster = False
from dask.distributed import Client, LocalCluster
from dask_gateway import Gateway

# Dask gateway
if gateway_cluster:
    gateway = Gateway()
    clusters = gateway.list_clusters()
    if not clusters:
        print('Creating new cluster. Please wait for this to finish.')
        cluster = gateway.new_cluster()
    else:
        print(f'An existing cluster was found. Connected to cluster \033[1m{clusters[0].name}\033[0m')
        cluster=gateway.connect(clusters[0].name)
    cluster.adapt(minimum=1, maximum=30)
    client = cluster.get_client()
    client.wait_for_workers(n_workers=1)

# Local cluster
else:
    cluster = LocalCluster()
    client = Client(cluster)

display(cluster)
display(client)

VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

0,1
Client  Scheduler: tcp://127.0.0.1:43481  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 32  Memory: 30.06 GB


In [6]:
# Load credentials
def load_creds():
    with open(os.environ['HOME'] + '/.aws/credentials','rt') as f:
        f.readline()
        key=f.readline().split('=')[1].strip()
        secret=f.readline().split('=')[1].strip()
    return key, secret
key,secret=load_creds()

In [7]:
# Check a file
def open_dataset(fo,preprocess=None):

    mapper=fsspec.get_mapper('reference://',
                         fo=fo,
                         target_options=dict(key=key,secret=secret),
                         remote_protocol='s3',
                         remote_options=dict(key=key,secret=secret),
                        )
    ds = xr.open_zarr(mapper,chunks={})   
    
    if preprocess is not None:
        ds = preprocess(ds)
    
    return ds

fn = 's3://imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2021/20210725152000-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.json'
ds = open_dataset(fn)
display(ds)

In [8]:
list(ds.data_vars)

['dt_analysis',
 'l2p_flags',
 'quality_level',
 'satellite_zenith_angle',
 'sea_ice_fraction',
 'sea_ice_fraction_dtime_from_sst',
 'sea_surface_temperature',
 'sses_bias',
 'sses_count',
 'sses_standard_deviation',
 'sst_dtime',
 'wind_speed',
 'wind_speed_dtime_from_sst']

### Find files with different chunking in the netCDF - mostly in 2016

In [9]:
# fs = fsspec.filesystem('s3',profile='default')

In [10]:
# files = fs.glob('s3://imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2016/201605*')
# for f in files:
#     ds = open_dataset(f)
#     print(f + str(ds['satellite_zenith_angle'].encoding['chunks']))

In [11]:
# Badly chunked file: imos-data-pixeldrill/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2016/20161001152000-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.nc
# fs.delete('imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/2016/20161001152000-ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night.json')

### Aggregate json at different time scales - deals with different chunk layouts by creating separate stacks for each layout

In [12]:
def aggregate_json(root='s3://imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/',
                   mask='2021/202101',
                   dest='./refs_monthly/',
                   suffix='_ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night',
                   extension='json'):
    import os
    
    def _preprocess(ds):
        variables = ['dt_analysis',
                     'l2p_flags',
                     'quality_level',
                     'satellite_zenith_angle',
                     'sea_surface_temperature',
                     'sses_bias',
                     'sses_count',
                     'sses_standard_deviation',
                     'sst_dtime']
        ds = ds[variables]
        return ds
    
    fs = fsspec.filesystem('s3',profile='default')
    references = fs.glob(f"{root}{mask}*.json")
        
    if len(references) >= 1:
        so = dict(
            protocol='s3',
            profile='default', 
            default_fill_cache=False, 
            default_cache_type='first',
        )
        
        #Deal with different chunk sizes - create a separate aggregate file for each chunking layout
        chunking = {}
        for r in references:
            ds=open_dataset('s3://' + r)
            key = ds['sea_surface_temperature'].chunks
            if key in chunking.keys():
                chunking[key].append(r)
            else:
                chunking[key] = [r,]
        
        #Label each set with a, b, c, ...
        labels = [chr(i) for i in range(97,97+len(chunking.keys()))]
        agg_files=[]
        for i, (chunks, refs) in enumerate(chunking.items()):
            
            #setup output location
            agg_file = f"{dest}{mask}{suffix}.{extension}".replace(f'.{extension}',f'_{labels[i]}.{extension}')
            print(agg_file)
            dirname = os.path.dirname(agg_file)
            os.makedirs(dirname,exist_ok=True)
            
            if len(refs) == 1: # Only one refence in this set... just use the source reference file
                fs.get(refs[0],agg_file)
            else: # otherwise join the references into one file
                
                mzz = MultiZarrToZarr(
                    refs,
                    storage_options=so,
                    remote_protocol="s3",
                    remote_options={'profile': 'default'},
                    xarray_concat_args=dict(dim='time',coords='minimal',join='override',compat='override',combine_attrs='override', fill_value=''),
                    preprocess=_preprocess
                )
                
                if extension == 'zarr':
                    template_count = None
                else:
                    template_count = 5

                try:
                    mzz.translate(agg_file, template_count=template_count)
                except NotImplementedError as ex:
                    agg_file = f'ERROR(CHUNK): {agg_file} {str(ex)}'
                except Exception as ex:
                    agg_file = f'ERROR(UNKOWN): {agg_file} {str(ex)}'
                    raise ex
            
            agg_files.append(agg_file)
            
        return {mask: agg_files}
    else:
        return {mask: 'ERROR(NOFILES)'}
    
from dask import delayed, compute
d_aggregate_json=delayed(aggregate_json)

In [16]:
aggregate_json(dest='./refs_monthly/',mask='2016/201605',extension='zarr')

./refs_monthly/2016/201605_ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_a.zarr
./refs_monthly/2016/201605_ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_b.zarr


{'2016/201605': ['./refs_monthly/2016/201605_ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_a.zarr',
  './refs_monthly/2016/201605_ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_b.zarr']}

### Monthly aggregates

In [50]:
futures=[]
for year in range(1992,2022):
    for month in range(1,13):
        futures.append(d_aggregate_json(dest='./refs_monthly/',
                                        mask=f'{year}/{year}{month:02d}',
                                        extension='json'))   
results = compute(futures)[0]

### Yearly aggregates

In [30]:
futures=[]
for year in range(1992,2022):
        futures.append(d_aggregate_json(dest='./refs_yearly/',
                                        mask=f'{year}/{year}',
                                        extension='json'))   
results = compute(futures)[0]

### Upload to S3 bucket

In [21]:
def upload_to_s3(f,urlpath='imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_monthly/'):
    import fsspec
    fs = fsspec.filesystem('s3',profile='default')
    fn = os.path.basename(f)
    fs.put_file(f,urlpath + fn)
    return "Success"

In [16]:
#monthly files
agg_files = sorted(glob.glob('./refs_monthly/*/*.json'))
futures = client.map(upload_to_s3,agg_files,urlpath='imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_monthly/')
results = client.gather(futures)

In [17]:
#yearly files
agg_files = sorted(glob.glob('./refs_yearly/*/*.json'))
futures = client.map(upload_to_s3,agg_files,urlpath='imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_yearly/')
results = client.gather(futures)

In [3]:
import fsspec
fs = fsspec.filesystem('s3',profile='default')

In [4]:
fs.delete('imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_yearly/*night.json')

## Try using zarr instead of json

### Monthly aggregates

In [17]:
futures=[]
for year in range(1992,2022):
    for month in range(1,13):
        futures.append(d_aggregate_json(dest='./refs_monthly/',
                                        mask=f'{year}/{year}{month:02d}',
                                        extension='zarr'))   
results = compute(futures)[0]

In [9]:
def upload_zarr_to_s3(f,urlpath='imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_monthly/'):
    import fsspec
    fs = fsspec.filesystem('s3',profile='default')
    fn = os.path.basename(f)
    fs.put(f+'/*',urlpath + fn + '/',recursive=True)
    return "Success"

In [None]:
agg_files = sorted(glob.glob('./refs_monthly/*/*.zarr'))
futures = client.map(upload_zarr_to_s3,agg_files,urlpath='imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_monthly/')
results = client.gather(futures)

### Yearly aggregates

In [24]:
futures=[]
for year in range(1992,2022):
        futures.append(d_aggregate_json(dest='./refs_yearly/',
                                        mask=f'{year}/{year}',
                                        extension='zarr'))   
results = compute(futures)[0]

In [10]:
agg_files = sorted(glob.glob('./refs_yearly/*/*.zarr'))
futures = client.map(upload_zarr_to_s3,agg_files,urlpath='imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_yearly/')
results = client.gather(futures)

In [24]:
open_dataset('s3://imos-data-pixeldrill-refs/IMOS/SRS/SST/ghrsst/L3S-1d/ngt/agg_yearly/1992_ABOM-L3S_GHRSST-SSTskin-AVHRR_D-1d_night_a.zarr')