# Complete partially-finished MERGE step of a historical run
The 48 hour walltime limit on Gadi means that it's not possible to finish the MERGE step in a single job for long (> ~60 year) large ensemble runs. I need a better fix in the long run, but for now this notebook continues each MERGE job where it left off

Here I use `region` to fill the incomplete block and then pick up continue writing with `append`

In [None]:
import os
import json
import math
import glob
import zarrtools
import numpy as np
import xarray as xr

In [None]:
from dask.distributed import Client
from dask_jobqueue import PBSCluster

walltime = '48:00:00'
cores = 48
memory = '192GB'
cluster = PBSCluster(processes=1,
                     walltime=str(walltime), cores=cores, memory=str(memory),
                     job_extra=['-l ncpus='+str(cores),
                                '-l mem='+str(memory),
                                '-P xv83',
                                '-l jobfs=100GB',
                                '-l storage=gdata/xv83+gdata/v14+scratch/v14'],
                     local_directory='$PBS_JOBFS',
                     # env_extra=['export MALLOC_TRIM_THRESHOLD_="0"'],
                     header_skip=["select"])

cluster.scale(jobs=1)
client = Client(cluster)
client

### Change this depending on what `realm` your doing and when it died

In [None]:
realm = 'atmos_isobaric_daily'
incomplete_block = slice(23716, 24024)

### How does the partially-written data look?

In [None]:
test = xr.open_zarr(f'/g/data/xv83/ds0092/CAFE/historical/WIP/c5-d60-pX-hist-19601101-96mem/ZARR/{realm}.zarr')

In [None]:
test['temp'].isel(time=incomplete_block).isnull().sum().values

### Build the dataset

In [None]:
paths_to_merge = sorted(glob.glob('/g/data/xv83/ds0092/CAFE/historical/WIP/c5-d60-pX-hist-19601101-96mem/ZARR/mem???'))
zarr_path = '/g/data/xv83/ds0092/CAFE/historical/WIP/c5-d60-pX-hist-19601101-96mem/ZARR/'
config_file = '/g/data/xv83/ds0092/CAFE/historical/WIP/c5-d60-pX-hist-19601101-96mem/mem004/zarr_specs_CAFE-f6.json'
number_of_time_blocks = 100

with open(config_file) as json_file:
    specs = json.load(json_file)

In [None]:
zarrify_kwargs = specs[realm].copy()
if 'load_chunks' in zarrify_kwargs:
    zarrify_kwargs.pop('load_chunks')
zarr_name = zarrify_kwargs.pop('zarr_name')
zarrify_kwargs['output_file'] = os.path.join(zarr_path, zarr_name)
done_file = os.path.join(zarr_path, f'{zarr_name}{os.path.extsep}done')

In [None]:
import dask
import zarr

print(f'Merging {zarr_name}...', flush=True)
_open = dask.delayed(xr.open_zarr)
datasets = []
merged_files = []
for path in paths_to_merge:
    input_file = os.path.join(path, zarr_name)
    input_zip_file = os.path.join(path, zarr_name+os.path.extsep+'zip')
    # Preferably use DirectoryStore if available, else ZipStore
    if os.path.exists(input_file):
        datasets.append(_open(input_file, consolidated=True))
        merged_files.append(input_file)
    elif os.path.exists(input_zip_file):
        datasets.append(_open(zarr.ZipStore(input_zip_file), consolidated=True))
        merged_files.append(input_zip_file)
    else:
        raise FileNotFoundError(f'Cannot find {input_file} or {input_zip_file}')
dataset = xr.concat(dask.compute(datasets)[0], dim='ensemble').sortby('ensemble')

In [None]:
for var in dataset:
    dataset[var].encoding = {}

### Use `region` to write incomplete block

In [None]:
def _drop_vars(ds, sequence_dim='time'):
    # writing a region means that all the variables MUST have sequence_dim
    to_drop = [v for v in ds.variables if sequence_dim not in ds[v].dims]
    return ds.drop_vars(to_drop)
                      
print(f'Writing {incomplete_block}...', flush=True)
dataset_write = dataset.isel({'time': incomplete_block})
zarrtools.zarrify(_drop_vars(dataset_write),
                  **zarrify_kwargs,
                  region={'time': incomplete_block},
                  compute=True)

### Complete rest of collection using `append`

In [None]:
def _blocks(l, n):
    """Yield successive n-sized chunks from l"""
    for i in range(0, len(l), n):
        yield l[i:i + n]
        
# Loop over time blocks 
BLOCK_DIM = 'time'
time_chunk_size = zarrify_kwargs['save_chunks'][BLOCK_DIM]
N = dataset.sizes[BLOCK_DIM]
block_size = time_chunk_size * math.ceil(N / number_of_time_blocks / time_chunk_size) # round to nearest multiple of time chunks

for idx, block in enumerate(_blocks(range(incomplete_block.stop, N), block_size)):
    print(f'Writing {block} of {N}...', flush=True)
    dataset_write = dataset.isel({BLOCK_DIM: slice(block.start, block.stop)})
    zarrtools.zarrify(dataset_write,
                      **zarrify_kwargs,
                      append_dim=BLOCK_DIM,
                      compute=True)