In [1]:
import pathlib
from pathlib import Path
import xarray as xr

import intake
from ecgtools import Builder
from ecgtools.parsers.cesm import parse_cesm_timeseries
from ecgtools.parsers import parse_cmip6
from ecgtools.builder import INVALID_ASSET, TRACEBACK
import traceback

import numpy as np


In [5]:
# get ready for e3sm

# clmcalipso_CFmon_E3SM-1-0_amip_r1i1p1f1_gr_197001-199412.nc
# stem.split("_") -> ['clmcalipso', 'CFmon', 'E3SM-1-0', 'amip', 'r1i1p1f1', 'gr', '197001-199412']

# component,stream,case,member_id,variable,start_time,end_time,time_range,long_name,units,vertical_levels,frequency,path

def parse_e3sm_cmip(file):
    """E3SM data downloaded from CMIP archive"""
    file = Path(file)
    info = {}

    try:
        stem = file.stem
        split = stem.split('_')
        variable = split[0]
        stream = split[1] # e.g., 'Amon', 'CFmon'
        if 'mon' in stream:
            frequency = 'month_1'
        experiment = split[3]
        case = "_".join([split[2],split[3]])  # E3SM-1-0_amip
        member_id = split[4]
        start_time = split[6].split('-')[0]
        end_time = split[6].split('-')[1]
        time_range = split[6]

        with xr.open_dataset(file, chunks={}, decode_times=False) as ds:
            long_name = ds[variable].attrs.get('long_name')
            units = ds[variable].attrs.get('units')
            if 'lev' in ds[variable].coords:
                vertical_levels = ds[variable]['lev'].size
            elif 'ilev' in ds[variable].coords:
                vertical_levels = ds[variable]['ilev'].size
            elif 'alt40' in ds[variable].coords:
                vertical_levels = ds[variable]['alt40'].size
            elif 'plev' in ds[variable].coords:
                vertical_levels = ds[variable]['plev'].size
            else:
                vertical_levels = 1

            if hasattr(ds, 'frequency'):
                if ds.attrs['frequency'] == 'mon':
                    frequency = 'month_1'
                else:
                    frequency = ds.attrs['frequency']
            if hasattr(ds, 'realm'):
                if ds.attrs['realm'] == 'atmos':
                    component = 'atm'
                else:
                    component = ds.attrs['realm']

            grid = getattr(ds, 'grid_label', 'na')
            spatial_domain = 'global'

            info = {
                'component': component,
                'case': case,
                'experiment': experiment,
                'variable': variable,
                'long_name': long_name.lower(),
                'frequency': frequency,
                'stream': stream,
                'member_id': member_id,
                'vertical_levels': vertical_levels,
                'units': units,
                'spatial_domain': spatial_domain,
                'grid': grid,
                'start_time': start_time,
                'end_time': end_time,
                'path': str(file),
            }

        return info

    except Exception:
        return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()}

In [2]:
# case_to_build = 'cam5_1deg_release_amip'

# case_to_build = 'f.e21.FHIST_BGC.f09_f09_mg17.CMIP6-AMIP.001_cosp1'

case_to_build = 'E3SM-1-0_amip'

use_ts_path = False
use_remap_path = True

ts_path_str = 'atm/proc/tseries/month_1'

In [3]:
data_root = pathlib.Path(f'/Volumes/Drudonna/{case_to_build}/').absolute()
if use_ts_path:
    ts_path = data_root / ts_path_str
elif use_remap_path:
    ts_path = data_root / "remapped"
else:
    ts_path = data_root

ts_files = sorted(ts_path.rglob('*'))
print(f'Found {len(ts_files)} time series files.')

if len(ts_files) != 0:
    cat_dir = data_root / 'catalog'
    pathlib.Path.mkdir(cat_dir, parents=True, exist_ok=True)


Found 234 time series files.


In [4]:
cat_builder = Builder(
    root_path=ts_path, # Directory with the output
    depth=1, # Depth of 1 since we don't need to descend
    njobs=5, # Number of jobs to execute - should be equal to # threads you are using
    exclude_patterns=["*/hist/*", "*/rest/*"],

)

