In [1]:
from climakitae.core.data_interface import (
    get_data_options, 
    get_subsetting_options, 
    get_data
)
# import climakitae as ck

import numpy as np
import matplotlib.pyplot as plt
import xarray as xr
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from matplotlib.backends.backend_pdf import PdfPages
import time
from pyproj import Transformer
import geopandas as gpd
from shapely.geometry import Point
import contextily as cx
import psutil
import xarray as xr
import dask
import dask.array as da
from dask.distributed import Client, LocalCluster, progress
from dask.diagnostics import ProgressBar
import logging

# Create a log file
logging.basicConfig(filename='wildfire_output_tests.log', level=logging.INFO)
print('done')

done


In [2]:
# variables_units = ("Fosberg fire weather index","[0 to 100]","Dynamical","hourly",[2.0])
# Define the sets of variables
variables_units_list = [
    ("Air Temperature at 2m", "DegF", "Dynamical", "hourly", [2.0]),
    ("Wind speed at 10m", "mph", "Dynamical", "hourly", [2.0]),
    ("Relative humidity", "percent", "Dynamical", "hourly", [2.0])
]

# Initialize dataset variables
t_ds = None
rh_ds = None
wd_ds = None

# Loop through each set of variables and process them
for variables_units in variables_units_list:
    variable, unit, downscale, timescale, GWL = variables_units
    print(variable, unit, downscale, timescale, GWL)
    print(f"Processing variable: {variable}")
    logging.info(f"Processing variable: {variable}")
    start_time = time.time()
    ds = get_data(
        variable=variable,
        units=unit,
        downscaling_method=downscale,
        resolution="3 km",
        timescale=timescale,
        cached_area="Southern California Edison",
        approach="Warming Level",
        warming_level_window=15,
        warming_level=GWL
    )
    print(f"data retreived in {time.time() - start_time:.2f} seconds.")
    logging.info(f"data retreived in {time.time() - start_time:.2f} seconds.")
    # Assign the dataset to the corresponding variable
    if variable == "Air Temperature at 2m":
        t_ds = ds
    elif variable == "Wind speed at 10m":
        wd_ds = ds
    elif variable == "Relative humidity":
        rh_ds = ds

start_time = time.time()
ds = xr.combine_by_coords([t_ds, rh_ds, wd_ds])
del t_ds, rh_ds, wd_ds

hours_in_year = 365 * 24
# Calculate the hour of the year
hour_of_year = ds['time_delta'].values % hours_in_year
# Assign the new hour_of_year coordinate to the dataset
ds = ds.assign_coords(hour_of_year=('time_delta', hour_of_year))

hours_in_month = np.array([31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]) * 24
cumulative_hours = np.cumsum(hours_in_month)

# Function to map hour_of_year to month
def hour_to_month(hour):
    return np.searchsorted(cumulative_hours, hour) + 1

# Apply the function to calculate the month for each hour_of_year
months = hour_to_month(ds['hour_of_year'].values)
ds = ds.assign_coords(month=('time_delta', months))

print(f"data grouped in {time.time() - start_time:.2f} seconds.")
logging.info(f"data grouped in {time.time() - start_time:.2f} seconds.")

print(ds)

Air Temperature at 2m DegF Dynamical hourly [2.0]
Processing variable: Air Temperature at 2m
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!! Returned data array is huge. Operations could take 10x to infinity longer than 1GB of data !!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

data retreived in 54.31 seconds.
Wind speed at 10m mph Dynamical hourly [2.0]
Processing variable: Wind speed at 10m
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!! Returned data array is huge. Operations could take 10x to infinity longer than 1GB of data !!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

data retreived in 125.25 seconds.
Relative humidity percent Dynamical hourly [2.0]
Processing variable: Relative humidity
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

