In [None]:
from dask.distributed import LocalCluster

cluster = LocalCluster(n_workers=20)  # Fully-featured local Dask cluster
client = cluster.get_client()
client

In [None]:
import xarray as xr


long_short_name_dict = {"2m_temperature": "t2m"}


def gen_file_list(
    variable: str,
    start_datetime: str,
    end_datetime: str,
    time_resolution: str,  # e.g., "hour", "day", "month", "year"
    time_agg_method: str,  # e.g., "mean", "max", "min"
):
    file_list = []
    if time_resolution == "hour":
        start_year = start_datetime[:4]
        end_year = end_datetime[:4]
        for year in range(int(start_year), int(end_year) + 1):
            file_path = f"/data/era5/raw/{variable}/{variable}-{year}.nc"
            file_list.append(file_path)
    else:
        file_path = (
            f"/home/huan1531/iharp-quick-aggregate/data/output/{variable}-{time_resolution}-{time_agg_method}.nc"
        )
        file_list.append(file_path)
    print(file_list)
    return file_list

In [None]:
variable = "2m_temperature"
start_datetime = "2000-01-01 00:00:00"
end_datetime = "2023-12-31 23:00:00"
# # 20% space
# max_lat = 80
# min_lat = -13
# min_lon = 0
# max_lon = 140
# # 10% space
# max_lat = 81
# min_lat = 0
# min_lon = 0
# max_lon = 80
# # Greenland
# max_lat = 85
# min_lat = 60
# min_lon = -70
# max_lon = -10
# Alaska
max_lat = 75
min_lat = 50
min_lon = -170
max_lon = -140
print((max_lat - min_lat) / 180 * (max_lon - min_lon) / 360)

time_resolution = "hour"
time_agg_method = "mean"
time_series_aggregation_method = "mean"


file_list = gen_file_list(variable, start_datetime, end_datetime, time_resolution, time_agg_method)
ds = xr.open_mfdataset(file_list, engine="netcdf4", parallel=True, chunks={"time": 2000})
ds = ds.sel(
    time=slice(start_datetime, end_datetime),
    latitude=slice(max_lat, min_lat),
    longitude=slice(min_lon, max_lon),
)
if time_series_aggregation_method == "mean":
    ts = ds.mean(dim=["latitude", "longitude"])
elif time_series_aggregation_method == "max":
    ts = ds.max(dim=["latitude", "longitude"])
elif time_series_aggregation_method == "min":
    ts = ds.min(dim=["latitude", "longitude"])
ts.compute()

In [6]:
cluster.close()