# Processing PiC_UVnudge data

### Set up
#### Packages

In [1]:
import numpy as np
import xarray as xr
import pandas as pd
from scipy import stats
import warnings
warnings.simplefilter('ignore', UserWarning)
warnings.filterwarnings('ignore')
import datetime as dt
from datetime import timedelta
from Processing_functions import FixLongitude, FixTime, FixGrid, InterPlevels
xr.set_options(display_expand_data=False);
xr.set_options(keep_attrs=True);
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.diagnostics import ProgressBar
from xgcm import Grid

#### Filepaths & name variables

In [2]:
## Test numbers
tst_nums = np.arange(1,4)

## File name
run_name = 'PiC_UVnudge_MM2006'
piC_UVnudge_name = 'b.e21.B1850cmip6.f09_g17.'+run_name

## Filepaths
path_to_arch = "/glade/derecho/scratch/glydia/archive/"
comp = 'ocn'
freq = 0 # 0: monthly, 1: daily
var_ind = 0

# ATM
# 8, 9, 11, -2, -1
# U, V, TREFHT, RESTOM, Z3

# ICE
# 0
# aice

# OCN
# 0
# MOC

# Variables
var_list = {'atm': ['TS','FLDS','CLOUD','FLNS','FSNS','FLNT','FSNT','PSL','U','V','T','TREFHT',
                    'Target_U','Target_V','Target_T','RESTOM','Z3'],
            'ice': ['aice','hi'],
            'ocn': ['MOC']}
var_ext = {0: '', 1: '_d'}
var = var_list[comp][var_ind]+var_ext[freq]

# Extensions
h_ext = {'atm': ['.h0.'],
       'ice': ['.h.','.h1.'],
       'ocn': ['.h.']}
mod_com = {'atm': 'cam',
           'ice': 'cice',
           'ocn': 'pop'}
time_path = {'atm': ['month_1'],
                'ice': ['month_1','day_1'],
                'ocn': ['month_1']}
yr_extn = {'in': [".195001-202312.", ".*0101-*1231."],
           'out': [".195001-202312.", ".19500101-20231231."]}
vert_lev = {'atm': [False,False,True,False,False,False,False,False,True,True,True,
                    False,True,True,True,False,True],
            'ice': [False,False],
            'ocn': [False]}

path_to_outdata = '/glade/work/glydia/Arctic_controls_processed_data/processed_'+run_name+'_data/'

In [3]:
cluster = PBSCluster(cores    = 1,
                     memory   = '25GiB',
                     queue    = 'casper',
                     walltime = '02:00:00',
                     account  = 'UCUB0155',
                     name='piControl_'+var)
cluster.scale(4*9)
client = Client(cluster)

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/glydia/Arctic_breakdown/proxy/33627/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/glydia/Arctic_breakdown/proxy/33627/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.97:34317,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/glydia/Arctic_breakdown/proxy/33627/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [5]:
## Chunking variables
time_ch = 365*2 if freq == 1 else 600
chunks = {
    'atm': {'time': time_ch, 'lat': 96, 'lon': 144, 'lev': -1},
    'ice': {'time': time_ch, 'nj': 192, 'ni': 160},
    'ocn': {'time': time_ch, 'nlat': 64, 'nlon': 96, 'z_t': 5}
}

### Load & modify data
#### Control data

In [6]:
%%time

yr_range = np.array([str(i).zfill(4) for i in np.arange(1950,2025)])

