Name: 01_observed_climatology_data_processing<br>
Description: This notebook contains the core code used for proessing Livneh and nClimGrid historic observed climatology data into standand cliamte indices. The Livneh and nClimGrid data files are netCDF format and contine daily tmax, tmin, and prcp. The Python xclim package was used for processing the input data into the indices. xclim Official Documentation https://xclim.readthedocs.io/en/stable/<br>
Date: August 2024<br>
Requirements: Python 3.11.10, xclim 0.47.0, xarray 2023.6.0<br>
Author: Mark Gilbert, Principle GIS Engineer, ArcGIS Living Atlas of the World, Esri (mgilbert@esri.com)<br>
<Br>
Livneh, B., T. J. Bohn, D. W. Pierce, F. Munoz-Arriola, B. Nijssen, R. Vose, D. R. Cayan, and L. Brekke, 2015: A spatially comprehensive, hydrometeorological data set for Mexico, the U.S., and Southern Canada 1950–2013. Scientific Data, 2, https://doi.org/10.1038/sdata.2015.42.<br>
<br>
Durre, I., M. F. Squires, R. S. Vose, A. Arguez, W. S. Gross, J. R. Rennie, and C. J. Schreck, 2022b: NOAA's nClimGrid-Daily Version 1 – Daily gridded temperature and precipitation for the Contiguous United States since 1951. NOAA National Centers for Environmental Information, since 6 May 2022, https://doi.org/10.25921/c4gt-r169


### Python Imports

In [None]:
from __future__ import annotations
from distributed import Client

import xarray as xr
import xclim
from xclim.core import units

import time
import logging
import os
import sys

### Dask Parallel Processing Setup

In [None]:
# Depending on your workstation specifications, you may need to adjust these values.
# On a single machine, n_workers=1 is usually better.
# 
# client = Client(n_workers=1, threads_per_worker=8, memory_limit="12GB")
# client

### Constants

In [None]:
# Set the name of the dataset to process.
# TODO: Add your path to the log file.
#
LOG_FILE_PATH = r"[add_path]\Log_Output"

# TODO: Uncomment the dataset you wish to work with
#
# IN_DATASET = "Livneh"
# IN_PATH = r"C:\noaa_data\Livneh_Source\*.nc"
# OUT_PATH = r"C:\noaa_data\Livneh_Thresholds_netCDF"

IN_DATASET = "nClimGrid"
IN_PATH = r"Z:\nClimGrid_Source\*.nc" # "C:\noaa_data\nClimGrid_Source\*.nc"
OUT_PATH = r"Z:\nClimGrid_Thresholds_netCDF" # "C:\noaa_data\nClimGrid_Thresholds_netCDF"

### Setup Logger

In [3]:
# Setup logging to file and stdout
#
file_time = time.strftime("%Y%m%d-%H%M%S")

log_file_name = "Output_Log_xclim_Processing_" + file_time + ".log"
LOG_FILE = os.path.join(LOG_FILE_PATH, log_file_name)

logging.basicConfig(level = logging.INFO,
                    format="%(asctime)s:%(levelname)s: %(message)s",
                    handlers=[
                       logging.FileHandler(filename=LOG_FILE),
                       logging.StreamHandler(sys.stdout)
                   ]
)

### Load Data Files

In [None]:
# Create dataset from multiple files
#
logging.info("Beginning data processing...")
if IN_DATASET == "Livneh":
    # %time ds_livneh_source = xr.open_mfdataset(r"G:\Livneh\*.nc", drop_variables=["wind"], parallel=True)
    ds_source = xr.open_mfdataset(IN_PATH, drop_variables=["wind"])
    
elif IN_DATASET == "nClimGrid":
    # %time ds_nclimgrid_source = xr.open_mfdataset(r"G:\nClimGrid\*.nc", parallel=True)
    ds_source = xr.open_mfdataset(IN_PATH)
else:
    logging.info(f"{IN_DATASET} does not exist. Ending.")
    sys.exit()
    
logging.info(f"{IN_DATASET} loaded from {IN_PATH}")

2024-08-20 16:05:12,730:INFO: Beginning data processing...
2024-08-20 16:09:25,447:INFO: nClimGrid loaded from Z:\nClimGrid_Source\*.nc


