# Generate metrics with WIT data
* **Lineage**:  This notebook was derived from code by Geosciences Australia and modified for:
* batch input of multiple WIT CVS in a folder (currently the ANAEv3 WIT output includes 270,653 polygons, each with its own csv file)
* multiple processor pool support to speed execution when running on a PC workstation
* linear interpolation of the observations dates to daily data to improve estimates of inundation duration
* some bug fixes in the inundation event metrics that were required when using the interpolated data
* output formatting

* Original Source: https://github.com/GeoscienceAustralia/dea-notebooks/blob/develop/Scientific_workflows/Wetlands_Insight_Tool/metrics/wit_metrics.ipynb

* **Dependencies**: This code requires two things to run (see the analysis parameters section for more information):
     * A folder containing pre-calculated WIT csv (obtained for the BWS Priorities Project from Geosciences Australia for each ANAE polygon > 1Ha)
     * A shapefile (or equivalent) that contains the area that the WIT result was run over.
  
     
## Background
The WIT data are generated by DEA with given wetland polygons and stored in a database on NCI. The data can be dumped into a csv when required. Any statistics can be generated with WIT data. This notebook provides a way in computing temporal statistics (metrics).

## WIT Data definition
* WIT data provides the following metrics for each polygon unit

```
   date: time of obersavation
   bs: percentage of bare soil
   npv: percentage of non photosynthetic vegetation
   pv: percentage of green/photosynthetic vegetation
   wet: percentage of wetness
   water: percentage of water
```

## Description
This notebook uses existing WIT data to compute metrics.
* First we load the existing WIT csv data from a saved csv location
* Then we compute the metrics for all polygons and output the results to CSV files.  The input CSV files are processed in "batches" that are spread across multiple CPU cores.  When execution is complete the various batch outputs are merged together into single result files that contain metrics for every CSV feature ID (ANAE polygons)

The following files are created:


 - **RESULT_WIT_ANAE_yearly_metrics**: min, max, mean, median of each WIT metric per calendar year
 - **RESULT_WIT_ANAE_event_threshold**: for the BWS project we defined an "event" as exceeding the median [water+wet] - this file is read in to calculate the event metrics but could be replaced with user selected values if custom thresholds were wanted or the routine that generates it could be altered to change the threshold formulaiclly.
 - **RESULT_WIT_ANAE_time_since_last_inundation**: number of days since the inundation event threshold was exceeded
 - **RESULT_WIT_ANAE_event_times**: start and end time, duration, duration of preceeding gap (the dry period) 
 - **RESULT_WIT_ANAE_event_stats**: for each event calculates the area of the polygon that was wet using the combination water+wet
 - **RESULT_WIT_ANAE_inundation_metrics**: this is a join of the RESULT_WIT_ANAE_event_time and RESULT_WIT_ANAE_event_stats


