In [134]:
import numpy as np
import xarray as xr
import pandas as pd
import numcodecs
from dask.distributed import Client, progress, LocalCluster
import zarr
import glob
from datetime import datetime
import time

def generate_file_list(start_doy, end_doy):   
    """
    Given a start day and end end day, generate a list of file locations.
    Assumes a 'prefix' and 'year' variables have already been defined.
    'Prefix' should be a local directory or http url and path.
    'Year' should be a 4 digit year.
    """
    days_of_year = list(range(start_doy, end_doy))
    fileObjs = []
    for doy in days_of_year:
        if doy < 10:
            doy = f"00{doy}"
        elif doy >= 10 and doy < 100:
            doy = f"0{doy}"            
        file = glob.glob(f"{prefix}/{doy}/*.nc")[0]
        fileObjs.append(file)
    return fileObjs

# Invariants - but should be made configurable
year = 2002
prefix = f"/fsx/eodc/mursst_netcdf/{year}"
chunks = {'time': 1, 'lat': 1799, 'lon': 3600}
path = 'x'.join(map(str, chunks.values()))
store_dir = f"/fsx/eodc/mursst_zarr/{path}"
numcodecs.blosc.use_threads = False
print(f"zarr store directory: {store_dir}")

zarr store directory: /fsx/eodc/mursst_zarr/1x1799x3600


In [2]:
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
print(f"Dask client {client}")

Dask client <Client: 'tcp://127.0.0.1:45917' processes=4 threads=32, memory=125.83 GB>


In [236]:
%%time
# Loop and append
start_doy = 357
end_doy = start_doy
number_batches_to_append = 1
batch_size = 9
final_end_doy = start_doy + (number_batches_to_append * batch_size)

while start_doy < final_end_doy:
    end_doy = start_doy + batch_size
    end_doy = min(366, end_doy)
    fileObjs = generate_file_list(start_doy, end_doy)
    first_file = fileObjs[0].split('/')[-1]
    last_file = fileObjs[-1].split('/')[-1]
    print(f"start doy: {start_doy}, file: {first_file}")
    print(f"end doy: {end_doy}, file: {last_file}")
    # Check here that the next day we will append is the next day in the year
    current_ds = xr.open_zarr(store_dir)
    next_day = current_ds.time[-1].values + np.timedelta64(1,'D')
    next_day_str = str(next_day)[0:10].replace('-', '') 
    if not (first_file[0:8] == next_day_str):
        raise Exception("starting file is not the next day of the year")
        break
    else:
        # Open dataset
        ds = xr.open_mfdataset(fileObjs, chunks=chunks, parallel=True, combine='by_coords')
        ds = ds.astype(np.float64)
        # Either append or initiate store
        args = {'consolidated': True}
        if start_doy == 152 and year == 2002:
            args['mode'] = 'w'
        else:
            args['mode'] = 'a'
            args['append_dim'] = 'time'
        ds.to_zarr(store_dir, **args)
    start_doy = end_doy
    print(f"Done with this batch")
    print()

start doy: 357, file: 20021223090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc
end doy: 366, file: 20021231090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc
Done with this batch

CPU times: user 22 s, sys: 2.07 s, total: 24 s
Wall time: 1min 14s


## Test the output

Assuming we are using 1x1799x3600

In [237]:
%%time
time_slice = slice(datetime.strptime(f"{year}-06-10", '%Y-%m-%d'), datetime.strptime(f"{year}-06-30", '%Y-%m-%d'))

start_doy = 152
end_doy = 366

fileObjs = generate_file_list(start_doy, end_doy)
print(f"start doy: {start_doy}, file: {fileObjs[0].split('/')[-1]}")
print(f"end doy: {end_doy}, file: {fileObjs[-1].split('/')[-1]}")          
ds_netcdf = xr.open_mfdataset(fileObjs, chunks=chunks, parallel=True, combine='by_coords')
ds_netcdf = ds_netcdf.astype(np.float64)
ds_zarr = xr.open_zarr('/fsx/eodc/mursst_zarr/1x1799x3600')
assert(ds_netcdf.dims == ds_zarr.dims)

start doy: 152, file: 20020601090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc
end doy: 366, file: 20021231090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc
CPU times: user 3.34 s, sys: 333 ms, total: 3.68 s
Wall time: 5.39 s


