In [1]:
# This script is a template for rechunking a ZARR dataset
# To go from chunked as images to chunked as timeseries


import xarray
import numpy as np
import zarr
from dask.diagnostics import ProgressBar
from dask.distributed import Client, LocalCluster, progress
from rechunker import rechunk
from tqdm import tqdm
import pandas as pd
import gc
import os

In [2]:
from dask.distributed import Client
client = Client(n_workers=6)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 24,Total memory: 59.57 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34259,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 24
Started: Just now,Total memory: 59.57 GiB

0,1
Comm: tcp://127.0.0.1:36301,Total threads: 4
Dashboard: http://127.0.0.1:33935/status,Memory: 9.93 GiB
Nanny: tcp://127.0.0.1:35727,
Local directory: /tmp/dask-scratch-space/worker-73z4urju,Local directory: /tmp/dask-scratch-space/worker-73z4urju

0,1
Comm: tcp://127.0.0.1:43753,Total threads: 4
Dashboard: http://127.0.0.1:33013/status,Memory: 9.93 GiB
Nanny: tcp://127.0.0.1:40313,
Local directory: /tmp/dask-scratch-space/worker-sfq_ecpr,Local directory: /tmp/dask-scratch-space/worker-sfq_ecpr

0,1
Comm: tcp://127.0.0.1:33195,Total threads: 4
Dashboard: http://127.0.0.1:35603/status,Memory: 9.93 GiB
Nanny: tcp://127.0.0.1:41563,
Local directory: /tmp/dask-scratch-space/worker-n1mvsony,Local directory: /tmp/dask-scratch-space/worker-n1mvsony

0,1
Comm: tcp://127.0.0.1:39281,Total threads: 4
Dashboard: http://127.0.0.1:40337/status,Memory: 9.93 GiB
Nanny: tcp://127.0.0.1:46627,
Local directory: /tmp/dask-scratch-space/worker-drds44o6,Local directory: /tmp/dask-scratch-space/worker-drds44o6

0,1
Comm: tcp://127.0.0.1:38081,Total threads: 4
Dashboard: http://127.0.0.1:34153/status,Memory: 9.93 GiB
Nanny: tcp://127.0.0.1:46501,
Local directory: /tmp/dask-scratch-space/worker-f74vze2p,Local directory: /tmp/dask-scratch-space/worker-f74vze2p

0,1
Comm: tcp://127.0.0.1:44595,Total threads: 4
Dashboard: http://127.0.0.1:43263/status,Memory: 9.93 GiB
Nanny: tcp://127.0.0.1:38489,
Local directory: /tmp/dask-scratch-space/worker-42iw0kfq,Local directory: /tmp/dask-scratch-space/worker-42iw0kfq


In [None]:
import shutil
import gc

def rechunk_zarr_store(zarr, chunks, target_zarr, steps = 200, 
                       slice_lengths=None, iterate_over='time',
                      chunk_subset=None, overwrite=False):
    
    if slice_lengths is None:
        slice_lengths = int(len(zarr[iterate_over]) / steps)
    if os.path.exists(target_zarr) and overwrite is not True:
        temp = xarray.open_zarr(target_zarr)
        already_rechunked = len(temp[iterate_over])
        temp.close()
    else:
        already_rechunked = 0
    
    # already_rechunked =0
    for i in tqdm(range(0, len(zarr[iterate_over]), slice_lengths)):
        if i + slice_lengths <= already_rechunked:
            print(f'skipped {i}')
            continue

        if i < already_rechunked and i + slice_lengths > already_rechunked:
            zarr_slice = zarr.isel({iterate_over : slice(already_rechunked, i+slice_lengths)})
        else:
            zarr_slice = zarr.isel({iterate_over : slice(i, i + slice_lengths)})
        
        for v in list(zarr_slice.coords.keys()):
            zarr_slice[v].encoding.clear()
                
        for var in zarr_slice:
            zarr_slice[var].encoding.clear()
        
        
        # zarr_slice.chunk({'data':-1, 'time':-1, 'y':-1, 'x':-1})
        if chunk_subset is not None:
            zarr_slice[chunk_subset] = zarr_slice[chunk_subset].chunk(chunks)
        else:
            zarr_slice = zarr_slice.chunk(chunks)
            
        if i == 0:
            zarr_slice.to_zarr(target_zarr, mode="w")
        else:
            zarr_slice.to_zarr(target_zarr, append_dim=iterate_over)
        del zarr_slice
        gc.collect()

In [None]:
sarah = xarray.open_zarr('/scratch/snx3000/kschuurm/ZARR/SARAH3.zip')

# fulldisk2 = xarray.open_zarr('/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK.zarr')


In [None]:
target_zarr = '/scratch/snx3000/kschuurm/ZARR/SARAH3_timeseries_chunk.zarr'

chunks = {'channel':-1, 'time':-1, 'lat':50, 'lon':50}

