# Build catalog from the input files for the bias correction

Date: 7 June 2024

Author = {"name": "Richard Matear","affiliation": "CSIRO", "email": "thomas.moore@csiro.au", "orcid": "0000-0003-3930-1946"}

#### Reference documents: https://ecgtools.readthedocs.io/en/latest

In [1]:
import glob
import pathlib
import traceback
from datetime import datetime

import xarray as xr

from ecgtools import Builder
from ecgtools.builder import INVALID_ASSET, TRACEBACK

from matplotlib import pyplot as plt

In [2]:
from dask.distributed import Client
client = Client()
client

2024-06-09 20:33:21,326 - distributed.preloading - INFO - Creating preload: /g/data/hh5/public/apps/dask-optimiser/schedplugin.py
2024-06-09 20:33:21,328 - distributed.utils - INFO - Reload module schedplugin from .py file
2024-06-09 20:33:21,351 - distributed.preloading - INFO - Import preload module: /g/data/hh5/public/apps/dask-optimiser/schedplugin.py
Perhaps you already have a cluster running?
Hosting the HTTP server on port 41173 instead


Modifying workers


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /node/gadi-cpu-bdw-0005.gadi.nci.org.au/27994/proxy/41173/status,

0,1
Dashboard: /node/gadi-cpu-bdw-0005.gadi.nci.org.au/27994/proxy/41173/status,Workers: 2
Total threads: 2,Total memory: 0 B
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38125,Workers: 2
Dashboard: /node/gadi-cpu-bdw-0005.gadi.nci.org.au/27994/proxy/41173/status,Total threads: 2
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:39019,Total threads: 1
Dashboard: /node/gadi-cpu-bdw-0005.gadi.nci.org.au/27994/proxy/43631/status,Memory: 0 B
Nanny: tcp://127.0.0.1:35259,
Local directory: /jobfs/117718290.gadi-pbs/dask-scratch-space/worker-dcll1qwe,Local directory: /jobfs/117718290.gadi-pbs/dask-scratch-space/worker-dcll1qwe

0,1
Comm: tcp://127.0.0.1:37693,Total threads: 1
Dashboard: /node/gadi-cpu-bdw-0005.gadi.nci.org.au/27994/proxy/45283/status,Memory: 0 B
Nanny: tcp://127.0.0.1:41555,
Local directory: /jobfs/117718290.gadi-pbs/dask-scratch-space/worker-vt08yj47,Local directory: /jobfs/117718290.gadi-pbs/dask-scratch-space/worker-vt08yj47


### get path from config

In [3]:
import configparser

# Create a ConfigParser object
config = configparser.ConfigParser()

# Read the config file
config.read('/g/data/xv83/rxm599/acs/data-catalogue/config.ini')

# Get the value of a variable
catalogue_path = config.get('paths', 'catalogue_path')

### build the catalogue from the listing of files on `xv83`

In [90]:
# build list of future simulations but omit the historical simulations

# create a base name for location of files
root_version= '/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/'
mBoM = sorted(glob.iglob(root_version+'AGCD-05i/BOM/*/[!hi]*/*/*/*/*'))
mCSIRO = sorted(glob.iglob(root_version+'AGCD-05i/CSIRO/*/[!hi]*/*/*/*/*'))
mRuns=mBoM+mCSIRO

# extract location of root location for a simulation
def r_model(mRun):
    b=mRun.split('/')
    c=b[8:]
    d='/'.join(c)
    return d

# number of individual simulations in the bias_input directory
print(len(mRuns))

17


In [76]:
for file in mRuns: 
    root_model=(r_model(file)) 
    root_source_path =  root_version+root_model+'/'
    files = sorted(glob.glob(root_source_path+'*/*'))
    print(root_source_path)
    print(len(files))
    
    

