In [1]:
import xarray as xr
import os
from pathlib import Path
# Configure Dask with MULTICORE processing for maximum speed
import dask
from dask.distributed import Client, LocalCluster
from dask.diagnostics import ProgressBar
import multiprocessing

# Get number of CPU cores
n_cores = multiprocessing.cpu_count()
print(f"Detected {n_cores} CPU cores")

# Create a local cluster with multiple workers (one per core)
# Adjust n_workers and threads_per_worker based on your system
cluster = LocalCluster(
    n_workers=n_cores - 1,  # Leave 1 core free for system
    threads_per_worker=2,    # 2 threads per worker
    memory_limit='auto',     # Auto-detect memory per worker
    processes=True,          # Use processes (not threads) for true parallelism
    dashboard_address=':8787'  # Optional: view dashboard at localhost:8787
)

# Connect to the cluster
client = Client(cluster)

print(f"Dask Client initialized with {n_cores - 1} workers")
print(f"Dashboard available at: {client.dashboard_link}")
print(client)

# Configure Dask settings for better performance
dask.config.set({
    'array.slicing.split_large_chunks': True,
    'distributed.worker.memory.target': 0.7,
    'distributed.worker.memory.spill': 0.8,
    'distributed.worker.memory.pause': 0.9,
    'distributed.scheduler.worker-saturation': 1.1,  # Allow slight oversubscription
})

print("\nDask cluster ready for parallel processing!")

Detected 32 CPU cores
Dask Client initialized with 31 workers
Dashboard available at: http://127.0.0.1:8787/status
<Client: 'tcp://127.0.0.1:62276' processes=31 threads=62, memory=123.56 GiB>

Dask cluster ready for parallel processing!


In [2]:
HOURLY_DIR = Path(rf"E:\backup\download")      # directory with hourly files
MONTH_DIR = Path(rf"E:\backup\trp_climate_model_data\era5land_1970_2024_monthmean")
QUARTER_DIR = Path(rf"E:\backup\trp_climate_model_data\era5land_1970_2024_qtrmean")
MONTH_DIR.mkdir(parents=True, exist_ok=True)    # output: quarterly means
QUARTER_DIR.mkdir(parents=True, exist_ok=True)    # output: quarterly means

YEARS = range(1980, 1990) # use 1980 which is the first year for testing
QUARTERS = [
    ("01", "02", "03"),  # Q1
    ("04", "05", "06"),  # Q2
    ("07", "08", "09"),  # Q3
    ("10", "11", "12"),  # Q4
]

# Hourly filename pattern:
# One file per month:    YYYY_MM.nc
PATTERN_PER_MONTH = "{y}_{m}.nc"   # for one file per month
os.environ.setdefault("HDF5_USE_FILE_LOCKING", "FALSE")

'FALSE'

In [None]:
# os.getcwd()
# local_data_directory = Path(rf'E:\backup\download')
# """
# file_name = []
# for n in range(1, 4):
#     file_name.append(local_data_directory / f'1980_0{n}.nc')"""

# file_name = local_data_directory / '1980_01.nc'

# ds = xr.open_mfdataset(file_name, engine='netcdf4')

# # Resample from hourly to quarterly data
# monthly_avg = ds.resample(valid_time='M').mean()

# # Alternative: Use 'Q' for quarter end instead of 'QS' (quarter start)
# # quarterly_avg = ds.resample(valid_time='Q').mean()

# print(f"Original shape: {ds.dims}")
# print(f"Quarterly shape: {monthly_avg.dims}")
#monthly_avg

In [None]:
# monthly_avg.to_netcdf(MONTH_DIR / '1980_01_month.nc')

# # Convert monthly averages to quarterly averages
# quarterly_avg = monthly_avg.resample(valid_time='QS').mean()

# print(f"Monthly shape: {monthly_avg.dims}")
# print(f"Quarterly shape: {quarterly_avg.dims}")
# print(f"\nQuarterly time points: {quarterly_avg.valid_time.values}")

In [None]:
# # PARALLEL VERSION: Process multiple files using ALL CPU cores
# # This will be MUCH faster than sequential processing

# import gc
# from concurrent.futures import ProcessPoolExecutor, as_completed
# import time

# def process_single_file(file_path, output_dir):
#     """
#     Process a single file - this function will run in parallel
#     """
#     import xarray as xr
#     from pathlib import Path
    
#     print(f"Processing: {file_path}")
#     start_time = time.time()
    
#     # Open with explicit chunking
#     ds = xr.open_dataset(
#         file_path, 
#         engine='netcdf4', 
#         chunks={'valid_time': 100, 'latitude': 600, 'longitude': 600}
#     )
    
#     # Resample to monthly
#     monthly_avg = ds.resample(valid_time='M').mean()
    
#     # Output file
#     output_file = output_dir / f"{Path(file_path).stem}_month.nc"
    
#     # Compute and save with compression
#     monthly_avg.to_netcdf(
#         output_file, 
#         compute=True,
#         encoding={var: {'zlib': True, 'complevel': 4, 'dtype': 'float32'} 
#                  for var in monthly_avg.data_vars}
#     )
    
#     # Cleanup
#     monthly_avg.close()
#     ds.close()
    
#     elapsed = time.time() - start_time
#     print(f"  ✓ Completed {output_file.name} in {elapsed:.2f} seconds")
    
#     return str(output_file)

# # List of files to process
# files_to_process = [
#     'E:\\backup\\download\\1980_01.nc',
#     'E:\\backup\\download\\1980_02.nc',
#     'E:\\backup\\download\\1980_03.nc'
# ]

