<b><i> preprocess </i></b>

Created by Eduardo Alastrué de Asenjo 

- Purpose: preprocess data before analysis for paper "European heat extremes under net-zero emissions"
- Data: Model data from ACCESS-ESM-1.5 & reanalysis data from ERA5
- Packages: use xarray with dask to handle data, xclim for computing the extreme indices
- Comments: It originally runs on Gadi, NCI's HPC, using conda env:analysis3 kernel

# Initialisation

Load modules

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib.lines as mlines
import matplotlib.colors as mcolors
from matplotlib.gridspec import GridSpec
import matplotlib.path as mpath
import seaborn as sns
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import os
import subprocess
import math as math
import netCDF4 as nc
from scipy import stats
from random import choices
import copy
import xarray as xr
import pandas as pd
import datetime
import cftime
from tqdm import tqdm
import glob
import pickle
import cdo
import statsmodels.api as sm 
import warnings
import pylab as py 
import regionmask
import xclim as xc
import logging # supress flox info messages 
logging.getLogger('flox').setLevel(logging.WARNING)
from IPython.core.magic import register_cell_magic # Custom magic command for cell running time 
import time
@register_cell_magic
def timing(line, cell):
    start_time = time.time()
    exec(cell, globals())
    end_time = time.time()
    print(f"Cell run at: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, Exec. time: {end_time - start_time:.2f} seconds")

load functions

In [2]:
def weighted_mon_to_year_mean(ds, var):
    """
    weight by days in each month when doing annual mean from monthly values in xarray
    taken from https://ncar.github.io/esds/posts/2021/yearly-averages-xarray/
    """
    month_length = ds.time.dt.days_in_month # Determine the month length

    wgts = month_length.groupby("time.year") / month_length.groupby("time.year").sum() # Calculate the weights

    np.testing.assert_allclose(wgts.groupby("time.year").sum(xr.ALL_DIMS), 1.0) # Make sure the weights in each year add up to 1

    obs = ds[var] # Subset our dataset for our variable

    cond = obs.isnull() # Setup our masking for nan values
    ones = xr.where(cond, 0.0, 1.0)
    
    obs_sum = (obs * wgts).resample(time="YS").sum(dim="time") # Calculate the numerator
    ones_out = (ones * wgts).resample(time="YS").sum(dim="time") # Calculate the denominator

    return obs_sum / ones_out # Return the weighted average

start dask cluster

In [None]:
# local cluster
from dask.distributed import Client
client = Client(memory_limit=None, threads_per_worker=1, n_workers=18)

# stab runs

tas

In [3]:
stab_postpro_path = "/g/data/ob22/as8561/data/enso_trans_stable/stable_runs/tas_field/"
tas_mon_sta2030 = xr.open_dataset(f"{stab_postpro_path}fld_s03i236_PI-GWL-B2030_mon.nc", chunks={'time': 2000}, use_cftime=True).assign_coords(exp=2030)
tas_mon_sta2035 = xr.open_dataset(f"{stab_postpro_path}fld_s03i236_PI-GWL-B2035_mon.nc", chunks={'time': 2000}, use_cftime=True).assign_coords(exp=2035)
tas_mon_sta2040 = xr.open_dataset(f"{stab_postpro_path}fld_s03i236_PI-GWL-B2040_mon.nc", chunks={'time': 2000}, use_cftime=True).assign_coords(exp=2040)
tas_mon_sta2045 = xr.open_dataset(f"{stab_postpro_path}fld_s03i236_PI-GWL-B2045_mon.nc", chunks={'time': 2000}, use_cftime=True).assign_coords(exp=2045)
tas_mon_sta2050 = xr.open_dataset(f"{stab_postpro_path}fld_s03i236_PI-GWL-B2050_mon.nc", chunks={'time': 2000}, use_cftime=True).assign_coords(exp=2050)
tas_mon_sta2055 = xr.open_dataset(f"{stab_postpro_path}fld_s03i236_PI-GWL-B2055_mon.nc", chunks={'time': 2000}, use_cftime=True).assign_coords(exp=2055)
tas_mon_sta2060 = xr.open_dataset(f"{stab_postpro_path}fld_s03i236_PI-GWL-B2060_mon.nc", chunks={'time': 2000}, use_cftime=True).assign_coords(exp=2060)

