# Perform sigma coordinate remapping

In [1]:
import os

import tempfile
from subprocess import PIPE, Popen, check_call

import numpy as np
import xarray as xr
import dask

import pop_tools

import calc_z_to_sigma

Cannot write to data cache '/glade/p/cesmdata/cseg'. Will not be able to download remote data files. Use environment variable 'CESMDATAROOT' to specify another directory.


## Utility functions

Define a set of helper functions.

In [2]:
def gen_time_chunks(start, stop, chunk_size):
    """
    Generate a list of index pairs for brute-force chunking in time.
    """    
    time_level_count = stop - start
    nchunk =  time_level_count // chunk_size

    if time_level_count%chunk_size != 0:
        nchunk += 1
    
    return [
        (start +i * chunk_size, start + i * chunk_size + chunk_size)
        for i in range(nchunk -1)] + [(start + (nchunk - 1 ) * chunk_size, stop)]


def filename_chunk(file_in, time_index, dir_out):
    """
    Create a filename for a chunked file.
    """
    file_in = os.path.basename(file_in).replace('.nc', '')   
    return f'{dir_out}/{file_in}.tnx.{time_index[0]:03d}-{time_index[1]:03d}.nc'


def write_input_file(file_in, time_index, dir_out, clobber):      
    """
    Read a file and write a time-indexed subset to a new file.
    
    Parameters
    ----------    
    file_in : str
      The input filename
    time_index : tuple
       The upper and lower index bounds
    dir_out : str
       The output directory.
    clobber : boolean
       Overwrite all files.
       
    Returns
    -------
    file_out : str
      The output file that was written.
      
    """
    file_out = filename_chunk(file_in, time_index, dir_out) 
    
    if os.path.exists(file_out) and clobber:
        check_call(['rm', '-f', file_out])
               
    if not os.path.exists(file_out):         
        os.makedirs(os.path.dirname(file_out), exist_ok=True)
        
        time_slice = slice(time_index[0], time_index[1])

        open_kwargs = dict(decode_coords=False, decode_times=False, decode_cf=False) 
        with xr.open_dataset(file_in, **open_kwargs) as ds:
            ds.isel(time=time_slice).to_netcdf(file_out)

    return file_out 

@dask.delayed
def ncrcat(files, file_out):
    """
    Call ncrcat
    
    Parameter
    ---------
    files : list
      The files to concatentate
    file_out : str
      The output file.      
    """
    
    tmpfile = tempfile.NamedTemporaryFile(suffix='.filelist')
    with open(tmpfile.name, 'w') as fid:
        for f in files:
            fid.write(f'{f}\n')

    ncrcat_cmd = ' '.join(['cat', tmpfile.name, '|', 'ncrcat', '-O', '-o', file_out])
    cmd = ' && '.join(['module load nco', ncrcat_cmd])
    
    p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
    
    stdout, stderr = p.communicate()
    if p.returncode != 0:
        print(stdout.decode('UTF-8'))
        print(stderr.decode('UTF-8'))
        raise   

## Spin up dask cluster

In [3]:
from ncar_jobqueue import NCARCluster
cluster = NCARCluster()
cluster.scale(36)
cluster

