In [1]:
import os

from itertools import product

import xarray as xr
import dask
import numpy as np
import pandas as pd

import util

PROJECT = "NCGD0011"
USER = os.environ["USER"]

In [2]:
basins = ['North_Atlantic_basin', 'North_Pacific_basin', 'South', 'Southern_Ocean']
npolygon = dict(
    North_Atlantic_basin=150, 
    North_Pacific_basin=200, 
    South=300,
    Southern_Ocean=40,
)

In [3]:
mths = ['-' + str(m).zfill(2) for m in range(1,13)]
yrs = np.array([str(yr).zfill(4) for yr in range(347, 363)])

timestamps = np.char.add(np.repeat(yrs, len(mths)),
                         np.tile(mths, len(yrs))
           )

In [4]:
%%time
path = '/glade/campaign/cesm/development/bgcwg/projects/OAE-Global-Efficiency/Mengyang_Global_OAE_Experiments/archive/'

rows = []
offset = 0
for n, b in enumerate(basins):
    
    polygon_ids = [f'{i:03d}' for i in np.arange(offset, offset + npolygon[b])]    
    offset += npolygon[b]
    
    for i, p_id in enumerate(polygon_ids):
        
        for m in ['01', '04', '07', '10']:
            ndx = np.int32(m) - 1
            dates = timestamps[ndx:ndx + 180]
            
            case = f'smyle-fosi.{b}.alk-forcing-{b}.{i:03d}-1999-{m}'
            files = [f'{path}/{case}/ocn/hist/{case}.pop.h.{d}.nc' for d in dates]
            
            rows.append(
                dict(polygon=i, polygon_id=p_id, basin=b, start_date=dates[0], files=files)
            )

index_fields = ['polygon', 'basin', 'start_date']
df = pd.DataFrame(rows).set_index(index_fields)
df

CPU times: user 255 ms, sys: 47.7 ms, total: 302 ms
Wall time: 303 ms


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,polygon_id,files
polygon,basin,start_date,Unnamed: 3_level_1,Unnamed: 4_level_1
0,North_Atlantic_basin,0347-01,000,[/glade/campaign/cesm/development/bgcwg/projec...
0,North_Atlantic_basin,0347-04,000,[/glade/campaign/cesm/development/bgcwg/projec...
0,North_Atlantic_basin,0347-07,000,[/glade/campaign/cesm/development/bgcwg/projec...
0,North_Atlantic_basin,0347-10,000,[/glade/campaign/cesm/development/bgcwg/projec...
1,North_Atlantic_basin,0347-01,001,[/glade/campaign/cesm/development/bgcwg/projec...
...,...,...,...,...
38,Southern_Ocean,0347-10,688,[/glade/campaign/cesm/development/bgcwg/projec...
39,Southern_Ocean,0347-01,689,[/glade/campaign/cesm/development/bgcwg/projec...
39,Southern_Ocean,0347-04,689,[/glade/campaign/cesm/development/bgcwg/projec...
39,Southern_Ocean,0347-07,689,[/glade/campaign/cesm/development/bgcwg/projec...


In [5]:
start_dates = list(df.index.unique(level='start_date'))
polygons = [df.xs((b, start_dates[0]), level=('basin', 'start_date')).index[0] for b in basins]

In [6]:
df.loc[0, 'South', '0347-01']

polygon_id                                                  350
files         [/glade/campaign/cesm/development/bgcwg/projec...
Name: (0, South, 0347-01), dtype: object

In [7]:
cluster, client = util.get_ClusterClient(memory="2GB", project=PROJECT, walltime="12:00:00")
cluster.scale(256)
client



0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/calcs/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/calcs/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.78:33671,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/calcs/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [8]:
%%time

@dask.delayed
def get_reference_data(index):
    
    return [
        xr.open_dataset(f)['ALK_ALT_CO2'].isel(time=0, z_t=0) 
        for f in df.loc[index].files
    ]
        

#reference_dsets = {f'{b}-{d}': get_reference_data((0, b, d))
#    for b, d in product(basins, start_dates)
#}

#reference_dsets = dask.compute(reference_dsets)[0]

CPU times: user 37 µs, sys: 13 µs, total: 50 µs
Wall time: 52.9 µs


In [9]:
%%time 

@dask.delayed
def comparison(index, index_ref):
    """return RMSE for field compared to reference"""
    
    files = df.loc[index].files
    da_list = [xr.open_dataset(f)['ALK_ALT_CO2'].isel(time=0, z_t=0) for f in files]    

    files = df.loc[index_ref].files
    da_ref_list = [xr.open_dataset(f)['ALK_ALT_CO2'].isel(time=0, z_t=0) for f in files]

    rmse = []
    for da_test, da_reference in zip(da_list, da_ref_list):
        rmse.append(
            ((da_test - da_reference) ** 2).sum().values.item()
        )
    return np.array(rmse)
        

rmse = []
for b, d in product(basins, start_dates):
    # get the indexes for these polygons — keep the reference as the first index
    polygons = df.xs((b, d), level=('basin', 'start_date')).index
    print((b, d))

    objs_rmse = []
    for p in polygons:
        p_ndx = (p, b, d)
        objs_rmse.append(dict(polygon=p, basin=b, start_date=d, rmse=comparison(p_ndx, (0, b, d))))
    
    rmse.extend(dask.compute(objs_rmse)[0])



('North_Atlantic_basin', '0347-01')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('North_Atlantic_basin', '0347-04')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('North_Atlantic_basin', '0347-07')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('North_Atlantic_basin', '0347-10')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('North_Pacific_basin', '0347-01')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('North_Pacific_basin', '0347-04')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('North_Pacific_basin', '0347-07')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('North_Pacific_basin', '0347-10')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('South', '0347-01')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('South', '0347-04')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('South', '0347-07')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('Southern_Ocean', '0347-04')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('Southern_Ocean', '0347-07')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


('Southern_Ocean', '0347-10')


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: user 14min 33s, sys: 5min 59s, total: 20min 33s
Wall time: 55min 44s


In [10]:
df_comp = pd.DataFrame(rmse).set_index(index_fields)
df_comp

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,rmse
polygon,basin,start_date,Unnamed: 3_level_1
0,North_Atlantic_basin,0347-01,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,North_Atlantic_basin,0347-01,"[23.45962905883789, 727.4760131835938, 1109.60..."
2,North_Atlantic_basin,0347-01,"[17.13382339477539, 564.4078979492188, 1187.89..."
3,North_Atlantic_basin,0347-01,"[20.131689071655273, 339.907470703125, 932.739..."
4,North_Atlantic_basin,0347-01,"[25.47551918029785, 812.4988403320312, 859.795..."
...,...,...,...
35,Southern_Ocean,0347-10,"[7.947857856750488, 126.8885726928711, 423.771..."
36,Southern_Ocean,0347-10,"[7.872915267944336, 130.29339599609375, 427.05..."
37,Southern_Ocean,0347-10,"[10.409200668334961, 185.58572387695312, 376.2..."
38,Southern_Ocean,0347-10,"[10.534895896911621, 175.25706481933594, 445.7..."


In [11]:
df_comp.to_pickle('comparison_data.pkl')