In [1]:
import argparse
import dask
import json
import netCDF4 as nc4
import numpy as np
import pandas as pd
from pathlib import Path
from pprint import pprint
import time
import warnings
import xarray as xr
from dask import delayed
warnings.filterwarnings('ignore')

In [2]:
import warnings
warnings.filterwarnings('ignore')
import dask
dask.config.set({'temporary_directory': '/mnt/intraid/ian1/ifenty/dask_tmp'})

# Works on Ian's Machine, but not necessary
from dask.distributed import Client, progress
#client = Client(processes=False, threads_per_worker=48)
#client = Client(processes=False, n_workers=8, threads_per_worker=1,memory_limit='128GB')
client = Client()

client

0,1
Client  Scheduler: tcp://127.0.0.1:45376  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 64  Memory: 201.40 GB


In [3]:
def get_groupings(base_dir, grid_type, time_type):
    groupings = dict()
    tmp = Path(f'{base_dir}/{grid_type}/{time_type}')
    print(tmp)
    if tmp.exists():
        g_dirs = np.sort(list(tmp.iterdir()))
        for pi, p in enumerate(g_dirs):
            grouping = str(p).split('/')[-1]
            groupings[pi] = dict()
            groupings[pi]['name'] = grouping
            groupings[pi]['grid'] = grid_type
            groupings[pi]['time_type'] = time_type
            groupings[pi]['directory'] = p
            
    return groupings

In [4]:
@delayed
def load_ecco_file(filename):
    time_start=time.time()
    print(filename.name)
    ecco_field = xr.open_dataset(filename).load()
    return ecco_field

In [5]:
@delayed
def get_minmax(ecco_field):
    results = dict()
    for dv in ecco_field.data_vars:
        results[dv] = dict()
        tmp_min = ecco_field[dv].min()
        tmp_max = ecco_field[dv].max()
        
        results[dv]['valid_min'] = tmp_min.values.item()
        results[dv]['valid_max'] = tmp_max.values.item()
    
    return results

In [6]:
@delayed
def save_DS(DS, filename, output_dir):
    filename = ['valid_minmax_' + filename]
    DS.to_netcdf(output_dir / filename)

In [7]:
@delayed
def construct_DS(ecco_field, results, grouping_info, ds_title, ds_id, delta_time, filename):
    DAs = []

    # loop through all data varaibles
    for dv in ecco_field.data_vars:
        valid_min = results[dv]['valid_min']
        valid_max = results[dv]['valid_max']

        # construct data array with valid min and max
        tmp = xr.DataArray([valid_min, valid_max], dims=['valid_min_max'])
        tmp.name = dv
        DAs.append(tmp)
       
    DS = xr.merge(DAs)
    DS.attrs['title']     = ds_title
    DS.attrs['name']      = grouping_info['name']
    DS.attrs['grid']      = grouping_info['grid']
    DS.attrs['time_type'] = grouping_info['time_type']
    DS.attrs['id']        = ds_id
    DS.attrs['shortname'] = ds_id.split('/')[1]
    DS.attrs['directory'] = str(grouping_info['directory'])
    DS.attrs['filename'] = filename
    DS.attrs['delta_time'] = delta_time

    return DS

In [8]:
def f1(ecco_files, grouping_info, ds_title, ds_id, output_dir):
    results = []
    start_time= time.time()
    for file in ecco_files:
        ecco_field = load_ecco_file(file)
        result = get_minmax(ecco_field)
        #results.append(result)
        delta_time = time.time() - start_time
        DS = construct_DS(ecco_field, result, grouping_info, ds_title, ds_id, delta_time, file.name)
        save_DS(DS, file.name, output_dir)
        results.append(DS)
    return results

## Inputs

In [9]:
output_dir = Path('/home/ifenty/ian1/ifenty/ECCOv4/Version4/Release4/podaac/valid_minmax_20210312')

In [10]:
dataset_base_dir = Path('/home/ifenty/ian1/ifenty/ECCOv4/Version4/Release4/podaac/')

In [11]:
grids = ['native','latlon']
time_avgs = ['day_inst', 'day_mean','mon_mean']

## Calc

In [12]:
time_type = time_avgs[2]
grid_type = grids[0]

groupings = get_groupings(dataset_base_dir, grid_type, time_type)

/home/ifenty/ian1/ifenty/ECCOv4/Version4/Release4/podaac/native/mon_mean


In [13]:
for gi in groupings:
    print(gi, groupings[gi]['name'])

0 ATM_SURFACE_TEMP_HUM_WIND_PRES
1 OCEAN_3D_MOMENTUM_TEND
2 OCEAN_3D_SALINITY_FLUX
3 OCEAN_3D_TEMPERATURE_FLUX
4 OCEAN_3D_VOLUME_FLUX
5 OCEAN_AND_ICE_SURFACE_FW_FLUX
6 OCEAN_AND_ICE_SURFACE_HEAT_FLUX
7 OCEAN_AND_ICE_SURFACE_STRESS
8 OCEAN_BOLUS_STREAMFUNCTION
9 OCEAN_BOLUS_VELOCITY
10 OCEAN_BOTTOM_PRESSURE
11 OCEAN_DENS_STRAT_PRESS
12 OCEAN_MIXED_LAYER_DEPTH
13 OCEAN_TEMPERATURE_SALINITY
14 OCEAN_VELOCITY
15 SEA_ICE_CONC_THICKNESS
16 SEA_ICE_HORIZ_VOLUME_FLUX
17 SEA_ICE_SALT_PLUME_FLUX
18 SEA_ICE_VELOCITY
19 SEA_SURFACE_HEIGHT


In [19]:
gi = 3
print(groupings[gi])
grouping_info = groupings[gi]

{'name': 'OCEAN_3D_TEMPERATURE_FLUX', 'grid': 'native', 'time_type': 'mon_mean', 'directory': PosixPath('/home/ifenty/ian1/ifenty/ECCOv4/Version4/Release4/podaac/native/mon_mean/OCEAN_3D_TEMPERATURE_FLUX')}


In [22]:
data_dir = groupings[gi]['directory']
glob_name = '*ECCO*nc'
ecco_files = np.sort(list(data_dir.glob(glob_name)))

In [21]:
ecco_files = ecco_files[0:2]
ecco_files

array([PosixPath('/home/ifenty/ian1/ifenty/ECCOv4/Version4/Release4/podaac/native/mon_mean/OCEAN_3D_TEMPERATURE_FLUX/OCEAN_3D_TEMPERATURE_FLUX_mon_mean_1992-01_ECCO_V4r4_native_llc0090.nc'),
       PosixPath('/home/ifenty/ian1/ifenty/ECCOv4/Version4/Release4/podaac/native/mon_mean/OCEAN_3D_TEMPERATURE_FLUX/OCEAN_3D_TEMPERATURE_FLUX_mon_mean_1992-02_ECCO_V4r4_native_llc0090.nc')],
      dtype=object)

In [23]:
tmp_file = xr.open_dataset(ecco_files[0])
ds_title = tmp_file.attrs['title']
ds_id = tmp_file.attrs['id']

In [24]:
start_time = time.time()
f1_out = dask.compute(f1(ecco_files, grouping_info, ds_title, ds_id, output_dir))[0]
delta_time = time.time() - start_time

In [26]:
delta_time*30

1247.4147605895996

In [33]:
output_dir = Path('/home/ifenty/ian1/ifenty/ECCOv4/Version4/Release4/podaac/valid_minmax_20210311c')
if not output_dir.exists():
    output_dir.mkdir()