In [2]:
import intake 
import pandas as pd

In [3]:
def create_local_table(paths):
    dfs = []
    for p in paths:
        dfs.append(pd.read_csv(p))
    df = pd.concat(dfs,ignore_index=True)
    return df
local_tab = create_local_table(snakemake.input)
url = (
    "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"
)
remote_col = intake.open_esm_datastore(url)
remote_tab = remote_col.df

In [5]:
local_tab['version']=local_tab['version'].str.slice(start=1).astype(int)

keys = [key 
        for key in local_tab.columns.values 
        if key in remote_tab.columns.values
       ]
i1 = local_tab.set_index(keys).index
i2 = remote_tab.set_index(keys).index

In [8]:
remote_filtered = remote_tab[~i2.isin(i1)]

merged_tab = pd.concat([local_tab, remote_filtered], ignore_index=True)

merged_tab["format"]="netcdf"
merged_tab.loc[pd.isna(merged_tab["path"]),"format"]="zarr"
merged_tab.loc[pd.isna(merged_tab["time_range"]),"time_range"]="*"

merged_tab.loc[pd.isna(merged_tab["path"]),"path"]=merged_tab["zstore"]

del merged_tab["zstore"]

merged_tab = merged_tab.drop_duplicates(subset=["path","version"])

In [16]:
merge_catalog ={
    # we follow the esmcat specs version 0.1.0: 
    'esmcat_version': '0.1.0',
    'id': 'Intake-Betzy-Nird-Pangeo',
    'description': "Intake catalog for CLIMASO, combining data local and cloud data",
}

attributes=[]
directory_structure_template="mip_era/activity_id/institution_id/source_id/experiment_id/member_id/table_id/variable_id/grid_label/version"

for att in directory_structure_template.split('/'):
    attributes.append(
        dict(column_name=att,
             vocabulary=f"https://raw.githubusercontent.com/WCRP-CMIP/CMIP6_CVs/master/CMIP6_{att}.json"
            )
    )
merge_catalog["attributes"]=attributes  

assets={
    "column_name": "path",
    "format_column_name" : "format"
  }
merge_catalog["assets"]=assets

In [17]:
aggregation_control=dict(
    variable_column_name="variable_id",
    groupby_attrs=[
        "activity_id",
        "institution_id"
    ]
)

aggregation_control["aggregations"]=[dict(
    attribute_name="variable_id",
    type="union"
)]

aggregation_control["aggregations"].append(
    dict(
        attribute_name="time_range",
        type="join_existing",
        options={ "dim": "time", "coords": "minimal", "compat": "override" }
    )
)

aggregation_control["aggregations"].append(
    dict(
        attribute_name= "member_id",
        type= "join_new",
        options={ "coords": "minimal", "compat": "override" }
    )
)

merge_catalog["aggregation_control"]=aggregation_control

In [18]:
esm_merged = intake.open_esm_datastore({'esmcat':merge_catalog, 'df': merged_tab})

In [26]:
outdir, name = snakemake.output.json.split('/')
name = name.split(".")[0]

In [27]:
esm_merged.serialize(name=name, directory=outdir, catalog_type="file")