cat_builder

Builder(root_path=PosixPath('/Volumes/Drudonna/E3SM-1-0_amip/remapped'), extension='.nc', depth=1, exclude_patterns=['*/hist/*', '*/rest/*'], njobs=5)

In [6]:

parse_func = parse_e3sm_cmip # parse_cesm_timeseries
cat_builder = cat_builder.build(parse_func)

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   1 out of   1 | elapsed:    2.7s finished
[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   8 tasks      | elapsed:    1.2s
[Parallel(n_jobs=5)]: Done 165 tasks      | elapsed:    1.4s
[Parallel(n_jobs=5)]: Done 234 out of 234 | elapsed:    1.4s finished


In [7]:
# inspect the dataframe
cat_builder.df

# We can also check to see which files were not parsed by calling .invalid_assets
# print(cat_builder.invalid_assets.to_string())

Unnamed: 0,component,case,experiment,variable,long_name,frequency,stream,member_id,vertical_levels,units,spatial_domain,grid,start_time,end_time,path
0,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,187001,189412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
1,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,189501,191912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
2,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,192001,194412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
3,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,194501,196912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
4,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,197001,199412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
229,atm,E3SM-1-0_amip,amip,wap,omega (=dp/dt),month_1,Amon,r3i1p1f1,19,Pa s-1,global,gr,189501,191912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/wap_A...
230,atm,E3SM-1-0_amip,amip,wap,omega (=dp/dt),month_1,Amon,r3i1p1f1,19,Pa s-1,global,gr,192001,194412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/wap_A...
231,atm,E3SM-1-0_amip,amip,wap,omega (=dp/dt),month_1,Amon,r3i1p1f1,19,Pa s-1,global,gr,194501,196912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/wap_A...
232,atm,E3SM-1-0_amip,amip,wap,omega (=dp/dt),month_1,Amon,r3i1p1f1,19,Pa s-1,global,gr,197001,199412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/wap_A...


In [8]:
cat_builder.save(
    data_root / 'catalog' / f'catalog_{case_to_build}.csv',
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variable',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["component", "stream", "case"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {'type': 'union', 'attribute_name': 'variable'},
        {
            "type": "join_existing",
            "attribute_name": "time_range",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        },
    ],
)

Saved catalog location: /Volumes/Drudonna/E3SM-1-0_amip/catalog/catalog_E3SM-1-0_amip.json and /Volumes/Drudonna/E3SM-1-0_amip/catalog/catalog_E3SM-1-0_amip.csv


In [3]:
# testing
# tfil = '/Volumes/Drudonna/E3SM-1-0_amip/remapped/rsut_Amon_E3SM-1-0_amip_r2i1p1f1_gr_187001-189412.nc'
# parse_e3sm_cmip(tfil)

In [9]:
b = Builder('/Volumes/Drudonna/E3SM-1-0_amip/remapped/')

In [10]:
b.build(parse_e3sm_cmip)

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    7.2s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done  54 tasks      | elapsed:    1.5s
[Parallel(n_jobs=-1)]: Done 216 out of 216 | elapsed:    1.6s finished


Builder(root_path=PosixPath('/Volumes/Drudonna/E3SM-1-0_amip/remapped'), extension='.nc', depth=0, exclude_patterns=None, njobs=-1)

In [11]:
b.df

Unnamed: 0,component,case,experiment,variable,long_name,frequency,stream,member_id,vertical_levels,units,spatial_domain,grid,start_time,end_time,path
0,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,187001,189412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
1,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,189501,191912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
2,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,192001,194412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
3,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,194501,196912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
4,atm,E3SM-1-0_amip,amip,clcalipso,calipso percentage cloud cover,month_1,CFmon,r1i1p1f1,40,%,global,gr,197001,199412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/clcal...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
211,atm,E3SM-1-0_amip,amip,rsutcs,toa outgoing clear-sky shortwave radiation,month_1,Amon,r3i1p1f1,1,W m-2,global,gr,189501,191912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/rsutc...
212,atm,E3SM-1-0_amip,amip,rsutcs,toa outgoing clear-sky shortwave radiation,month_1,Amon,r3i1p1f1,1,W m-2,global,gr,192001,194412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/rsutc...
213,atm,E3SM-1-0_amip,amip,rsutcs,toa outgoing clear-sky shortwave radiation,month_1,Amon,r3i1p1f1,1,W m-2,global,gr,194501,196912,/Volumes/Drudonna/E3SM-1-0_amip/remapped/rsutc...
214,atm,E3SM-1-0_amip,amip,rsutcs,toa outgoing clear-sky shortwave radiation,month_1,Amon,r3i1p1f1,1,W m-2,global,gr,197001,199412,/Volumes/Drudonna/E3SM-1-0_amip/remapped/rsutc...


