In [1]:
from dask.distributed import Client, progress
from dask_kubernetes import KubeCluster
cluster = KubeCluster()
cluster.adapt(maximum=30)
cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [2]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.32.12.80:43727  Dashboard: /user/0000-0002-7053-9066/proxy/8787/status,Cluster  Workers: 4  Cores: 8  Memory: 46.00 GB


In [3]:
import os
os.environ['NUMPY_EXPERIMENTAL_ARRAY_FUNCTION'] = '0'

from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
import gcsfs
import gsw
from tqdm.autonotebook import tqdm
import cartopy.crs as ccrs
import cartopy
import xesmf as xe

from xhistogram.xarray import histogram

%matplotlib inline
plt.rcParams['figure.figsize'] = 12, 6
%config InlineBackend.figure_format = 'retina' 



In [4]:
def Catalog_filter(catalog, var, ense, exp, freq):
    '''
    Function that takes LISTS of:
    var = desired variable name
    ense = desired ensembles
    exp = deisred epxeriments
    freq = desired sample rate (frequency)
    and returns list of model names and filtered catalog
    '''
    
    # Creates a new dataframe based on the filters
    df_filter = catalog[(catalog['experiment_id'].isin(exp)) & \
          (catalog['table_id'].isin(freq)) & \
         (catalog['member_id'].isin(ense)) & \
         (catalog['variable_id'].isin(var))]
    
    models = df_filter.source_id.unique()
    
    return models, df_filter
 
    
def get_data(df, variable, model):
    '''
    Function that downloads variable data from google-cloud
    and returns xarray dataset with all the variable data
    
    IMPORANT - variables in list should have same dimensions!!!
    '''
    
    ds_l = []
    for var in variable:
        uri = df[(df.variable_id == var) & 
                (df.source_id == model)].zstore.values[0]
        gcs = gcsfs.GCSFileSystem(token='anon')
        ds = xr.open_zarr(gcs.get_mapper(uri), consolidated=True)
        ds_l.append(ds)
        
    ds_full = xr.merge(ds_l)
    return ds_full

In [5]:
df = pd.read_csv('https://storage.googleapis.com/pangeo-cmip6/pangeo-cmip6-zarr-consolidated-stores.csv')
df

variable_ids = ['so', 'thetao']
experiment_ids = ['historical']
table_ids = ['Omon']
member_ids = ['r1i1p1f1']

source_list, df2 = Catalog_filter(df, variable_ids, 
                                  member_ids, experiment_ids,
                                  table_ids)

xrays = []
 
for source in tqdm(source_list[1:5]):
    xray = get_data(df2, variable_ids, source)
    xrays.append(xray)
    
xray_dic = {key:value for key, value in zip(source_list, xrays)}

HBox(children=(IntProgress(value=0, max=4), HTML(value='')))




In [6]:
def url_to_xray(urls_l):
    '''
    Function that takes urls for ONE VARIABLE and returns
    a concatenated xarray 
    '''
    
    xrays = []
    
    for key in tqdm(urls_l):
        xray = xr.open_dataset(urls_l[key], decode_times=False)
        xrays.append(xray)
        
    xrays_f = xr.concat(xrays, dim='time')
    
    return xrays_f


urls_T = {'WOA_T_85_94': 'https://data.nodc.noaa.gov/thredds/dodsC/ncei/woa/temperature/8594/1.00/woa18_8594_t00_01.nc',
          'WOA_T_95_04': 'https://data.nodc.noaa.gov/thredds/dodsC/ncei/woa/temperature/95A4/1.00/woa18_95A4_t00_01.nc'
          }
          
urls_S = {'WOA_S_85-94': 'https://data.nodc.noaa.gov/thredds/dodsC/ncei/woa/salinity/8594/1.00/woa18_8594_s00_01.nc',
          'WOA_S_95-04': 'https://data.nodc.noaa.gov/thredds/dodsC/ncei/woa/salinity/95A4/1.00/woa18_95A4_s00_01.nc'
          }


xr_obs_T = url_to_xray(urls_T)
xr_obs_S = url_to_xray(urls_S)
xr_obs_full = xr.merge([xr_obs_T, xr_obs_S])
xr_obs_full

HBox(children=(IntProgress(value=0, max=2), HTML(value='')))




HBox(children=(IntProgress(value=0, max=2), HTML(value='')))




<xarray.Dataset>
Dimensions:             (depth: 102, lat: 180, lon: 360, nbounds: 2, time: 2)
Coordinates:
  * lat                 (lat) float32 -89.5 -88.5 -87.5 -86.5 ... 87.5 88.5 89.5
  * depth               (depth) float32 0.0 5.0 10.0 ... 5300.0 5400.0 5500.0
  * lon                 (lon) float32 -179.5 -178.5 -177.5 ... 177.5 178.5 179.5
  * time                (time) float64 438.0 438.0
