In [1]:
import os
import xarray as xr
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

import climtas.nci
from dask.distributed import Client, as_completed

In [2]:
def himawari_era5_diff(date):

    date_dt = datetime.strptime(date, "%m-%Y")
    year = date_dt.strftime("%Y")
    month = date_dt.strftime("%m")
    
    ##### Himawari Data
    if date_dt <= datetime.strptime('2019-03-31', '%Y-%m-%d'):
        version = 'v1.0'
    else:
        version = 'v1.1'
    directory=Path(f'/g/data/rv74/satellite-products/arc/der/himawari-ahi/solar/p1s/{version}/{year}/{month}')
    files = sorted(str(p) for p in directory.rglob("*.nc"))
    def _preprocess(ds):
        return ds.drop_vars(set(ds.data_vars) - {'surface_global_irradiance'})
    
    himawari = xr.open_mfdataset(
        files,
        combine='by_coords',
        preprocess=_preprocess,
        engine='h5netcdf',
    )
    himawari = himawari.chunk({'time':100})
    

    ##### ERA5 Data
    era5_dir = [
        Path(f"/g/data/rt52/era5/single-levels/reanalysis/msdwswrf/{year}"),
    ]
    era5_file = [f for d in era5_dir for f in d.glob(f"msdwswrf_era5_oper_sfc_{year}{month}*.nc")][0]
    era5 = xr.open_dataset(era5_file, chunks={'time':100})
    
    # Restrict ERA5 domain to Himawari domain
    era5_aus = era5.sel(
        latitude=slice(-10, -44.5),
        longitude=slice(156.26, 112)
    )
    
    # Get monthly mean
    himawari_clim = himawari.surface_global_irradiance.mean(dim='time')
    era5_clim = era5_aus.msdwswrf.mean(dim='time')
    
    # Regrid ERA5 for comparison
    era5_clim_interp = era5_clim.interp(
        latitude=himawari_clim.latitude,
        longitude=himawari_clim.longitude,
        method='linear'
    )
    
    # Find the difference
    diff = era5_clim_interp - himawari_clim
    
    # Save results
    file_name = f'msdwswrf-era5-himawari_{year}{month}.nc'
    file_path = Path('/g/data/er8/users/cd3022/Irradiance-comparisons/era5-himawari/monthly/')
    file_path.mkdir(parents=True, exist_ok=True)
    diff.to_netcdf(file_path / file_name)
    return f"Finished processing {year}-{month}"

In [3]:
client = Client(
    n_workers=10,
    threads_per_worker=1
)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 10
Total threads: 10,Total memory: 95.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:40031,Workers: 10
Dashboard: /proxy/8787/status,Total threads: 10
Started: Just now,Total memory: 95.00 GiB

0,1
Comm: tcp://127.0.0.1:39329,Total threads: 1
Dashboard: /proxy/34057/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:33939,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-8d03appf,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-8d03appf

0,1
Comm: tcp://127.0.0.1:35535,Total threads: 1
Dashboard: /proxy/46807/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:44021,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-sds6nsw5,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-sds6nsw5

0,1
Comm: tcp://127.0.0.1:46525,Total threads: 1
Dashboard: /proxy/35165/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:37679,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-ysf116id,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-ysf116id

0,1
Comm: tcp://127.0.0.1:43961,Total threads: 1
Dashboard: /proxy/38745/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:41429,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-2v9mmvfc,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-2v9mmvfc

0,1
Comm: tcp://127.0.0.1:45887,Total threads: 1
Dashboard: /proxy/34929/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:44187,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-fjygr52c,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-fjygr52c

0,1
Comm: tcp://127.0.0.1:41347,Total threads: 1
Dashboard: /proxy/40011/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:40033,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-v2vxlkow,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-v2vxlkow

0,1
Comm: tcp://127.0.0.1:39811,Total threads: 1
Dashboard: /proxy/34907/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:39683,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-emmnoi8r,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-emmnoi8r

0,1
Comm: tcp://127.0.0.1:41997,Total threads: 1
Dashboard: /proxy/46435/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:42141,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-gq5kok0a,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-gq5kok0a

0,1
Comm: tcp://127.0.0.1:40907,Total threads: 1
Dashboard: /proxy/40291/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:44231,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-z7v56pyi,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-z7v56pyi

0,1
Comm: tcp://127.0.0.1:44869,Total threads: 1
Dashboard: /proxy/39801/status,Memory: 9.50 GiB
Nanny: tcp://127.0.0.1:46365,
Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-8b7n8myj,Local directory: /jobfs/141792892.gadi-pbs/dask-scratch-space/worker-8b7n8myj


In [4]:
if __name__ == '__main__':
    date = '09-2022'
    futures = {}
    future = client.submit(himawari_era5_diff, date)
    futures[future] = f"The job for {date}"
    for future in as_completed(futures):
        _ = future.result()  # Raises if the function failed
    
        # Remove reference to free up memory
        del futures[future]

ERROR 1: PROJ: proj_create_from_database: Open of /g/data/hh5/public/apps/miniconda3/envs/analysis3-24.04/share/proj failed
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
Key:       ('chunked_aware_interpnd-7be5a644031cd77dc86210f0781f87d3', 0, 0)
State:     executing
Function:  execute_task
args:      ((subgraph_callable-7d326c09c12f2f51c986210272e3a590, (<function concatenate_axes at 0x148eacf61900>, [array([], shape=(1726, 0), dtype=float64)], [1]), (<function concatenate_axes at 0x148eacf61900>, [array([], dtype=float32)], [0]), array([112.  , 112.02, 112.04, ..., 156.22, 156.24, 156.26], dtype=float32)))
kwargs:    {}
Exception: "ValueError('cannot reshape array of size 0 into shape (0,newaxis)')"
Traceback: '  File "/g/data/hh5/public/apps/miniconda3/envs/analysis3-24.04/lib/python3.10/site-packages/dask/optimization.py", line 1001, in __call__\n    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))\n  File "/g/data/hh5/p

ValueError: cannot reshape array of size 0 into shape (0,newaxis)