* WIT metrics: refer [WIT metrics](https://docs.google.com/document/d/1JBZzVRW6K0fJT4jws3lRranPLPBYBkTDvpu94knv5dY/edit?usp=sharing)

## Processing Environment
For the project the analysis was conducted in the python processing environment of ArcGIS Pro 3.0 but were coded to use common open source python data processing libraries (Geopandas, Pandas, numpy) that should enable the analysis to be repeated in most environments.

***
     
# Contact
- Dr Shane Brooks
- shane@brooks.eco
- https://brooks.eco

# User Defined Parameters

In [10]:
#  set the working directory
working_directory = './BWSVulnerability/WIT'


# shapefile: the shape file mentioned above to find the  and get their area
# set to '' to disable area lookup
shapefile = '././BWSVulnerability/WIT/ANAEv3_WIT_clean19042022/ANAEv3_WIT.shp'
# shapefile field name that identifies each polygon -  the ANAEv3 UID geohash was used here.
# The ANAE UID code is also used in the naming convention for the CSV files
skey='UID'

# Path with wildcard that defines the list of csv files to process.  When debugging a single filename might be prudent.
csvfiles = './BWSVulnerability/WIT/csv'

# csv feature_id - the WIT csv output files nclude a column 'feature_id' that in this case is the ANAE UID
pkey='feature_id'

# whether to interpolate the WIT observation dates (typically 10-50+ per year) to daily data or not.
# This is computationally expensive but improves estimates of inundation durataion.
interpolate = True
 #number of WIT csv files to include in each 'batch' for processing by a single CPU core.
batchsize = 200

# Set working directory

In [None]:
# change to the user specified working directory so the worker gets written to the correct location
import os
os.chdir(working_directory)
cwdpath = os.getcwd()

# write python module workers.py to working directory
* To speed up the processing of many CSV files (we intially processed 270,653 ANAE csv) we divide the work across multiple processor cores.  To achieve this the notebook calls an external worker module.  For convienience the worker module code is maintained within the notebook.
* Executing the cell below writes out **workers.py** to the current working directoy so that it can be imported as the worker functions for multiprocessing.
* the workers module contains the routines for summarising the WIT CSV data
* the notebook also contains code that can be run as a single core process in the "normal way".

In [8]:
%%writefile workers.py

"""
    workers.py
    
    This is a python module that is saved as workers.py to the current working directory when the cell is run in the Jupyter environment
    
    workers.py is imported in the executable code below this cell for multiprocessing
    
    This convienently allows the python code of the workers.py module to be edited in the jupyter environment and stored with the multiprocessing code

"""
import os

import numpy as np
import pandas as pd
import fiona
from shapely import geometry


def shape_list(key, values, shapefile):
    """
        Get a generator of shapes from the given shapefile
            key: the key to match in 'properties' in the shape file
            values: a list of property values
            shapefile: the name of your shape file
            e.g. key='ORIGID', values=[1, 2, 3, 4, 5], 
            shapefile='/g/data/r78/DEA_Wetlands/shapefiles/MDB_ANAE_Aug2017_modified_2019_SB_3577.shp'
    """
    count = len(values)
    with fiona.open(shapefile) as allshapes:
        for shape in allshapes:
            shape_id = shape['properties'].get(key)
            if shape_id is None:
                continue
            if shape_id in values:
                yield(shape_id, shape)
                count -= 1
            if count <= 0:
                break
    
def get_areas(features, pkey='feature_id'):
    """
        Calculate the area of a list/generator of shapes
        input:
            features: a list of shapes indexed by the key
        output:
            a dataframe of area index by the key
    """
    re = pd.DataFrame()
    for f in features:
        va = pd.DataFrame([[f[0], geometry.shape(f[1]['geometry']).area/1e4]], columns=[pkey, 'area'])
        re = re.append(va, sort=False)
    return re.set_index(pkey)

    
def annual_metrics(wit_data, members=['pv', 'wet', 'water', 'bs', 'npv', ['npv', 'pv', 'wet'], ['pv', 'wet'], ['water', 'wet']], threshold=[25, 75], pkey='feature_id'):
                                              
    """
        Compute the annual max, min, mean, count with given wit data, members and threshold
        input:
            wit_data: dataframe of WIT
            members: the elements which the metrics are computed against, can be a column from wit_data, e.g. 'pv'
                         or the sum of wit columns, e.g. ['water', 'wet']
            threshold: a list of thresholds such that (elements >= threshold[i]) is True, 
                        where i = 0, 1...len(threshold)-1
        output:
            dataframe of metrics
    """
    years = wit_data['date']
    i = 0
    wit_df = wit_data.copy(deep=True)

    for m in members:
        if isinstance(m, list):
            wit_df.insert(wit_df.columns.size+i, '+'.join(m), wit_df[m].sum(axis=1))
    years = pd.DatetimeIndex(wit_df['date']).year.unique()
    shape_id_list = wit_df[pkey].unique()
    #shane changed 4 to 5 to accomodate median added below 
    wit_metrics = [pd.DataFrame()] * 5
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).max()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_max' for n in wit_yearly.columns[1:]})
        wit_metrics[0] = wit_metrics[0].append(wit_yearly, sort=False)
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).min()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_min' for n in wit_yearly.columns[1:]})
        wit_metrics[1] = wit_metrics[1].append(wit_yearly, sort=False)
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).mean()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_mean' for n in wit_yearly.columns[1:]})
        wit_metrics[2] = wit_metrics[2].append(wit_yearly, sort=False)
        
    #*********************** START ADDED BY SHANE ***********************
    #adding median
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).median()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_median' for n in wit_yearly.columns[1:]})
        wit_metrics[3] = wit_metrics[3].append(wit_yearly, sort=False)
    #*********************** END ADDED BY SHANE ***********************      
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y][[pkey, 'bs']].groupby(pkey).count()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: 'count' for n in wit_yearly.columns[1:]})
        #shane changed index from 3 to 4 to accomodate median added above 
        wit_metrics[4] = wit_metrics[4].append(wit_yearly, sort=False)
    #for t in threshold:
    #    wit_df_ts = wit_df.copy(deep=True)
    #    wit_metrics += [pd.DataFrame()]
    #    wit_df_ts.loc[:, wit_df_ts.columns[2:]] = wit_df_ts.loc[:, wit_df_ts.columns[2:]].mask((wit_df_ts[wit_df_ts.columns[2:]] < t/100), np.nan)
    #    for y in years:
    #        wit_yearly = wit_df_ts[pd.DatetimeIndex(wit_df_ts['date']).year==y].drop(columns=['date']).groupby(pkey).count()
    #        wit_yearly.insert(0, 'year', y)
    #        wit_yearly = wit_yearly.rename(columns={n: n+'_count'+str(t) for n in wit_yearly.columns[1:]})
    #        wit_metrics[-1] = wit_metrics[-1].append(wit_yearly, sort=False)
    wit_yearly_metrics = wit_metrics[0]
    wit_yearly_metrics.sort_values(by=[pkey, 'year'],inplace=True)
    for i in range(len(wit_metrics)-1):
        wit_yearly_metrics = pd.merge(wit_yearly_metrics, wit_metrics[i+1], on=[pkey, 'year'], how='inner')
    ofn = "WIT_ANAE_yearly_metrics"+str(wit_data['chunk'].iat[0])+".csv"
    wit_yearly_metrics.to_csv(ofn)
    return wit_yearly_metrics

def get_event_time(wit_ww, threshold, pkey='feature_id'):
    """

        Compute inundation event time by given threshold
        input:
            wit_df: wetness computed from wit data
            threshold: a value such that (water+wet > threshold) = inundation
        output:
            dateframe of inundation event time
    """
    if isinstance(threshold, pd.DataFrame):
        gid = wit_ww.index.unique()[0]
        poly_threshold = threshold.loc[gid].to_numpy()[0]
    else:
        poly_threshold = threshold
    i_start = wit_ww[wit_ww['water+wet'] >= poly_threshold]['date'].min()
    if pd.isnull(i_start):
        re = pd.DataFrame([[np.nan] * 5], columns=['threshold', 'start_time', 'end_time', 'duration', 'gap'], index=wit_ww.index.unique())
        re.index.name = pkey
        return re
    #SSB - moved equal to needed for when threshold = 0
    #re_idx = np.searchsorted(wit_ww[(wit_ww['water+wet'] < poly_threshold)]['date'].values, 
    #                         wit_ww[(wit_ww['water+wet'] >= poly_threshold)]['date'].values)
    re_idx = np.searchsorted(wit_ww[(wit_ww['water+wet'] <= poly_threshold)]['date'].values,
                         wit_ww[(wit_ww['water+wet'] > poly_threshold)]['date'].values)

    re_idx, count = np.unique(re_idx, return_counts=True)
    start_idx = np.zeros(len(count)+1, dtype='int')
    start_idx[1:] = np.cumsum(count)

    #SSB removed "equals" sorts correctly when threshold is zero
    #re_start = wit_ww[(wit_ww['water+wet'] >= poly_threshold)].iloc[start_idx[:-1]][['date']].rename(columns={'date': 'start_time'})
    #re_end = wit_ww[(wit_ww['water+wet'] >= poly_threshold)].iloc[start_idx[1:] - 1][['date']].rename(columns={'date': 'end_time'})
    re_start = wit_ww[(wit_ww['water+wet'] > poly_threshold)].iloc[start_idx[:-1]][['date']].rename(columns={'date': 'start_time'})
    re_end = wit_ww[(wit_ww['water+wet'] > poly_threshold)].iloc[start_idx[1:] - 1][['date']].rename(columns={'date': 'end_time'})

    re = pd.concat([re_start, re_end], axis=1)
    if not re.empty:
        re.insert(2, 'duration', 
                  (re['end_time'] - re['start_time'] + np.timedelta64(1, 'D')).astype('timedelta64[D]').astype('timedelta64[D]'))
        re.insert(3, 'gap', np.concatenate([[np.timedelta64(0, 'D')],
                                            (re['start_time'][1:].values - re['end_time'][:-1].values - np.timedelta64(1, 'D')).astype('timedelta64[D]')]))
        re.insert(0, 'threshold', poly_threshold)
        re.insert(0, pkey, wit_ww.index.unique()[0])
        re = re.set_index(pkey)
    return re
    