Do some cleanup of poorly documented source files

In [None]:
# Make a deep copy of the source dataset
#
ds_working = ds_source.copy(deep=True)

# Get the begin and end years for the output file names
#
begin_year = ds_working.time.dt.year[0].values.item()
end_year =  ds_working.time.dt.year[-1].values.item()

logging.info(f"Doing data cleanup on {IN_DATASET}...")
if IN_DATASET == "Livneh":
    # Update the variables names for Livneh. They do not follow all the CF Conventions.
    #
    ds_working = ds_working.rename(Prec="prcp")
    ds_working = ds_working.rename(Tmax="tmax")
    ds_working = ds_working.rename(Tmin="tmin")

    # Add a standard_name attribute to Livneh.tmax/tmin. Otherwsie you will get a warning when sending these 
    # variables to the indicators.atmos.tg function.
    #
    ds_working["tmax"] = ds_working.tmax.assign_attrs(standard_name="air_temperature")
    ds_working["tmin"] = ds_working.tmin.assign_attrs(standard_name="air_temperature")
    
    # Add cell_methods attributes to Livneh/nClimGrid tmax/tmin to avoid warnings when calculating annual averages.
    #
    ds_working["tmax"] = ds_working.tmax.assign_attrs(cell_methods='time: maximum within days')
    ds_working["tmin"] = ds_working.tmin.assign_attrs(cell_methods='time: minimum within days')
    
    # Calculate tavg for Livneh. The source does not include it. Needed for CDD/HDD.
    # tavg gets standard_name and cell_methods from the Indicator function
    #
    logging.info(f"Calculating tavg for {IN_DATASET}...")
    ds_working["tavg"] = xclim.indicators.atmos.tg(tasmin=ds_working.tmin, tasmax=ds_working.tmax)

elif IN_DATASET == 'nClimGrid':
    # Add a cell_methods attribute to nClimGrid tavg to avoid a CF warning when calculation CDD/HDD
    #
    ds_working["tavg"] = ds_working.tavg.assign_attrs(cell_methods='time: mean within days')
    ds_working["tmax"] = ds_working.tmax.assign_attrs(cell_methods='time: maximum within days')
    ds_working["tmin"] = ds_working.tmin.assign_attrs(cell_methods='time: minimum within days')

# Both datasets need this attribute to avoid warnings
# A CF Convention for standard_name for precip is "lwe_thickness_of_precipitation_amount".
# Source data had no cell_methods. 
#
ds_working["prcp"] = ds_working.prcp.assign_attrs(standard_name="lwe_thickness_of_precipitation_amount")

### Calculate Thresholds

In [None]:
logging.info(f"Starting threshold processing...")

out_file_list = []
file_suffix = str(begin_year) + "_" + str(end_year) + ".nc"

# Cooling Degree Days in degF-days
# Threshold set at 65 degF per NOAA and Year End as the frequency, e.g. YYYY-12-31
# tavg in nClimGrid does not have a cell_methods attribute but Livneh does. Livneh tavg gets the attribute added
# by the xclim atmos.tg function.
#
logging.info(f"Calculating cooling/heating degree days for {IN_DATASET}")
cooling_degree_days = xclim.indicators.atmos.cooling_degree_days(tas=ds_working.tavg, thresh='65.0 degF', freq='YE')
cooling_degree_days_F_days = units.convert_units_to(cooling_degree_days, "day fahrenheit")
out_file_list.append((cooling_degree_days_F_days, f"{IN_DATASET}_cdd_{file_suffix}"))

# Heating Degree Days in degF-days
#
heating_degree_days = xclim.indicators.atmos.heating_degree_days(tas=ds_working.tavg, thresh='65.0 degF', freq='YE')
heating_degree_days_F_days = units.convert_units_to(heating_degree_days, "day fahrenheit")
out_file_list.append((heating_degree_days_F_days, f"{IN_DATASET}_hdd_{file_suffix}"))

# Annual Average Temperatures in degF (tmax, tmin, tavg)
# TODO: Add cell_methods attribute updates to avoid these warnings
#
logging.info(f"Calculating annual average temperatures for {IN_DATASET}")
tmax_annual_mean = xclim.indicators.atmos.tx_mean(tasmax=ds_working.tmax, freq="YE")
tmin_annual_mean = xclim.indicators.atmos.tn_mean(tasmin=ds_working.tmin, freq="YE")
tavg_annual_mean = xclim.indicators.atmos.tg_mean(tas=ds_working.tavg, freq="YE")