ens_list = []
for i in tst_nums:
    tst_name = piC_UVnudge_name+'.'+str(i).zfill(3)
    ## Load data
    # Open dataset
    filepath = path_to_arch+tst_name+'/'+comp+'/proc/tseries/'+time_path[comp][freq]+'/'
    filename = tst_name+h_ext[comp][freq]+var+yr_extn['in'][freq]+'nc'
    print(filepath+filename)
    if freq == 0:
        ds = dask.delayed(xr.open_dataset)(filepath+filename,chunks=chunks[comp])
    else:
        ds = xr.open_mfdataset(paths=filepath+filename,chunks=chunks[comp],parallel=True)

    horiz_only = not vert_lev[comp][var_ind]
    
    dsv = ds[var] if horiz_only else ds
    
    #del ds
    
    processed_list = []
    for j in range(0,len(yr_range)-1):
        # If monthly data
        if freq == 0:
            startyr = yr_range[j]
            endyr = yr_range[j+1]
            ann_slice = dsv.sel(time=slice(startyr+'-02-01',endyr+'-01-17')) #if add_cyclic else dsv.sel(time=slice(startyr+'-02-10',endyr+'-01-17'),lat=slice(0,90))
            print('sliced '+startyr+'-02-01 to '+endyr+'-01-17')
            
            fixedtime_data = dask.delayed(FixTime)(ann_slice)
            print('   fixed time')
    
            if comp == 'ice':
                fixedgrid_data = dask.delayed(FixGrid)(fixedtime_data,'gx1v7')
                processed_list.append(fixedgrid_data)
                print('   fixed CICE grid')
    
                #del ann_slice, fixedtime_data, fixedgrid_data
            elif comp == 'ocn':
                processed_list.append(fixedtime_data)
    
                #del ann_slice, fixedtime_data
            else:
                fixedgrid_data = dask.delayed(FixLongitude)(fixedtime_data, False)
                # If 3D data, interpolate to pressure levels
                if vert_lev[comp][var_ind]:
                    addplev_data = dask.delayed(InterPlevels)(fixedgrid_data, var)
                    processed_list.append(addplev_data)
                else:
                    processed_list.append(fixedgrid_data)
                print('   fixed longitude')
    
                #del ann_slice, fixedtime_data, fixedgrid_data
                
            
        # If daily data
        else:
            startyr = yr_range[j]
            endyr = yr_range[j]
            ann_slice = dsv.sel(time=slice(startyr+'-01-01',endyr+'-12-31')) #if add_cyclic else dsv.sel(time=slice(startyr+'-01-01',endyr+'-12-31'), lat=slice(0,90))
            print('sliced '+startyr+'-01-01 to '+endyr+'-12-31')
    
            if comp == 'ice':
                fixedgrid_data = dask.delayed(FixGrid)(ann_slice,'gx1v7')
                processed_list.append(fixedgrid_data)
                print('   fixed CICE grid')
    
                #del ann_slice, fixedgrid_data
            elif comp == 'ocn':
                processed_list.append(ann_slice)
    
                #del ann_slice
            else:
                fixedgrid_data = dask.delayed(FixLongitude)(ann_slice, False)
                processed_list.append(fixedgrid_data)
                print('   fixed longitude')
    
                #del ann_slice, fixedgrid_data
    if horiz_only and freq == 0:
    
        processed_comp = dask.compute(*processed_list)
        print('computed list for ensemble member '+str(i).zfill(3))
        
        processed_out = xr.concat(processed_comp,dim='time').chunk({'time':111})
        ens_list.append(processed_out)
        print('concatenated data for ensemble member '+str(i).zfill(3))
    else:
        ens_list.append(processed_list)
        print('add uncomputed list for '+str(i).zfill(3))

/glade/derecho/scratch/glydia/archive/b.e21.B1850cmip6.f09_g17.PiC_UVnudge_MM2006.001/ocn/proc/tseries/month_1/b.e21.B1850cmip6.f09_g17.PiC_UVnudge_MM2006.001.h.MOC.195001-202312.nc
sliced 1950-02-01 to 1951-01-17
   fixed time
sliced 1951-02-01 to 1952-01-17
   fixed time
sliced 1952-02-01 to 1953-01-17
   fixed time
sliced 1953-02-01 to 1954-01-17
   fixed time
sliced 1954-02-01 to 1955-01-17
   fixed time
sliced 1955-02-01 to 1956-01-17
   fixed time
sliced 1956-02-01 to 1957-01-17
   fixed time
sliced 1957-02-01 to 1958-01-17
   fixed time
sliced 1958-02-01 to 1959-01-17
   fixed time
sliced 1959-02-01 to 1960-01-17
   fixed time
sliced 1960-02-01 to 1961-01-17
   fixed time
sliced 1961-02-01 to 1962-01-17
   fixed time
sliced 1962-02-01 to 1963-01-17
   fixed time
sliced 1963-02-01 to 1964-01-17
   fixed time
sliced 1964-02-01 to 1965-01-17
   fixed time
sliced 1965-02-01 to 1966-01-17
   fixed time
sliced 1966-02-01 to 1967-01-17
   fixed time
sliced 1967-02-01 to 1968-01-17
   f

In [7]:
%%time

if not vert_lev[comp][var_ind] and freq == 0:
    ens_comp = dask.compute(*ens_list)
    print('computed list of ensemble members')
    
    ens_out = xr.concat(ens_comp,pd.Index(tst_nums,name='ensemble_member'))
    print('concatenated data of ensemble members')
    
    ens_out.to_netcdf(path_to_outdata+piC_UVnudge_name+h_ext[comp][freq]+var+yr_extn['out'][freq]+'nc', 
                                format='NETCDF4',encoding={var: {"zlib": True, "complevel": 1}})
    print('wrote data to disk')
    
else:
    ens_comp = dask.compute(*[i[0] for i in ens_list])
    ens_out = xr.concat(ens_comp,pd.Index(tst_nums,name='ensemble_member'))
    ens_out.to_zarr(path_to_outdata+piC_UVnudge_name+h_ext[comp][freq]+var+yr_extn['out'][freq]+'zarr', 
                            group=var)
    print('saved initial zarr store')
    for i in range(1,len(yr_range)-1):
        yr = str(yr_range[i])
        print('   saving year '+yr)
        
        ens_comp = dask.compute(*[j[i] for j in ens_list])
        ens_out = xr.concat(ens_comp,pd.Index(tst_nums,name='ensemble_member'))
        
        ens_out.sel(time=slice(yr+'-01-01',yr+'-12-31')).to_zarr(path_to_outdata+piC_UVnudge_name+h_ext[comp][freq]+var+yr_extn['out'][freq]+'zarr', 
                            append_dim='time', mode='a-',group=var)
    print('wrote data to disk')

computed list of ensemble members
concatenated data of ensemble members
wrote data to disk
CPU times: user 25.3 s, sys: 3.41 s, total: 28.7 s
Wall time: 41.6 s


In [8]:
client.shutdown()