def get_im_stats(grouped_wit, im_time, wit_area):
    """
        Get inundation stats given wit data and events
        input:
            grouped_wit: wit data
            im_time: inundation events in time
        output:
            the stats of inundation events
    """
    gid = grouped_wit.index.unique()[0]
    if gid not in im_time.indices.keys():
        return pd.DataFrame([[np.nan]*5], columns=['start_time', 'max_water+wet', 'mean_water+wet', 'max_wet_area', 'mean_wet_area'],
                           index=[gid])
    re_left = np.searchsorted(grouped_wit['date'].values.astype('datetime64'),
                         im_time.get_group(gid)['start_time'].values, side='left')
    re_right = np.searchsorted(grouped_wit['date'].values.astype('datetime64'),
                         im_time.get_group(gid)['end_time'].values, side='right')
    re = pd.DataFrame()
    for a, b in zip(re_left, re_right):
        tmp = pd.concat([grouped_wit.iloc[a:a+1]['date'].rename('start_time').astype('datetime64'),
                         pd.Series(grouped_wit.iloc[a:b]['water+wet'].max(),index=[gid], name='max_water+wet'),
                         pd.Series(grouped_wit.iloc[a:b]['water+wet'].mean(),index=[gid], name='mean_water+wet')],
                        axis=1)
        if isinstance(wit_area, pd.DataFrame):
            tmp.insert(3, 'max_wet_area', tmp['max_water+wet'].values * wit_area[wit_area.index==gid].values)
            tmp.insert(4, 'mean_wet_area', tmp['mean_water+wet'].values * wit_area[wit_area.index==gid].values)
        re = re.append(tmp, sort=False)
    #reset the index as the pkey
    re.index.name = grouped_wit.index.name
    re.reset_index()
    return re

def event_time(wit_df, threshold=0.01, pkey='feature_id'):
    """
        Compute the inundation events with given wit data and threshold
        input:
            wit_df: wetness computed from wit data
            threshold: a value such that (water+wet > threshold) = inundation,
        output:
            dataframe of events
    """
    return wit_df.groupby(pkey).apply(get_event_time, threshold=threshold, pkey=pkey).dropna().droplevel(0)

def event_stats(wit_df, wit_im, wit_area, pkey='feature_id'):
    """
        Compute inundation event stats with given wit wetness, events defined by (start_time, end_time) 
        and polygon areas
        input:
            wit_df: wetness computed from wit data
            wit_im: inundation event
            wit_area: polygon areas indexed by the key
        output:
            dataframe of event stats
    """
    grouped_im = wit_im[['start_time', 'end_time']].groupby(pkey)
    #was droplevel(0) but this left first column without a header (1) deletes that column instead
    return wit_df.groupby(pkey).apply(get_im_stats, im_time=grouped_im, wit_area=wit_area).droplevel(1)

def inundation_metrics(wit_data, threshold=0.01, shapefile = 'shapefile', skey='UID', pkey='feature_id'):
    """
        Compute inundation metrics with given wit data, polygon areas and threshold
        input:
            wit_data: a dataframe of wit_data
            wit_area: polygon areas indexed by the key
            threshold: a value such that (water+wet > threshold) = inundation
        output:
            dataframe of inundation metrics
    """
    wit_area=[]
    if os.path.isfile(shapefile):
        features = shape_list(skey, wit_data['feature_id'].unique(), shapefile)
        wit_area = get_areas(features, pkey='feature_id')

    wit_df = wit_data.copy(deep=True)
    wit_df.insert(2, 'water+wet', wit_df[['water', 'wet']].sum(axis=1).round(decimals = 4))
    #wit_df = wit_df.drop(columns=wit_df.columns[3:])
    wit_df = wit_df[[pkey,'date','water+wet']]
    wit_df['date'] = wit_df['date'].astype('datetime64')
    wit_df = wit_df.set_index(pkey)
    wit_im_time = event_time(wit_df, threshold, pkey='feature_id')
    ofn = "WIT_ANAE_event_times"+str(wit_data['chunk'].iat[0])+".csv"
    wit_im_time.to_csv(ofn)
    
    wit_im_stats = event_stats(wit_df, wit_im_time, wit_area, pkey='feature_id')
    ofn = "WIT_ANAE_event_stats"+str(wit_data['chunk'].iat[0])+".csv"
    wit_im_stats.to_csv(ofn)
    
    wit_im = pd.DataFrame()
    if not wit_im_time.empty:
        wit_im =pd.merge(wit_im_time, wit_im_stats, on=[pkey, 'start_time'], how='inner')
        ofn = "WIT_ANAE_inundation_metrics"+str(wit_data['chunk'].iat[0])+".csv"
        wit_im.to_csv(ofn)
    return wit_im