In [3]:
def fosberg_fire_index(t2_F, rh_percent, windspeed_mph):
    """Compute the Fosberg Fire Weather Index."""
    print("Compute the Fosberg Fire Weather Index...")
    m_low, m_mid, m_high = _equilibrium_moisture_constant(h=rh_percent, T=t2_F)
    m = xr.where(rh_percent < 10, m_low, m_mid)
    m = xr.where(rh_percent > 50, m_high, m)
    n = _moisture_dampening_coeff(m)
    U = windspeed_mph
    tmp = (n * ((1 + U**2) ** 0.5)) / 0.3002
    FFWI = tmp.clip(min=0.0, max=100.0)
    for coord in list(FFWI.coords):
        FFWI[coord].attrs = t2_F[coord].attrs
    FFWI.name = "Fosberg Fire Weather Index"
    FFWI.attrs["units"] = "[0 to 100]"
    return FFWI

def _equilibrium_moisture_constant(h, T):
    print("Compute the EMC...")
    m_low = 0.03229 + 0.281073 * h - 0.000578 * h * T
    m_mid = 2.22749 + 0.160107 * h - 0.01478 * T
    m_high = 21.0606 + 0.005565 * (h**2) - 0.00035 * h * T - 0.483199 * h
    return (m_low, m_mid, m_high)

def _moisture_dampening_coeff(m):
    print("Compute the MDC...")
    n = 1 - 2 * (m / 30) + 1.5 * (m / 30) ** 2 - 0.5 * (m / 30) ** 3
    return n

In [4]:

# Define larger chunk dimensions and data type
chunk_dims = {'warming_level': 1, 'time_delta': 10000, 'y': 240, 'x': 149, 'simulation': 1}
data_type = np.float32  # Change this to the appropriate data type

# Calculate the number of elements in the chunk
num_elements = np.prod(list(chunk_dims.values()))

# Calculate the size of each element
element_size = data_type().nbytes

# Calculate the total memory usage
total_memory_bytes = num_elements * element_size
total_memory_mb = total_memory_bytes / (1024 * 1024)

print(f"Total memory usage for the chunk: {total_memory_mb:.2f} MB")

Total memory usage for the chunk: 1364.14 MB


In [None]:


# Write to the log file
logging.info("Processing FWI")

def print_resource_usage():
    """Print the current CPU and memory usage."""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_info = psutil.virtual_memory()
    memory_percent = memory_info.percent
    logging.info(f"CPU usage: {cpu_percent}%")
    logging.info(f"Memory usage: {memory_percent}%")

# Initialize Dask client
logging.info("Initializing Dask client...")
client = Client(n_workers=14, threads_per_worker=1, memory_limit='10GB', dashboard_address=':8788')
logging.info("Dask client initialized.")
print_resource_usage()

# Define chunk size based on your data dimensions and memory constraints
chunk_size = {'warming_level': 1, 'time_delta': 288, 'y': 24, 'x': 9, 'simulation': 1}
logging.info("Chunking dataset...")
ds_chunked = ds.chunk(chunks=chunk_size)
logging.info("Dataset chunked.")
print_resource_usage()

# Extract the variables from the dataset
logging.info("Extracting variables...")
temp_month = ds_chunked['Air Temperature at 2m']
humidity_month = ds_chunked['Relative humidity']
wind_month = ds_chunked['Wind speed at 10m']
logging.info("Variables extracted.")
print_resource_usage()

