In [1]:
import xarray as xr
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
from cartopy.feature import ShapelyFeature
import cartopy.feature as cfeature
import matplotlib.gridspec as gridspec
from pylab import *
from tqdm.notebook import tqdm
import dask
from dask.distributed import Client, progress, wait
#from dask.diagnostics import ProgressBar
#ProgressBar().register()

In [2]:
chunk_size = {'time': 40000, 'latitude': 10, 'longitude': 10, 'level':50}
data_type_size = 4

num_elements_per_chunk = chunk_size['time'] * chunk_size['latitude'] * chunk_size['longitude'] * chunk_size['level']
memory_per_chunk = num_elements_per_chunk * data_type_size
memory_per_chunk_gb = memory_per_chunk / (1024 ** 3)
print(memory_per_chunk_gb)

0.7450580596923828


In [14]:
# Initialize Dask client
client = Client()

# Define a function to compute with progress and retry on failure
def compute_with_progress(dask_array, retries=3):
    attempt = 0
    while attempt < retries:
        try:
            # Create a future object for the computation
            future = dask_array.persist()  # Persist to keep it in memory
            progress(future)  # Attach the progress bar
            result = future.compute()  # Compute the final result
            return result
        except Exception as e:
            print(f"Attempt {attempt + 1} failed with error: {e}")
            attempt += 1
            if attempt >= retries:
                raise
            print("Retrying...")
            
def save_dataset_with_encoding(dataset, file_path, encoding):
    """
    Save an xarray Dataset to a NetCDF file with specified encoding options for all variables.

    Parameters:
    - dataset (xr.Dataset): The xarray Dataset to save.
    - file_path (str): Path to the NetCDF file.
    - encoding (dict): Encoding options to apply to all variables.
    """
    # Create encoding dictionary with encoding for all variables
    encoding_dict = {var: encoding for var in dataset.data_vars}

    # Save the dataset to a NetCDF file with the specified encoding
    dataset.to_netcdf(file_path, encoding=encoding_dict)
    
# Define common encoding options for all variables
comp = dict(zlib=True, complevel=5)

In [4]:
#client

In [5]:
# Read ERA5 data from google services, takes a little time, not much
url_pl_era5 = 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3'

# Define a reasonable chunk size based on your system's memory
chunk_size = {'time': 40000, 'latitude': 10, 'longitude': 10, 'level':10}
ds_pl = xr.open_dataset(url_pl_era5, engine="zarr", chunks={})

# Pressure levels
pressure_var = ['u_component_of_wind', 'v_component_of_wind']
ds_pl_cut = ds_pl[pressure_var]

del ds_pl

# We have all the data here, so we should first cut for our domain and also select the variables we need
# This takes a little time, not much

# Area Lat: -15 to 15 --- Lon: -85 to -48 AMERICA
lat1,lat2 = 2, 9
lon1,lon2 = 135, 145
levels = [100, 125, 150, 175, 200, 225, 250, 300, 350, 
          400, 450, 500, 550, 600, 650, 700, 750, 775, 
          800, 825, 850, 875, 900, 925, 950, 975, 1000]

wind_data = ds_pl_cut.sel(time=slice("2016-01-01T00:00", "2019-12-31T23:00"), level=levels,
                          latitude=slice(lat2, lat1), longitude=slice(lon1, lon2))

In [22]:
# Calculate wind speed
wind_speed = np.sqrt(wind_data['u_component_of_wind']**2 + wind_data['v_component_of_wind']**2)

# Add wind_speed as a new variable in the Dataset
wind_data['wind_speed'] = wind_speed

# Persist intermediate results to avoid recomputation
wind_data = wind_data.persist()

# Calculate the seasonal mean of u and v components
seasonal_mean_u = wind_data['u_component_of_wind'].groupby('time.season').mean('time').persist()
seasonal_mean_v = wind_data['v_component_of_wind'].groupby('time.season').mean('time').persist()

# Calculate wind speed from the seasonal mean u and v components
seasonal_mean_speed = wind_data['wind_speed'].groupby('time.season').mean('time')
# Calculate wind direction from the seasonal mean u and v components
seasonal_mean_direction_radians = np.arctan2(seasonal_mean_v, seasonal_mean_u)
seasonal_mean_direction_degrees = np.degrees(seasonal_mean_direction_radians).persist()

# Convert direction to the range [0, 360)
seasonal_mean_direction = (seasonal_mean_direction_degrees + 360) % 360

# Persist these seasonal means as well to avoid recomputation
seasonal_mean_speed_pro = seasonal_mean_speed.mean(dim=['latitude', 'longitude'])
seasonal_mean_direction_pro = seasonal_mean_direction.mean(dim=['latitude', 'longitude'])

In [23]:
seasonal_mean_speed_pro.to_netcdf()
seasonal_mean_direction_pro.to_netcdf()

Unnamed: 0,Array,Chunk
Bytes,432 B,108 B
Shape,"(4, 27)","(1, 27)"
Dask graph,4 chunks in 49 graph layers,4 chunks in 49 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 432 B 108 B Shape (4, 27) (1, 27) Dask graph 4 chunks in 49 graph layers Data type float32 numpy.ndarray",27  4,

Unnamed: 0,Array,Chunk
Bytes,432 B,108 B
Shape,"(4, 27)","(1, 27)"
Dask graph,4 chunks in 49 graph layers,4 chunks in 49 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [None]:
fig = plt.figure(figsize=(6,6)) #16,8
gs = gridspec.GridSpec(1,1, left=0.05, right=0.975, hspace=0.05, wspace=0.13, top=0.9, bottom=0.1)
ax = plt.subplot(gs[0],projection= ccrs.PlateCarree())
wind_data.u_component_of_wind[0,0,:,:].plot()
ax.add_feature(cfeature.COASTLINE)
ax.set_extent([120, 180, -10, 20], crs=ccrs.PlateCarree())