mon_arrays = [tas_mon_sta2030,tas_mon_sta2035,tas_mon_sta2040,tas_mon_sta2045,tas_mon_sta2050,tas_mon_sta2055,tas_mon_sta2060]
tas_mon_sta = xr.concat(mon_arrays, dim='exp')

def subtract_100_years(time):
    return cftime.DatetimeGregorian(time.year - 100, time.month, time.day, time.hour, time.minute, time.second, time.microsecond)
new_times = [subtract_100_years(t) for t in tas_mon_sta['time'].values]
tas_mon_sta['time'] = new_times
tas_yr_sta = weighted_mon_to_year_mean(tas_mon_sta, 'fld_s03i236')

In [5]:
tas_yr_sta.to_netcdf(f"/scratch/p66/ea1694/data/stab_runs/tas_yr_sta.nc")

tasmax

In [None]:
%%timing
year = 2030
def _preprocess(ds):
    return ds['fld_s03i236_max']
for i in tqdm(range(1, 12)):
    century = f"{i:02}"
    files = glob.glob(f"/g/data/p73/archive/non-CMIP/ACCESS-ESM1-5/PI-GWL-t6/history/atm/netCDF/*pe-{century}*dai.nc") # name is different!
    ds = xr.open_mfdataset(files, preprocess=_preprocess, parallel=True, chunks={'time': 1200}, use_cftime=True)
    ds = ds.drop_vars(['height']).rename({'fld_s03i236_max': 'tasmax'})
    ds = ds.load()
    ds.to_netcdf(f"/g/data/ob22/ea1694/data/stab_runs/tasmaxday/tasmax_PI-GWL-B{year}_{century}00s.nc")

In [None]:
%%timing
year = 2040 #2035, 2040, 2045, 2050, 2055, 2060
def _preprocess(ds):
    return ds['fld_s03i236_max']
for i in tqdm(range(1, 12)):
    century = f"{i:02}"
    files = glob.glob(f"/g/data/p73/archive/non-CMIP/ACCESS-ESM1-5/PI-GWL-B{year}/history/atm/netCDF/*pe-{century}*dai.nc")
    ds = xr.open_mfdataset(files, preprocess=_preprocess, parallel=True, chunks={'time': 1200}, use_cftime=True)
    ds = ds.drop_vars(['height']).rename({'fld_s03i236_max': 'tasmax'})
    ds = ds.load()
    ds.to_netcdf(f"/g/data/ob22/ea1694/data/stab_runs/tasmaxday/tasmax_PI-GWL-B{year}_{century}00s.nc")

Load all and calculate TXx index

In [None]:
%%timing
path = "/g/data/ob22/ea1694/data/stab_runs/tasmaxday/"
tasmax_day_sta2030 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2030_*00s.nc", use_cftime=True).assign_coords(exp=2030)
tasmax_day_sta2035 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2035_*00s.nc", use_cftime=True).assign_coords(exp=2035)
tasmax_day_sta2040 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2040_*00s.nc", use_cftime=True).assign_coords(exp=2040)
tasmax_day_sta2045 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2045_*00s.nc", use_cftime=True).assign_coords(exp=2045)
tasmax_day_sta2050 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2050_*00s.nc", use_cftime=True).assign_coords(exp=2050)
tasmax_day_sta2055 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2055_*00s.nc", use_cftime=True).assign_coords(exp=2055)
tasmax_day_sta2060 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2060_*00s.nc", use_cftime=True).assign_coords(exp=2060)

day_arrays = [tasmax_day_sta2030,tasmax_day_sta2035,tasmax_day_sta2040,tasmax_day_sta2045,tasmax_day_sta2050,tasmax_day_sta2055,tasmax_day_sta2060]
tasmax_day_sta = xr.concat(day_arrays, dim='exp')