tmax_annual_mean_F = units.convert_units_to(tmax_annual_mean, "degF")
tmin_annual_mean_F = units.convert_units_to(tmin_annual_mean, "degF")
tavg_annual_mean_F = units.convert_units_to(tavg_annual_mean, "degF")

out_file_list.append((tmax_annual_mean_F, f"{IN_DATASET}_tmax_{file_suffix}"))
out_file_list.append((tmin_annual_mean_F, f"{IN_DATASET}_tmin_{file_suffix}"))
out_file_list.append((tavg_annual_mean_F, f"{IN_DATASET}_tavg_{file_suffix}"))


# Resample Precipitation to Monthly and Annual Sums in Inches (pr_annual, pr_monthly)
#
logging.info(f"Calculating pr_annual for {IN_DATASET}")
pr_annual = ds_working.prcp.resample(time="YE").sum()

logging.info(f"Calculating pr_monthly for {IN_DATASET}")
pr_monthly = ds_working.prcp.resample(time="ME").sum()

pr_annual_in = units.convert_units_to(pr_annual, "in")
pr_monthly_in = units.convert_units_to(pr_monthly, "in")
out_file_list.append((pr_annual_in, f"{IN_DATASET}_pr-annual_{file_suffix}"))
out_file_list.append((pr_monthly_in, f"{IN_DATASET}_pr-monthly_{file_suffix}"))

# precipitation max over a window
#
# max_n_day_precipitation_amount requires varialbe in precipitation flux
# Source was originally in mm (thickness).
#
ds_working["prcp"] = units.convert_units_to(ds_working.prcp, "kg m-2 s-1")
# The units conversion doesn't seem to add a cell_methods attribute.
# Setting this to avoid a warning
#
ds_working["prcp"] = ds_working.prcp.assign_attrs(cell_methods='time: mean within days')

logging.info(f"Calculating prmax1day for {IN_DATASET}...")
prmax1day = xclim.indicators.atmos.max_n_day_precipitation_amount(pr=ds_working.prcp, window=1, freq='YE')
prmax1day_in = units.convert_units_to(prmax1day, "in")
out_file_list.append((prmax1day_in, f"{IN_DATASET}_prmax1day_{file_suffix}"))

logging.info(f"Calculating prmax5day for {IN_DATASET}...")
prmax5day = xclim.indicators.atmos.max_n_day_precipitation_amount(pr=ds_working.prcp, window=5, freq='YE')
prmax5day_in = units.convert_units_to(prmax5day, "in")
out_file_list.append((prmax5day_in, f"{IN_DATASET}_prmax5day_{file_suffix}"))

logging.info(f"Calculating prmax10day for {IN_DATASET}...")
prmax10day = xclim.indicators.atmos.max_n_day_precipitation_amount(pr=ds_working.prcp, window=10, freq='YE')
prmax10day_in = units.convert_units_to(prmax10day, "in")
out_file_list.append((prmax10day_in, f"{IN_DATASET}_prmax10day_{file_suffix}"))

logging.info(f"Calculating prmax20day for {IN_DATASET}...")
prmax20day = xclim.indicators.atmos.max_n_day_precipitation_amount(pr=ds_working.prcp, window=20, freq='YE')
prmax20day_in = units.convert_units_to(prmax20day, "in")
out_file_list.append((prmax20day_in, f"{IN_DATASET}_prmax20day_{file_suffix}"))

logging.info(f"Calculating prmax30day for {IN_DATASET}...")
prmax30day = xclim.indicators.atmos.max_n_day_precipitation_amount(pr=ds_working.prcp, window=30, freq='YE')
prmax30day_in = units.convert_units_to(prmax30day, "in")
out_file_list.append((prmax30day_in, f"{IN_DATASET}_prmax30day_{file_suffix}"))

