In [1]:
from dask.distributed import Client
import multiprocessing

ncpu = multiprocessing.cpu_count()
threads = 6
nworker = ncpu // threads
print(ncpu, threads, nworker)

48 6 8


In [2]:
client = Client(processes=False, threads_per_worker=threads, n_workers=nworker, memory_limit='256GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:33165  Dashboard: http://localhost:8888/proxy/8787/status,Cluster  Workers: 8  Cores: 48  Memory: 515.40 GB


In [3]:
import pandas as pd
import fnmatch
import dask.dataframe as dd
from intake.source.utils import reverse_format
import os
import re
import subprocess
from tqdm.auto import tqdm
from pathlib import Path
import shutil
import numpy as np

## Create text file containing all files available

In [4]:
def get_file_list(persist_path):
    root = Path("/work/kd0956/CMIP5/data/cmip5/")
    p_path = Path(persist_path)
    p_path.mkdir(exist_ok=True)
    dirs = [x for x in root.iterdir() if x.is_dir()]
    for directory in tqdm(dirs):
        print(directory)
        stem = directory.stem
        f = open(f"{persist_path}/{stem}.txt", "w")
        cmd = ["find", "-L", directory.as_posix(), "-name", "*.nc"]
        p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=f)
        p.wait()

In [5]:
persist_path = "./CMIP5_filelist"
# get_file_list(persist_path)


## Extract attributes of a file using information from CMIP5 DRS.



Reference:
- CMIP5 DRS: https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=27

Directory:
```
  <activity>/
    <product>/
        <institute>/
            <model>/
                <experiment>/
                    <frequency>/
                        <modeling realm>/
                            <MIP table>/
                                <ensemble member>/
                                    <version number>/
                                        <variable name>/
                                            <CMOR filename>.nc
```
                                                
