# Batch SnowRadar Processing Example
A simple workflow using multiple CPUs and landmask/QA filtering

In [1]:
# Community imports
import pandas as pd
from glob import glob
import os
from pathlib import Path
import geopandas as gpd

# pySnowRadar imports|
from pySnowRadar import SnowRadar
from pySnowRadar.qc import error_check
from pySnowRadar.processing import geo_filter,geo_filter_insitu_sites, batch_process
from pySnowRadar.algorithms import Wavelet_TN, Peakiness
from pySnowRadar.processing import extract_layers

from tqdm import tqdm
from scipy.signal import find_peaks
from datetime import datetime, timedelta
from thefuzz import process, fuzz
from scipy.spatial.distance import cdist
from tqdm import tqdm
import h5py
import numpy as np

In [2]:
def open_data(path, filetype, mode, instrument):
    if mode == 'dict':
        df_dict = {}
        if filetype == 'csv' or filetype=='txt':
            if instrument == 'MP':
                lon_col = detect_cols(path, keywords=['lon','Longitude'])
                lat_col = detect_cols(path, keywords=['lat', 'Latitude'])
                time_col = detect_cols(path, keywords=['timestamp', 'datetime'])
                var_col = detect_cols(path, keywords=['snow_depth', 'MagnaProbe','depth'])
                site_col = detect_cols(path, keywords=['site','site_id'])
                type_col = detect_cols(path, keywords=['type','ice_type'])

                cols = []
                cols.append(lon_col) if lon_col != None else None
                cols.append(lat_col) if lat_col != None else None
                cols.append(time_col) if time_col != None else None
                cols.append(var_col) if var_col != None else None
                cols.append(site_col) if site_col != None else None
                cols.append(type_col) if type_col != None else None

                names = []
                names.append('lon') if lon_col != None else None
                names.append('lat') if lat_col != None else None
                names.append('time') if time_col != None else None
                names.append('snow_depth') if var_col != None else None
                names.append("site_id") if site_col != None else None
                names.append("ice_type") if type_col != None else None

                df_import = pd.DataFrame({'names':names}, index=cols)

                df_import.sort_index(inplace=True)
                df = pd.read_csv(path, skiprows=1,  usecols=list(df_import.index), names=df_import['names'])
                if np.mean(df['snow_depth'] > 5): #convert [cm] to [m] in snowdepth
                    df['snow_depth'] /= 100
                df.loc[df['lon'] < 0, 'lon'] += 360
                
                df = strtime_to_datetime(df)
                
                if site_col != None:
                    df_dict = split_data(df, df_dict)
                    
                else:
                    df_dict['1'] = df

        elif filetype == 'h5':
            #This could (should) be more flexible to detect data fields etc.
            #Since we only (atm) use 1 h5 file, it is tailored for this file
            f = h5py.File(path, 'r')
            group = f['eureka_data']
            data = group['magnaprobe']
            df = pd.DataFrame({'lat':data['latitude'][()], 'lon':data['longitude'][()], 'snow_depth':data['snow_depth'][()], 'site_id':data['site_id'][()]})
            df.loc[df['lon'] < 0, 'lon'] += 360
            df_dict = split_data(df, df_dict)

        return df_dict

    elif mode == 'df':
        
        if filetype == 'csv' or filetype=='txt':
            if instrument == 'OIB':
                df = pd.read_csv(path, usecols=[0,1,2,7,8,15,16])
                df['date'] = df['date'].apply(lambda x: datetime.strptime(str(x), '%Y%m%d'))
                df['datetime'] = df['date'] + df['elapsed'].apply(lambda x: timedelta(seconds=x))
                df = df[df['snow_depth'] != -99999]
                del df['date'], df['elapsed']
                
            elif instrument == 'ATM':
                df = pd.read_csv(path, index_col=0)
                
        return df

def strtime_to_datetime(df):
    try:
        df['time'] = df['time'].apply(lambda x: datetime.strptime(x, '%d/%m/%Y %H:%M') )
        
    except:
        df['time'] = clean_time_col(df['time'])
        df['time'] = df['time'].apply(lambda x: datetime.strptime(x, '%d/%m/%Y %H:%M') )

    return df 

def clean_time_col(time): 
    weird_indices = time.apply(lambda x: len(x) > 16) #length of weird timestamps is higher
    time.loc[weird_indices] = np.nan
    time = time.ffill()
    return time
 
