In [1]:
import os
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

pd.options.display.float_format = '{:,.4f}'.format

## User Settings
Here is every tunable parameter...

In [4]:
# -------------------------
# ---Input/Output Files ---
# -------------------------
input_csv_path = r"/Users/carynv5/Documents/NV5/USACE/dredge/Dodge Island 2023 Data.csv"

output_csv_path = '/Users/carynv5/Documents/NV5/USACE/dredge/OUTLIER_DETECTION/load_number_fix/Dodge_Data_LoadFlags.csv'

# ----------------------
# ---Time Thresholds ---
# ----------------------
# this is the maximum allowable gap between loads on a contract. This threshold is used to 
# remove outliers when differencing time stamps between rows to avoid outliers in analysis.
contract_gap_threshold = pd.to_timedelta('5 days') # 5 days

# if the average time deltas of a load (or group) exceed this threshold, the entire load is 
# flagged as having potential data gaps. Additionally, this threshold is used to identify
# non-contiguous loads within a single contract
load_gap_threshold = pd.to_timedelta('0 days 00:00:20') # 20 seconds

# -------------------------
# ---Min/Max Thresholds ---
# -------------------------
# any loads (or groups) whose durations that fall outside these thresholds will be flagged as
# having invalid durations
duration_threshold_min = pd.to_timedelta('0 days 01:00:00') # 1 hour
duration_threshold_max = pd.to_timedelta('1 days 00:00:00') # 1 day

# any loads (or groups) whose total point count falls outside these thresholds will be flagged
# as having an invalid number of points
point_threshold_min = 375 # set to approximate duration_threshold_min (assuming 10 second ping rate)
point_threshold_max = 8750 # set to approximate  duration_threshold_max (assuming 10 second ping rate)

# we estimate ping rate for each group based on the group's duration and the total number of points.
# If the actual ping rate is different, we flag that group (or load). In seconds.
ping_rate_threshold = 1.0 # 1 second

## Step 1: Data Preprocessing
Ingest CSV, compute per-point statistics

In [5]:
orig_df = pd.read_csv(input_csv_path)

In [6]:
orig_df['msg_time'] = pd.to_datetime(orig_df['msg_time']).dt.tz_localize(None)
orig_df = orig_df.sort_values('msg_time')

In [7]:
# determine the length of time between each data transmission for each datapoint in each contract,
# and set the outliers to pd.NA (i.e. gaps within a contract)

# to be implemented in Spark
orig_df['time_delta'] = orig_df.groupby('contract_id')['msg_time'].diff()
orig_df.loc[orig_df['time_delta'] > contract_gap_threshold, 'time_delta'] = pd.NA

In [9]:
# assign load group numbers based on contract_id and contiguous load numbers
# This sorts the DataFrame orig_df based on the values in the msg_time column in ascending order. This ensures that the data is in chronological order, which is important for the subsequent grouping and calculations.
orig_df = orig_df.sort_values('msg_time')
# spark - sorted_df = orig_df.orderBy(col("msg_time").asc())


# This function is defined to operate on each group of the DataFrame.
# group['load_number'].shift() shifts the load_number column by one position, creating a new Series where each value is the previous row's load_number.
# group['load_number'].ne(group['load_number'].shift()) compares each load_number with the previous one, resulting in a boolean Series where True indicates a change in load_number from the previous row.
# .cumsum() computes the cumulative sum of this boolean Series. This essentially assigns a new group number every time there is a change in load_number.
# The result is stored in a new column group_number in the DataFrame group.
def calculate_group_number(group):
    group['group_number'] = group['load_number'].ne(group['load_number'].shift()).cumsum()
    return group

orig_df = orig_df.groupby('contract_id').apply(calculate_group_number)

# Quash the multindex back down to a single index with reset_index()
orig_df = orig_df.reset_index(drop=True)

## Step 2. Create Load Group Intervals

Create a dataframe of "load group intervals". These intervals represent contiguous points in time
that all share a single load number. These are useful because load numbers can be mis-reported,
so partitioning data by load number alone will lead to misleading statistics.