def interpolate_daily(wit_data, pkey='feature_id'):
    return wit_data.groupby(pkey).apply(interpolate_wit, pkey=pkey).droplevel(0)

def interpolate_wit(grouped_wit, pkey='feature_id'):
    daily_wit = pd.DataFrame({pkey: grouped_wit[pkey].unique()[0], 'date': pd.date_range(grouped_wit['date'].astype('datetime64[D]').min(), grouped_wit['date'].astype('datetime64[D]').max(), freq='D'),
                          'bs': np.nan, 'npv': np.nan, 'pv': np.nan, 'wet': np.nan, 'water': np.nan})
    _, nidx, oidx = np.intersect1d(daily_wit['date'].to_numpy().astype('datetime64[D]'), grouped_wit['date'].to_numpy().astype('datetime64[D]'),
                  return_indices=True)
    daily_wit.loc[nidx, ["bs","npv","pv","wet","water"]]  = grouped_wit[["bs","npv","pv","wet","water"]].iloc[oidx].to_numpy()
    #daily_wit = daily_wit.interpolate(axis=0)
    #recent version of pandas throws error due to date column.  workaround is to only interpolate the columns of data
    daily_wit[["bs","npv","pv","wet","water"]] = daily_wit.groupby(['feature_id']).apply(lambda x: x[["bs","npv","pv","wet","water"]].interpolate(axis=0))
    if 'chunk' in grouped_wit.columns:
        daily_wit['chunk'] = grouped_wit['chunk'].unique()[0]
    return daily_wit

def time_since_last_inundation(wit_data, wit_im, pkey='feature_id'):
    """
        create a pivot table to gather the time since last inundation using the event metrics
        timesincelast = number of days from last event end-date to final date in WIT record
    """
    maxdate = pd.pivot_table(wit_data, index=pkey, values=['date'], aggfunc=np.max)
    maxdate['final-date'] = maxdate['date'].astype('datetime64')
    lastevent = pd.pivot_table(wit_im, index=pkey, values=['end_time'], aggfunc=np.max)
    time_since_last = pd.merge(maxdate, lastevent, on=[pkey], how='inner')
    time_since_last.insert(2, 'timesincelast', 
              (time_since_last['final-date'] - time_since_last['end_time']).astype('timedelta64[D]'))
    ofn = "WIT_ANAE_time_since_last_inundation"+str(wit_data['chunk'].iat[0])+".csv"
    time_since_last.to_csv(ofn)
    return time_since_last

def all_time_median(wit_data, members=[['water', 'wet']], pkey='feature_id'):
    """
        Compute the all time median
        input:
            wit_data: dataframe of WIT
            members: the elements which the metrics are computed against, can be a column from wit_data, e.g. 'pv'
                         or the sum of wit columns, e.g. ['water', 'wet']
        output:
            dataframe of median indexed by pkey
    """
    wit_df = wit_data.copy(deep=True)
    i = 0
    for m in members:
        if isinstance(m, list):
            wit_df.insert(wit_df.columns.size+i, '+'.join(m), wit_df[m].sum(axis=1))
        i += 1
    wit_median = wit_df.groupby(pkey).median().round(decimals = 4)
    ofn = "WIT_ANAE_event_threshold"+str(wit_data['chunk'].iat[0])+".csv"
    wit_median.to_csv(ofn)
    return wit_median


Overwriting workers.py


# Load packages
Import Python packages that are used for the analysis.

Use standard import commands; some are shown below. 


In [9]:
import io
#import os  # is loaded above to set the working directory
import numpy as np
import pandas as pd
import fiona
import glob
from shapely import geometry
from tqdm.notebook import tqdm
import time
import multiprocessing
from itertools import repeat

sys.path.append(os.getcwd())   #appends cwd to path allowing python to find the workers.py in the notebook directory
import workers


# Function definitons
* **Note:**
   * Many of the functions defined below are duplicates of the code contained in the workers module for use when debugging or when running on single core
   * The main code below uses multiprocessing to call worker functions in workers.py that is located in the current working directory


In [None]:
"""
    Jupyter can switch this code off to get it out of your way when running multiprocessor
    esc y to activate code so it can run
    esc r to switch to raw and disable the cell
"""


def shape_list(key, values, shapefile):
    """
        Get a generator of shapes from the given shapefile
            key: the key to match in 'properties' in the shape file
            values: a list of property values
            shapefile: the name of your shape file
            e.g. key='ORIGID', values=[1, 2, 3, 4, 5], 
            shapefile='/g/data/r78/DEA_Wetlands/shapefiles/MDB_ANAE_Aug2017_modified_2019_SB_3577.shp'
    """
    count = len(values)
    with fiona.open(shapefile) as allshapes:
        for shape in allshapes:
            shape_id = shape['properties'].get(key)
            if shape_id is None:
                continue
            if shape_id in values:
                yield(shape_id, shape)
                count -= 1
            if count <= 0:
                break
    
def get_areas(features, pkey='feature_id'):
    """
        Calculate the area of a list/generator of shapes
        input:
            features: a list of shapes indexed by the key
        output:
            a dataframe of area index by the key
    """
    re = pd.DataFrame()
    for f in features:
        va = pd.DataFrame([[f[0], geometry.shape(f[1]['geometry']).area/1e4]], columns=[pkey, 'area'])
        re = re.append(va, sort=False)
    return re.set_index(pkey)

def dump_wit_data(key, feature_list, output):
    """
        dump wit data from the database into a file
        input:
            key: Name to id the polygon
            feature_list: a list or generator of features
        output:
            a csv file to save all the wit data
    """
    for f_id, f in feature_list:
        _, wit_data = query_wit_data(f)
        csv_buf = io.StringIO()
        wit_df = pd.DataFrame(data=wit_data, columns=['date', 'bs', 'npv', 'pv', 'wet', 'water'])
        wit_df.insert(0, key, f_id)
        wit_df.to_csv(csv_buf, index=False, header=False)
        csv_buf.seek(0)
        with open(output, 'a') as f:
            f.write(csv_buf.read())
    with open(output, 'a') as f:
        f.write(','.join(list(wit_df.columns))) 