CMOR filename: `<variable name>_<MIP table>_<model>_<experiment>_ <ensemble member>[_<temporal subset>][_<geographical info>].nc`
"""

In [6]:
products = list(Path(persist_path).rglob("*.txt"))
products = [product.stem for product in products]
products

['output1', 'output2', 'output1-checkpoint']

In [7]:
df = dd.read_csv(f"{persist_path}/*.txt", header=None).compute()
df.columns = ["path"]
df.head()

Unnamed: 0,path
0,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
1,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
2,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
3,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
4,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...


In [8]:
len(df)

3542137

In [9]:
def _reverse_filename_format(file_basename, filename_template=None, gridspec_template=None):
    """
    Uses intake's ``reverse_format`` utility to reverse the string method format.
    Given format_string and resolved_string, find arguments
    that would give format_string.format(arguments) == resolved_string
    """
    try:
        return reverse_format(filename_template, file_basename)
    except ValueError:
        try:
            return reverse_format(gridspec_template, file_basename)
        except:
            print(
                f'Failed to parse file: {file_basename} using patterns: {filename_template} and {gridspec_template}'
            )
            return {}


def _extract_attr_with_regex(input_str, regex, strip_chars=None):
    pattern = re.compile(regex, re.IGNORECASE)
    match = re.findall(pattern, input_str)
    if match:
        match = max(match, key=len)
        if strip_chars:
            match = match.strip(strip_chars)

        else:
            match = match.strip()

        return match

    else:
        return None


exclude_patterns = ['*/files/*', '*/latest/*']


def _filter_func(path):
    return not any(fnmatch.fnmatch(path, pat=exclude_pattern) for exclude_pattern in exclude_patterns)

In [10]:
%%time
files = df.path.tolist()
filelist = list(filter(_filter_func, files))

CPU times: user 15.6 s, sys: 115 ms, total: 15.7 s
Wall time: 15.5 s


In [11]:
len(filelist)

3542137

In [12]:
def get_attrs(filepath):
    """Extract attributes of a file using information from CMIP5 DRS.
    Notes
    -----
    Reference:
    - CMIP5 DRS: https://pcmdi.llnl.gov/mips/cmip5/docs/cmip5_data_reference_syntax.pdf?id=27
    """

    fileparts = {}

    freq_regex = r'/3hr/|/6hr/|/day/|/fx/|/mon/|/monClim/|/subhr/|/yr/'
    realm_regex = r'aerosol|atmos|land|landIce|ocean|ocnBgchem|seaIce'
    version_regex = r'v\d{4}\d{2}\d{2}|v\d{1}'

    file_basename = os.path.basename(filepath)
    fileparts['path'] = filepath

    filename_template = '{variable}_{mip_table}_{model}_{experiment}_{ensemble_member}_{temporal_subset}.nc'
    gridspec_template = '{variable}_{mip_table}_{model}_{experiment}_{ensemble_member}.nc'
    f = _reverse_filename_format(
        file_basename, filename_template=filename_template, gridspec_template=gridspec_template
    )
    fileparts.update(f)

    frequency = _extract_attr_with_regex(filepath, regex=freq_regex, strip_chars='/')
    realm = _extract_attr_with_regex(filepath, regex=realm_regex)
    version = _extract_attr_with_regex(filepath, regex=version_regex) or 'v0'
    fileparts['frequency'] = frequency
    fileparts['modeling_realm'] = realm
    fileparts['version'] = version
    try:
        part1, part2 = os.path.dirname(filepath).split(fileparts['experiment'])
        part1 = part1.strip("/").split("/")
        fileparts['institute'] = part1[-2]
        fileparts['product_id'] = part1[-3]
    except Exception:
        print(fileparts)

    return fileparts

In [13]:
get_attrs(filelist[0])

{'path': '/work/kd0956/CMIP5/data/cmip5/output1/MOHC/HadGEM2-CC/historical/day/atmos/day/r2i1p1/v20111129/rhs/rhs_day_HadGEM2-CC_historical_r2i1p1_19991201-20041130.nc',
 'variable': 'rhs',
 'mip_table': 'day',
 'model': 'HadGEM2-CC',
 'experiment': 'historical',
 'ensemble_member': 'r2i1p1',
 'temporal_subset': '19991201-20041130',
 'frequency': 'day',
 'modeling_realm': 'atmos',
 'version': 'v20111129',
 'institute': 'MOHC',
 'product_id': 'output1'}

In [15]:
%%time
entries = list(map(get_attrs, filelist))



CPU times: user 3min 5s, sys: 4.75 s, total: 3min 9s
Wall time: 3min 7s


In [16]:
entries[10]

{'path': '/work/kd0956/CMIP5/data/cmip5/output1/MOHC/HadGEM2-CC/historical/day/atmos/day/r2i1p1/v20111129/rhsmax/rhsmax_day_HadGEM2-CC_historical_r2i1p1_19791201-19841130.nc',
 'variable': 'rhsmax',
 'mip_table': 'day',
 'model': 'HadGEM2-CC',
 'experiment': 'historical',
 'ensemble_member': 'r2i1p1',
 'temporal_subset': '19791201-19841130',
 'frequency': 'day',
 'modeling_realm': 'atmos',
 'version': 'v20111129',
 'institute': 'MOHC',
 'product_id': 'output1'}

In [17]:
len(entries)



3542137

In [18]:
df = pd.DataFrame(entries)
df = df.drop_duplicates(subset=['path'], keep='last').reset_index(drop=True)
df.head()

Unnamed: 0,path,variable,mip_table,model,experiment,ensemble_member,temporal_subset,frequency,modeling_realm,version,institute,product_id
0,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19991201-20041130,day,atmos,v20111129,MOHC,output1
1,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,20041201-20051130,day,atmos,v20111129,MOHC,output1
2,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19791201-19841130,day,atmos,v20111129,MOHC,output1
3,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19591201-19641130,day,atmos,v20111129,MOHC,output1
4,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19841201-19891130,day,atmos,v20111129,MOHC,output1


In [19]:
len(df)

3542137

In [20]:
# Some entries are invalid
invalids = df[~df.product_id.isin(products)]
invalids
# df = df[df.activity_id.isin(activity_ids)]

Unnamed: 0,path,variable,mip_table,model,experiment,ensemble_member,temporal_subset,frequency,modeling_realm,version,institute,product_id
1617349,/work/kd0956/CMIP5/data/cmip5/output1/NOAA-GFD...,gridspec,ocean,fx,GFDL-ESM2G,esmHistorical,r0i0p0,fx,ocean,v20110601,output1,cmip5
1617353,/work/kd0956/CMIP5/data/cmip5/output1/NOAA-GFD...,gridspec,seaIce,fx,GFDL-ESM2G,esmHistorical,r0i0p0,fx,seaIce,v20110601,output1,cmip5
1627745,/work/kd0956/CMIP5/data/cmip5/output1/NOAA-GFD...,gridspec,ocean,fx,GFDL-ESM2G,historical,r0i0p0,fx,ocean,v20110601,output1,cmip5
1627750,/work/kd0956/CMIP5/data/cmip5/output1/NOAA-GFD...,gridspec,seaIce,fx,GFDL-ESM2G,historical,r0i0p0,fx,seaIce,v20110601,output1,cmip5
1634485,/work/kd0956/CMIP5/data/cmip5/output1/NOAA-GFD...,gridspec,seaIce,fx,GFDL-ESM2G,esmrcp85,r0i0p0,fx,seaIce,v20120301,output1,cmip5
...,...,...,...,...,...,...,...,...,...,...,...,...
2394846,/work/kd0956/CMIP5/data/cmip5/output1/ICHEC/EC...,orog,atmos,fx,EC-EARTH,historical,r0i0p0,fx,atmos,v20130211,output1,cmip5
2394848,/work/kd0956/CMIP5/data/cmip5/output1/ICHEC/EC...,areacella,atmos,fx,EC-EARTH,rcp85,r0i0p0,fx,atmos,v20130211,output1,cmip5
2394850,/work/kd0956/CMIP5/data/cmip5/output1/ICHEC/EC...,areacella,atmos,fx,EC-EARTH,rcp26,r0i0p0,fx,atmos,v20130211,output1,cmip5
2394851,/work/kd0956/CMIP5/data/cmip5/output1/ICHEC/EC...,areacella,atmos,fx,EC-EARTH,historical,r0i0p0,fx,atmos,v20130211,output1,cmip5


In [21]:
df = df[df.product_id.isin(products)]
len(df)

3541938

In [22]:
df.ensemble_member.unique()

array(['r2i1p1', 'r1i1p1', 'r3i1p1', 'r0i0p0', 'r5i3p1', 'r4i3p1',
       'r2i3p1', 'r1i2p1', 'r3i2p1', 'r3i3p1', 'r4i2p1', 'r1i3p1',
       'r7i3p1', 'r2i2p1', 'r8i3p1', 'r5i2p1', 'r9i2p1', 'r10i3p1',
       'r6i3p1', 'r7i2p1', 'r6i2p1', 'r10i2p1', 'r9i3p1', 'r8i2p1',
       'r5i1p1', 'r10i1p1', 'r6i1p1', 'r8i1p1', 'r4i1p1', 'r7i1p1',
       'r9i1p1', 'r11i1p1', 'r12i1p1', 'r1i2p2', 'r6i1p12', 'r4i1p10',
       'r6i1p11', 'r4i1p12', 'r4i1p11', 'r4i1p15', 'r4i1p16', 'r1i1p17',
       'r1i1p12', 'r6i1p15', 'r1i1p13', 'r6i1p16', 'r4i1p17', 'r1i1p11',
       'r1i1p10', 'r1i1p16', 'r1i1p15', 'r6i1p13', 'r6i1p14', 'r4i1p14',
       'r6i1p17', 'r2i1p11', 'r1i1p14', 'r6i1p10', 'r3i1p13', 'r1i1p2',
       'r4i1p4', 'r5i1p4', 'r3i1p3', 'r1i1p3', 'r2i1p2', 'r4i1p2',
       'r5i1p2', 'r2i1p3', 'r5i1p3', 'r3i1p2', 'r4i1p3', 'r3i1p4',
       'r2i1p4', 'r1i1p4', 'r2i1p6', 'r4i1p5', 'r1i1p6', 'r5i1p6',
       'r1i1p5', 'r4i1p6', 'r3i1p5', 'r5i1p5', 'r3i1p6', 'r2i1p5',
       'r1i1p125', 'r1i1p126', '

## Pick the latest versions only

In [23]:
grpby = list(set(df.columns.tolist()) - {'path', 'version'})
groups = df.groupby(grpby)

In [24]:
%%time
idx_to_remove = []
for _, group in groups:
    if group.version.nunique() > 1:
        idx_to_remove.extend(group.sort_values(by=['version'], ascending=False).index[1:].values.tolist())

tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x2ba33abb27b8>>
Traceback (most recent call last):
  File "/mnt/lustre01/pf/zmaw/m300524/conda-envs/ESMValtool/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/mnt/lustre01/pf/zmaw/m300524/conda-envs/ESMValtool/lib/python3.7/site-packages/bokeh/server/tornado.py", line 558, in _keep_alive
    c.send_ping()
  File "/mnt/lustre01/pf/zmaw/m300524/conda-envs/ESMValtool/lib/python3.7/site-packages/bokeh/server/connection.py", line 80, in send_ping
    self._socket.ping(codecs.encode(str(self._ping_count), "utf-8"))
  File "/mnt/lustre01/pf/zmaw/m300524/conda-envs/ESMValtool/lib/python3.7/site-packages/tornado/websocket.py", line 447, in ping
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError


CPU times: user 25min 54s, sys: 1min 28s, total: 27min 22s
Wall time: 25min 38s


In [25]:
len(idx_to_remove)

277602

In [26]:
len(df)

3541938

In [27]:
df1 = df.copy()
df = df.drop(index=idx_to_remove)
len(df)

3264336

In [28]:
df.head()

Unnamed: 0,path,variable,mip_table,model,experiment,ensemble_member,temporal_subset,frequency,modeling_realm,version,institute,product_id
0,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19991201-20041130,day,atmos,v20111129,MOHC,output1
1,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,20041201-20051130,day,atmos,v20111129,MOHC,output1
2,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19791201-19841130,day,atmos,v20111129,MOHC,output1
3,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19591201-19641130,day,atmos,v20111129,MOHC,output1
4,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...,rhs,day,HadGEM2-CC,historical,r2i1p1,19841201-19891130,day,atmos,v20111129,MOHC,output1


In [29]:
df.columns.shape

(12,)

In [30]:
# Re-arange columns
columns = [
    "product_id",
    "institute",
    "model",
    "experiment",
    "frequency",
    "modeling_realm",
    "mip_table",
    "ensemble_member",
    "variable",
    "temporal_subset",
    "version",
    "path",
]
df = df[columns]
df.head()

Unnamed: 0,product_id,institute,model,experiment,frequency,modeling_realm,mip_table,ensemble_member,variable,temporal_subset,version,path
0,output1,MOHC,HadGEM2-CC,historical,day,atmos,day,r2i1p1,rhs,19991201-20041130,v20111129,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
1,output1,MOHC,HadGEM2-CC,historical,day,atmos,day,r2i1p1,rhs,20041201-20051130,v20111129,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
2,output1,MOHC,HadGEM2-CC,historical,day,atmos,day,r2i1p1,rhs,19791201-19841130,v20111129,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
3,output1,MOHC,HadGEM2-CC,historical,day,atmos,day,r2i1p1,rhs,19591201-19641130,v20111129,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...
4,output1,MOHC,HadGEM2-CC,historical,day,atmos,day,r2i1p1,rhs,19841201-19891130,v20111129,/work/kd0956/CMIP5/data/cmip5/output1/MOHC/Had...


In [31]:
df.to_csv("../catalogs/mistral-cmip5.csv.gz", compression="gzip", index=False)