In [14]:
# create an interval dataframe from each load group which describes
# high-level statistics about each group, then compute an IndexInterval for each
# A dictionary interval_dict is initialized with empty lists for each of the required summary statistics.
interval_dict = {'contract': [], 'group': [], 'load': [], 'msg_time_min': [], 
                 'msg_time_max': [], 'time_delta_avg': [], 'total_points':[]}

# The orig_df DataFrame is grouped by contract_id and group_number.
# For each group, the following statistics are calculated and appended to the respective lists in the dictionary:
# contract: The contract_id of the group.
# group: The group_number of the group.
# load: The minimum load_number within the group.
# msg_time_min: The minimum msg_time within the group.
# msg_time_max: The maximum msg_time within the group.
# time_delta_avg: The average time_delta within the group.
# total_points: The total number of points (rows) in the group.
# Example:

# SELECT 
#     contract_id AS contract,
#     group_number AS `group`,
#     MIN(load_number) AS load,
#     MIN(msg_time) AS msg_time_min,
#     MAX(msg_time) AS msg_time_max,
#     AVG(time_delta) AS time_delta_avg,
#     COUNT(*) AS total_points
# FROM orig_table
# GROUP BY contract_id, group_number;

for idx, df in orig_df.groupby(['contract_id', 'group_number']):
    interval_dict['contract'].append(idx[0])
    interval_dict['group'].append(idx[1])
    interval_dict['load'].append(df['load_number'].min())
    interval_dict['msg_time_min'].append(df['msg_time'].min())
    interval_dict['msg_time_max'].append(df['msg_time'].max())
    interval_dict['time_delta_avg'].append(df['time_delta'].mean())
    interval_dict['total_points'].append(df.shape[0])

# Creating dataframe of the result - SQL would create a table or view
group_intervals = pd.DataFrame(interval_dict)
#group_intervals = pd.DataFrame(interval_dict).set_index(['contract', 'group'])

In [15]:
# What the output looks like
group_intervals.head()

Unnamed: 0,contract,group,load,msg_time_min,msg_time_max,time_delta_avg,total_points
0,445,1,1,2023-10-30 19:55:32,2023-10-31 00:11:32,0 days 00:00:05.245901639,2929
1,445,2,2,2023-10-31 00:11:34,2023-10-31 02:59:54,0 days 00:00:05.226073460,1933
2,445,3,3,2023-10-31 03:00:09,2023-10-31 07:00:39,0 days 00:00:05.243194192,2755
3,445,4,4,2023-10-31 07:00:49,2023-10-31 10:11:36,0 days 00:00:05.142280071,2228
4,445,5,5,2023-10-31 10:11:46,2023-10-31 15:14:46,0 days 00:00:05.339007924,3407


In [18]:
# compute useful columns:
# interval index

# SQL databases do not have a direct equivalent of pandas' IntervalIndex, but you can achieve a similar result by using the minimum and maximum timestamps to define intervals.
# Example:

# CREATE VIEW interval_view AS
# SELECT 
#     contract,
#     group,
#     msg_time_min,
#     msg_time_max,
#     CONCAT('[',
#            DATE_FORMAT(msg_time_min, '%Y-%m-%d %H:%i:%s'),
#            ', ',
#            DATE_FORMAT(msg_time_max, '%Y-%m-%d %H:%i:%s'),
#            ']') AS interval
# FROM group_intervals;
group_intervals['interval'] = pd.IntervalIndex.from_arrays(np.array(group_intervals['msg_time_min']), 
                                                      np.array(group_intervals['msg_time_max']), 
                                                      closed='both')



# This calculates the total time duration for each interval in the group_intervals DataFrame and storing the result in a new column called total_time.

# You can calculate the duration between the msg_time_min and msg_time_max columns in SQL.
# Example:

# SELECT 
#     contract,
#     group,
#     msg_time_min,
#     msg_time_max,
#     TIMESTAMPDIFF(SECOND, msg_time_min, msg_time_max) AS total_time_seconds
# FROM 
#     group_intervals;


# total length of an interval
group_intervals['total_time'] = group_intervals['interval'].apply(lambda x: (x.right - x.left))



#  This calculates the midpoint of each interval in the group_intervals DataFrame and storing the result in a new column called msg_time_mid

# To achieve the same result in SQL, you can calculate the midpoint between the msg_time_min and msg_time_max columns. 
# Example:

# SELECT 
#     contract,
#     group,
#     msg_time_min,
#     msg_time_max,
#     DATE_ADD(msg_time_min, INTERVAL TIMESTAMPDIFF(SECOND, msg_time_min, msg_time_max) / 2 SECOND) AS msg_time_mid
# FROM 
#     group_intervals;

# midpoint of an interval
group_intervals['msg_time_mid'] = group_intervals['interval'].apply(lambda x: x.mid)

## Flag Any Outliers in the Load Groups

In [19]:
# flag any groups where the duration is either too short or two long... 1 means the group 
# is positive for exceeding threshold, 0 means group is within valid range


# Example:

# Variable: duration_threshold_min defined in seconds (1hr)
# SET @duration_threshold_min = 3600; or pass a variable into the query.

# SELECT 
#     contract,
#     group,
#     total_time,
#     CASE 
#         WHEN total_time < @duration_threshold_min THEN 1
#         ELSE 0
#     END AS invalid_load_duration_short
# FROM 
#     group_intervals;

group_intervals['invalid_load_duration_short'] = np.where(
    (group_intervals['total_time'] < duration_threshold_min), 1, 0)

group_intervals['invalid_load_duration_long'] = np.where(
    (group_intervals['total_time'] > duration_threshold_max), 1, 0)

In [11]:
# You can use the COUNT function along with a GROUP BY clause to count the occurrences of each value in the invalid_load_duration_short column. 

# SELECT 
#     invalid_load_duration_short,
#     COUNT(*) AS count
# FROM 
#     group_intervals
# GROUP BY 
#     invalid_load_duration_short;

group_intervals['invalid_load_duration_short'].value_counts()

invalid_load_duration_short
0    1887
1      53
Name: count, dtype: int64

In [12]:
group_intervals['invalid_load_duration_long'].value_counts()

invalid_load_duration_long
0    1920
1      20
Name: count, dtype: int64

In [13]:
# flag any groups where the point count is either too low or too high... 1 means the group 
# is positive for falling outside the threshold, 0 means group is within valid range
group_intervals['invalid_load_point_count_low'] = np.where(
    (group_intervals['total_points'] < point_threshold_min), 1, 0)

group_intervals['invalid_load_point_count_high'] = np.where(
    (group_intervals['total_points'] > point_threshold_max), 1, 0)


# defined above ^
# duration_threshold_min = pd.to_timedelta('0 days 01:00:00') # 1 hour
# duration_threshold_max = pd.to_timedelta('1 days 00:00:00') # 1 day

In [14]:
group_intervals['invalid_load_point_count_low'].value_counts()

invalid_load_point_count_low
0    1900
1      40
Name: count, dtype: int64

In [15]:
group_intervals['invalid_load_point_count_high'].value_counts()

invalid_load_point_count_high
0    1906
1      34
Name: count, dtype: int64

In [16]:
# flag groups with long durations between points
group_intervals['invalid_load_reporting_gap'] = np.where(
    group_intervals['time_delta_avg'] > load_gap_threshold, 1, 0)

group_intervals['invalid_load_reporting_gap'].value_counts()

invalid_load_reporting_gap
0    1938
1       2
Name: count, dtype: int64

In [17]:
# check if the estimated ping rate and the actual ping rates are within a threshold of each other
def flag_invalid_ping_rates(row, ping_rate_threshold):
    est_ping_rate = row['total_time'].total_seconds() / row['total_points']
    time_delta_seconds = row['time_delta_avg'].total_seconds()
    if abs(est_ping_rate - time_delta_seconds) > ping_rate_threshold:
        return 1
    else:
        return 0

group_intervals['invalid_load_ping_rate'] = group_intervals.apply(
    flag_invalid_ping_rates, args=(ping_rate_threshold,), axis=1)

group_intervals['invalid_load_ping_rate'].value_counts()

invalid_load_ping_rate
0    1914
1      26
Name: count, dtype: int64

In [18]:
# look for load numbers that are duplicated on a contract
def flag_duplicated_load_numbers(df):
    mask = df['load'].duplicated(keep=False)
    df.loc[mask, 'invalid_duplicate_loads'] = 1
    return df