In [5]:
"""
    esc y to activate code so it can run
    esc r to switch to raw and disable the cell
"""
def annual_metrics(wit_data, members=['pv', 'wet', 'water', 'bs', 'npv', ['npv', 'pv', 'wet'],
                                          ['pv', 'wet'], ['water', 'wet']], threshold=[25, 75], pkey='feature_id'):
    """
        Compute the annual max, min, mean, count with given wit data, members and threshold
        input:
            wit_data: dataframe of WIT
            members: the elements which the metrics are computed against, can be a column from wit_data, e.g. 'pv'
                         or the sum of wit columns, e.g. ['water', 'wet']
            threshold: a list of thresholds such that (elements >= threshold[i]) is True, 
                        where i = 0, 1...len(threshold)-1
        output:
            dataframe of metrics
    """
    years = wit_data['date']
    i = 0
    wit_df = wit_data.copy(deep=True)
    for m in members:
        if isinstance(m, list):
            wit_df.insert(wit_df.columns.size+i, '+'.join(m), wit_df[m].sum(axis=1))
    years = pd.DatetimeIndex(wit_df['date']).year.unique()
    shape_id_list = wit_df[pkey].unique()
    #shane changed 4 to 5 to accomodate median added below 
    wit_metrics = [pd.DataFrame()] * 5
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).max()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_max' for n in wit_yearly.columns[1:]})
        wit_metrics[0] = wit_metrics[0].append(wit_yearly, sort=False)
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).min()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_min' for n in wit_yearly.columns[1:]})
        wit_metrics[1] = wit_metrics[1].append(wit_yearly, sort=False)
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).mean()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_mean' for n in wit_yearly.columns[1:]})
        wit_metrics[2] = wit_metrics[2].append(wit_yearly, sort=False)

    #*********************** START ADDED BY SHANE ***********************
    #adding median
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y].drop(columns=['date']).groupby(pkey).median()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: n+'_median' for n in wit_yearly.columns[1:]})
        wit_metrics[3] = wit_metrics[3].append(wit_yearly, sort=False)
    #*********************** END ADDED BY SHANE ***********************      
    for y in years:
        wit_yearly = wit_df[pd.DatetimeIndex(wit_df['date']).year==y][[pkey, 'bs']].groupby(pkey).count()
        wit_yearly.insert(0, 'year', y)
        wit_yearly = wit_yearly.rename(columns={n: 'count' for n in wit_yearly.columns[1:]})
        #shane changed index from 3 to 4 to accomodate median added above 
        wit_metrics[4] = wit_metrics[4].append(wit_yearly, sort=False)
    #for t in threshold:
    #    wit_df_ts = wit_df.copy(deep=True)
    #    wit_metrics += [pd.DataFrame()]
    #    wit_df_ts.loc[:, wit_df_ts.columns[2:]] = wit_df_ts.loc[:, wit_df_ts.columns[2:]].mask((wit_df_ts[wit_df_ts.columns[2:]] < t/100), np.nan)
    #    for y in years:
    #        wit_yearly = wit_df_ts[pd.DatetimeIndex(wit_df_ts['date']).year==y].drop(columns=['date']).groupby(pkey).count()
    #        wit_yearly.insert(0, 'year', y)
    #        wit_yearly = wit_yearly.rename(columns={n: n+'_count'+str(t) for n in wit_yearly.columns[1:]})
    #        wit_metrics[-1] = wit_metrics[-1].append(wit_yearly, sort=False)
    wit_yearly_metrics = wit_metrics[0]
    wit_yearly_metrics.sort_values(by=[pkey, 'year'],inplace=True)
    for i in range(len(wit_metrics)-1):
        wit_yearly_metrics = pd.merge(wit_yearly_metrics, wit_metrics[i+1], on=[pkey, 'year'], how='inner')
    return wit_yearly_metrics
    

In [6]:
"""
    esc y to activate code so it can run
    esc r to switch to raw and disable the cell
"""

def get_event_time(wit_ww, threshold, pkey='feature_id'):
    """

        Compute inundation event time by given threshold
        input:
            wit_df: wetness computed from wit data
            threshold: a value such that (water+wet > threshold) = inundation
        output:
            dateframe of inundation event time
    """
    if isinstance(threshold, pd.DataFrame):
        gid = wit_ww.index.unique()[0]
        poly_threshold = threshold.loc[gid].to_numpy()[0]
    else:
        poly_threshold = threshold
    i_start = wit_ww[wit_ww['water+wet'] >= poly_threshold]['date'].min()
    if pd.isnull(i_start):
        re = pd.DataFrame([[np.nan] * 5], columns=['threshold', 'start_time', 'end_time', 'duration', 'gap'], index=wit_ww.index.unique())
        re.index.name = pkey
        return re
    #SSB - moved equal to needed for when threshold = 0
    #re_idx = np.searchsorted(wit_ww[(wit_ww['water+wet'] < poly_threshold)]['date'].values, 
    #                         wit_ww[(wit_ww['water+wet'] >= poly_threshold)]['date'].values)
    re_idx = np.searchsorted(wit_ww[(wit_ww['water+wet'] <= poly_threshold)]['date'].values,
                         wit_ww[(wit_ww['water+wet'] > poly_threshold)]['date'].values)

    re_idx, count = np.unique(re_idx, return_counts=True)
    start_idx = np.zeros(len(count)+1, dtype='int')
    start_idx[1:] = np.cumsum(count)

    #SSB removed "equals" sorts correctly when threshold is zero
    #re_start = wit_ww[(wit_ww['water+wet'] >= poly_threshold)].iloc[start_idx[:-1]][['date']].rename(columns={'date': 'start_time'})
    #re_end = wit_ww[(wit_ww['water+wet'] >= poly_threshold)].iloc[start_idx[1:] - 1][['date']].rename(columns={'date': 'end_time'})
    re_start = wit_ww[(wit_ww['water+wet'] > poly_threshold)].iloc[start_idx[:-1]][['date']].rename(columns={'date': 'start_time'})
    re_end = wit_ww[(wit_ww['water+wet'] > poly_threshold)].iloc[start_idx[1:] - 1][['date']].rename(columns={'date': 'end_time'})

    re = pd.concat([re_start, re_end], axis=1)
    if not re.empty:
        re.insert(2, 'duration', 
                  (re['end_time'] - re['start_time'] + np.timedelta64(1, 'D')).astype('timedelta64[D]').astype('timedelta64[D]'))
        re.insert(3, 'gap', np.concatenate([[np.timedelta64(0, 'D')],
                                            (re['start_time'][1:].values - re['end_time'][:-1].values - np.timedelta64(1, 'D')).astype('timedelta64[D]')]))
        re.insert(0, 'threshold', poly_threshold)
        re.insert(0, pkey, wit_ww.index.unique()[0])
        re = re.set_index(pkey)
    return re
    