In [12]:
b.save(
    '/Volumes/Drudonna/E3SM-1-0_amip/catalog_E3SM-1-0_amip.csv',
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variable',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["component", "stream", "case"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {'type': 'union', 'attribute_name': 'variable'},
        {
            "type": "join_existing",
            "attribute_name": "time_range",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        },
    ],
)

Saved catalog location: /Volumes/Drudonna/E3SM-1-0_amip/catalog_E3SM-1-0_amip.json and /Volumes/Drudonna/E3SM-1-0_amip/catalog_E3SM-1-0_amip.csv


In [2]:
#
# Can we make a catalog for satellite data? 
#

# The answer is yes... the question is whether we can make it look a lot like the model catalogs

# component,stream,case,member_id,variable,start_time,end_time,time_range,long_name,units,vertical_levels,frequency,path

# I think we should make {'case': 'satellite'}
# Then we need to figure out the variables ... which we can get from ds.data_vars (list ok)
# The frequency will be "month_1" for all the data we'll ingest
# We can check the file name to figure out "source"
# What other info do we actually need... We need to know how to aggregate-> just by variable, I guess.


# files
# MapLowMidHigh330m_201901_avg_CFMIP2_sat_3.1.2.nc


### CUSTOM PARSER ###

sat_sources = ["ISCCP", "MODIS", "MISR", "CALIPSO", "CERES"]

def parse_satellites(file):
    """Parse satellite data for comparison with models. """
    file = Path(file)
    info = {}

    # determine source
    stem = file.stem
    for src in sat_sources:
        if src.lower() in stem:
            info['source'] = src
        elif src.lower() in str(file.parent).lower():
            info['source'] = src
    if 'source' not in info:
        print("UNKNOWN SOURCE: {stem}")              

    try:
        # to get the variables we make a list
        with xr.open_dataset(file, chunks={}, decode_times=True) as ds:
            variable_list = [var for var in ds if 'long_name' in ds[var].attrs]
            if 'time' in ds:
                info['start_time'] = ds.time.min().dt.strftime("%Y%m").item()
                info['end_time'] = ds.time.max().dt.strftime("%Y%m").item()

        info['variable']= variable_list
        info['path'] = str(file)
        info['frequency'] = 'month_1'  # We know a priori 
        if not variable_list:
            return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()}
        else:
            return info
    except Exception:
        return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()}
        

###

#
# isccp misr calipso modis
#
# sat_paths = ["/Volumes/Drudonna/ISCCPH/cltisccp_mon_HGGproc.198307-201706.nc",
# "/Volumes/Drudonna/ISCCPH/clhisccp_mon_HGGproc.198307-201706.nc",
# "/Volumes/Drudonna/ISCCPH/clmisccp_mon_HGGproc.198307-201706.nc",
# "/Volumes/Drudonna/ISCCPH/cllisccp_mon_HGGproc.198307-201706.nc",
# "/Volumes/Drudonna/MISR/misr_L3_V7/remapped/cltmisr_obs4MIPs_MISR_V7_200003-202111.nc",
# "/Volumes/Drudonna/MISR/misr_L3_V7/remapped/clhmisr_obs4MIPs_MISR_V7_200003-202111.nc",
# "/Volumes/Drudonna/MISR/misr_L3_V7/remapped/clmmisr_obs4MIPs_MISR_V7_200003-202111.nc",
# "/Volumes/Drudonna/MISR/misr_L3_V7/remapped/cllmisr_obs4MIPs_MISR_V7_200003-202111.nc",
# "/Volumes/Drudonna/CALIPSO/GOCCP/2Ddata/grid_2x2_L40/remapped/fv09", # the variables are contained in these monthly files
# "/Volumes/Drudonna/MODIS/processed/remapped/fv09/cltmodis_MCD06COSPM3_MODIS_v062_200207-202207.nc",
# "/Volumes/Drudonna/MODIS/processed/remapped/fv09/clhmodis_MCD06COSPM3_MODIS_v062_200207-202207.nc",
# "/Volumes/Drudonna/MODIS/processed/remapped/fv09/clmmodis_MCD06COSPM3_MODIS_v062_200207-202207.nc",
# "/Volumes/Drudonna/MODIS/processed/remapped/fv09/cllmodis_MCD06COSPM3_MODIS_v062_200207-202207.nc"]