group_intervals['invalid_load_duplicate'] = 0
group_intervals = group_intervals.groupby('contract').apply(flag_duplicated_load_numbers)
group_intervals = group_intervals.reset_index(drop=True)

group_intervals['invalid_load_duplicate'].value_counts()

invalid_load_duplicate
0    1940
Name: count, dtype: int64

In [19]:
# check if any load groups within a contract overlap one another
def check_overlaps(df):
    for i in df.itertuples():
        for j in df.itertuples():
            if i.Index != j.Index and i.interval.overlaps(j.interval):
                df.loc[i.Index, 'invalid_load_overlap'] = 1
                print(f"Contract {i.contract}: Load {i.load} overlaps {j.load}")
    return df

group_intervals['invalid_load_overlap'] = 0
group_intervals = group_intervals.groupby('contract').apply(check_overlaps)
group_intervals = group_intervals.reset_index(drop=True)

group_intervals['invalid_load_overlap'].value_counts()

invalid_load_overlap
0    1940
Name: count, dtype: int64

In [20]:
# look for non-contiguous loads
group_intervals = group_intervals.sort_values(['contract', 'interval'])

group_intervals['time_diff'] = abs(group_intervals['msg_time_min'] - 
                                            group_intervals.groupby('contract')['msg_time_max'].shift(1))

group_intervals['invalid_load_noncontiguous'] = np.where(group_intervals['time_diff'] > load_gap_threshold, 1, 0)

group_intervals['invalid_load_noncontiguous'].value_counts()

invalid_load_noncontiguous
0    1913
1      27
Name: count, dtype: int64

In [21]:
# look for load numbers that are duplicated on a contract
def flag_duplicated_load_numbers(df):
    mask = df['load'].duplicated(keep=False)
    df.loc[mask, 'invalid_duplicate_loads'] = 1
    return df

group_intervals['invalid_load_duplicate'] = 0
group_intervals = group_intervals.groupby('contract').apply(flag_duplicated_load_numbers)
group_intervals = group_intervals.reset_index(drop=True)

group_intervals['invalid_load_duplicate'].value_counts()

invalid_load_duplicate
0    1940
Name: count, dtype: int64

In [22]:
# Flag loads where ANY of the outlier flags are positive (=1)
outlier_columns = ['invalid_load_duration_short', 'invalid_load_duration_long',
                   'invalid_load_point_count_low', 'invalid_load_point_count_high',
                   'invalid_load_reporting_gap', 'invalid_load_ping_rate', 
                   'invalid_load_overlap', 'invalid_load_noncontiguous', 'invalid_load_duplicate']

group_intervals['invalid_load'] = group_intervals[outlier_columns].any(axis=1).astype(int)

group_intervals['invalid_load'].value_counts()

invalid_load
0    1829
1     111
Name: count, dtype: int64

## Rejoin Intervals with Outlier Flags to Original Data (map intervals back to points)

In [23]:
# NOTE: this should be easier to do with a pd.cut() but their was a conflict in the time formats
#       between points and intervals. Need to troubleshoot that...

# create a key field shared between the two data frames
group_intervals['msg_time'] = group_intervals['msg_time_min']

# pd.merge_asof requires the data to be sorted
orig_df = orig_df.sort_values('msg_time')
group_intervals = group_intervals.sort_values('msg_time')

# merge only the needed fields into the original dataframe. 
# NOTE: pd.merge_asof() does a "closest time match" and be default only matches "backwards", so 
#       using the minimum time of each load should work out just fine.
group_intervals_out = group_intervals[['msg_time', 
                                       'invalid_load_duration_short', 'invalid_load_duration_long',
                                       'invalid_load_point_count_low', 'invalid_load_point_count_high',
                                       'invalid_load_reporting_gap', 'invalid_load_ping_rate', 
                                       'invalid_load_overlap', 'invalid_load_noncontiguous', 
                                       'invalid_load_duplicate', 'invalid_load']]
 
orig_df_out = pd.merge_asof(orig_df, group_intervals_out, on='msg_time')

In [None]:
# write the df to CSV with the repaired load numbers, drop the columns we created
orig_df_out.drop(columns=['group_number', 'time_delta']).to_csv(output_csv_path)