VBox(children=(HTML(value='<h2>NCARCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [4]:
from dask.distributed import Client
client = Client(cluster) # Connect this local process to remote workers
client

0,1
Client  Scheduler: tcp://128.117.181.225:45870  Dashboard: https://jupyterhub.ucar.edu/dav/user/mclong/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## Setup problem details

In [5]:
droot = '/glade/scratch/kristenk/archive/cesm22cocco.G1850ECOIAF.T62_g17.003/ocn/proc/tseries/month_1'

case = 'cesm22cocco.G1850ECOIAF.T62_g17.003'
stream = 'pop.h'
datestr = '024901-031012'
variables = ['ALK', 'ALK_ALT_CO2', 'DIC',]


USER = os.environ['USER']
dout = f'/glade/scratch/{USER}/besome/cesm_cases/{case}/sigma_coord'

Get the list of input files.

In [None]:
files = {}
for v in variables+['PD']:
    files[v] = f'{droot}/{case}.{stream}.{v}.{datestr}.nc'
    if not os.path.exists(files[v]):
        raise ValueError(f'{files[v]} dne')
files

## Setup the sigma coordinate

In [None]:
sigma_delta=0.2
sigma_start=24. - sigma_delta / 2
sigma_stop=28. +  sigma_delta / 2

sigma_edges = calc_z_to_sigma.sigma_coord_edges(sigma_start,sigma_stop,sigma_delta)
sigma = np.average(np.vstack((sigma_edges[0:-1],sigma_edges[1:])),axis=0)
sigma

## Define the chunks over-which to apply brute-force parallelism

In [None]:
with xr.open_dataset(files['PD'], decode_coords=False, decode_times=False) as ds:
    stop = len(ds.time)

time_chunks = gen_time_chunks(start=0, stop=stop, chunk_size=12)
print(f'{len(time_chunks)} chunks')

## Generate sigma2 dataset

In [None]:
#open_kwargs = dict(decode_coords=False, decode_times=False, decode_cf=False) 

#dsS =  xr.open_dataset(files['SALT'], chunks={'time': 12}, **open_kwargs)
#dsT =  xr.open_dataset(files['TEMP'], chunks={'time': 12}, **open_kwargs)

#sigma2 = pop_tools.eos(salt=dsS.SALT, temp=dsT.TEMP, pressure=2000.)


## Perform the remapping

In [9]:
%%time

@dask.delayed
def op_one_chunk(file_in_var, file_in_sigma, file_out, time_index, dir_out, clobber):
    """
    Remap one chunk to sigma coord.
    """
    
    dir_tmp = f'{dir_out}/tmp'
               
    # write the input files
    file_in_var_sub = write_input_file(file_in_var, time_index, dir_tmp, clobber)
    file_in_sigma_sub = write_input_file(file_in_sigma, time_index, dir_tmp, clobber)
     
    # construct output file name
    file_out = filename_chunk(file_out, time_index, dir_tmp)
    
    if os.path.exists(file_out) and clobber:
        check_call(['rm', '-f', file_out])
    
    # call the remapping routine
    if not os.path.exists(file_out):
        os.makedirs(os.path.dirname(file_out), exist_ok=True)
        calc_z_to_sigma.compute(
            file_in=file_in_var_sub,
            file_in_sigma=file_in_sigma_sub, 
            file_out=file_out,
            zname='z_t', 
            dzname='dz', 
            kmtname='KMT', 
            sigma_varname='PD',
            convert_from_pd=True,
            dimsub={},
            sigma_start=sigma_start,
            sigma_stop=sigma_stop,
            sigma_delta=sigma_delta,)    
        
    return file_out


def op_var(file_in_var, file_in_sigma, file_out, dir_out, time_chunks, clobber):
    """
    Remap variable to sigma coords, chunking in time.
    """
    kw=dict(
        file_in_var=file_in_var, 
        file_in_sigma=file_in_sigma, 
        file_out=file_out,
        dir_out=dir_out,
        clobber=clobber,
    )
    
    file_cat = [op_one_chunk(time_index=tnx, **kw) for tnx in time_chunks]    
    
    return dask.compute(*file_cat)


# loop over variables and remap each one
clobber = False
ncrcat_delayed = []
for v in variables:
    
    file_out = f'{dout}/{case}.{stream}.sigma.{v}.{datestr}.nc'    

    if not os.path.exists(file_out) or clobber:         
        os.makedirs(os.path.dirname(file_out), exist_ok=True)

        print(f'compute {v} remapping')
        file_cat = op_var(files[v], files['PD'], file_out, dout, time_chunks, clobber)
        
        ncrcat_delayed.append(ncrcat(file_cat, file_out))

print('concatenating chunks...')
dask.compute(*ncrcat_delayed)

compute ALK
	remapping time chunks...
compute ALK_ALT_CO2
	remapping time chunks...
compute DIC
	remapping time chunks...
concatenating chunks...
CPU times: user 2min 37s, sys: 18.9 s, total: 2min 56s
Wall time: 1h 11min 31s


(None, None, None)

## Clean up temporary files

In [12]:
check_call(['rm', '-frv', f'{dout}/tmp'])

0