def get_extent_latlon(lon, lat):
    '''
    Calculates the extent of a given list of latlon coordinates. 
    Jumps from 360 deg to 0 deg (or visce versa) in longitude are handled.

    Inputs:
    lon: List of longitude coordinates
    lat: List of latitude coordinates

    Returns:
    extent_latlon: Four point tuple containing the extent: (x0, y0, x1, y2)
    '''
    
    minlat = min(lat)
    maxlat = max(lat)
    minlon = min(lon)
    maxlon = max(lon)

    diff_lon = np.diff(lon)

    if max(abs(diff_lon)) > 100: #find discontinuity in lon
        indices_max,_ = find_peaks(diff_lon, height= 200)
        indices_min,_ = find_peaks(diff_lon*-1, height= 200)
        indices = sorted(np.append(indices_max, [x for x in indices_min]))
        indices.append(len(diff_lon)) 

        offset = np.ones_like(indices)

        if indices[0] in indices_max:
            indices.insert(0,0)
            offset = np.insert(offset,0,0)

        tmp_lon = lon.copy()
        for i in range(0,len(indices)-1,2):
            ind1 = indices[i] + offset[i]
            ind2 = indices[i+1] + 1
            tmp_lon[ind1:ind2] = tmp_lon.iloc[ind1:ind2] + 360
            
        minlon = min(tmp_lon)
        if minlon > 360:
            minlon -= 360
        maxlon = max(tmp_lon)
        if maxlon > 360:
            maxlon -= 360

    extent_latlon = (minlon, minlat, maxlon, maxlat)
    return extent_latlon

def ordering_latlon(df):
    bounds = get_extent_latlon(df['lon'],df['lat'])
    idx = df.loc[(df['lat'] == bounds[1])].index[0]

    reind = np.arange(0,len(df))
    reind = np.delete(reind, idx)
    reind = np.insert(reind, 0, idx)
    df = df.reindex(reind)
    df.reset_index(inplace=True, drop=True)

    dists = cdist(list(zip(df['lon'],df['lat'])),list(zip(df['lon'],df['lat'])))
    mask = np.zeros(len(df['lon'])).astype(bool)
    tst = []
    i = 0
    for j in range(len(mask)-1):
        mask[i] = True
        dist_zero = dists[0,:]
        dist_zero[mask] = np.nan
        row = dists[:,i].copy() + dist_zero
        i = np.nanargmin(row)
        tst.append(i)
    df = df.loc[tst]
    df.reset_index(inplace=True, drop=True)
    return df

def split_data(df, df_dict):
    for uniq in df['site_id'].unique():
        df_tmp = df[df['site_id'] == uniq]
        if len(df_tmp) > 10000:
            df_tmp = ordering_latlon(df_tmp)
            df_tmp.dropna(inplace=True)

            num_steps = int(np.ceil(len(df_tmp) / 3000))
            steps = np.linspace(0,len(df_tmp), num_steps).astype(int)
            for i in range(len(steps)-1):
                df_tmp_part = df_tmp.loc[steps[i]:steps[i+1]]
                df_dict[f'{uniq}.{i}'] = df_tmp_part

        else:
            df_dict[str(uniq)] = df_tmp

    return df_dict   

def detect_cols(path, keywords):

    df_test = pd.read_csv(path)
    cols = list(df_test.columns)
    scores = np.zeros(len(cols),dtype='int')
    df_scores = pd.DataFrame({'scores':scores},index=cols)
    dt = np.dtype(np.str_,np.int_)

    for key in keywords:
        fuzzy = process.extract(key, cols, limit=len(cols)) 
        inds = np.array(fuzzy,dtype=dt)[:,0]
        scrs = pd.Series(np.array(fuzzy,dtype=dt)[:,1].astype(int), index=inds)
        df_scores['scores'] += scrs

        if True in (scrs > 80).unique():
            tst = df_scores.reset_index()
            col_ind = tst[tst['index'] == (scrs > 80).index[0]].index[0]
            return col_ind

    col_ind = df_scores['scores'].argmax() if df_scores['scores'].max() > len(keywords) * 65 else None
    return col_ind

In [None]:
#WAVELET, 2016

sites = ['grid3', 'grid4', 'grid5', 'grid6', 'grid7', 'grid8'] #'grid1', 'grid2', 

input_sr_data_path = '/Users/torka/Library/CloudStorage/OneDrive-Personal/MarineSciences/MasterThs-T/Data/OIB/Echograms/20160419/*/*_deconv.nc'
path_to_shapes = '/Users/torka/Library/CloudStorage/OneDrive-Personal/MarineSciences/MasterThs-T/Data/Eureka/grid_extents/'
output_path = f'/Users/torka/Library/CloudStorage/OneDrive-Personal/MarineSciences/MasterThs-T/Data/OIB/Wavelet/20160419_max_testing'