# psmax seasonal
#
logging.info(f"Calculating prmax_seasonal for {IN_DATASET}...")
prmax_seasonal = ds_working.prcp.resample(time="QS-DEC").max(dim="time")
prmax_seasonal_in =  units.convert_units_to(prmax_seasonal, "in")
out_file_list.append((prmax_seasonal_in, f"{IN_DATASET}_prmax-seasonal_{file_suffix}"))

# tmax
#
logging.info(f"Calculating tmax1day for {IN_DATASET}...")
tmax1day = ds_working.tmax.resample(time="YE").max(dim=["time"])
tmax1day_F = units.convert_units_to(tmax1day, "degF")
out_file_list.append((tmax1day_F, f"{IN_DATASET}_tmax1day_{file_suffix}"))

# TMAX days ABOVE a threshold
#
logging.info(f"Calculating tmax_days_ge_85F for {IN_DATASET}...")
tmax_days_ge_85F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='85.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_85F, f"{IN_DATASET}_tmax-days-ge-85F_{file_suffix}"))

logging.info(f"Calculating tmax_days_ge_86F for {IN_DATASET}...")
tmax_days_ge_86F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='86.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_86F, f"{IN_DATASET}_tmax-days-ge-86F_{file_suffix}"))

logging.info(f"Calculating tmax_days_ge_90F for {IN_DATASET}...")
tmax_days_ge_90F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='90.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_90F, f"{IN_DATASET}_tmax-days-ge-90F_{file_suffix}"))

logging.info(f"Calculating tmax_days_ge_95F for {IN_DATASET}...")
tmax_days_ge_95F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='95.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_95F, f"{IN_DATASET}_tmax-days-ge-95F_{file_suffix}"))

logging.info(f"Calculating tmax_days_ge_100F for {IN_DATASET}...")
tmax_days_ge_100F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='100.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_100F, f"{IN_DATASET}_tmax-days-ge-100F_{file_suffix}"))

logging.info(f"Calculating tmax_days_ge_105F for {IN_DATASET}...")
tmax_days_ge_105F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='105.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_105F, f"{IN_DATASET}_tmax-days-ge-105F_{file_suffix}"))

logging.info(f"Calculating tmax_days_ge_110F for {IN_DATASET}...")
tmax_days_ge_110F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='110.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_110F, f"{IN_DATASET}_tmax-days-ge-110F_{file_suffix}"))

logging.info(f"Calculating tmax_days_ge_115F for {IN_DATASET}...")
tmax_days_ge_115F = xclim.indicators.atmos.tx_days_above(tasmax=ds_working.tmax, thresh='115.0 degF', freq='YE', op='ge')
out_file_list.append((tmax_days_ge_115F, f"{IN_DATASET}_tmax-days-ge-115F_{file_suffix}"))

# TMAX days BELOW a threshold
#
logging.info(f"Calculating tmax_days_le_32F for {IN_DATASET}...")
tmax_days_le_32F = xclim.indicators.atmos.tx_days_below(tasmax=ds_working.tmax, thresh='32.0 degF', freq='YE', op='le')
out_file_list.append((tmax_days_le_32F, f"{IN_DATASET}_tmax-days-le-32F_{file_suffix}"))

# Summer thresholds
#
logging.info(f"Calculating tmean_jja for {IN_DATASET}...")
tmean_jja = ds_working.tavg.sel(time=ds_working['time.season'] == 'JJA').resample(time="YE").mean(dim=["time"])
tmean_jja_F = units.convert_units_to(tmean_jja, "degF")
out_file_list.append((tmean_jja_F, f"{IN_DATASET}_tmean-jja_{file_suffix}"))

logging.info(f"Calculating tmin_jja for {IN_DATASET}...")
tmin_jja = ds_working.tmin.sel(time=ds_working['time.season'] == 'JJA').resample(time="YE").min(dim=["time"])
tmin_jja_F = units.convert_units_to(tmin_jja, "degF")
out_file_list.append((tmin_jja_F, f"{IN_DATASET}_tmin-jja_{file_suffix}"))

# tmin
#
logging.info(f"Calculating tmin1day for {IN_DATASET}...")
tmin1day = ds_working.tmin.resample(time="YE").min(dim=["time"])
tmin1day_F = units.convert_units_to(tmin1day, "degF")
out_file_list.append((tmin1day_F, f"{IN_DATASET}_tmin1day_{file_suffix}"))