def subtract_100_years(time): # start years experiment at year 0
    return cftime.DatetimeGregorian(time.year - 100, time.month, time.day, time.hour, time.minute, time.second, time.microsecond)
new_times = [subtract_100_years(t) for t in tasmax_day_sta['time'].values]
tasmax_day_sta['time'] = new_times
tasmax_day_sta.coords['lon'] = (tasmax_day_sta.coords['lon'] + 180) % 360 - 180 # center longitudes at 0
tasmax_day_sta = tasmax_day_sta.sortby(tasmax_day_sta.lon)

In [None]:
txx_year = xc.indices.tx_max(tasmax_day_sta.tasmax, freq='YS')
txx_year = txx_year.compute()
txx_year.to_netcdf(f"/g/data/ob22/ea1694/data/stab_runs/indices/txx_year.nc")

In [None]:
txx_mon = xc.indices.tx_max(tasmax_day_sta.tasmax, freq='MS')
txx_mon = txx_mon.compute()
txx_mon.to_netcdf(f"/g/data/ob22/ea1694/data/stab_runs/indices/txx_mon.nc")

Load stab & hist tasmax and calculate TX90p index wrt 1961-1990

In [None]:
%%timing
path_hist = {'historical': '/g/data/fs38/publications/CMIP6/CMIP/CSIRO/ACCESS-ESM1-5/historical/'}
array = []
for i in range(1,41):
    rea = "r"+str(i)+"i1p1f1"
    if os.path.isdir(f'{path_hist["historical"]}/{rea}'):
        infiles = glob.glob(f'{path_hist["historical"]}/{rea}/day/tasmax/gn/latest/*.nc', recursive=True)
        array.append(xr.open_mfdataset(infiles, use_cftime=True, data_vars="minimal", coords="minimal", chunks={'time': 1000}, parallel=True, compat="override").assign_coords(realiz=rea)) 
tasmax_day_hist = xr.concat(array, dim='realiz')
tasmax_day_hist.coords['lon'] = (tasmax_day_hist.coords['lon'] + 180) % 360 - 180 # center longitudes at 0
tasmax_day_hist = tasmax_day_hist.sortby(tasmax_day_hist.lon)
tasmax_day_histref1960 = tasmax_day_hist.sel(time=slice(cftime.DatetimeProlepticGregorian(1961, 1, 1),cftime.DatetimeProlepticGregorian(1990, 12, 31)))

In [None]:
from xclim.core.calendar import percentile_doy

tasmax_histref1960_per90 = percentile_doy(tasmax_day_histref1960.tasmax, per=90).sel(percentiles=90).mean("realiz")
tasmax_histref1960_per90 = tasmax_histref1960_per90.compute()
tasmax_histref1960_per90.attrs["units"] = "K"
tasmax_histref1960_per90.to_netcdf(f"/scratch/p66/ea1694/data/stab_runs/indices/tasmax_histref1960_per90.nc")

In [None]:
%%timing
path = "/scratch/p66/txz599/processed_output/data/stab_runs/tasmaxday/"
tasmax_day_sta2030 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2030_*00s.nc", use_cftime=True).assign_coords(exp=2030)
tasmax_day_sta2035 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2035_*00s.nc", use_cftime=True).assign_coords(exp=2035)
tasmax_day_sta2040 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2040_*00s.nc", use_cftime=True).assign_coords(exp=2040)
tasmax_day_sta2045 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2045_*00s.nc", use_cftime=True).assign_coords(exp=2045)
tasmax_day_sta2050 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2050_*00s.nc", use_cftime=True).assign_coords(exp=2050)
tasmax_day_sta2055 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2055_*00s.nc", use_cftime=True).assign_coords(exp=2055)
tasmax_day_sta2060 = xr.open_mfdataset(f"{path}tasmax_PI-GWL-B2060_*00s.nc", use_cftime=True).assign_coords(exp=2060)

day_arrays = [tasmax_day_sta2030,tasmax_day_sta2035,tasmax_day_sta2040,tasmax_day_sta2045,tasmax_day_sta2050,tasmax_day_sta2055,tasmax_day_sta2060]
tasmax_day_sta = xr.concat(day_arrays, dim='exp')