rechunk_zarr_store(sarah, chunks, target_zarr, slice_lengths=5000, 
                   iterate_over='time', chunk_subset='channel_data')

In [None]:
test = = xarray.open_zarr('/scratch/snx3000/kschuurm/ZARR/SEVIRI_2018.zarr')

In [None]:
sarah.transpose('data', 'time', 'y','x')

In [None]:
seviri = xarray.open_zarr('/scratch/snx3000/kschuurm/ZARR/SEVIRI_FULLDISK.zarr')
seviri

# Combine ZARR|

In [None]:

target_zarr = '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK.zarr'

zarrstores_fn = {
    2016: '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK_2016.zarr',
    2017: '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK_2017.zarr',
    2018: '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK_2018.zarr',
    2019: '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK_2019.zarr',
    2020: '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK_2020.zarr',
    2021: '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK_2021.zarr',
    2022: '/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK_2022.zarr',
}

zarrstores = {k: xarray.open_zarr(v) for k,v in zarrstores_fn.items()}

In [None]:
first = True

already_processed = None

if os.path.exists(target_zarr):
    first = False
    a = xarray.open_zarr(target_zarr)
    already_processed = a.time.values
    a.close()
    
for year, store in zarrstores.items():
    
    yr_store = store.sel(time=(store.time.dt.year == year)).drop_duplicates(dim='time', keep='first')
    timeindex = np.sort(np.array(list(set(yr_store.time.values))))
    timeindex = pd.DatetimeIndex(timeindex)
    idx_start = 0
    idx_end = len(timeindex)
    print(idx_end)
    
    steps = 20000
    for istart in tqdm(range(idx_start, idx_end, steps)):
        iend = min([istart+steps, idx_end])
        
        times = timeindex[slice(istart, iend)]
        ds_slice = yr_store.sel(time=times)


        for v in list(ds_slice.coords.keys()):
#             if ds_slice.coords[v].dtype == object:
            ds_slice[v].encoding.clear()

        for var in ds_slice:
            ds_slice[var].encoding.clear()

        ds_slice['channel_data'] = ds_slice['channel_data'].chunk({'channel':-1, 'time':1, 'x':-1, 'y':-1})
        ds_slice['time'] = ds_slice.time.chunk({'time':-1})


        if first:
            ds_slice.to_zarr(target_zarr, mode='w')
            first = False
        else:
            ds_slice.to_zarr(target_zarr, append_dim ='time')

        gc.collect()


In [None]:
import zarr
import xarray

fulldisk = xarray.open_zarr('/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK.zarr')


store = zarr.storage.ZipStore('/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK.zip')

for v in list(fulldisk.coords.keys()):
    fulldisk[v].encoding.clear()

for var in fulldisk:
    fulldisk[var].encoding.clear()

fulldisk['channel_data'] = fulldisk['channel_data'].chunk({'channel':-1, 'time':1, 'x':-1, 'y':-1})
fulldisk['time'] = fulldisk.time.chunk({'time':-1})
fulldisk.to_zarr(store)

In [None]:

fulldisk = xarray.open_zarr('/capstor/scratch/cscs/kschuurm/ZARR/SEVIRI_FULLDISK.zarr')

In [None]:
fulldisk

In [None]:
fulldisk.drop_duplicates(dim='time')

In [5]:
a = xarray.open_zarr('/scratch/snx3000/kschuurm/ZARR/SEVIRI_FULLDISK_Italy.zarr')
a = a.drop_duplicates('time')

In [4]:
b = xarray.open_zarr('/scratch/snx3000/kschuurm/ZARR/SOLARPOS_Italy.zarr')
b

Unnamed: 0,Array,Chunk
Bytes,16.61 GiB,32.96 MiB
Shape,"(3, 51613, 240, 240)","(3, 100, 240, 240)"
Dask graph,517 chunks in 2 graph layers,517 chunks in 2 graph layers
Data type,float16 numpy.ndarray,float16 numpy.ndarray
"Array Chunk Bytes 16.61 GiB 32.96 MiB Shape (3, 51613, 240, 240) (3, 100, 240, 240) Dask graph 517 chunks in 2 graph layers Data type float16 numpy.ndarray",3  1  240  240  51613,

Unnamed: 0,Array,Chunk
Bytes,16.61 GiB,32.96 MiB
Shape,"(3, 51613, 240, 240)","(3, 100, 240, 240)"
Dask graph,517 chunks in 2 graph layers,517 chunks in 2 graph layers
Data type,float16 numpy.ndarray,float16 numpy.ndarray


In [9]:
for v in list(a.coords.keys()):
    a[v].encoding.clear()

for var in a:
    a[var].encoding.clear()

In [10]:
a.to_zarr('/scratch/snx3000/kschuurm/ZARR/SEVIRI_FULLDISK_Italy2.zarr', mode='w')

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


<xarray.backends.zarr.ZarrStore at 0x1554be0d59e0>