In [1]:
import xarray as xr
import dask.array as da
import glob
import os
from dask_jobqueue import PBSCluster
from dask.distributed import Client
import numpy as np

In [2]:
# REZ mask
mask_file = "/home/548/cd3022/aus-historical-solar-droughts/data/boundary_files/REZ_mask.npz"
loaded_mask = np.load(mask_file)
mask = loaded_mask["mask"]

# Define processing function

In [3]:
def solar_generation_processing(ds):
    print(ds)
    print(ds.latitude)
    # apply region mask to lat/lon coordinates
    mask_da = xr.DataArray(mask, coords={"latitude": ds.latitude, "longitude": ds.longitude}, dims=["latitude", "longitude"])
    masked_ds = ds.where(mask_da, drop=True)
    
    # get irradiance data, ensuring to flatten and remove all unnecessary nan values
    ghi = masked_ds.surface_global_irradiance.values.ravel()
    dni = masked_ds.direct_normal_irradiance.values.ravel()
    dhi = masked_ds.surface_diffuse_irradiance.values.ravel()
    nan_mask = np.isnan(ghi) # same shape as dni, dhi
    ghi_clean = ghi[~nan_mask]
    dni_clean = dni[~nan_mask]
    dhi_clean = dhi[~nan_mask]
    
    # get correct time and coordinate data, so that it matches up with the remaining irradiance values
    lat_1d = masked_ds.latitude.values
    lon_1d = masked_ds.longitude.values
    lon_grid, lat_grid = np.meshgrid(lon_1d, lat_1d, indexing="xy")
    lat_grid_1d = lat_grid.ravel()
    lon_grid_1d = lon_grid.ravel()
    lat_1d_expanded = np.tile(lat_grid_1d, dataset.sizes["time"])  # Tile lat for all times
    lon_1d_expanded = np.tile(lon_grid_1d, dataset.sizes["time"])  # Tile lon for all times
    time_1d = np.repeat(masked_ds.time.values, len(lat_grid_1d))  # Repeat time for all lat/lon
    lat_1d_expanded_clean = lat_1d_expanded[~nan_mask]
    lon_1d_expanded_clean = lon_1d_expanded[~nan_mask]
    time_1d_clean = time_1d[~nan_mask]
        
    # calculate capacity factors using pvlib
    # the function defined in utils_V2 is essentially the same as the workflow in pv-output-tilting.ipynb
    actual_ideal_ratio = utils_V2.tilting_panel_pr(
        pv_model = 'Canadian_Solar_CS5P_220M___2009_',
        inverter_model = 'ABB__MICRO_0_25_I_OUTD_US_208__208V_',
        ghi=ghi_clean,
        dni=dni_clean,
        dhi=dhi_clean,
        time=time_1d_clean,
        lat=lat_1d_expanded_clean,
        lon=lon_1d_expanded_clean
    )  
    
    # template to refit data to
    mask_template = masked_ds.surface_global_irradiance
    
    # Now need to get data back in line with coordinates
    # fill cf array with nan values so it can fit back into lat/lon coords
    filled = np.empty_like(ghi)
    # nan values outside the data
    filled[nan_mask] = np.nan
    # add the data to the same mask the input irradiance data was taken from
    filled[~nan_mask] = actual_ideal_ratio
    # convert data back into 3D xarray
    reshaped = filled.reshape(mask_template.shape)
    ratio_da = xr.DataArray(reshaped, coords=mask_template.coords, dims=mask_template.dims)
    return ratio_da

# Set up PBSCluster

In [4]:
cluster = PBSCluster(
    queue='normal',  # Specify your PBS queue
    cores=10,                  # Number of cores per worker
    memory='40GB',            # Memory per worker
    walltime='10:00:00',      # Walltime limit
)
client = Client(cluster)
print('cluster done')
# edit path variable to determine data to process
# path = '/g/data/rv74/satellite-products/arc/der/himawari-ahi/solar/p1s/v*/**/*.nc' # all files!!!
path = '/g/data/rv74/satellite-products/arc/der/himawari-ahi/solar/p1s/v1.1/2023/08/01/*.nc'
files = glob.glob(path, recursive = True)
print('files read')

ds = xr.open_mfdataset(files, combine='by_coords', chunks={'time': 5, 'latitude': 1726, 'longitude': 2214})  # Adjust chunk size based on data
print(ds.chunks)
# Apply processing lazily
processed_data = ds.map_blocks(solar_generation_processing)

processed_data = processed_data.persist()  # Store processed data in memory

# Compute mean over spatial dimensions
NEM_performance = processed_data.mean(dim=["latitude", "longitude"], skipna=True)

file_path = '/g/data/er8/users/cd3022/solar_drought/dask_trial'
os.makedirs(file_path, exist_ok=True)

NEM_performance.to_netcdf(f'{file_path}/output.nc', compute=False)  # Async write

# Close the Dask client and cluster when done
try:
    # Perform computation
    result = NEM_performance.compute()  # Trigger execution

    # Save output
    result.to_netcdf(f'{file_path}/output.nc')
    
finally:
    # Cleanup
    client.close()
    cluster.close()

cluster done
files read
Frozen({'time': (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 'latitude': (1726,), 'longitude': (2214,)})
<xarray.Dataset> Size: 0B
Dimensions:                     (time: 0, latitude: 0, longitude: 0)
Coordinates:
  * time                        (time) datetime64[ns] 0B 
  * latitude                    (latitude) float32 0B 
  * longitude                   (longitude) float32 0B 
Data variables:
    crs                         (time) int32 0B 
    surface_global_irradiance   (time, latitude, longitude) float64 0B 
    direct_normal_irradiance    (time, latitude, longitude) float64 0B 
    surface_diffuse_irradiance  (time, latitude, longitude) float64 0B 
    quality_mask                (time, latitude

Exception: Cannot infer object returned from running user provided function. Please supply the 'template' kwarg to map_blocks.