Dimensions without coordinates: nbounds
Data variables:
    crs                 (time) int32 -2147483647 -2147483647
    lat_bnds            (time, lat, nbounds) float32 -90.0 -89.0 ... 89.0 90.0
    lon_bnds            (time, lon, nbounds) float32 -180.0 -179.0 ... 180.0
    depth_bnds          (time, depth, nbounds) float32 0.0 2.5 ... 5450.0 5500.0
    climatology_bounds  (time, nbounds) float32 384.0 504.0 384.0 504.0
    t_an                (time, depth, lat, lon) float32 nan nan nan ... nan nan
    t_mn                (time, depth, lat, lon) float32 nan nan nan ... nan nan
    t_dd       

In [7]:
def create_pressure(data, var):
    '''
    Creates new pressure array from depth and latitudes using TEOS-10
    Input: xarray
    Output: xarray with pressure as new variable
    '''  
    z = -1*xr_obs_full.depth
    p = np.zeros((len(data.depth),len(data.lat)))

    for i in range(len(data.lat)):
        p[:,i] = gsw.p_from_z(z, data.lat[i])

    b = np.repeat(p[:, :, np.newaxis], len(data.lon), axis=2)

    P = np.empty_like(data[var])
    
    for i in range(P.shape[0]):
        P[i,:,:,:] = b
    
    xray = xr.DataArray(P, 
                        coords=xr_obs_full[var].coords, 
                        dims=xr_obs_full[var].dims,
                        name ='Pressure')
    data2 = xr.merge([data, xray])
    
    return data2



par = 't_an'
ds = create_pressure(xr_obs_full, par)

In [8]:
xray_dic.keys()

dict_keys(['AWI-CM-1-1-MR', 'BCC-CSM2-MR', 'BCC-ESM1', 'CAMS-CSM1-0'])

In [None]:
def Regridding(data, grid_out):

    grid_11 = xr.Dataset({'lat': (grid_out.lat), 'lon': (grid_out.lon)})
    regrid = xe.Regridder(data, grid_11, 
                      'bilinear', periodic=True, reuse_weights=True)
    return regrid(data)


S = xray_dic['BCC-CSM2-MR'].sel(time=slice('1986','2005')).mean(dim='time').mean(dim='lev').so

ds_plt = Regridding(S, ds)
ds_plt.to_netcdf('Shit.nc')

fig = plt.figure(figsize=(12,20))
ax = plt.axes(projection=ccrs.Robinson())
ax.coastlines()
ax.gridlines()
S.plot(ax = ax, transform=ccrs.PlateCarree())

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
    for key, value in data.items()
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
    if type(value) is Serialize
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in serialize
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
    for obj in x
  File "/srv/conda/envs/notebook/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable.', 'subgraph_callable')
