# Calculate 2D arrival time maps for each shelf basin

In [1]:
import xarray as xr
import numpy as np
import pandas as pd
from glob import glob
import time
from datetime import datetime
from dask import dataframe
from scipy.optimize import leastsq
import matplotlib.pyplot as plt

from dask.distributed import Client
import memory_profiler

import warnings
warnings.filterwarnings("ignore")

In [2]:
import sys
from pathlib import Path
module_path = str(Path.cwd().parents[0] )
#print(module_path)
if module_path not in sys.path:
    sys.path.append(module_path)
import custom_functions as cf

In [3]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /proxy/38095/status,

0,1
Dashboard: /proxy/38095/status,Workers: 6
Total threads: 18,Total memory: 200.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38539,Workers: 6
Dashboard: /proxy/38095/status,Total threads: 18
Started: Just now,Total memory: 200.00 GiB

0,1
Comm: tcp://127.0.0.1:37953,Total threads: 3
Dashboard: /proxy/35869/status,Memory: 33.33 GiB
Nanny: tcp://127.0.0.1:36079,
Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-ie8fj8zt,Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-ie8fj8zt

0,1
Comm: tcp://127.0.0.1:39773,Total threads: 3
Dashboard: /proxy/36633/status,Memory: 33.33 GiB
Nanny: tcp://127.0.0.1:36175,
Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-4w4htquh,Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-4w4htquh

0,1
Comm: tcp://127.0.0.1:36517,Total threads: 3
Dashboard: /proxy/34797/status,Memory: 33.33 GiB
Nanny: tcp://127.0.0.1:40135,
Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-ih2ttyte,Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-ih2ttyte

0,1
Comm: tcp://127.0.0.1:43739,Total threads: 3
Dashboard: /proxy/46017/status,Memory: 33.33 GiB
Nanny: tcp://127.0.0.1:35413,
Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-d84izku3,Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-d84izku3

0,1
Comm: tcp://127.0.0.1:45059,Total threads: 3
Dashboard: /proxy/38553/status,Memory: 33.33 GiB
Nanny: tcp://127.0.0.1:34637,
Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-b78_72rn,Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-b78_72rn

0,1
Comm: tcp://127.0.0.1:43549,Total threads: 3
Dashboard: /proxy/41401/status,Memory: 33.33 GiB
Nanny: tcp://127.0.0.1:46711,
Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-xzkk4w3p,Local directory: /jobfs/70037796.gadi-pbs/dask-worker-space/worker-xzkk4w3p


#### Define custom functions

In [4]:
def geolocate_basin(ds, basinmask): #THIS IS NOT RIGHT FOR PARTICLES _ ONLY FOR BASIN MAP
    """
    Use basinmask to add ocean basin id to dataset
    """
    
    #ds['basin'] = basinmask.sel(yt_ocean=ds.lat, xt_ocean=ds.lon, method='nearest').basins.drop_vars(['yt_ocean','xt_ocean'])
    ds['basin'] = basinmask.sel(yu_ocean=ds.lat, xu_ocean=ds.lon, method='nearest').basins.drop_vars(['yu_ocean','xu_ocean'])
    
    return ds

# Inverse Gaussian (scaled)
def inversegaussian(x, Gamma, Delta, Scaling):
    inversegaussian = []
    for i in range(x.size):
        inversegaussian += [Scaling*(((Gamma**3)/(4 * np.pi * (Delta**2) * (x[i]**3)))**(0.5))
                            * np.exp(((-Gamma)*((x[i]-Gamma)**2))/(4*(Delta**2)*x[i]))]
    return np.array(inversegaussian)

# Residual Calculation for least squares
def res(p, y, x):
    Gam, Del, Sca = p
    y_fit = inversegaussian(x, Gam, Del, Sca)
    err = y - y_fit
    return err

#### Load required data

In [5]:
# Open Antarctic basins file
antarctic_basins = xr.open_dataset('/g/data/e14/hd4873/runs/parcels/output/AntConn/data/basin_masks/Antarctic_shelf_basin_mask_hu_coarse.nc')