/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1/day/
602
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/ACCESS-ESM1-5/ssp370/r6i1p1f1/BARPA-R/v1-r1/day/
602
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/CESM2/ssp370/r11i1p1f1/BARPA-R/v1-r1/day/
602
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/CMCC-ESM2/ssp370/r1i1p1f1/BARPA-R/v1-r1/day/
602
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/EC-Earth3/ssp370/r1i1p1f1/BARPA-R/v1-r1/day/
602
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/MPI-ESM1-2-HR/ssp370/r1i1p1f1/BARPA-R/v1-r1/day/
602
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/NorESM2-MM/ssp

In [79]:
def parse_CCAM(file):
    """CCAM data stored in"""
    file = pathlib.Path(file)
    info = {}

    try:
        stem = file.stem
        split = stem.split('_')
        variable = split[0]
        domain = split[1]
        host_GCM = split[2]
        run_type = split[3]
        host_ensemble = split[4]
        downscale_model = split[5]
        downscale_version = split[6]
        period = split[7]
        if period == '1hr':
            time_period = 'hourly'
        elif period == '6hr':
            time_period = 'six_hourly'
        elif period == 'day':
            time_period = 'daily'
        elif period == 'mon':
            time_period = 'monthly'
        else:
            time_period = 'fixed'    

        
        info = {
            'variable': variable,
            'domain': domain,
            'host_GCM':host_GCM,
            'run_type':run_type,
            'host_ensemble':host_ensemble,
            'downscale_model':downscale_model,
            'downscale_version':downscale_version,
            'period': period,
            'time_period': time_period,
            'path': str(file),
        }
        return info
    
    except Exception:
        return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()}

# Create catalogue for each future simulation in the bias_input directory 

In [88]:
%%time
for file in mRuns: 
    root_model=(r_model(file)) 
    root_source_path =  root_version+root_model+'/'
    files = sorted(glob.glob(root_source_path+'*/*'))
    print(root_source_path)
    print(len(files))
# build catalogue    
    b = Builder([root_source_path],depth=2)
    b.build(parsing_func = parse_CCAM)
    model=root_model.replace("/", "_")
    print(model)
#
    b.save(
    # File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
        name = model,
        directory = catalogue_path,
    # Column name including filepath
        path_column_name='path',
    # Column name including variables
        variable_column_name='variable',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
        data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
        groupby_attrs=["domain", "host_GCM", "run_type","host_ensemble","downscale_model","downscale_version","period"],
    # Aggregations which are fed into xarray when reading in data using intake
        aggregations=[
        {
            "type": "join_existing",
            "attribute_name": "date",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        }
        ],
    )



/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/ACCESS-CM2/ssp370/r4i1p1f1/BARPA-R/v1-r1/day/
602
AGCD-05i_BOM_ACCESS-CM2_ssp370_r4i1p1f1_BARPA-R_v1-r1_day
Successfully wrote ESM catalog json file to: file:///g/data/xv83/rxm599/acs/data-catalogue/catalogues/AGCD-05i_BOM_ACCESS-CM2_ssp370_r4i1p1f1_BARPA-R_v1-r1_day.json
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/ACCESS-ESM1-5/ssp370/r6i1p1f1/BARPA-R/v1-r1/day/
602
AGCD-05i_BOM_ACCESS-ESM1-5_ssp370_r6i1p1f1_BARPA-R_v1-r1_day
Successfully wrote ESM catalog json file to: file:///g/data/xv83/rxm599/acs/data-catalogue/catalogues/AGCD-05i_BOM_ACCESS-ESM1-5_ssp370_r6i1p1f1_BARPA-R_v1-r1_day.json
/g/data/ia39/australian-climate-service/test-data/CORDEX-CMIP6/bias-adjustment-input/AGCD-05i/BOM/CESM2/ssp370/r11i1p1f1/BARPA-R/v1-r1/day/
602
AGCD-05i_BOM_CESM2_ssp370_r11i1p1f1_BARPA-R_v1-r1_day
Successfully wrote ESM catalog json file to: file:/

# THE END

In [28]:
client.shutdown()