sat_paths = ["/Volumes/Drudonna/ISCCPH/processed/fv09", 
 "/Volumes/Drudonna/MISR/misr_L3_V7/remapped",
 "/Volumes/Drudonna/CALIPSO/GOCCP/2Ddata/grid_2x2_L40/remapped/fv09",
 "/Volumes/Drudonna/CALIPSO/GOCCP/3Ddata/remapped/fv09",
 "/Volumes/Drudonna/MODIS/processed/remapped/fv09/",
 "/Volumes/Drudonna/CERES/monthly_files/remapped/fv09"]

sat_cat_builder = Builder(
    root_path=sat_paths, # Directory with the output
    depth=1, # Depth of 1 since we don't need to descend
    njobs=5, # Number of jobs to execute - should be equal to # threads you are using
    include_patterns=["MapLowMidHigh330m_*_avg_CFMIP2_sat_3.1.2.nc", 
    "3D_CloudFraction330m_*_avg_CFMIP2_sat_3.1.2.nc",
    "cl[thml]isccp_mon_ISCCP-Basic.HGG.GLOBAL.10KM_198307-201706.nc",
    "cl[thml]misr_obs4MIPs_MISR_V7_200003-202111.nc",
    "cl[thml]modis_MCD06COSPM3_MODIS_v062_200207-202207.nc",
    "CERES_EBAF_Ed4.1_Subset_*.nc"],
    exclude_patterns=["ISCCP-Basic.*", "mapping*"]
)

sat_cat_builder = sat_cat_builder.build(parse_satellites)

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   3 out of   6 | elapsed:    2.8s remaining:    2.8s
[Parallel(n_jobs=5)]: Done   6 out of   6 | elapsed:    2.8s finished
[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   8 tasks      | elapsed:    1.4s
[Parallel(n_jobs=5)]: Done 202 tasks      | elapsed:    1.7s
[Parallel(n_jobs=5)]: Done 1127 out of 1127 | elapsed:    4.6s finished
  ).clean_dataframe()


In [3]:
print(sat_cat_builder.invalid_assets.to_string())

                                                               INVALID_ASSET         TRACEBACK
1126  /Volumes/Drudonna/MODIS/processed/remapped/fv09/mapping_1x1_to_fv09.nc  NoneType: None\n


In [4]:
# inspect the dataframe
# sat_cat_builder.df

# np.sum(sat_cat_builder.df['source'] == 'CALIPSO')

# for s in set(sat_cat_builder.df['source']):
#     print(f"There are {np.sum(sat_cat_builder.df['source'] == s)} rows with source = {s}")

# We can also check to see which files were not parsed by calling .invalid_assets
# print(sat_cat_builder.invalid_assets.to_string())


sat_cat_builder.save(
    '/Volumes/Drudonna/catalog_satellite_clouds_monthly.csv',
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variable',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["source"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {'type': 'union', 'attribute_name': 'variable'},
        {
            "type": "join_existing",
            "attribute_name": "variable",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        },
    ],
)

Saved catalog location: /Volumes/Drudonna/catalog_satellite_clouds_monthly.json and /Volumes/Drudonna/catalog_satellite_clouds_monthly.csv


  sat_cat_builder.save(