def subtract_100_years(time): # start years experiment at year 0
    return cftime.DatetimeGregorian(time.year - 100, time.month, time.day, time.hour, time.minute, time.second, time.microsecond)
new_times = [subtract_100_years(t) for t in tasmax_day_sta['time'].values]
tasmax_day_sta['time'] = new_times
tasmax_day_sta.coords['lon'] = (tasmax_day_sta.coords['lon'] + 180) % 360 - 180 # center longitudes at 0
tasmax_day_sta = tasmax_day_sta.sortby(tasmax_day_sta.lon)

In [None]:
from xclim.indices import tx90p
for exp in tasmax_day_sta.exp.values:
    for sta in [1, 101, 201, 301, 401, 501, 601, 701, 801, 901]:
        tasmax = tasmax_day_sta.sel(exp=exp, drop=True).tasmax.sel(time=slice(cftime.DatetimeProlepticGregorian(sta, 1, 1), cftime.DatetimeProlepticGregorian(sta+99, 12, 31))).load()
        tx90p_yr = tx90p(tasmax, tasmax_histref1960_per90, freq='YS')
        tx90p_yr.to_netcdf(f"/scratch/p66/ea1694/data/stab_runs/indices/tx90p/tx90p_yr_NZ{exp}_{sta+99}.nc")

# cmip6

In [None]:
path_ssp = {
    'ssp585': '/g/data/fs38/publications/CMIP6/ScenarioMIP/CSIRO/ACCESS-ESM1-5/ssp585/'
}

path_hist = {
    'historical': '/g/data/fs38/publications/CMIP6/CMIP/CSIRO/ACCESS-ESM1-5/historical/'
}

historical

In [None]:
%%timing
array = []
for i in range(1,41):
    rea = "r"+str(i)+"i1p1f1"
    if os.path.isdir(f'{path_hist["historical"]}/{rea}'):
        infiles = glob.glob(f'{path_hist["historical"]}/{rea}/day/tasmax/gn/latest/*.nc', recursive=True)
        array.append(xr.open_mfdataset(infiles, use_cftime=True, data_vars="minimal", coords="minimal", chunks={'time': 1000}, parallel=True, compat="override").assign_coords(realiz=rea)) 
tasmax_day_hist = xr.concat(array, dim='realiz')
tasmax_day_hist.coords['lon'] = (tasmax_day_hist.coords['lon'] + 180) % 360 - 180 # center longitudes at 0
tasmax_day_hist = tasmax_day_hist.sortby(tasmax_day_hist.lon)

In [None]:
%%timing
# yearly TXx index 
txx_year_hist = xc.indices.tx_max(tasmax_day_hist.tasmax, freq='YS')
txx_year_hist = txx_year_hist.compute()
txx_year_hist.to_netcdf(f"/g/data/ob22/ea1694/data/ACCESS-ESM1.5/scenarios/indices/txx_year_hist.nc")

In [None]:
%%timing
# monthly TXx index 
txx_mon_hist = xc.indices.tn_max(tasmax_day_hist.tasmax, freq='MS')
txx_mon_hist = txx_mon_hist.compute()
txx_mon_hist.to_netcdf(f"/g/data/ob22/ea1694/data/ACCESS-ESM1.5/scenarios/indices/txx_mon_hist.nc")

In [None]:
%%timing
# yearly TX90p
from xclim.indices import tx90p
tasmax_histref1960_per90 = xr.open_mfdataset(f"/scratch/p66/ea1694/data/stab_runs/indices/tasmax_histref1960_per90.nc", use_cftime=True).per
tasmax_histref1960_per90 = tasmax_histref1960_per90.load()

for rea in tqdm(tasmax_day_hist.realiz.values):
    tasmax = tasmax_day_hist.sel(realiz=rea, drop=True).tasmax.load()
    tx90p_yr_hist = tx90p(tasmax, tasmax_histref1960_per90, freq='YS')
    tx90p_yr_hist.to_netcdf(f"/scratch/p66/ea1694/data/stab_runs/indices/tx90p/tx90p_yr_hist_{rea}.nc")