def get_im_stats(grouped_wit, im_time, wit_area):
    """
        Get inundation stats given wit data and events
        input:
            grouped_wit: wit data
            im_time: inundation events in time
        output:
            the stats of inundation events
    """
    gid = grouped_wit.index.unique()[0]
    if gid not in im_time.indices.keys():
        return pd.DataFrame([[np.nan]*5], columns=['start_time', 'max_water+wet', 'mean_water+wet', 'max_wet_area', 'mean_wet_area'],
                           index=[gid])
    re_left = np.searchsorted(grouped_wit['date'].values.astype('datetime64'),
                         im_time.get_group(gid)['start_time'].values, side='left')
    re_right = np.searchsorted(grouped_wit['date'].values.astype('datetime64'),
                         im_time.get_group(gid)['end_time'].values, side='right')
    re = pd.DataFrame()
    for a, b in zip(re_left, re_right):
        tmp = pd.concat([grouped_wit.iloc[a:a+1]['date'].rename('start_time').astype('datetime64'),
                         pd.Series(grouped_wit.iloc[a:b]['water+wet'].max(),index=[gid], name='max_water+wet'),
                         pd.Series(grouped_wit.iloc[a:b]['water+wet'].mean(),index=[gid], name='mean_water+wet')],
                        axis=1)
        if isinstance(wit_area, pd.DataFrame):
            tmp.insert(3, 'max_wet_area', tmp['max_water+wet'].values * wit_area[wit_area.index==gid].values)
            tmp.insert(4, 'mean_wet_area', tmp['mean_water+wet'].values * wit_area[wit_area.index==gid].values)
        
        re = re.append(tmp, sort=False)
    #reset the index as the pkey
    re.index.name = grouped_wit.index.name
    re.reset_index()
    #re.to_csv('ssb.csv')
    return re

In [7]:
"""
    esc y to activate code so it can run
    esc r to switch to raw and disable the cell
"""
def event_time(wit_df, threshold=0.01, pkey='feature_id'):
    """
        Compute the inundation events with given wit data and threshold
        input:
            wit_df: wetness computed from wit data
            threshold: a value such that (water+wet > threshold) = inundation,
        output:
            dataframe of events
    """
    return wit_df.groupby(pkey).apply(get_event_time, threshold=threshold, pkey=pkey).dropna().droplevel(0)

def event_stats(wit_df, wit_im, wit_area, pkey='feature_id'):
    """
        Compute inundation event stats with given wit wetness, events defined by (start_time, end_time) 
        and polygon areas
        input:
            wit_df: wetness computed from wit data
            wit_im: inundation event
            wit_area: polygon areas indexed by the key
        output:
            dataframe of event stats
    """
    grouped_im = wit_im[['start_time', 'end_time']].groupby(pkey)
    #was droplevel(0) but this left first column without a header (1) deletes that column instead
    return  wit_df.groupby(pkey).apply(get_im_stats, im_time=grouped_im, wit_area=wit_area).droplevel(1)

def inundation_metrics(wit_data, threshold=0.01, shapefile = 'shapefile', skey='UID', pkey='feature_id'):
    """
        Compute inundation metrics with given wit data, polygon areas and threshold
        input:
            wit_data: a dataframe of wit_data
            wit_area: polygon areas indexed by the key
            threshold: a value such that (water+wet > threshold) = inundation
        output:
            dataframe of inundation metrics
    """
    wit_area=[]
    if os.path.isfile(shapefile):
        features = shape_list(skey, wit_data['feature_id'].unique(), shapefile)
        wit_area = get_areas(features, pkey='feature_id')
        
    wit_df = wit_data.copy(deep=True)
    wit_df.insert(2, 'water+wet', wit_df[['water', 'wet']].sum(axis=1).round(4))
    #wit_df = wit_df.drop(columns=wit_df.columns[3:])
    wit_df = wit_df[[pkey,'date','water+wet']]
    wit_df['date'] = wit_df['date'].astype('datetime64')

    ofn = "ANAE_wit_df"+str(wit_data['chunk'].iat[0])+".csv"
    wit_df.to_csv(ofn)
    wit_df = wit_df.set_index(pkey)
    wit_im_time = event_time(wit_df, threshold, pkey)
    ofn = "ANAE_event_times"+str(wit_data['chunk'].iat[0])+".csv"
    wit_im_time.to_csv(ofn)
    wit_im_stats = event_stats(wit_df, wit_im_time, wit_area, pkey)
    if not wit_im_time.empty:
        ofn = "ANAE_event_stats"+str(wit_data['chunk'].iat[0])+".csv"
        wit_im_stats.to_csv(ofn)

    wit_im = pd.DataFrame()
    if not wit_im_time.empty:
        wit_im =pd.merge(wit_im_time, wit_im_stats, on=[pkey, 'start_time'], how='inner')
        ofn = "ANAE_inundation_metrics"+str(wit_data['chunk'].iat[0])+".csv"
        wit_im.to_csv(ofn)

    return wit_im