# Calculate the Fosberg Fire Weather Index for the current month
logging.info("Calculating Fosberg Fire Weather Index...")
start_time = time.time()
fwi_dask = fosberg_fire_index(temp_month, humidity_month, wind_month)
end_time = time.time()
logging.info(f"Fosberg Fire Weather Index calculated in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Average by hour_of_year dimension to make an hourly climatology
logging.info("Calculating hourly climatology...")
start_time = time.time()
pbar = ProgressBar()
pbar.register()
hour_of_year_mean = fwi_dask.groupby('hour_of_year').mean(dim='time_delta', skipna=True).compute()
end_time = time.time()
logging.info(f"Hourly climatology calculated in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Calculate monthly MAX
logging.info("Calculating monthly maximum...")
start_time = time.time()
pbar = ProgressBar()
pbar.register()
monthly_max_FWI = hour_of_year_mean.groupby('month').max(dim='hour_of_year', skipna=True).compute()
end_time = time.time()
logging.info(f"Monthly maximum calculated in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Save to NetCDF with compression
logging.info("Saving to NetCDF...")
start_time = time.time()
encoding = {monthly_max_FWI.name: {'zlib': True, 'complevel': 5}}
with ProgressBar():
    monthly_max_FWI.to_netcdf('FFWI_GWL2_avg.nc', encoding=encoding)
end_time = time.time()
logging.info(f"Saved to NetCDF in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Close the dataset
logging.info("Closing dataset...")
ds_chunked.close()
logging.info("Dataset closed.")
print_resource_usage()

# Properly shut down the Dask client
logging.info("Shutting down Dask client...")
client.close()
logging.info("Dask client shut down.")

Compute the Fosberg Fire Weather Index...
Compute the EMC...
Compute the MDC...


In [None]:
xx

In [None]:


def print_resource_usage():
    """Print the current CPU and memory usage."""
    cpu_percent = psutil.cpu_percent(interval=1)
    memory_info = psutil.virtual_memory()
    memory_percent = memory_info.percent
    print(f"CPU usage: {cpu_percent}%")
    print(f"Memory usage: {memory_percent}%")

# Initialize Dask client
print("Initializing Dask client...")
client = Client(n_workers=14, threads_per_worker=1, memory_limit='10GB', dashboard_address=':8788')
print("Dask client initialized.")
print_resource_usage()

# Define chunk size based on your data dimensions and memory constraints
chunk_size = {'warming_level': 1, 'time_delta': 288, 'y': 24, 'x': 9, 'simulation': 1}
print("Chunking dataset...")
ds_chunked = ds.chunk(chunks=chunk_size)
print("Dataset chunked.")
print_resource_usage()

# Extract the variables from the dataset
print("Extracting variables...")
temp_month = ds_chunked['Air Temperature at 2m']
humidity_month = ds_chunked['Relative humidity']
wind_month = ds_chunked['Wind speed at 10m']
print("Variables extracted.")
print_resource_usage()

# Calculate the Fosberg Fire Weather Index for the current month
print("Calculating Fosberg Fire Weather Index...")
start_time = time.time()
fwi_dask = fosberg_fire_index(temp_month, humidity_month, wind_month)
end_time = time.time()
print(f"Fosberg Fire Weather Index calculated in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Average by hour_of_year dimension to make an hourly climatology
print("Calculating hourly climatology...")
start_time = time.time()
pbar = ProgressBar()
pbar.register()
hour_of_year_mean = fwi_dask.groupby('hour_of_year').mean(dim='time_delta', skipna=True).compute()
end_time = time.time()
print(f"Hourly climatology calculated in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Calculate monthly MAX
print("Calculating monthly maximum...")
start_time = time.time()
pbar = ProgressBar()
pbar.register()
monthly_max_FWI = hour_of_year_mean.groupby('month').max(dim='hour_of_year', skipna=True).compute()
end_time = time.time()
print(f"Monthly maximum calculated in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Save to NetCDF with compression
print("Saving to NetCDF...")
start_time = time.time()
encoding = {monthly_max_FWI.name: {'zlib': True, 'complevel': 5}}
with ProgressBar():
    monthly_max_FWI.to_netcdf('FFWI_GWL2_avg.nc', encoding=encoding)
end_time = time.time()
print(f"Saved to NetCDF in {(end_time - start_time) / 60:.2f} minutes.")
print_resource_usage()

# Close the dataset
print("Closing dataset...")
ds_chunked.close()
print("Dataset closed.")
print_resource_usage()

# Properly shut down the Dask client
print("Shutting down Dask client...")
client.close()
print("Dask client shut down.")

In [None]:
print(fwi_dask)

<xarray.DataArray 'Fosberg Fire Weather Index' (warming_level: 1,
                                                time_delta: 262800, y: 240,
                                                x: 149, simulation: 8)> Size: 301GB
dask.array<clip, shape=(1, 262800, 240, 149, 8), dtype=float32, chunksize=(1, 288, 24, 9, 1), chunktype=numpy.ndarray>
Coordinates: (12/13)
  * warming_level      (warming_level) float64 8B 2.0
  * x                  (x) float64 1kB -4.236e+06 -4.233e+06 ... -3.792e+06
  * y                  (y) float64 2kB 5.899e+05 5.929e+05 ... 1.307e+06
    lakemask           (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    landmask           (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    lat                (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    ...                 ...
    Lambert_Conformal  int64 8B 0
  * time_delta         (time_delta) float64 2MB -1.314e+05 ... 1.314e+05
    centered_year      

In [None]:
start_time = time.time()
print(start_time)
hour_of_year_mean = fwi_dask.groupby('hour_of_year').mean(dim='time_delta', skipna=True)
print(hour_of_year_mean)
print(f"mean hour of year {time.time() - start_time:.2f} seconds.")

1743022698.2312553


In [None]:
# Apply the function to calculate the month for each hour_of_year
months = hour_to_month(hour_of_year_mean['hour_of_year'].values)
hour_of_year_mean = hour_of_year_mean.assign_coords(month=('hour_of_year', months))

print(hour_of_year_mean)

In [None]:
start_time = time.time()
print(start_time)
monthly_max_FWI = hour_of_year_mean.groupby('month').max(dim='hour_of_year', skipna=True).compute
print(f"mean hour of year {time.time() - start_time:.2f} seconds.")

1743016585.2668443
mean hour of year 239.09 seconds.


In [None]:
print(monthly_max_FWI)

<xarray.DataArray 'Fosberg Fire Weather Index' (warming_level: 1, month: 12,
                                                y: 240, x: 149, simulation: 8)> Size: 14MB
dask.array<transpose, shape=(1, 12, 240, 149, 8), dtype=float32, chunksize=(1, 1, 24, 9, 1), chunktype=numpy.ndarray>
Coordinates:
  * warming_level      (warming_level) float64 8B 2.0
  * x                  (x) float64 1kB -4.236e+06 -4.233e+06 ... -3.792e+06
  * y                  (y) float64 2kB 5.899e+05 5.929e+05 ... 1.307e+06
    lakemask           (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    landmask           (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    lat                (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    lon                (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    Lambert_Conformal  int64 8B 0
    centered_year      (simulation) int64 64B dask.array<chunksize=(1,), meta=np.ndarray>
  * simula

In [None]:
import climakitae as ck
ck.export(monthly_max_FWI, filename="monthly_max_FWI", format="NetCDF")

Exporting specified data to NetCDF...
Saving file locally as NetCDF4...


In [None]:
5181.42/60

In [None]:
5943/60

99.05

In [None]:
# Group by the new month coordinate and calculate the mean
# print(f"Calculating hourly climatology for {variable}...")
# hour_of_year_max = fwi_dask.groupby('hour_of_year').max(dim='time_delta', skipna=True)
# hour_of_year_med = fwi_dask.groupby('hour_of_year').median(dim='time_delta', skipna=True)
# hour_of_year_min = fwi_dask.groupby('hour_of_year').min(dim='time_delta', skipna=True)

In [None]:
print(hour_of_year_mean)

<xarray.DataArray 'Fosberg Fire Weather Index' (warming_level: 1,
                                                hour_of_year: 8760, y: 240,
                                                x: 149, simulation: 8)> Size: 10GB
dask.array<transpose, shape=(1, 8760, 240, 149, 8), dtype=float32, chunksize=(1, 1, 24, 9, 1), chunktype=numpy.ndarray>
Coordinates:
  * warming_level      (warming_level) float64 8B 2.0
  * x                  (x) float64 1kB -4.236e+06 -4.233e+06 ... -3.792e+06
  * y                  (y) float64 2kB 5.899e+05 5.929e+05 ... 1.307e+06
    lakemask           (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    landmask           (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    lat                (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    lon                (y, x) float32 143kB dask.array<chunksize=(24, 9), meta=np.ndarray>
    Lambert_Conformal  int64 8B 0
    centered_year      (simulation) int64

In [None]:
import xarray as xr
from dask.distributed import Client

# Initialize Dask client
# client = Client(n_workers=32, threads_per_worker=1, memory_limit='7GB')

# Define compression settings
encoding = {hour_of_year_mean.name: {'zlib': True, 'complevel': 5}}

# Save to NetCDF with compression
hour_of_year_mean.to_netcdf('FFWI_GWL2_avg.nc', encoding=encoding)

# Monitor Dask progress
# client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43569 instead


In [None]:
ck.export(data_to_use, filename="my_filename1", format="NetCDF")

In [None]:
print('done')