ssp585

In [None]:
%%timing
array = []
for i in range(1,41):
    rea = "r"+str(i)+"i1p1f1"
    if os.path.isdir(f'{path_ssp["ssp585"]}/{rea}'):
        infiles = glob.glob(f'{path_ssp["ssp585"]}/{rea}/day/tasmax/gn/latest/*gn_20*.nc', recursive=True)
        array.append(xr.open_mfdataset(infiles, use_cftime=True, data_vars="minimal", coords="minimal", chunks={'time': 1000}, parallel=True, compat="override").assign_coords(realiz=rea)) 
tasmax_day_ssp585 = xr.concat(array, dim='realiz')
tasmax_day_ssp585.coords['lon'] = (tasmax_day_ssp585.coords['lon'] + 180) % 360 - 180 # center longitudes at 0
tasmax_day_ssp585 = tasmax_day_ssp585.sortby(tasmax_day_ssp585.lon)

In [None]:
%%timing
# yearly TXx index 
txx_year_ssp585 = xc.indices.tx_max(tasmax_day_ssp585.tasmax, freq='YS')
txx_year_ssp585 = txx_year_ssp585.compute()
txx_year_ssp585.to_netcdf(f"/g/data/ob22/ea1694/data/ACCESS-ESM1.5/scenarios/indices/txx_year_ssp585.nc")

In [None]:
%%timing
# monthly TXx index 
txx_mon_ssp585 = xc.indices.tn_max(tasmax_day_ssp585.tasmax, freq='MS')
txx_mon_ssp585 = txx_mon_ssp585.compute()
txx_mon_ssp585.to_netcdf(f"/g/data/ob22/ea1694/data/ACCESS-ESM1.5/scenarios/indices/txx_mon_ssp585.nc")

In [None]:
%%timing
# yearly TX90p
from xclim.indices import tx90p
tasmax_histref1960_per90 = xr.open_mfdataset(f"/scratch/p66/ea1694/data/stab_runs/indices/tasmax_histref1960_per90.nc", use_cftime=True).per
tasmax_histref1960_per90 = tasmax_histref1960_per90.load()

for rea in tqdm(tasmax_day_ssp585.realiz.values):
    tasmax = tasmax_day_ssp585.sel(realiz=rea, drop=True).tasmax.load()
    tx90p_yr_ssp585 = tx90p(tasmax, tasmax_histref1960_per90, freq='YS')
    tx90p_yr_ssp585.to_netcdf(f"/scratch/p66/ea1694/data/stab_runs/indices/tx90p/tx90p_yr_ssp585_{rea}.nc")

# ERA5

In [None]:
era5_paths = {'hr': "/g/data/rt52/era5/single-levels/reanalysis/",
             'mon': "/g/data/rt52/era5/single-levels/monthly-averaged/"
            }

In [None]:
%%timing
var = '2t'
array = []
for yea in range (1940, 2025):
    infiles = glob.glob(f"{era5_paths['hr']}{var}/{yea}/*.nc", recursive=True)
    array.append(xr.open_mfdataset(infiles, use_cftime=True, data_vars="minimal", coords="minimal", parallel=True, compat="override"))
tas_day_era = xr.concat(array, dim='time')
tas_day_era = tas_day_era.rename({'t2m':'tas', 'longitude': 'lon', 'latitude': 'lat'})

In [None]:
%%timing
for yea in tqdm(range(1940, 2025)):
    tas_yr = tas_day_era.where(tas_day_era.time.dt.year == yea, drop=True).tas.resample(time='Y').mean()
    tas_yr = tas_yr.compute()
    tas_yr.to_netcdf(f"/g/data/ob22/ea1694/data/era5/tas/tas_yr_era_{yea}.nc")

In [None]:
%%timing
var = 'mx2t'
array = []
for yea in range (1940, 2025):
    infiles = glob.glob(f"{era5_paths['hr']}{var}/{yea}/*.nc", recursive=True)
    hr_var = xr.open_mfdataset(infiles, use_cftime=True, data_vars="minimal", coords="minimal", parallel=True, compat="override")
    array.append(hr_var.resample(time='1D').max()) 
