# Tracking eddies; to be run after identifying_eddies.ipynb


In [None]:
import os 
import glob

from py_eddy_tracker.featured_tracking.area_tracker import AreaTracker
from py_eddy_tracker.tracking import Correspondances

import numpy as np
from datetime import datetime, timedelta
from netCDF4 import Dataset
import xarray as xr


### Setup SLURMCluster

In [None]:
import dask
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

In [None]:
dask.config.config.get('distributed').get('dashboard').update({'link':'{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'})

In [None]:
cluster = SLURMCluster(name='dask-cluster',
                      cores=10,
                      memory='256GB',
                      processes=5,
                      interface='ib0',
                      queue='compute',
                      account='mh0033',
                      walltime='01:00:00',
                      asynchronous=0)

In [None]:
cluster.scale(cores=200)
client = Client(cluster)
client

### Define functions we want to do in parallel (eddy tracking)

In [None]:
#Functions from eddy-tracking.py (aided by Malcolm Roberts)
def tracking(file_objects, previous_correspondance, eddy_type, zarr=False, nb_obs_min=10, raw=True, cmin=0.05, virtual=4):
    # %%
    # We run a tracking with a tracker which uses contour overlap, on first time step
    output_dir = os.path.dirname(previous_correspondance)
    class_kw = dict(cmin=cmin)
    if not os.path.isfile(previous_correspondance):
        c = Correspondances(
            datasets=file_objects, class_method=AreaTracker, 
            class_kw=class_kw, virtual=virtual
        )
        c.track()
        c.prepare_merging()
    else:
        c = Correspondances(
            datasets=file_objects, class_method=AreaTracker, 
            class_kw=class_kw, virtual=virtual,
            previous_correspondance=previous_correspondance
        )
        c.track()
        c.prepare_merging()
        c.merge()

    new_correspondance = previous_correspondance[:-3]+'_new.nc'
    with Dataset(new_correspondance, "w") as h:
        c.to_netcdf(h)

    try:
        # test can read new file, and then move to replace old file
        nc = Dataset(new_correspondance, 'r')
        os.rename(new_correspondance, previous_correspondance)
    except:
        raise Exception('Error opening new correspondance file '+new_correspondance)

    write_obs_files(c, raw, output_dir, zarr, eddy_type, nb_obs_min)
    

def write_obs_files(c, raw, output_dir, zarr, eddy_type, nb_obs_min):
    kw_write = dict(path=output_dir, zarr_flag=zarr, sign_type=eddy_type)

    fout = os.path.join(output_dir, eddy_type+'_untracked.nc')
    c.get_unused_data(raw_data=raw).write_file(
        filename=fout
    )

    short_c = c._copy()
    short_c.shorter_than(size_max=nb_obs_min)
    short_track = short_c.merge(raw_data=raw)

    if c.longer_than(size_min=nb_obs_min) is False:
        long_track = short_track.empty_dataset()
    else:
        long_track = c.merge(raw_data=raw)

    # We flag obs
    if c.virtual:
        long_track["virtual"][:] = long_track["time"] == 0
        long_track.normalize_longitude()
        long_track.filled_by_interpolation(long_track["virtual"] == 1)
        short_track["virtual"][:] = short_track["time"] == 0
        short_track.normalize_longitude()
        short_track.filled_by_interpolation(short_track["virtual"] == 1)

    print("Longer track saved have %d obs", c.nb_obs_by_tracks.max())
    print(
        "The mean length is %d observations for long track",
        c.nb_obs_by_tracks.mean(),
    )

    fout = os.path.join(output_dir, eddy_type+'_tracks.nc')
    long_track.write_file(filename=fout)
    fout = os.path.join(output_dir, eddy_type+'_short.nc')
    short_track.write_file(
        #filename="%(path)s/%(sign_type)s_track_too_short.nc", **kw_write
        filename=fout
    )



In [None]:
def tracking_eddytype(trackerdir,eddydir, exp_id, eddy_type, fq, zarr, nb_obs_min, raw, cmin):
    #eddy_type='anticyclonic','cyclonic'
    previous_correspondance = os.path.join(trackerdir, expid+'_'+eddy_type+'_'+fq+'_correspondance.nc')
    print(previous_correspondance)
    search = os.path.join(eddydir+expid+'_'+eddy_type+'_'+fq+'_200[2-8]????.nc')
    print('search files ',search)
    file_objects = sorted(glob.glob(search))
    print('files for tracking: ', file_objects)
    tracking(file_objects, previous_correspondance, eddy_type, zarr=zarr, nb_obs_min=nb_obs_min, raw=raw, cmin=cmin)
    return None

### Do eddy tracking using data stored by identifying_eddies.ipynb

In [None]:
expid='erc1011'
varname='ssh'
fq='dm'
nb_obs_min = 10 # minimum of 10 points in track to be considered a long trajectory
raw = True # 
cmin = 0.05 # minimum contour
virtual = 4 # number of consecutive timesteps with missing detection allowed
class_kw = dict(cmin=cmin)
zarr = False

lazy_results = []
# again looping over wavelength and shape error
for wavelength in [200,700]:
    print('wavelength = ', wavelength)
    for shape_error in [30,70]:
        print('shape_error = ', shape_error)
        eddy_dir='/path/to/eddydata/'+expid+'_eddytrack/'+'wv_'+str(int(wavelength))+'_se_'+str(int(shape_error))+'/'
        tracker_dir=eddy_dir+'tracks/'
        if not os.path.exists(tracker_dir):
            os.makedirs(tracker_dir)
        print('eddydir = ', eddy_dir)
        print('tracker_dir = ', tracker_dir)
        for eddy_type in ['cyclonic','anticyclonic']:
            #define computation we want to do without doing it
            lazy_result = dask.delayed(tracking_eddytype)(trackerdir=tracker_dir,
                                                          eddydir=eddy_dir,
                                                    exp_id=expid, 
                                                    eddy_type=eddy_type,
                                                    fq=fq,zarr=zarr,nb_obs_min=nb_obs_min,
                                                      raw=raw, cmin=cmin)
            # store all computations to be done in parallel
            lazy_results.append(lazy_result)  
# compute the results
futures = dask.compute(*lazy_results)
results = dask.compute(*futures)

### shutdown cluster

In [None]:
client.close()
client.shutdown()