In [1]:
import numpy as np
import pandas as pd
import xarray as xr
import dask
import os, sys
import glob
import zarr
from joblib import Parallel, delayed
import os

sys.path.append('/')
from libraries import *

dates = pd.date_range(start='2011-01-01T00', end='2020-12-31T23', freq='h')
zarr_store = '/data/harish/CERRA_wind_profiles_and_Chebyshev_coefficients/CERRA_height_level_winds.zarr'

- My initial idea has been to combine wind speed data at analysis and forecast times.
- However, I mistakenly downloaded analysis data for remaining vertical levels, rather than forecast. 
- I can still download the forecast data, but it takes too much time. 
- Thus, I first combine the analysis data, across height levels. 

In [7]:
client.close()
cluster.close()

In [2]:
print("Starting parallel computing...")
import dask.distributed as dd
cluster = dd.LocalCluster(n_workers=12,threads_per_worker=1,memory_limit='16GB',dashboard_address='8787')
# Connect to the cluster
client = dd.Client(cluster)
print(client)

Starting parallel computing...
<Client: 'tcp://127.0.0.1:39583' processes=12 threads=12, memory=178.81 GiB>


In [3]:
def preprocess(ds):
    ds['time'] = ds['valid_time']
    ds = ds.drop(['valid_time', 'step', 'latitude','longitude'])
    return ds
def preprocess_2(ds):
    '''
    This script process the remaining height level forecast data
    '''
    ds = ds.rename({'valid_time':'time'})
    ds = ds.drop(['expver','latitude','longitude'])
    return ds

# Initializing a zarr by reading sample data
- Once created, no need to repeat again.

In [4]:
def init_zarr_store(zarr_store, dates):
    ds = xr.open_dataset(f'/media/harish/External_3/CERRA_ws_15_30_50_75_200_250_300_400_500/2020/CERRA_gridded_15_30_50_75_200_250_300_400_500_wind_2020_1.nc').ws
    ds = preprocess(ds)
    template = ds.chunk({'time': 1,'y': -1,'x': -1, 'heightAboveGround':1}).pipe(xr.zeros_like).isel(time=0,heightAboveGround=0,
                                                                        drop=True).expand_dims(time=len(dates), heightAboveGround=len(CERRA_levels))
    template['time'] = dates
    template['heightAboveGround'] = CERRA_levels
    template = template.chunk({'time': 24,'y':256,'x':256,'heightAboveGround':12})
    template = template.transpose('time','y','x','heightAboveGround')
    template.to_dataset(name = 'wind_speed').to_zarr(zarr_store, compute=False, consolidated=True, mode='w')
    return template
'''
Initialize the zarr store, which creates the zarr store in disk, with zeros. 
Once created, better to chose append mode for further operations or else it will overwrite the existing data.
'''
#init_zarr_store(zarr_store, dates)

Unnamed: 0,Array,Chunk
Bytes,4.37 TiB,72.00 MiB
Shape,"(87672, 1069, 1069, 12)","(24, 256, 256, 12)"
Dask graph,91325 chunks in 5 graph layers,91325 chunks in 5 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 4.37 TiB 72.00 MiB Shape (87672, 1069, 1069, 12) (24, 256, 256, 12) Dask graph 91325 chunks in 5 graph layers Data type float32 numpy.ndarray",87672  1  12  1069  1069,

Unnamed: 0,Array,Chunk
Bytes,4.37 TiB,72.00 MiB
Shape,"(87672, 1069, 1069, 12)","(24, 256, 256, 12)"
Dask graph,91325 chunks in 5 graph layers,91325 chunks in 5 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [5]:
def open_file(file_path,var, preprocess_fn=None):
    chunks = {'x': 256, 'y': 256}
    try:
        ds = xr.open_dataset(file_path, chunks=chunks)[f'{var}']
        if preprocess_fn:
            ds = preprocess_fn(ds)
        return ds
    except Exception as e:
        print(f"Error opening file: {file_path}: {e}")
        return None
    