distributed.comm.utils - ERROR - ('Could not serialize object of type SubgraphCallable.',

Reuse existing file: bilinear_232x360_180x360_peri.nc


In [18]:
def Regridding(data, grid_out):

    grid_11 = xr.Dataset({'lat': (grid_out.lat), 'lon': (grid_out.lon)})
    regrid = xe.Regridder(data, grid_11, 
                      'bilinear', periodic=True, reuse_weights=True)
    return regrid(data)

ds_plt = Regridding(xray_dic['BCC-CSM2-MR'], ds)

S = ds_plt.sel(time=slice('1986','1990')).mean(dim='time').mean(dim='lev').so



Reuse existing file: bilinear_232x360_180x360_peri.nc
using dimensions ('lat', 'lon') from data variable so as the horizontal dimensions for this dataset.


In [None]:
fig = plt.figure(figsize=(12,20))
ax = plt.axes(projection=ccrs.Robinson())
ax.coastlines()
ax.gridlines()
S.plot(ax = ax, transform=ccrs.PlateCarree())

In [40]:
f.shape

(2, 102, 180, 360)

In [92]:
xrays[0]['so'].lat.values

array([-47.39407752, -48.47549908, -49.20522144, ...,  59.96596739,
        33.4582047 ,  42.16660379])

In [78]:
#Taking Monthly Ocean data (Omon), for Salinity (so) and Potential Temperature (thetao)
variable_ids = ['so', 'thetao']
experiment_ids = ['historical']
table_ids = ['Omon']
member_ids = ['r1i1p1f1']
source_ids = []

for name, group in df.groupby('source_id'):
    if (all([expt in group.experiment_id.values
            for expt in experiment_ids]) and
        all([expt in group.variable_id.values
            for expt in variable_ids]) and
       all([expt in group.table_id.values
            for expt in table_ids]) and
       all([expt in group.member_id.values
            for expt in member_ids])):
        source_ids.append(name)

source_ids # Models with both monthly potential T and S

['AWI-CM-1-1-MR',
 'BCC-CSM2-MR',
 'BCC-ESM1',
 'CAMS-CSM1-0',
 'CESM2',
 'CESM2-WACCM',
 'CanESM5',
 'E3SM-1-0',
 'EC-Earth3',
 'EC-Earth3-Veg',
 'FGOALS-f3-L',
 'GFDL-CM4',
 'GFDL-ESM4',
 'GISS-E2-1-G',
 'GISS-E2-1-G-CC',
 'GISS-E2-1-H',
 'HadGEM3-GC31-LL',
 'IPSL-CM6A-LR',
 'MCM-UA-1-0',
 'MIROC6',
 'NESM3',
 'NorCPM1',
 'SAM0-UNICON']

In [58]:
len(df_S.source_id.unique())

15

In [38]:
len(df_S.source_id.unique())

27

In [56]:
df_T

Unnamed: 0,activity_id,institution_id,source_id,experiment_id,member_id,table_id,variable_id,grid_label,zstore,dcpp_init_year
169,CMIP,AWI,AWI-CM-1-1-MR,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/AWI/AWI-CM-1-1-MR/historical/r...,
325,CMIP,BCC,BCC-CSM2-MR,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/BCC/BCC-CSM2-MR/historical/r1i...,
588,CMIP,BCC,BCC-ESM1,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/BCC/BCC-ESM1/historical/r1i1p1...,
841,CMIP,CAMS,CAMS-CSM1-0,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/CAMS/CAMS-CSM1-0/historical/r1...,
904,CMIP,CAS,FGOALS-f3-L,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/CAS/FGOALS-f3-L/historical/r1i...,
5759,CMIP,E3SM-Project,E3SM-1-0,historical,r1i1p1f1,Omon,thetao,gr,gs://cmip6/CMIP/E3SM-Project/E3SM-1-0/historic...,
6034,CMIP,EC-Earth-Consortium,EC-Earth3-Veg,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/EC-Earth-Consortium/EC-Earth3-...,
11497,CMIP,NASA-GISS,GISS-E2-1-G-CC,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/NASA-GISS/GISS-E2-1-G-CC/histo...,
13388,CMIP,NCAR,CESM2-WACCM,historical,r1i1p1f1,Omon,thetao,gr,gs://cmip6/CMIP/NCAR/CESM2-WACCM/historical/r1...,
14583,CMIP,NCAR,CESM2,historical,r1i1p1f1,Omon,thetao,gn,gs://cmip6/CMIP/NCAR/CESM2/historical/r1i1p1f1...,


In [47]:
df_S

Unnamed: 0,activity_id,institution_id,source_id,experiment_id,member_id,table_id,variable_id,grid_label,zstore,dcpp_init_year
165,CMIP,AWI,AWI-CM-1-1-MR,historical,r1i1p1f1,Omon,so,gn,gs://cmip6/CMIP/AWI/AWI-CM-1-1-MR/historical/r...,
323,CMIP,BCC,BCC-CSM2-MR,historical,r1i1p1f1,Omon,so,gn,gs://cmip6/CMIP/BCC/BCC-CSM2-MR/historical/r1i...,
586,CMIP,BCC,BCC-ESM1,historical,r1i1p1f1,Omon,so,gn,gs://cmip6/CMIP/BCC/BCC-ESM1/historical/r1i1p1...,
838,CMIP,CAMS,CAMS-CSM1-0,historical,r1i1p1f1,Omon,so,gn,gs://cmip6/CMIP/CAMS/CAMS-CSM1-0/historical/r1...,
903,CMIP,CAS,FGOALS-f3-L,historical,r1i1p1f1,Omon,so,gn,gs://cmip6/CMIP/CAS/FGOALS-f3-L/historical/r1i...,
1455,CMIP,CCCma,CanESM5,historical,r10i1p1f1,Omon,so,gn,gs://cmip6/CMIP/CCCma/CanESM5/historical/r10i1...,
4476,CMIP,CNRM-CERFACS,CNRM-CM6-1,historical,r10i1p1f2,Omon,so,gn,gs://cmip6/CMIP/CNRM-CERFACS/CNRM-CM6-1/histor...,
5411,CMIP,CNRM-CERFACS,CNRM-ESM2-1,historical,r2i1p1f2,Omon,so,gn,gs://cmip6/CMIP/CNRM-CERFACS/CNRM-ESM2-1/histo...,
5755,CMIP,E3SM-Project,E3SM-1-0,historical,r1i1p1f1,Omon,so,gr,gs://cmip6/CMIP/E3SM-Project/E3SM-1-0/historic...,
6030,CMIP,EC-Earth-Consortium,EC-Earth3-Veg,historical,r1i1p1f1,Omon,so,gn,gs://cmip6/CMIP/EC-Earth-Consortium/EC-Earth3-...,