# Open initial particle parameters file
startfile = '/g/data/e14/hd4873/runs/parcels/output/AntConn/data/CircumAntarcticParticles_initial_values.nc'
ref = xr.open_dataset(startfile, decode_cf=False).drop_vars(['shelf_exit_lat', 'shelf_exit_lon', 'shelf_exit_z', 
                                                             'shelf_exit_S', 'shelf_exit_T', 'z', 'temp', 'salt', 
                                                             'shelf_weighted_val', 'shelf', 'lat', 'lon',
                                                            ]).load()

# Define particle files
npart = 130146
datadir = '/g/data/e14/hd4873/runs/parcels/output/AntConn/data/xarray_files/traj_chunked_basin/'
files = sorted(glob(datadir+'CircumAntarcticParticles_*.nc')) 

## open complete particle dataset
ds = xr.open_mfdataset(files, decode_cf=False).drop_vars(['psal','thermo','mixedlayershuffle', 'mldepth', 
                                                          'unbeachCount', 'z', 'basin', 'shelf','basin_ZonalConn',])

print(ds.nbytes/1000**3)

115.0719818


In [6]:
print(antarctic_basins.region.values)
print(antarctic_basins.basin_lookup.values)

['East Antarctica 01' 'East Antarctica 02' 'Ross Sea' 'West Antarctica'
 'West Antarctic Peninsula' 'Weddell Sea' 'Southern Ocean']
[  2.   3.   4.   5.   6.   7. 100.]


### Calculate 2D maps of arrival time

First define histogram parameters. 

In [7]:
# Size of bins for histograms
ntime = 21
total_months = ntime*12
bin_months = 6
delta_time_bin = ntime/total_months*bin_months

# Bins for histograms
time_bin = np.arange(0,ntime,delta_time_bin)

# X in timestep increments
x = time_bin[:-1]
x_long = np.arange(delta_time_bin, ntime*3, delta_time_bin)

# Convert X to years and center points on middle of timesteps
x_years = (x + delta_time_bin/2.0) 
x_years_long = (x_long + delta_time_bin/2.0)

# Initial guess for Gam, Del, Sca
Gam, Del, Sca = [20, 50, 1000000] 
p = [Gam, Del, Sca] # Initial guesses for leastsq
y_init = inversegaussian(x_years, Gam, Del, Sca) 

# set lat and lon bins
dlon = 1
dlat = 0.5
lat_S, lat_N = -81, -48
lon_W, lon_E = -280, 80
xarange = np.arange(lon_W, lon_E+dlon, dlon)
yarange = np.arange(lat_S, lat_N+dlat, dlat)
xmid = (xarange[1:]+xarange[:-1])/2. # longitude midpoints
ymid = (yarange[1:]+yarange[:-1])/2. # latitude midpoints

# create basin map to test if timemap is in ocean or not
basinmap = np.zeros((len(ymid), len(xmid)))
basinmap = xr.DataArray(basinmap, coords=[ymid, xmid], dims=["lat", "lon"])
basinmap = geolocate_basin(basinmap, antarctic_basins)

Now loop through each basin and save 2D timemap arrays to file. 