def read_monthly_data(year, month):

    # File paths
    file_paths = {
        "ds_10m": f"/media/harish/External_3/CERRA_ws10/{year}/CERRA_{year}_{month}.nc",
        "ds_10m_1": f"/media/harish/External_3/CERRA_ws10_step1/{year}/CERRA_{year}_{month}.nc",
        "ds_10m_2": f"/media/harish/External_3/CERRA_ws10_step2/{year}/CERRA_{year}_{month}.nc",
        "ds_100m": f"/media/harish/External_3/CERRA_ws100/{year}/CERRA_gridded_100_m_wind_{year}_{month}.nc",
        "ds_150m": f"/media/harish/External_3/CERRA_ws150/{year}/CERRA_gridded_150_m_wind_{year}_{month}.nc",
        "ds_100_150m_1": f"/media/harish/External_3/CERRA_ws_100_150_step1/{year}/CERRA_gridded_wind_{year}_{month}_1.nc",
        "ds_100_150m_2": f"/media/harish/External_3/CERRA_ws_100_150_step2/{year}/CERRA_gridded_wind_{year}_{month}_2.nc",
        "ds_remaining_height": f"/media/harish/External_3/CERRA_ws_15_30_50_75_200_250_300_400_500/{year}/CERRA_gridded_15_30_50_75_200_250_300_400_500_wind_{year}_{month}.nc",
        "ds_remaining_height_1": f"/media/harish/External_3/CERRA_ws_15_30_50_75_200_250_300_400_500_step1/{year}/CERRA_gridded_15_30_50_75_200_250_300_400_500_wind_{year}_{month}_1.nc",
        "ds_remaining_height_2": f"/media/harish/External_3/CERRA_ws_15_30_50_75_200_250_300_400_500_step2/{year}/CERRA_gridded_15_30_50_75_200_250_300_400_500_wind_{year}_{month}_2.nc",
    }

    # Open datasets
    datasets = {
        "ds_10m": open_file(file_paths["ds_10m"],'si10', preprocess),
        "ds_10m_1": open_file(file_paths["ds_10m_1"],'si10', preprocess),
        "ds_10m_2": open_file(file_paths["ds_10m_2"],'si10', preprocess),
        "ds_100m": open_file(file_paths["ds_100m"],'ws', preprocess),
        "ds_150m": open_file(file_paths["ds_150m"],'ws', preprocess),
        "ds_100_150m_1": open_file(file_paths["ds_100_150m_1"],'ws', preprocess),
        "ds_100_150m_2": open_file(file_paths["ds_100_150m_2"],'ws', preprocess),
        "ds_remaining_height": open_file(file_paths["ds_remaining_height"],'ws', preprocess_2) if (year == 2015 and month == 12) else open_file(file_paths["ds_remaining_height"],'ws', preprocess),
        "ds_remaining_height_1": open_file(file_paths["ds_remaining_height_1"],'ws', preprocess_2),
        "ds_remaining_height_2": open_file(file_paths["ds_remaining_height_2"],'ws', preprocess_2),
    }
    
    # Identify problematic files
    problematic_files = [key for key, ds in datasets.items() if ds is None]
    if problematic_files:
        print(f"Skipping {year}-{month} due to the following file issues:")
        for key in problematic_files:
            print(f"  - {file_paths[key]}")
        return None

    # Concatenate datasets
    ws_10m = xr.concat([datasets["ds_10m"], datasets["ds_10m_1"], datasets["ds_10m_2"]], dim='time')
    ws_100_150m = xr.concat([datasets["ds_100m"], datasets["ds_150m"]],dim='heightAboveGround')
    ws_100_150m = xr.concat([ws_100_150m,datasets["ds_100_150m_1"], datasets["ds_100_150m_2"]], dim='time')
    ws_remaining_height = xr.concat([datasets["ds_remaining_height"], datasets["ds_remaining_height_1"], datasets["ds_remaining_height_2"]], dim='time')

    ws_monthly = xr.concat([ws_10m, ws_100_150m, ws_remaining_height], dim='heightAboveGround')
    ws_monthly = ws_monthly.sortby('heightAboveGround')
    ws_monthly = ws_monthly.sortby('time')
    ws_monthly = ws_monthly.chunk({'time': 24, 'heightAboveGround': -1})
    return ws_monthly


In [6]:
def write_chunk(ds_chunk, zarr_store, region):
    """
    Function to write a single chunk to the Zarr store.
    """
    ds_chunk.to_zarr(zarr_store, region=region, mode="r+")

def write_to_zarr_parallel(ds, zarr_store, n_jobs=os.cpu_count()):
    """
    Writes the dataset to the Zarr store in parallel using joblib.
    """
    # Determine the time and height indices
    time_indices_monthly = np.searchsorted(dates.values, ds.time.values)
    start,end = time_indices_monthly[0],time_indices_monthly[-1]
    batch_size = 24

    # List to store all tasks
    tasks = []

    # Iterate over time indices in batches
    for t_idx in range(start, end + 1, batch_size):
        # Calculate the batch range (start to end within bounds)
        batch_end = min(t_idx + batch_size, end + 1)

        # Define the region for this batch
        region = {
            "time": slice(t_idx, batch_end),
        }

        # Select the batch of data
        ds_chunk = (
            ds.sel(time=dates[t_idx:batch_end])
            .to_dataset(name="wind_speed")
        ).drop(['time','heightAboveGround'])

        # Add the task to the task list
        tasks.append(delayed(write_chunk)(ds_chunk, zarr_store, region))

    # Run all tasks in parallel
    with Parallel(n_jobs=n_jobs, verbose=10) as parallel:
        parallel(tasks)

In [None]:
for year in range(2011, 2021, 1):  # Iterate through the years
    for month in range(1, 13):  # Iterate through the months
        try:
            print(f"Processing {year}-{month:02d}...")
            ds = read_monthly_data(year, month)
            if ds is None:
                print(f"Skipping {year}-{month:02d} due to missing or corrupted files.")
            else:
                # Uncomment and replace with your actual Zarr writing function
                write_to_zarr_parallel(ds.load(), zarr_store)
                print(f"Successfully processed {year}-{month:02d}.")
        except Exception as e:
            print(f"Unexpected error in reading {year}-{month:02d}: {e}")

In [8]:
xr.open_zarr(zarr_store).wind_speed.sel(time=slice('2020-01-01','2020-12-31T23')).isel(y=100,x=100).load()