In [5]:
'''script to regrid CMIP6 datatsets to target grid and store them'''
import numpy as np
import xarray as xr
import dask
import os
import intake
import pandas as pd
from sklearn.metrics import confusion_matrix
from collections import defaultdict
from tqdm.autonotebook import tqdm
from xmip.utils import google_cmip_col
from xmip.postprocessing import combine_datasets,_concat_sorted_time, merge_variables
from cmip_catalogue_operations import reduce_cat_to_max_num_realizations, drop_vars_from_cat, drop_older_versions
from cmip_ds_dict_operations import select_period, pr_flux_to_m, drop_duplicate_timesteps, drop_coords, drop_incomplete
import xesmf as xe
import gcsfs
from get_era5_around_tgs import get_era5_around_tgs
fs = gcsfs.GCSFileSystem() #list stores, stripp zarr from filename, load 

In [2]:
#configure settings
output_path = 'gs://leap-persistent/timh37/HighResMIP/surgeNN_predictors/'
overwrite_existing = False #whether or not to process files for which output already exists in the output path

query_vars = ['psl','uas','vas'] #variables to process
required_vars = ['psl','uas','vas'] #variables that includes models should provide

highresmip_model = 'HadGEM3-GC31-HM'

In [3]:
#query simulations & manipulate data catalog:
col = intake.open_esm_datastore("https://storage.googleapis.com/cmip6/cmip6-pgf-ingestion-test/catalog/catalog.json") #temporary pangeo-leap-forge catalogue
cat = col.search(activity_id='HighResMIP',table_id=['3hr','E3hr'],source_id=[highresmip_model],experiment_id=['highres-future','hist-1950'],variable_id=['psl','vas','uas'])#table_id='3hr',require_all_on=['member_id','grid_label','experiment_id'])
kwargs = {'zarr_kwargs':{'consolidated':True,'use_cftime':True},'aggregate':False} #keyword arguments for generating dictionary of datasets from cmip6 catalogue
ddict = cat.to_dataset_dict(**kwargs) #open datasets into dictionary
ddict = drop_duplicate_timesteps(ddict) #remove duplicate timesteps from ds if present
ddict = drop_coords(ddict,['bnds','nbnd','height']) #remove some unused auxiliary coordinates
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    hist_fut = combine_datasets(ddict,_concat_sorted_time,match_attrs =['source_id', 'grid_label','table_id','variant_label','variable_id'],combine_func_kwargs={'join':'inner','coords':'minimal'})    
hist_fut = drop_duplicate_timesteps(hist_fut) 
hist_fut = drop_incomplete(hist_fut)


--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.member_id.table_id.variable_id.grid_label.sub_experiment_id.variant_label.version.zstore'


  ddict = cat.to_dataset_dict(**kwargs) #open datasets into dictionary


In [4]:
#regrid to sufficiently large grid to derive predictors around TGs, at ERA5 resolution
target_grid = xr.Dataset( #grid to interpolate CMIP6 simulations to
        {   "longitude": (["longitude"], np.arange(-13,13,.25), {"units": "degrees_east"}),
            "latitude": (["latitude"], np.arange(63,34,-.25), {"units": "degrees_north"}),})

regridded_datasets = defaultdict(dict)
for key,ds in tqdm(hist_fut.items()):
    ds.coords['lon'] = (ds.coords['lon'] + 180) % 360 - 180
    ds = ds.sortby(ds.lon)

    ds = ds.where((ds.lat>30)&(ds.lat<65)&(ds.lon>-15)&(ds.lon<15),drop=True) #not sure if necessary/more efficient
    ds = ds.isel(sub_experiment_id=0,drop=True) #remove this coordinate
    
    regridder = xe.Regridder(ds,target_grid,'bilinear',ignore_degenerate=True,periodic=True)
    regridded_ds = regridder(ds.chunk({'time':2000,'lat':10000,'lon':10000}),keep_attrs=True)

    regridded_datasets[key] = regridded_ds.unify_chunks().chunk({'time':500})

  0%|          | 0/3 [00:00<?, ?it/s]

In [5]:
predictors_eu = xr.merge(list(regridded_datasets.values()))
predictors_eu  = predictors_eu.rename({'vas':'v10','uas':'u10','psl':'msl'})
predictors_eu.attrs['resolution'] = '0p25x0p25'

In [6]:
predictors_eu = predictors_eu.load() #this takes a while (~40GB, depending on target grid)

In [7]:
#get predictors around each tide gauge and store
grid_size_around_tgs=5 #degrees around TGs
tg_coords = xr.open_dataset('../gesla3_tg_coordinates_eu.nc')#.sel(tg=['den_helder-denhdr-nld-rws.csv']) #load TG coordinates

tgs        = ['stavanger-svg-nor-nhs.csv','wick-wic-gbr-bodc.csv','esbjerg-esb-dnk-dmi.csv',
                  'immingham-imm-gbr-bodc.csv','den_helder-denhdr-nld-rws.csv', 'fishguard-fis-gbr-bodc.csv',  
                  'brest-822a-fra-uhslc.csv', 'vigo-vigo-esp-ieo.csv',  'alicante_i_outer_harbour-alio-esp-da_mm.csv']

for output_tg in tqdm(tgs):#tg_coords.tg.values:
    print('processing: '+output_tg)
    predictors = get_era5_around_tgs(predictors_eu,grid_size_around_tgs,tg_coords.sel(tg=[output_tg])).chunk({'tg':1,'time':1000000})
    predictors = predictors.isel(variant_label=0,tg=0)

    output_fn = os.path.join(output_path,'predictors_'+predictors.source_id+'_'+str(predictors.time.dt.year[0].values)+'_'+str(predictors.time.dt.year[-1].values)+'_'+output_tg.replace('.csv','')+'.zarr')
    predictors.to_zarr(output_fn,mode='w') #store 

  0%|          | 0/9 [00:00<?, ?it/s]

processing: stavanger-svg-nor-nhs.csv
processing: wick-wic-gbr-bodc.csv
processing: esbjerg-esb-dnk-dmi.csv
processing: immingham-imm-gbr-bodc.csv
processing: den_helder-denhdr-nld-rws.csv
processing: fishguard-fis-gbr-bodc.csv
processing: brest-822a-fra-uhslc.csv
processing: vigo-vigo-esp-ieo.csv
processing: alicante_i_outer_harbour-alio-esp-da_mm.csv


In [None]:
'''
var_names = ['msl','u10','v10','w']

if 'w' in var_names and 'w' not in predictors.variables:
    predictors['w'] = np.sqrt((predictors.u10**2+predictors.v10**2))
    
for var in var_names: #remove amean
    predictors[var] = predictors[var].groupby(predictors.time.dt.year) - predictors[var].groupby(predictors.time.dt.year).mean('time') #remove annual means
    predictors[var] = deseasonalize_da(predictors[var]) #remove mean seasonal cycle

predictors = (predictors - predictors.mean(dim='time'))/predictors.std(dim='time',ddof=0) #standardize
'''