tasmax_day_era = xr.concat(array, dim='time')
tasmax_day_era = tasmax_day_era.rename({var:'tasmax', 'longitude': 'lon', 'latitude': 'lat'})

In [None]:
%%timing
for yea in tqdm(range(1940, 2025)):
    txx_year = xc.indices.tx_max(tasmax_day_era.where(tasmax_day_era.time.dt.year == yea, drop=True).tasmax, freq='YS')
    txx_year = txx_year.compute()
    txx_year.to_netcdf(f"/g/data/ob22/ea1694/data/era5/indices/txx_year_{yea}.nc")

In [None]:
%%timing
for yea in tqdm(range(1940, 2025)):
    txx_mon = xc.indices.tx_max(tasmax_day_era.where(tasmax_day_era.time.dt.year == yea, drop=True).tasmax, freq='MS')
    txx_mon = txx_mon.compute()
    txx_mon.to_netcdf(f"/g/data/ob22/ea1694/data/era5/indices/txx_mon_{yea}.nc")

In [None]:
txx_year_era = tasmax_mon_era.resample(time='Y').max()

In [None]:
%%timing
var = 'mx2t'
array = []
for yea in range (1981, 1991):
    infiles = glob.glob(f"{era5_paths['hr']}{var}/{yea}/*.nc", recursive=True)
    hr_var = xr.open_mfdataset(infiles, use_cftime=True, data_vars="minimal", coords="minimal", parallel=True, compat="override"
                              , chunks={'latitude': 721, 'longitude': 1440})
    array.append(hr_var.resample(time='1D').max()) 
tasmax_day_era1970 = xr.concat(array, dim='time')
tasmax_day_era1970 = tasmax_day_era1970.rename({var:'tasmax', 'longitude': 'lon', 'latitude': 'lat'})
tasmax_day_era1970 = tasmax_day_era1970.compute()
tasmax_day_era1970.to_netcdf(f"/scratch/p66/ea1694/data/era5/tasmax/tasmax_day_era1970.nc")

In [None]:
tasmax_day_eraref1960 = xr.open_mfdataset(f"/scratch/p66/ea1694/data/era5/tasmax/tasmax_day_era19*", use_cftime=True, parallel=True,
                                          chunks={'lat': 103, 'lon': 36})

In [None]:
from xclim.core.calendar import percentile_doy
tasmax_eraref1960_per90 = percentile_doy(tasmax_day_eraref1960.tasmax, per=90).sel(percentiles=90)
tasmax_eraref1960_per90 = tasmax_eraref1960_per90.compute()
tasmax_eraref1960_per90.attrs["units"] = "K"
tasmax_eraref1960_per90.to_netcdf(f"/scratch/p66/ea1694/data/era5/indices/tasmax_eraref1960_per90.nc")

In [None]:
%%timing
# yearly TX90p
from xclim.indices import tx90p
tasmax_eraref1960_per90 = xr.open_mfdataset(f"/scratch/p66/ea1694/data/era5/indices/tasmax_eraref1960_per90.nc", use_cftime=True)
tasmax_eraref1960_per90 = tasmax_eraref1960_per90.per.load()
var = 'mx2t'
for yea in tqdm(range(2003, 2025)):
    infiles = glob.glob(f"{era5_paths['hr']}{var}/{yea}/*.nc", recursive=True)
    hr_var = xr.open_mfdataset(infiles, use_cftime=True, data_vars="minimal", coords="minimal", parallel=True, compat="override")
    tasmax = hr_var.resample(time='1D').max().rename({var:'tasmax', 'longitude': 'lon', 'latitude': 'lat'})
    tasmax = tasmax.tasmax.load()
    tx90p_yr_era = tx90p(tasmax, tasmax_eraref1960_per90, freq='YS')
    tx90p_yr_era.to_netcdf(f"/scratch/p66/ea1694/data/stab_runs/indices/tx90p/tx90p_yr_era_{yea}.nc")