# TMIN days ABOVE a threshold
#
logging.info(f"Calculating tmin_days_ge_60F for {IN_DATASET}...")
tmin_days_ge_60F = xclim.indicators.atmos.tn_days_above(tasmin=ds_working.tmin, thresh='60.0 degF', freq='YE', op='ge')
out_file_list.append((tmin_days_ge_60F, f"{IN_DATASET}_tmin-days-ge-60F_{file_suffix}"))

logging.info(f"Calculating tmin_days_ge_70F for {IN_DATASET}...")
tmin_days_ge_70F = xclim.indicators.atmos.tn_days_above(tasmin=ds_working.tmin, thresh='70.0 degF', freq='YE', op='ge')
out_file_list.append((tmin_days_ge_70F, f"{IN_DATASET}_tmin-days-ge-70F_{file_suffix}"))

logging.info(f"Calculating tmin_days_ge_75F for {IN_DATASET}...")
tmin_days_ge_75F = xclim.indicators.atmos.tn_days_above(tasmin=ds_working.tmin, thresh='75.0 degF', freq='YE', op='ge')
out_file_list.append((tmin_days_ge_75F, f"{IN_DATASET}_tmin-days-ge-75F_{file_suffix}"))

logging.info(f"Calculating tmin_days_ge_80F for {IN_DATASET}...")
tmin_days_ge_80F = xclim.indicators.atmos.tn_days_above(tasmin=ds_working.tmin, thresh='80.0 degF', freq='YE', op='ge')
out_file_list.append((tmin_days_ge_80F, f"{IN_DATASET}_tmin-days-ge-80F_{file_suffix}"))

logging.info(f"Calculating tmin_days_ge_85F for {IN_DATASET}...")
tmin_days_ge_85F = xclim.indicators.atmos.tn_days_above(tasmin=ds_working.tmin, thresh='85.0 degF', freq='YE', op='ge')
out_file_list.append((tmin_days_ge_85F, f"{IN_DATASET}_tmin-days-ge-85F_{file_suffix}"))

logging.info(f"Calculating tmin_days_ge_90F for {IN_DATASET}...")
tmin_days_ge_90F = xclim.indicators.atmos.tn_days_above(tasmin=ds_working.tmin, thresh='90.0 degF', freq='YE', op='ge')
out_file_list.append((tmin_days_ge_90F, f"{IN_DATASET}_tmin-days-ge-90F_{file_suffix}"))

# TMIN days BELOW a threshold
#
logging.info(f"Calculating tmin_days_le_0F for {IN_DATASET}...")
tmin_days_le_0F = xclim.indicators.atmos.tn_days_below(tasmin=ds_working.tmin, thresh='0.0 degF', freq='YE', op='le')
out_file_list.append((tmin_days_le_0F, f"{IN_DATASET}_tmin-days-le-0F_{file_suffix}"))

logging.info(f"Calculating tmin_days_le_28F for {IN_DATASET}...")
tmin_days_le_28F = xclim.indicators.atmos.tn_days_below(tasmin=ds_working.tmin, thresh='28.0 degF', freq='YE', op='le')
out_file_list.append((tmin_days_le_28F, f"{IN_DATASET}_tmin-days-le-28F_{file_suffix}"))

logging.info(f"Calculating tmin_days_le_32F for {IN_DATASET}...")
tmin_days_le_32F = xclim.indicators.atmos.tn_days_below(tasmin=ds_working.tmin, thresh='32.0 degF', freq='YE', op='le')
out_file_list.append((tmin_days_le_32F, f"{IN_DATASET}_tmin-days-le-32F_{file_suffix}"))

logging.info(f"Threshold processing completed.")

### Write out files

In [None]:
#Loop over the out_file_list and write variables to netCDF
#
start_overall = time.perf_counter()
logging.info("Begin writting files...")

for da, file_name in out_file_list:
    out_path = os.path.join(OUT_PATH, file_name)
    start_file = time.perf_counter()
    da.to_netcdf(out_path)
    end_file = time.perf_counter()
    logging.info(f"{da.name} saved to {out_path} in {round(end_file-start_file, 3)}")

    end_overall = time.perf_counter()

logging.info(f"Processing complete in {round(end_overall-start_overall, 3)} seconds.")