# Calculate wind speed (hourly)

In [13]:
from dask.distributed import Client,LocalCluster
from dask_jobqueue import PBSCluster

In [14]:
# One node on Gadi has 48 cores - try and use up a full node before going to multiple nodes (jobs)

walltime = "01:00:00"
cores = 24
memory = str(4 * cores) + "GB"

cluster = PBSCluster(walltime=str(walltime), cores=cores, memory=str(memory), processes=cores,
                     job_extra_directives=["-q normal",
                                           "-P w42",
                                           "-l ncpus="+str(cores),
                                           "-l mem="+str(memory),
                                           "-l storage=gdata/w42+gdata/rt52"],
                     local_directory="$TMPDIR",
                     job_directives_skip=["select"])

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43115 instead


In [15]:
cluster.scale(jobs=1)
client = Client(cluster)

In [16]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: /proxy/43115/status,

0,1
Dashboard: /proxy/43115/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.6.121.4:34389,Workers: 0
Dashboard: /proxy/43115/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [2]:
import xarray as xr
import numpy as np

import matplotlib.pyplot as plt

In [48]:
%cd /g/data/w42/dr6273/work/energy_climate_modes

import functions as fn

/g/data/w42/dr6273/work/energy_climate_modes


In [47]:
%load_ext autoreload
%autoreload 2

In [6]:
years = range(1959, 2023)

In [10]:
root_path = "/g/data/rt52/era5/single-levels/reanalysis/"
write_path = "/g/data/w42/dr6273/work/data/era5/100w/hourly/"

In [50]:
aus_region = fn.get_east_Aus_boundary()

# Compute wind speed by year

In [5]:
def windspeed(u, v):
    """
    Compute windspeed from u and v
    
    u: array of zonal wind
    v: array of meridional wind
    """
    return np.sqrt(u ** 2 + v ** 2)

In [30]:
def preprocess(ds):
    """
    Preprocess function for open_mfdataset.
    Selects Australian region and renames coords.
    """
    ds = ds.sel(
        longitude=slice(aus_region[0], aus_region[1]),
        latitude=slice(aus_region[2], aus_region[3])
    )
    ds = ds.rename(
        {'longitude': 'lon',
         'latitude': 'lat'}
    )
    ds = ds.chunk({"time": -1, "lat": -1, "lon": -1})
    return ds

In [31]:
def load_hourly(preprocess, variable, year, first_hour, data_path=root_path):
    """
    Load and preprocess hourly data for a given year
    
    preprocess: preprocess function
    variable: name of variable to process
    year: year to process
    first_hour: desired first hour from which to compute 24-hour aggregations
    data_path: path to hourly data
    """
    # Open all hours in the year (~33 GB)
    hourly = xr.open_mfdataset(
        data_path + variable + "/" + str(year) + "/*.nc",
        preprocess=preprocess
    )

    # Start the aggregation on the desired hour (e.g. 0000)
    data_first_hour = hourly["time"].dt.hour.item(0)
    desired_start_index = (first_hour - data_first_hour) % 24
    hourly = hourly.isel(time=range(desired_start_index, len(hourly["time"])))
    
    return hourly

In [55]:
for year in years:
    if year in [1959, 1980, 2000, 2020]:
        print(year)
        
    u100 = load_hourly(
        preprocess,
        "100u",
        year,
        0
    )

    v100 = load_hourly(
        preprocess,
        "100v",
        year,
        0
    )
    
    w100 = windspeed(
        u100.rename({"u100": "w100"}),
        v100.rename({"v100": "w100"})
    )
    w100 = w100.chunk({"time": 24*7*12})
    
    encoding = {
        "w100": {"dtype": "float32"}
    }
    w100.to_netcdf(
        write_path + "100w_era5_hourly_" + str(year) + "_east_Aus.nc",
        mode="w",
        encoding=encoding
    )

1959
1980
2000
2020


# Close cluster

In [35]:
client.close()
cluster.close()