In [1]:
import xarray as xr
import pandas as pd
import numpy as np
import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
import glob
from os.path import join

In [2]:
cluster = PBSCluster(account='NAML0001',
                     queue='main',
                     walltime='02:30:00',
                     memory="40GB",
                     cores=36)
                    
client = Client(cluster)
cluster.scale(jobs=10, memory="400GB")
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/cbecker/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/cbecker/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.14.11.121:42829,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/cbecker/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [3]:
def write_intermediate(files, variables, out_path, name):
    """ This function loads the relevant variables from the CLM PPE data, combines it, and saves an intermediate format.
    File format is yearly and includes all 500 ensemble members.
    """

    def preprocess(ds):
        """Filters the dataset to include only specified variables and adds ensemble member coordinate for easy merging."""
        ds_sub = ds[variables]
        ds_sub = ds_sub.expand_dims({'member': 1}).assign_coords({'member': [int(ds_sub.encoding["source"].split("LHC")[2][:4])]})
        return ds_sub

    ds = xr.open_mfdataset(files, combine='by_coords', parallel=True, preprocess=preprocess).sel(member=slice(1, 500)).persist()

    years, datasets = zip(*ds.groupby("time.year"))
    paths = [join(out_path, f"{name}_{y}.nc") for y in years]
    xr.save_mfdataset(datasets, paths)

In [15]:
## took about 1 hour for a single variable for all 165 years

start_year = 1850
end_year = 2015
files = []
for member in range(0, 501):
    m = str(member).zfill(4)
    for year in range(start_year, end_year, 5):
        files.append(f"/glade/campaign/cgd/tss/projects/PPE/PPEn11_LHC/transient/hist/PPEn11_transient_LHC{m}.clm2.h0.{year}-02-01-00000.nc")

base_out_path = "/glade/derecho/scratch/cbecker/PPE_intermediate_data/LAI"
file_out_prefix = "LAI_GPP"
variables = ["TLAI", "GPP"]
write_intermediate(files=files,
                   variables=variables,
                   out_path=base_out_path,
                   name=file_out_prefix)