In [1]:
import pandas as pd
import intake.catalog
import intake.catalog.base
import intake.catalog.local
import intake.catalog.remote
import intake.source.base
import intake_xarray.base
import os
import numpy as np
import xarray as xr

In [2]:
df = pd.read_csv("/glade/u/home/abanihi/sorted_cmip5_database.csv")
df.head()

Unnamed: 0,ensemble,experiment,file_basename,file_fullpath,frequency,institution,model,realm,root,varname,version
0,r1i1p1,rcp85,ua_Amon_GFDL-CM3_rcp85_r1i1p1_209601-210012.nc,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,mon,NOAA-GFDL,GFDL-CM3,atmos,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,ua,v0
1,r4i1p1,historical,ua_Amon_GFDL-CM3_historical_r4i1p1_200501-2005...,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,mon,NOAA-GFDL,GFDL-CM3,atmos,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,ua,v0
2,r5i1p1,historical,ua_Amon_GFDL-CM3_historical_r5i1p1_200501-2005...,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,mon,NOAA-GFDL,GFDL-CM3,atmos,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,ua,v0
3,r2i1p1,historical,ua_Amon_GFDL-CM3_historical_r2i1p1_200501-2005...,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,mon,NOAA-GFDL,GFDL-CM3,atmos,/glade/collections/cmip/cmip5/output1/NOAA-GFD...,ua,v0
4,r4i1p1,rcp85,ua_Amon_CanESM2_rcp85_r4i1p1_200601-210012.nc,/glade/collections/cmip/cmip5/output1/CCCma/Ca...,mon,CCCma,CanESM2,atmos,/glade/collections/cmip/cmip5/output1/CCCma/Ca...,ua,v0


In [3]:
class CMIP5DataSource(intake_xarray.base.DataSourceMixin):
    container = 'xarray'
    name = 'cmip5'
    version = '0.0.1'
    partition_access = True
    
    def __init__(self, query_result, metadata=None):
        self._df = query_result
        self.urlpath = ''
        self._ds = None
        super().__init__(metadata=metadata)
        if self.metadata is None:
            self.metadata = {}
            
    def _open_dataset(self):
        ds_list = []
        for ens_i in self._df.ensemble.unique():
            ens_match = (self._df.ensemble == ens_i)
            paths = self._df.loc[ens_match].file_fullpath.tolist()
            ds_list.append(xr.open_mfdataset(paths))
            
        self._ds = xr.concat(ds_list, dim='ensemble')

In [4]:
CMIP5DataSource

__main__.CMIP5DataSource

In [5]:
database_file="/glade/u/home/abanihi/sorted_cmip5_database.csv"

In [6]:
class CMIP5DataStoreCatalog(intake.catalog.Catalog):
    name = "CMIP5_metadatastore"
    def __init__(self, database_file=None, **kwargs):
        self._database_file = database_file
        self._df = self._read_database()
        self.models = self._df.model.unique()
        self.experiments = self._df.experiment.unique()
        self.ensembles = self._df.ensemble.unique()
        self.frequencies = self._df.frequency.unique()
        self.realms = self._df.realm.unique()
        kwargs.setdefault('name', database_file)
        
        super().__init__(**kwargs)
        if self.metadata is None:
            self.metadata = {}
            
        catalog = self
        
        
    def _read_database(self):
        if os.path.exists(self._database_file):
            return pd.read_csv(self._database_file)
        else:
            raise FileNotFoundError(f"{self._database_file}")
            
    def _query_database(self, model, experiment, frequency, realm, ensemble, varname):
        query = {'model': model,
                'experiment': experiment,
                'frequency': frequency,
                'realm': realm,
                'ensemble': ensemble,
                'varname': varname}
        
        condition = np.ones(len(self._df), dtype=bool)
        
        for key, val in query.items():
            if val is not None:
                condition = condition & (self._df[key] == val)
        df_subset = self._df.loc[condition]
        if df_subset.empty:
            raise ValueError(f"No dataset found for:\n \
                              \tmodel = {model} \n \
                              \texperiment = {experiment} \n \
                              \tfrequency = {frequency} \n \
                              \trealm = {realm} \n \
                              \tensemble = {ensemble} \n \
                              \tvarname = {varname}")
        #-- realm is optional arg so check that the same varname is not in multiple realms
        realm_list = df_subset.realm.unique()
        if len(realm_list) != 1:
            raise ValueError(f"{varname} found in multiple realms:\n \
                          '\t{realm_list}. Please specify the realm to use")
        return df_subset
        
    def search(self, model=None, experiment=None, frequency=None, realm=None, ensemble=None, varname=None):
        self._latest_query_results = self._query_database(model, experiment, frequency, realm, ensemble, varname)
        return self._latest_query_results
    
    def _load(self):
        query_result = self._latest_query_results
        args = {'query_result': query_result}
        return intake.catalog.local.LocalCatalogEntry(
        name='test',
        description={},
        driver= '__main__.CMIP5DataSource',
        direct_access='forbid',
        args = args,
        cache = None,
        parameters = {},
        metadata = None,
        catalog_dir = None)
        

In [7]:
a = CMIP5DataStoreCatalog(database_file=database_file)

In [29]:
b = a.search(model='GFDL-CM3', realm="atmos", frequency="mon", experiment='rcp85', ensemble='r1i1p1', varname='ua')

In [30]:
c = a._load()

In [31]:
c.describe()

{'container': 'xarray',
 'description': {},
 'direct_access': 'forbid',
 'user_parameters': []}

In [32]:
c.to_dask()