# # Process files in parallel using all CPU cores
# print(f"\nProcessing {len(files_to_process)} files in parallel...")
# start_total = time.time()

# # Use ProcessPoolExecutor for true multicore parallelism
# max_workers = multiprocessing.cpu_count() - 1  # Leave one core free
# print(f"Using {max_workers} parallel workers\n")

# with ProcessPoolExecutor(max_workers=max_workers) as executor:
#     # Submit all tasks
#     futures = {
#         executor.submit(process_single_file, f, MONTH_DIR): f 
#         for f in files_to_process
#     }
    
#     # Wait for completion
#     completed_files = []
#     for future in as_completed(futures):
#         try:
#             result = future.result()
#             completed_files.append(result)
#         except Exception as e:
#             print(f"ERROR processing {futures[future]}: {e}")

# total_time = time.time() - start_total
# print(f"\n{'='*60}")
# print(f"All files completed in {total_time:.2f} seconds")
# print(f"Processed {len(completed_files)}/{len(files_to_process)} files successfully")
# print(f"Average time per file: {total_time/len(completed_files):.2f} seconds")
# print(f"{'='*60}")

In [4]:
# FASTEST METHOD: Use Dask's parallel processing with open_mfdataset
# This combines files AND uses all cores for computation

import time

# --------------------------
# PROCESS HOURLY → QUARTERLY
# --------------------------
print("Starting hourly to quarterly processing...")
print(f"Input directory: {HOURLY_DIR}")
print(f"Output directory: {QUARTER_DIR}")

total_processed = 0
total_skipped = 0

for y in YEARS:
    y = int(y)
    for q_idx, (m1, m2, m3) in enumerate(QUARTERS, start=1):
        # Gather monthly hourly files for the quarter
        files_to_process = []
        for month in [m1, m2, m3]:
            monthly_file = HOURLY_DIR / PATTERN_PER_MONTH.format(y=y, m=month)
            if monthly_file.exists():
                files_to_process.append(str(monthly_file))
            else:
                print(f"  [warning] Missing: {monthly_file.name}")

        if not files_to_process:
            print(f"[skip] No hourly files for {y} Q{q_idx}")
            total_skipped += 1
            continue

        # Output file
        q_out = QUARTER_DIR / f"{y}_Q{q_idx}_qmean.nc"

        # If already exists, skip (comment out to reprocess)
        if q_out.exists():
            print(f"[exists] {q_out.name} - skipping")
            continue

        print(f"[processing] {y} Q{q_idx} - combining {files_to_process} file(s)")

        #print(f"Processing {len(files_to_process)} files using Dask parallel computation...")
        start_time = time.time()

        # Open all files with parallel reading enabled
        # The Dask client we created earlier will automatically use all cores
        combined_ds = xr.open_mfdataset(
            files_to_process,
            engine='netcdf4',
            chunks={'valid_time': 100, 'latitude': 600, 'longitude': 600},
            parallel=True,  # Enable parallel file reading
            combine='by_coords',
            coords='minimal',
            compat='override'
        )

        print(f"Loaded combined dataset: {combined_ds.dims}")

        # Resample to monthly - this creates a lazy task graph
        quarterly_avg = combined_ds.resample(valid_time='Q').mean()

        print(f"Quarter averaged shape: {quarterly_avg.dims}")
        print(f"Computing in parallel across all CPU cores...")

        # The compute happens here and uses ALL CPU cores automatically
        quarterly_avg.to_netcdf(
            q_out,
            compute=True,  # Trigger parallel computation
            encoding={var: {'zlib': True, 'complevel': 4, 'dtype': 'float32'} 
                    for var in quarterly_avg.data_vars}
        )

        # Cleanup
        quarterly_avg.close()
        combined_ds.close()

        elapsed = time.time() - start_time
        print(f"\n{'='*60}")
        print(f"✓ Completed in {elapsed:.2f} seconds using multicore processing")
        print(f"Output: {q_out.name}")
        print(f"{'='*60}\n")

Starting hourly to quarterly processing...
Input directory: E:\backup\download
Output directory: E:\backup\trp_climate_model_data\era5land_1970_2024_qtrmean
[exists] 1980_Q1_qmean.nc - skipping
[exists] 1980_Q2_qmean.nc - skipping
[exists] 1980_Q3_qmean.nc - skipping
[exists] 1980_Q4_qmean.nc - skipping
[exists] 1981_Q1_qmean.nc - skipping
[exists] 1981_Q2_qmean.nc - skipping
[exists] 1981_Q3_qmean.nc - skipping
[processing] 1981 Q4 - combining ['E:\\backup\\download\\1981_10.nc', 'E:\\backup\\download\\1981_11.nc', 'E:\\backup\\download\\1981_12.nc'] file(s)
Loaded combined dataset: Frozen({'valid_time': 2208, 'latitude': 1801, 'longitude': 3600})
Quarter averaged shape: Frozen({'valid_time': 1, 'latitude': 1801, 'longitude': 3600})
Computing in parallel across all CPU cores...

✓ Completed in 234.74 seconds using multicore processing
Output: 1981_Q4_qmean.nc

[exists] 1982_Q1_qmean.nc - skipping
[exists] 1982_Q2_qmean.nc - skipping
[exists] 1982_Q3_qmean.nc - skipping
[exists] 1982_Q

In [15]:
xr.open_dataset(q_out)