In [11]:
%%time
# Loop through each basin and save to 2Dpxy file
for basinid in antarctic_basins.basin_lookup[0:5]:
    print(basinid.values)
    # create empty arrays for timemap
    timemap_peak = np.zeros((len(ymid), len(xmid)))
    timemap_peak = xr.DataArray(timemap_peak, coords=[ymid, xmid], dims=["lat", "lon"])
    timemap_peak.attrs["Long name"] = "Peak arrival time since release to reach specific grid box - weighted by transport"
    timemap_peak.attrs["Units"] = "Years"
    
    timemap_peak_gaussian = np.zeros((len(ymid), len(xmid)))
    timemap_peak_gaussian = xr.DataArray(timemap_peak_gaussian, coords=[ymid, xmid], dims=["lat", "lon"])
    timemap_peak_gaussian.attrs["Long name"] = "Peak arrival time based on analytical inverse gaussian TTD fit - weighted by transport"
    timemap_peak_gaussian.attrs["Units"] = "Years"
    
    timemap_peak_unweighted_gaussian =  np.zeros((len(ymid), len(xmid)))
    timemap_peak_unweighted_gaussian = xr.DataArray(timemap_peak_unweighted_gaussian, coords=[ymid, xmid], dims=["lat", "lon"])
    timemap_peak_unweighted_gaussian.attrs["Long name"] = "Peak arrival time based on analytical inverse gaussian TTD fit - unweighted, just based on particle count"
    timemap_peak_unweighted_gaussian.attrs["Units"] = "Years"
           
    timemap_mean = np.zeros((len(ymid), len(xmid)))
    timemap_mean = xr.DataArray(timemap_mean, coords=[ymid, xmid], dims=["lat", "lon"])
    timemap_mean.attrs["Long name"] = "Mean time since release to reach specific grid box"
    timemap_mean.attrs["Units"] = "Days"
    
    timemap_median = np.zeros((len(ymid), len(xmid)))
    timemap_median = xr.DataArray(timemap_median, coords=[ymid, xmid], dims=["lat", "lon"])
    timemap_median.attrs["Long name"] = "Median time since release to reach specific grid box"
    timemap_median.attrs["Units"] = "Days"
    
    # set land cells to nan
    timemap_peak = timemap_peak.where(basinmap.basin > 0, np.nan)
    timemap_peak_gaussian = timemap_peak_gaussian.where(basinmap.basin > 0, np.nan)
    timemap_peak_unweighted_gaussian = timemap_peak_unweighted_gaussian.where(basinmap.basin > 0, np.nan)
    timemap_mean = timemap_mean.where(basinmap.basin > 0, np.nan)
    timemap_median = timemap_median.where(basinmap.basin > 0, np.nan)
    
    # select out particle trajectories for this basin
    traj = np.where(ref.basin == basinid.values)[0]
    
    # subset ds for basin and convert to dataframe
    print("loading basin dataset...")
    dfloop = ds.isel(trajectory = traj).to_dataframe()
    print("dropping NaNs...")
    dfloop.dropna(subset = ["lat", "lon"], inplace=True)
    
    # convert transport and releasetime to df
    print("convert transport and releasetime to df...")
    refloop = ref.isel(trajectory=traj).to_dataframe().reset_index().drop(columns = ['basin', 'shelf_exit_indx']) #RELEASE TIME IS IN DAYS!!!!!!!

    for j, lat in enumerate(yarange[:-1]): #loop through latitude bins
        print(j,lat)
        # create latitude dataframe subset
        dflattmp = dfloop[(dfloop['lat'] >= lat) & (dfloop['lat'] < (lat+dlat))]
        dflattmp = dflattmp.reset_index()
        
        # merge transport and release_time values into dataframe
        dflattmp = pd.merge(dflattmp, np.abs(refloop), 
                     left_on = 'trajectory', 
                     right_on = 'trajectory', 
                     how='left')
        dflattmp['crossing_time'] = dflattmp['time'] - dflattmp['release_time']*24*60*60

        for i, lon in enumerate(xarange[:-1]):  #loop through longitude bins
            if np.isnan(basinmap.basin[j,i]):
                continue
            else:
                dflontmp = dflattmp[(dflattmp.lon>=lon) & (dflattmp.lon<(lon+dlon))]
                if dflontmp.shape[0] > 0:                    
                    # add mean and median time to map arrays
                    timemap_mean[j,i] = dflontmp.groupby('trajectory').first().crossing_time.mean(skipna=True)/60/60/24 # converts to days
                    timemap_median[j,i] = dflontmp.groupby('trajectory').first().crossing_time.median(skipna=True)/60/60/24 # converts to days
                    n, x = np.histogram(dflontmp.groupby('trajectory').first().crossing_time.values/60/60/24/365, 
                            bins=time_bin, weights= dflontmp.groupby('trajectory').first().trans.values)
                    m = n.max()
                    maxval = np.where(n == m)[0]
                    pa = (x[maxval] + x[maxval+1])/2
                    timemap_peak[j,i] = pa[0]
                    
                    # transport weighted inverse gaussian method
                    y_data = np.histogram(dflontmp.groupby('trajectory').first().crossing_time.values/60/60/24/365,
                                          bins=time_bin, weights= dflontmp.groupby('trajectory').first().trans.values)
                    # Least squares
                    plsq = leastsq(res, p, args = (y_data[0], x_years))
                    # Fitted inverse gaussian distribution, optimizing three parameters
                    y_est_long = inversegaussian(x_years_long, plsq[0][0], plsq[0][1], plsq[0][2])
                    # Index of max of the distribution
                    argmax_dist = np.argmax(y_est_long)
                    # save time of peak to array
                    timemap_peak_gaussian[j,i] = x_years_long[argmax_dist]
                    
                    # unweighted inverse gaussian method (particle count)
                    y_data = np.histogram(dflontmp.groupby('trajectory').first().crossing_time.values/60/60/24/365,
                                          bins=time_bin,)
                    # Least squares
                    plsq = leastsq(res, p, args = (y_data[0], x_years))
                    # Fitted inverse gaussian distribution, optimizing three parameters
                    y_est_long = inversegaussian(x_years_long, plsq[0][0], plsq[0][1], plsq[0][2])
                    # Index of max of the distribution
                    argmax_dist = np.argmax(y_est_long)
                    # save time of peak to array
                    timemap_peak_unweighted_gaussian[j,i] = x_years_long[argmax_dist]
                    

    timemap = xr.Dataset(data_vars=dict(mean_arrival=timemap_mean,  
                                        median_arrival=timemap_median, 
                                        peak_arrival=timemap_peak, 
                                        peak_gaussian=timemap_peak_gaussian, 
                                        peak_unweighted_gaussian = timemap_peak_unweighted_gaussian))
    timemap = timemap.drop('basin')
    timemap.attrs['History'] = 'Created {}.'.format(datetime.now().strftime("%Y-%m-%d"))
    outfile = '/g/data/e14/hd4873/runs/parcels/output/AntConn/data/2Dtime/2Dtimemap_6mobins_coarsebasin_{:02d}_initial_G-{}_D-{}_Sca-{}_allbins_REVIEWSREDONE.nc'.format(int(basinid.values), Gam, Del, Sca)
    # Save to netCDF
    print("Saving to netCDF file", outfile)
    encod={}
    for var in timemap.data_vars:
        encod[var]={'zlib':True}
    timemap.to_netcdf(outfile)