for site in tqdm(sites):

        input_sr_data = glob(input_sr_data_path)
        # geo_filtered = geo_filter(input_sr_data)
        insitu_site_filtered = geo_filter_insitu_sites(path_to_shapes, site, input_sr_data)

        # Generate error codes for SR data
        sr_data = [SnowRadar(sr, 'full') for sr in insitu_site_filtered]
        error_codes = [pd.Series(error_check(sr).tolist()) for sr in sr_data]

        workers = 8    
        picker = Wavelet_TN # IS CURRENTLY IN LN ISNTEAD OF 10LOG10 MODE!

        # FOR WAVELET_TN
        params={'snow_density':0.3,
                'ref_snow_layer': 1,
                'cwt_precision': 10}

        res = batch_process(insitu_site_filtered, picker, params, workers,
                        dump_results=True,
                        overwrite=True,
                        path=os.path.join(output_path, f'{site}')
                        )

  0%|          | 0/8 [00:00<?, ?it/s]

100%|██████████| 8/8 [05:17<00:00, 39.72s/it]


In [None]:
#PEAKINESS

sites = ['grid1', 'grid2', 'grid3', 'grid4', 'grid5', 'grid6', 'grid7', 'grid8']

for site in tqdm(sites):

        input_sr_data = glob('/Users/torka/Library/CloudStorage/OneDrive-Personal/MarineSciences/MasterThs-T/Data/OIB/Echograms/20160419/*/*_deconv.nc')
        # geo_filtered = geo_filter(input_sr_data)
        insitu_site_filtered = geo_filter_insitu_sites(site, input_sr_data)

        # Generate error codes for SR data
        sr_data = [SnowRadar(sr, 'full') for sr in insitu_site_filtered]
        error_codes = [pd.Series(error_check(sr).tolist()) for sr in sr_data]

        workers = 8    
        picker = Peakiness # IS CURRENTLY IN LN ISNTEAD OF 10LOG10 MODE!
        log_peak_threshold = .7
        lin_peak_threshold = .5
        pp_r_threshold = 30
        pp_l_threshold = 25
        
        # FOR PEAKINESS
        params={
                'snow_density': 0.3,
                'log_peak_threshold' : log_peak_threshold,
                'lin_peak_threshold' : lin_peak_threshold, 
                'pp_r_threshold' : pp_r_threshold, 
                'pp_l_threshold' : pp_l_threshold
        }       

        res = batch_process(insitu_site_filtered, picker, params, workers,
                        dump_results=True,
                        overwrite=True,
                        path=f'/Users/torka/Library/CloudStorage/OneDrive-Personal/MarineSciences/MasterThs-T/Data/OIB/Peakiness/20160419/{site}/'
                        )

In [None]:
input_sr_data = glob('/Users/torka/Library/CloudStorage/OneDrive-Personal/MarineSciences/MasterThs-T/Data/OIB/Echograms/20160419/*/*_deconv.nc')

log_peak_threshold = .7
lin_peak_threshold = .5
pp_r_threshold = 30
pp_l_threshold = 25
folder_name = str(log_peak_threshold) + '_' + str(lin_peak_threshold) + '_' + str(pp_r_threshold) + '_' + str(pp_l_threshold)
outer_path = '/Users/torka/Library/CloudStorage/OneDrive-Personal/MarineSciences/MasterThs-T/Data/OIB/Peakiness/2016'
inner_path = os.path.join(outer_path, folder_name)
print(inner_path)
Path(inner_path).mkdir(exist_ok=True)

params={
    'snow_density': 0.3,
    'log_peak_threshold' : log_peak_threshold,
    'lin_peak_threshold' : lin_peak_threshold, 
    'pp_r_threshold' : pp_r_threshold, 
    'pp_l_threshold' : pp_l_threshold
}

geo_filtered = geo_filter(input_sr_data)
# Generate error codes for SR data
sr_data = [SnowRadar(sr, 'full') for sr in geo_filtered]
error_codes = [pd.Series(error_check(sr).tolist()) for sr in sr_data]


workers = 8
picker = Peakiness
res = batch_process(geo_filtered, picker, params, workers, dump_results=True, overwrite=True, path=inner_path)

'<function Peakiness at 0x300343a60>'