In [238]:
%%time
print(ds_netcdf.analysed_sst[150:214,:,:].sel(lat=slice(0,50),lon=slice(-170,-110)).mean().values)
#ds_netcdf.analysed_sst.sel(time=time_slice).mean().values

294.760208857499
CPU times: user 2.13 s, sys: 143 ms, total: 2.27 s
Wall time: 13 s


In [239]:
%%time
print(ds_zarr.analysed_sst[150:214,:,:].sel(lat=slice(0,50),lon=slice(-170,-110)).mean().values)
#ds_zarr.analysed_sst.sel(time=time_slice).mean().values

294.760208857499
CPU times: user 1.44 s, sys: 85.1 ms, total: 1.53 s
Wall time: 3.71 s


In [243]:
import s3fs

s3 = s3fs.S3FileSystem(anon=False, client_kwargs=dict(region_name='us-east-1'))
s3_dir = 'ds-data-projects/eodc/eodc/mursst_zarr/1x1799x3600'
s3_store = s3fs.S3Map(root=s3_dir, s3=s3, check=True)
ds=xr.open_zarr(s3_store)

In [244]:
ds

## Performance testing different chunking

In [110]:
time_slice = slice(datetime.strptime(f"{year}-06-02", '%Y-%m-%d'), datetime.strptime(f"{year}-06-04", '%Y-%m-%d'))
chunk_opts = ['1x1799x3600', '5x1000x1000', '1x3600x7200']
n_tests = 10
test_results = {}
for chunk_opt in chunk_opts:
    print(f"chunk_opt {chunk_opt}")
    ds_zarr = xr.open_zarr(f"/fsx/eodc/mursst_zarr/{chunk_opt}")
    durations = []
    for testi in range(n_tests):
        start = time.time()
        ds_zarr.analysed_sst[1,:,:].sel(lat=slice(0,50),lon=slice(-170,-110)).mean().values
        ds_zarr.analysed_sst.sel(time=time_slice).mean().values
        end = time.time()
        durations.append(end - start)
    test_results[chunk_opt] = np.sum(durations)/n_tests
        
test_results

chunk_opt 1x1799x3600
chunk_opt 5x1000x1000
chunk_opt 1x3600x7200


{'1x1799x3600': 1.7027885675430299,
 '5x1000x1000': 2.600662112236023,
 '1x3600x7200': 1.9275601387023926}

In [111]:
time_slice = slice(datetime.strptime(f"{year}-06-02", '%Y-%m-%d'), datetime.strptime(f"{year}-06-04", '%Y-%m-%d'))
chunk_opts = ['1x1799x3600', '5x1000x1000', '1x3600x7200']
n_tests = 10
test_results = {}
for chunk_opt in chunk_opts:
    print(f"chunk_opt {chunk_opt}")
    ds_zarr = xr.open_zarr(f"/fsx/eodc/mursst_zarr/{chunk_opt}")
    durations = []
    for testi in range(n_tests):
        start = time.time()
        ds_zarr.analysed_sst[1,:,:].sel(lat=slice(0,50),lon=slice(-170,-110)).mean().values
        ds_zarr.analysed_sst.sel(time=time_slice).mean().values
        end = time.time()
        durations.append(end - start)
    test_results[chunk_opt] = np.sum(durations)/n_tests
        
test_results

chunk_opt 1x1799x3600
chunk_opt 5x1000x1000
chunk_opt 1x3600x7200


{'1x1799x3600': 1.63055739402771,
 '5x1000x1000': 2.6444275856018065,
 '1x3600x7200': 1.914821243286133}

The best option is 1x1799x3600

### Handle Failures through by creating a subset and replacing it

In [None]:
# %%time
# start_doy = 157
# end_doy = 162 
# fileObjs = generate_file_list(start_doy, end_doy)
# print(f"start doy: {start_doy}, file: {fileObjs[0].split('/')[-1]}")
# print(f"end doy: {end_doy}, file: {fileObjs[-1].split('/')[-1]}")          
# ds = xr.open_mfdataset(fileObjs, chunks=chunks, parallel=True, combine='by_coords')
# ds = ds.astype(np.float64)
# subset_dir = 'subset'
# ds.to_zarr(subset_dir, consolidated=True, mode='w')
# existing_group = zarr.open(store=store_dir)
# subset_group = zarr.open(store=subset_dir)
# zarr.copy(subset_group, existing_group, name='mursst', if_exists='replace')