In [8]:
"""
    esc y to activate code so it can run
    esc r to switch to raw and disable the cell
"""

def interpolate_wit(grouped_wit, pkey='feature_id'):
    daily_wit = pd.DataFrame({pkey: grouped_wit[pkey].unique()[0], 'date': pd.date_range(grouped_wit['date'].astype('datetime64[D]').min(), grouped_wit['date'].astype('datetime64[D]').max(), freq='D'),
                          'bs': np.nan, 'npv': np.nan, 'pv': np.nan, 'wet': np.nan, 'water': np.nan})
    _, nidx, oidx = np.intersect1d(daily_wit['date'].to_numpy().astype('datetime64[D]'), grouped_wit['date'].to_numpy().astype('datetime64[D]'),
                  return_indices=True)
    daily_wit.loc[nidx, ["bs","npv","pv","wet","water"]]  = grouped_wit[["bs","npv","pv","wet","water"]].iloc[oidx].to_numpy()
    #daily_wit = daily_wit.interpolate(axis=0)
    #changes to pandas can't interpolate the date column - can only interpolate the columns of data
    daily_wit[["bs","npv","pv","wet","water"]] = daily_wit.groupby(['feature_id']).apply(lambda x: x[["bs","npv","pv","wet","water"]].interpolate(axis=0))
    return daily_wit

def time_since_last_inundation(wit_data, wit_im, pkey='feature_id'):
    """
        create a pivot table to gather the time since last inundation using the event metrics
        timesincelast = number of days from last event end-date to final date in WIT record
    """
    maxdate = pd.pivot_table(wit_data, index=pkey, values=['date'], aggfunc=np.max)
    maxdate['final-date'] = maxdate['date'].astype('datetime64')
    lastevent = pd.pivot_table(wit_im, index=pkey, values=['end_time'], aggfunc=np.max)
    time_since_last = pd.merge(maxdate, lastevent, on=[pkey], how='inner')
    time_since_last.insert(2, 'timesincelast', 
              (time_since_last['final-date'] - time_since_last['end_time']).astype('timedelta64[D]'))
    return time_since_last

def all_time_median(wit_data, members=[['water', 'wet']], pkey='feature_id'):
    """
        Compute the all time median
        input:
            wit_data: dataframe of WIT
            members: the elements which the metrics are computed against, can be a column from wit_data, e.g. 'pv'
                         or the sum of wit columns, e.g. ['water', 'wet']
        output:
            dataframe of median indexed by pkey
    """
    wit_df = wit_data.copy(deep=True)
    i = 0
    for m in members:
        if isinstance(m, list):
            wit_df.insert(wit_df.columns.size+i, '+'.join(m), wit_df[m].sum(axis=1))
        i += 1
    wit_median = wit_df.groupby(pkey).median().round(decimals = 4)
    ofn = "ANAE_event_threshold"+str(wit_data['chunk'].iat[0])+".csv"
    wit_median.to_csv(ofn)
    return (wit_median)

In [9]:
def merge_batches (path, output_filenames):
    '''
        merges the outputs from each batch into a single file of results for all ANAE polygons
    '''
    for fname in output_filenames:
        ofns = glob.glob(os.path.join(path, fname+'*.csv'))
        dfs = []
        for ofn in ofns:
            try:
                df = pd.read_csv(ofn)
                dfs.append(df)
            except:
                print("Error reading file: ", ofn)
        if dfs:
            out_data = pd.concat(dfs)
            out_data.reset_index(drop=True,inplace=True)
            ofn = "RESULT_"+fname+".csv"
            try:
                out_data.to_csv(os.path.join(path, ofn))
            except:
                print("Error writing merged file: ", os.path.join(path, ofn))


def delete_old_batch_outputs (path, output_filenames):
    '''
        deletes the individual batch results which are no longer required after they have been merged
    '''
    for fname in output_filenames:
        ofns = glob.glob(os.path.join(path, './'+fname+'*.csv'))
        for ofn in ofns:
            try:
                os.remove(ofn)
            except:
                print("Error while deleting file : ", ofn)
                


# multiprocessing code  -  this is the loop that executes to compute the metrics for all CSV in the specified path

In [11]:
#note is some debug code commented out in various places below

multiprocessing.set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

output_filenames = ['WIT_ANAE_yearly_metrics','WIT_ANAE_event_threshold','WIT_ANAE_inundation_metrics','WIT_ANAE_time_since_last_inundation','WIT_ANAE_event_times','WIT_ANAE_event_stats']

start = time.process_time()


def nicetime(s):
    return time.strftime("%H hours %M minutes %S seconds", time.gmtime(s))

# use glob to get all the csv files 
# in the folder
cwdpath = os.getcwd()


delete_old_batch_outputs(cwdpath, output_filenames)

if interpolate:
    interpath = os.path.join(os.path.dirname(csvfiles),'interpolated')
    try:
        if not os.path.isdir(interpath): os.mkdir(interpath)
    except Exception as ex:
        print(ex)

csv_files = glob.glob(csvfiles)