7.0
loading basin dataset...
dropping NaNs...
convert transport and releasetime to df...
0 -81.0
1 -80.5
2 -80.0
3 -79.5
4 -79.0
5 -78.5
6 -78.0
7 -77.5
8 -77.0
9 -76.5
10 -76.0
11 -75.5
12 -75.0
13 -74.5
14 -74.0
15 -73.5
16 -73.0
17 -72.5
18 -72.0
19 -71.5
20 -71.0
21 -70.5
22 -70.0
23 -69.5
24 -69.0
25 -68.5
26 -68.0
27 -67.5
28 -67.0
29 -66.5
30 -66.0
31 -65.5
32 -65.0
33 -64.5
34 -64.0
35 -63.5
36 -63.0
37 -62.5
38 -62.0
39 -61.5
40 -61.0
41 -60.5
42 -60.0
43 -59.5
44 -59.0
45 -58.5
46 -58.0
47 -57.5
48 -57.0
49 -56.5
50 -56.0
51 -55.5
52 -55.0
53 -54.5
54 -54.0
55 -53.5
56 -53.0
57 -52.5
58 -52.0
59 -51.5
60 -51.0
61 -50.5
62 -50.0
63 -49.5
64 -49.0
65 -48.5
Saving to netCDF file /g/data/e14/hd4873/runs/parcels/output/AntConn/data/2Dtime/sensitivityTest_2Dtimemap_6mobins_coarsebasin_07_initial_G-20_D-50_Sca-1000000_allbins.nc
CPU times: user 53min 28s, sys: 9min 36s, total: 1h 3min 4s
Wall time: 59min 49s
