# Rechunk variables
This notebook rechunks variables for optimizing operations across the time dimension.

In [1]:
__author__ = "@andrewbrettin"

In [None]:
import os
import sys
import shutil
import warnings
import json
from datetime import datetime
from itertools import product
import numpy as np
import xarray as xr
from rechunker import rechunk
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client

sys.path.append('..')
import utils

In [None]:
## Globals
with open("../paths.json") as paths_json: 
    PATHS = json.load(paths_json)
with open("../globals.json") as globals_json:
    GLOBALS = json.load(globals_json)
    
FILE  = os.path.join(os.path.abspath('.'), 'rechunk.ipynb')

VARIABLES = ['zos', 'SST', 'UBOT', 'VBOT']

Dask

In [4]:
cluster = PBSCluster(walltime='02:30:00')
client = Client(cluster)

In [5]:
cluster.scale(4)
cluster

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/abrettin/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.60:32955,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/abrettin/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [6]:
client.dashboard_link

'https://jupyterhub.hpc.ucar.edu/stable/user/abrettin/proxy/8787/status'

Functions

In [7]:
def rm_stores(*stores):
    for store in stores:
        if os.path.exists(store):
            shutil.rmtree(store)

def execute_rechunk(ds, target_store, temp_store):
    chunks_dict = {
        'time': -1,
        'lat': 48,
        'lon': 48
    }
    max_mem='8GB'
    
    array_plan = rechunk(
        ds, chunks_dict, max_mem, target_store, temp_store=temp_store
    )
    
    array_plan.execute()

In [8]:
def main():
    START_TIME = datetime.now()
    
    for var, init_year, member in product(
            VARIABLES, GLOBALS['init_years'], GLOBALS['members']):
        print(f"LE-{init_year}.{member}.{var}")
        print(datetime.now() - START_TIME)
        
        # Load data as a dataset
        array = utils.data.load_dataset(
            var, init_year, member, chunkedby='space')
        ds = xr.Dataset({var: array})
        ds = ds.chunk({'time': 3650})
        
        # Prepare paths for rechunking
        print("Preparing zarr stores")
        print(datetime.now() - START_TIME)
        target_store = os.path.join(
            PATHS['rechunked'],
            f'LE2-{init_year}.{member}.{var}_rechunked.zarr'
        )
        temp_store = os.path.join(PATHS['tmp'],'temp.zarr')
        rm_stores(target_store, temp_store)
        
        # Rechunk
        print("Rechunking")
        print(datetime.now() - START_TIME)
        execute_rechunk(ds, target_store, temp_store)
        
        # Repeat
        print(f"Completed rechunk for LE-{init_year}.{member}.{var}")
        print(datetime.now() - START_TIME, '\n')
        client.restart()
    
    print("PROCESS_COMPLETED")
    print(START_TIME - datetime.now())
    
    return 0

---

In [None]:
main()

LE-1251.011.SSH
0:00:00.000035
Preparing zarr stores
0:00:00.736510
Rechunking
0:00:00.738820
Completed rechunk for LE-1251.011.SSH
0:01:10.371643 

LE-1251.012.SSH
0:01:14.713106
Preparing zarr stores
0:01:15.202158
Rechunking
0:01:15.603567
Completed rechunk for LE-1251.012.SSH
0:02:20.351156 

LE-1251.013.SSH
0:02:23.564562
Preparing zarr stores
0:02:24.110935
Rechunking
0:02:24.560562
Completed rechunk for LE-1251.013.SSH
0:03:22.068667 

LE-1281.011.SSH
0:03:25.281665
Preparing zarr stores
0:03:25.820717
Rechunking
0:03:26.244054
Completed rechunk for LE-1281.011.SSH
0:04:33.149712 

LE-1281.012.SSH
0:04:36.161316
Preparing zarr stores
0:04:36.639374
Rechunking
0:04:37.084018
Completed rechunk for LE-1281.012.SSH
0:05:44.906164 

LE-1281.013.SSH
0:05:48.346723
Preparing zarr stores
0:05:48.877961
Rechunking
0:05:49.293081
Completed rechunk for LE-1281.013.SSH
0:06:44.157413 

LE-1301.011.SSH
0:06:47.580025
Preparing zarr stores
0:06:48.122607
Rechunking
0:06:48.499406
Completed re

In [9]:
main()

LE-1251.011.SHF
0:00:00.000146
Preparing zarr stores
0:00:00.588535
Rechunking
0:00:00.590744
Completed rechunk for LE-1251.011.SHF
0:01:39.553802 

LE-1251.012.SHF
0:01:45.488998
Preparing zarr stores
0:01:45.909732
Rechunking
0:01:46.128323
Completed rechunk for LE-1251.012.SHF
0:02:53.752917 

LE-1251.013.SHF
0:02:57.712715
Preparing zarr stores
0:02:58.337886
Rechunking
0:02:58.533433
Completed rechunk for LE-1251.013.SHF
0:04:09.235987 

LE-1281.011.SHF
0:04:13.609861
Preparing zarr stores
0:04:14.270614
Rechunking
0:04:14.476724
Completed rechunk for LE-1281.011.SHF
0:05:25.026887 

LE-1281.012.SHF
0:05:29.387913
Preparing zarr stores
0:05:29.810244
Rechunking
0:05:29.997003
Completed rechunk for LE-1281.012.SHF
0:06:38.890103 

LE-1281.013.SHF
0:06:43.244344
Preparing zarr stores
0:06:43.855556
Rechunking
0:06:44.047950
Completed rechunk for LE-1281.013.SHF
0:07:56.222396 

LE-1301.011.SHF
0:08:00.372004
Preparing zarr stores
0:08:01.020692
Rechunking
0:08:01.242468
Completed re

0

In [10]:
client.close()
cluster.close()