In [24]:
import os
import gc
import dask
import dask.array as da
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
from dask.distributed import Client, LocalCluster, progress
gc.collect()

os.chdir(
    "/mnt/cloud/wwu1/ec_bronze/_nogroup/ae78a1ca-a0e8-4e4e-8992-69c34947db65/UseCase_AIRCON"
)

# Define a custom function to calculate wind speed
def wind_speed(u, v):
    return da.sqrt(u**2 + v**2)

def wind_dir(u, v):
    return (270 - da.arctan2(u, v) * 180 / np.pi) % 360

scale_factor_wd = 360 / (2**16 - 1)  # applies to all datasets

In [25]:
client = Client()
print(client)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 41907 instead


<Client: 'tcp://127.0.0.1:39255' processes=4 threads=16, memory=64.00 GiB>


In [33]:
def process_wind(y):
    out_ws = f"supplementary/era5_download/ERA5_10m_wind_speed_hrly_{y}.nc"  
    out_wd = f"supplementary/era5_download/ERA5_10m_wind_direction_hrly_{y}.nc"  
    
    if not os.path.isfile(out_wd) or os.path.getsize(out_wd) < 50000 or not os.path.isfile(out_ws) or os.path.getsize(out_ws) < 50000:
        chunkies = "auto"
        d = xr.open_mfdataset([
        f"supplementary/era5_download/ERA5_10m_u_component_of_wind_hrly_{y}.nc",
        f"supplementary/era5_download/ERA5_10m_v_component_of_wind_hrly_{y}.nc",
        ], chunks = chunkies,
           combine = "by_coords",
           parallel = True)  
        
        if 'expver' in d.dims:
            d = d.reduce(np.nansum, 'expver')
            print(f"\nFound experimental version of data (recent dates) in {y}. Reducing dimension.")
        
        d["ws"] = wind_speed(d.u10, d.v10)
        mini = d.ws.min().compute().values.item()
        maxi = d.ws.max().compute().values.item()
        scale_factor_ws = (maxi - (-mini)) / (2**16 - 1)
        ws = (d["ws"].to_dataset().to_netcdf(
                out_ws,
                compute=False,
                encoding={
                    "ws": {
                        "dtype": "int16",
                        "scale_factor": scale_factor_ws,
                        "_FillValue": -9999,
                    }}))
        
        d["wd"] = wind_dir(d.u10, d.v10)
        wd = (d["wd"].to_dataset().to_netcdf(
                out_wd,
                compute=False,
                encoding={
                    "wd": {
                        "dtype": "int16",
                        "scale_factor": scale_factor_wd,
                        "_FillValue": -9999,
                    }}))
        res = [ws, wd]
        del d    
        return res
        del res

In [30]:
%%time

years = range(2015, 2024)
lazy_results = []

for y in years:
    p_wind = process_wind(y)
    lazy_results.append(p_wind[0])
    lazy_results.append(p_wind[1])
    print(y, sep=' ', end=' ', flush=True)

2015 2016 2017 2018 2019 2020 2021 2022 Found experimental version of SRR data (recent dates). Reducing dimension.
2023 CPU times: user 2min 1s, sys: 26.2 s, total: 2min 28s
Wall time: 18min 20s


In [31]:
print(len(lazy_results))

lazy_results = [x for x in lazy_results if x is not None]
print(len(lazy_results))

18
18


In [32]:
agg_res = dask.persist(*lazy_results)
progress(agg_res)

VBox()