<xarray.Dataset>
Dimensions:     (bnds: 2, ensemble: 1, lat: 90, lon: 144, plev: 23, time: 60)
Coordinates:
  * plev        (plev) float64 1e+05 9.25e+04 8.5e+04 ... 300.0 200.0 100.0
  * time        (time) datetime64[ns] 2096-01-16T12:00:00 ... 2100-12-16T12:00:00
  * lat         (lat) float64 -89.0 -87.0 -85.0 -83.0 ... 83.0 85.0 87.0 89.0
  * lon         (lon) float64 1.25 3.75 6.25 8.75 ... 351.2 353.8 356.2 358.8
Dimensions without coordinates: bnds, ensemble
Data variables:
    time_bnds   (ensemble, time, bnds) datetime64[ns] dask.array<shape=(1, 60, 2), chunksize=(1, 60, 2)>
    lat_bnds    (ensemble, lat, bnds) float64 dask.array<shape=(1, 90, 2), chunksize=(1, 90, 2)>
    lon_bnds    (ensemble, lon, bnds) float64 dask.array<shape=(1, 144, 2), chunksize=(1, 144, 2)>
    ua          (ensemble, time, plev, lat, lon) float32 dask.array<shape=(1, 60, 23, 90, 144), chunksize=(1, 60, 23, 90, 144)>
    average_T1  (ensemble, time) datetime64[ns] dask.array<shape=(1, 60), chunksize=(1

In [35]:
a.to_dask()

In [34]:
!ls

Untitled.ipynb			 slurm-1949796.out  slurm-1949813.out
cmip5_database_generation.ipynb  slurm-1949797.out  slurm-1949814.out
dask-worker-space		 slurm-1949798.out  slurm-1949821.out
slurm-1949785.out		 slurm-1949799.out  slurm-1949822.out
slurm-1949786.out		 slurm-1949800.out  slurm-1949823.out
slurm-1949787.out		 slurm-1949801.out  slurm-1949824.out
slurm-1949788.out		 slurm-1949802.out  slurm-1949825.out
slurm-1949789.out		 slurm-1949803.out  slurm-1949826.out
slurm-1949790.out		 slurm-1949807.out  slurm-1949827.out
slurm-1949791.out		 slurm-1949808.out  slurm-1949828.out
slurm-1949792.out		 slurm-1949809.out  slurm-1949829.out
slurm-1949793.out		 slurm-1949810.out  slurm-1949830.out
slurm-1949794.out		 slurm-1949811.out  slurm-1949831.out
slurm-1949795.out		 slurm-1949812.out


In [15]:
ds_list = []
for ens_i in b.ensemble.unique():
    ens_match = (b.ensemble == ens_i)
    paths = b.loc[ens_match].file_fullpath.tolist()
    #ds_list.append(xr.open_mfdataset(paths))

In [16]:
paths

['/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/ua_Amon_GFDL-CM3_rcp85_r1i1p1_209601-210012.nc',
 '/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/v20120227/ts/ts_Amon_GFDL-CM3_rcp85_r1i1p1_207601-208012.nc',
 '/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/v20120227/tas/tas_Amon_GFDL-CM3_rcp85_r1i1p1_209601-210012.nc',
 '/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/v20120227/rlds/rlds_Amon_GFDL-CM3_rcp85_r1i1p1_209601-210012.nc',
 '/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/v20120227/sfcWind/sfcWind_Amon_GFDL-CM3_rcp85_r1i1p1_209601-210012.nc',
 '/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/v20120227/pr/pr_Amon_GFDL-CM3_rcp85_r1i1p1_209601-210012.nc',
 '/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/v20120227/tasmin

In [28]:
paths[5]

'/glade/collections/cmip/cmip5/output1/NOAA-GFDL/GFDL-CM3/rcp85/mon/atmos/Amon/r1i1p1/v20120227/pr/pr_Amon_GFDL-CM3_rcp85_r1i1p1_209601-210012.nc'

In [24]:
d = xr.open_mfdataset(paths[:4])

In [25]:
d.height

<xarray.DataArray 'height' ()>
array(2.)
Coordinates:
    height   float64 ...
Attributes:
    units:          m
    positive:       up
    long_name:      height
    standard_name:  height
    axis:           Z

In [26]:
d

<xarray.Dataset>
Dimensions:     (bnds: 2, lat: 90, lon: 144, plev: 23, time: 120)
Coordinates:
  * time        (time) datetime64[ns] 2076-01-16T12:00:00 ... 2100-12-16T12:00:00
  * plev        (plev) float64 1e+05 9.25e+04 8.5e+04 ... 300.0 200.0 100.0
  * lat         (lat) float64 -89.0 -87.0 -85.0 -83.0 ... 83.0 85.0 87.0 89.0
  * lon         (lon) float64 1.25 3.75 6.25 8.75 ... 351.2 353.8 356.2 358.8
    height      float64 ...
Dimensions without coordinates: bnds
Data variables:
    time_bnds   (time, bnds) datetime64[ns] dask.array<shape=(120, 2), chunksize=(120, 2)>
    lat_bnds    (lat, bnds) float64 dask.array<shape=(90, 2), chunksize=(90, 2)>
    lon_bnds    (lon, bnds) float64 dask.array<shape=(144, 2), chunksize=(144, 2)>
    ua          (time, plev, lat, lon) float32 dask.array<shape=(120, 23, 90, 144), chunksize=(120, 23, 90, 144)>
    average_T1  (time) datetime64[ns] dask.array<shape=(120,), chunksize=(120,)>
    average_T2  (time) datetime64[ns] dask.array<shape=(120