if __name__ ==  '__main__': 
    shape_area = False
    debug = False
    pkey = 'feature_id'
    
    # cpu core count -1 to leave the os a crumb to work with
    CPU = multiprocessing.cpu_count() - 1
    chunksize = batchsize * CPU
    #l_index, r_index = 0, batchsize
    #batch = csv_files[l_index:r_index]
    print ("Found",len(csv_files),"WIT csv in",csvfiles)
    print ("Processing using",CPU,"cores, and batches of",batchsize,"files.")
    pool=multiprocessing.Pool(processes = CPU)
    try:
        for j in tqdm(range(0, len(csv_files), chunksize)):
            mpbatch = csv_files[j:j + chunksize]
            wd = []
            for i in range(0, len(mpbatch), batchsize):
                batch = mpbatch[i:i + batchsize]
                #print(i, batch)
                startbatch = time.process_time()
                dfs = []
                UIDs = []
                for f in batch:
                    df = pd.read_csv(f)
                    df = df[df['pc_missing'] < 0.1]
                    df = df.sort_values(by=['date'])
                    dfs.append(df)
                wit_data = pd.concat(dfs)
                wit_data['chunk'] = j+i
                wd.append(wit_data)
                #print ("Batch",j+i,"completed in", time.process_time() - startbatch, "seconds.")
            
            #print (j,"**** send to pool *********")

            if interpolate:
                wd = pool.map(workers.interpolate_daily, wd)
                #debug code below writes out the interpolated data for inspection/saving
                #ofn = os.path.join(interpath,"ANAE_interpolated"+str(j)+".csv")
                #print ("saving interpolated data: "+ ofn)
                #wd[0].to_csv(ofn)
            #Annual metrics  min, max, median for all WIT params per year.  Also 'water+wet'
            output = pool.map(workers.annual_metrics,wd)
            
            #Calc threshold for inundation events as the all-time median 'water+wet' 
            mmedian = pool.map(workers.all_time_median,wd)
            threshold_list=[]
            for e in mmedian:
                threshold_list.append(pd.DataFrame(e['water+wet']))
                
            #Inundation metrics - applying threshold to 'water+wet'   
            wit_im = pool.starmap(workers.inundation_metrics, zip(wd, threshold_list, repeat(shapefile), repeat(skey)))
            
            #Time since last inundation
            time_since_last = pool.starmap(workers.time_since_last_inundation, zip(wd, wit_im))
            

    except Exception as ex:
        print(ex)
    finally:
        pool.close()
        pool.join()       
 
    print (len(csv_files),"All batches completed in", nicetime(time.process_time() - start))
    
    merge_batches (cwdpath, output_filenames)
    print ("All batches merged in", nicetime(time.process_time() - start))
    delete_old_batch_outputs (cwdpath, output_filenames)
    

Found 37 WIT csv in s:/luke/*.csv
Processing using 31 cores, and batches of 200 files.


  0%|          | 0/1 [00:00<?, ?it/s]

37 All batches completed in 00 hours 00 minutes 00 seconds
All batches merged in 00 hours 00 minutes 01 seconds


### Debug Code: Single core processing using functions in this notebook (not calling the external workers)

* this is slower of course and will chunk through each CSV one by one but may be easier to debug

In [None]:
"""
    esc y to activate code so it can run
    esc r to switch to raw and disable the cell

"""

debug = False            
if debug:

    output_filenames = ['WIT_ANAE_yearly_metrics','WIT_ANAE_event_threshold','WIT_ANAE_inundation_metrics','WIT_ANAE_time_since_last_inundation','WIT_ANAE_event_times','WIT_ANAE_event_stats']


    start = time.process_time()
    # use glob to get all the csv files 
    # in the folder
    cwdpath = os.getcwd()

    delete_old_batch_outputs (cwdpath, output_filenames)


    #csv_files = glob.glob(os.path.join(path, './test_data/*.csv'))
    csv_files = glob.glob('s:/ANAE_WIT_result3/*.csv')
    # loop over the list of csv files

    shapefile ='' #disables area calcs... faster

    shape_area = False
    debug = True
    pkey = 'feature_id'
    batchsize = 1000
    l_index, r_index = 0, batchsize
    batch = csv_files[l_index:r_index]
    while batch:
        startbatch = time.process_time()
        dfs = []
        for f in batch:
            df = pd.read_csv(f)
            df = df[df['pc_missing'] < 0.5]
            df = df.sort_values(by=['date'])
            dfs.append(df)
        wit_data = pd.concat(dfs)
        wit_data['chunk'] = l_index
        #wd.append(wit_data)
        ##if shape_area:
        #    features = shape_list('UID', UIDs, shapefile)    
        #    wit_area = get_areas(features, pkey='UID')

        if debug: wit_data.to_csv("'wit_data"+str(l_index)+".csv")
        #wit_data = wit_data.groupby(pkey).apply(interpolate_wit, pkey=pkey).droplevel(0)

        wit_data.sort_values(by=['feature_id', 'date'],inplace=True)
        wit_data.reset_index(drop=True,inplace=True)
        if debug: wit_data.to_csv("'wit_data_interp"+str(l_index)+".csv")


        # compute yearly metrics and save the results to a csv
        # set the output file to your own path
        wit_yearly_metrics = annual_metrics(wit_data, pkey=pkey)
        ofn = "ANAE_yearly_metrics"+str(l_index)+".csv"
        wit_yearly_metrics.to_csv(ofn)

        # compute all time median, serve as the threshold
        # save the threshold in a csv, set the output file to your own path
        wit_median = all_time_median(wit_data, pkey=pkey)
        threshold_list = (wit_median[['water+wet']])
        ofn = "ANAE_event_threshold"+str(l_index)+".csv"
        threshold_list.to_csv(ofn)

        # compute event metrics with threshold_list then save the results to a csv
        # set the output file to your own path
        wit_im =inundation_metrics(wit_data, threshold_list, shapefile, pkey=pkey)
        ofn = "ANAE_inundation_metrics"+str(l_index)+".csv"
        wit_im.to_csv(ofn)

        # compute time since the last inundation event then save the results to a csv
        time_since_last = time_since_last_inundation(wit_data, wit_im, pkey)
        ofn = "ANAE_time_since_last_inundation"+str(l_index)+".csv"
        time_since_last.to_csv(ofn)
 



        print ("Batch",l_index,"completed in", time.process_time() - startbatch, "seconds.")
        l_index, r_index = r_index, r_index + batchsize
        batch = csv_files[l_index:r_index]



    merge_batches (cwdpath, output_filenames)
    print ("All batches completed in", time.process_time() - start, "seconds.")


### END