# Creates kerchunks from specified pattern or files on Azure Blobs

In [None]:
#!pip install autopep8

import fsspec
import ujson
import xarray as xr
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
from tqdm import tqdm

In [None]:
import adlfs

print(adlfs.__version__)

In [None]:
#If you want to see the available file systems
#fsspec.available_protocols()

## Needs to be improved... i.e. obtained from other credentials

In [None]:
# needs to be improved
account_dict = dict(account_name = "<get this from the azure portal: storage account name>",
               account_key="<get this from the azure portal for this account name: key>")

In [None]:
# Initiate fsspec filesystems for reading and writing
fs_read = fsspec.filesystem("abfs", **account_dict)

#fs_write = fsspec.filesystem("")
fs_write = fsspec.filesystem("file")

#!az login --use-device-code

In [None]:
file_range=(2000,3949) # (1,1000)

# Retrieve list of available files. Can take a long time
So instead used it to find out (by trial and error, could use a binary search here) to get the max number (3948) in this case

In [None]:
blob_name = f"abfs://bay-delta-schism2-v58/eli/simulations/hindcast_clinic2/outputs/schout_0000_{file_range[1]-1}.nc"
print(f'Looking for last blob name: {blob_name}')
files_paths = fs_read.glob(blob_name)

assert len(files_paths) == 1 # else you have specified to far a range, if this fails reduce the range above

In [None]:
# Here we prepend the prefix 'abfs://', which points to Azure Blobs.
#file_pattern = sorted(["abfs://" + f for f in files_paths])# faster if you already know the patterns expected.
file_pattern = [f'abfs://bay-delta-schism2-v58/eli/simulations/hindcast_clinic2/outputs/schout_0000_{i}.nc' 
                for i in range(*file_range)]

In [None]:
file_pattern[0:3]

In [None]:
# seems to hang for later operations if I introspect here ...
#ds = xr.open_dataset(fs_read.open(file_pattern[0]))
#ds

## Generate the zarr jsons for each file

In [None]:
so_dict = dict(default_fill_cache=False, default_cache_type="first")
output_dir = "./hindcast2"

In [None]:
import time
import functools

def retry_on_failure(max_retries=3, delay=1):
    def decorator_retry_on_failure(func):
        @functools.wraps(func)
        def wrapper_retry_on_failure(*args, **kwargs):
            num_retries = 0
            while num_retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except:
                    num_retries += 1
                    time.sleep(delay)
            raise Exception("Function failed after %d retries" % max_retries)
        return wrapper_retry_on_failure
    return decorator_retry_on_failure


In [None]:
# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from a NetCDF file.
@retry_on_failure()
def generate_json_reference(u, output_dir: str):
    with fs_read.open(u, **so_dict) as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        fname = u.split("/")[-1].strip(".nc")
        outf = f"{output_dir}/{fname}.json"
        with open(outf, "wb") as f:
            f.write(ujson.dumps(h5chunks.translate()).encode())
        return outf

In [None]:
import dask

In [None]:
#tasks = [dask.delayed(generate_json_reference)(file, output_dir, dask_key_name=file.split("/")[-1]) for file in file_pattern]
tasks = [dask.delayed(generate_json_reference)(file, output_dir) for file in file_pattern]

In [None]:
tasks[0:1]

## Start dask cluster

In [None]:
import logging

from distributed import Client

client = Client(n_workers=8, silence_logs=logging.ERROR)
client

In [None]:
_=dask.compute(tasks)

## Combine the zarr jsons from above into a single combined one

In [None]:
from kerchunk.combine import MultiZarrToZarr

In [None]:
json_files = [f"{output_dir}/{f.split('/')[-1].strip('.nc')}.json" for f in file_pattern[:-1]]

In [None]:
zz = MultiZarrToZarr(json_files,
                     remote_protocol='abfs',remote_options=account_dict,
                     concat_dims=['time'], identical_dims=['nSCHISM_hgrid_node', 'nSCHISM_vgrid_layers'])

In [None]:
with open(f'hindcast2_combined_file_{file_range[0]}_{file_range[1]}.json','wb') as ofh: 
    ofh.write(ujson.dumps(zz.translate()).encode())

## All of the above has a convenience function available...
Discovered this later, still in progress as the above gives greater flexibility yet.

from kerchunk.combine import auto_dask

auto_dask(json_files, single_driver=SingleHdf5ToZarr, single_kwargs = so_dict, mzz_kwargs = {}, 
          n_batches=8, remote_protocol='abfs', remote_options=account_dict,
          filename = f'hindcast2_combined_file_{file_range[0]}_{file_range[1]}.json')

## See other notebook to read using the combined files

[using combined json files to read](./azure_kerchunk_read_combined.ipynb)