In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

# Set random seed 
RSEED = 100

In [2]:
# Load Training Data

train_data = pd.read_csv("../00_Data_Sets/1_mio_dataset_2010_2014.csv");

In [3]:
# Load Input Data
incoming_data_passenger_count_outlier = pd.read_csv("../01_Synthetic Data/02_Passenger Count/passenger count_outlier_01.csv");
# incoming_data_location_outlier = pd.read_csv("../01_Synthetic Data/01_Location/location_outlier_01.csv");
# incoming_data_distance_outlier = pd.read_csv("../01_Synthetic Data/03_Distance/distance_outlier_01.csv");
# incoming_data_distance_drift = pd.read_csv("../01_Synthetic Data/03_Distance/distance_drift_total_01.csv");
# incoming_data_location_drift = pd.read_csv("../01_Synthetic Data/01_Location/location_drift_total.csv");
incoming_data_passenger_drift = pd.read_csv("../01_Synthetic Data/02_Passenger Count/passenger count_drift_total_01.csv");

In [4]:
def monitor_drift_main_metrics(training_data, incoming_batch, start_index, end_index, batch_info, column, thresholds):
    
    batch_thresholds = thresholds[batch_info['name'].lower() + '_batch_drift']
    threshold = batch_thresholds['one_dim_drift_metric']
            
    if abs(incoming_batch.iloc[start_index:end_index,column].mean()) > abs((training_data.iloc[:,column].mean() * (1 + threshold))):
        print('[DRIFT][{} Window]: Upwards Data Drift detected!. ({} AVG: {})\t(Index: {})'.format(batch_info['name'], training_data.columns[column], incoming_batch.iloc[start_index:end_index,column].mean(), end_index))
        
    if abs(incoming_batch.iloc[start_index:end_index,column].quantile(0.25)) > abs((training_data.iloc[:,column].quantile(0.25) * (1 + threshold))):
        print('[DRIFT][{} Window]: Upwards Data Drift detected! ({} 25%-Quantile: {})\t(Index: {})'.format(batch_info['name'], training_data.columns[column], incoming_batch.iloc[start_index:end_index,column].mean(), end_index))
        
    if abs(incoming_batch.iloc[start_index:end_index,column].quantile(0.75)) > abs((training_data.iloc[:,column].quantile(0.75) * (1 + threshold))):
        print('[DRIFT][{} Window]: Upwards Data Drift detected! ({} 75%-Quantile: {})\t(Index: {})'.format(batch_info['name'], training_data.columns[column], incoming_batch.iloc[start_index:end_index,column].mean(), end_index))
        
    if abs(incoming_batch.iloc[start_index:end_index,column].mean()) < abs((training_data.iloc[:,column].mean() * (1 - threshold))):
        print('[DRIFT][{} Window]: Downwards Data Drift detected! ({} AVG: {})\t(Index: {})'.format(batch_info['name'], training_data.columns[column], incoming_batch.iloc[start_index:end_index,column].mean(), end_index))
        
    if abs(incoming_batch.iloc[start_index:end_index,column].quantile(0.25)) < abs((training_data.iloc[:,column].quantile(0.25) * (1 - threshold))):
        print('[DRIFT][{} Window]: Downwards Data Drift detected! ({} 25%-Quantile: {})\t(Index: {})'.format(batch_info['name'], training_data.columns[column], incoming_batch.iloc[start_index:end_index,column].mean(), end_index))
        
    if abs(incoming_batch.iloc[start_index:end_index,column].quantile(0.75)) < abs((training_data.iloc[:,column].quantile(0.75) * (1 - threshold))):
        print('[DRIFT][{} Window]: Downwards Data Drift detected! ({} 75%-Quantile: {})\t(Index: {})'.format(batch_info['name'], training_data.columns[column], incoming_batch.iloc[start_index:end_index,column].mean(), end_index))

        
from math import log2

def kl_divergence(p, q):
    return sum(p[i] * log2(p[i]/q[i]) for i in range(len(p)))

        
def monitor_drift_kl_divergence(training_data, incoming_batch, start_index, end_index, batch_info, column, thresholds):
    
    batch_thresholds = thresholds[batch_info['name'].lower() + '_batch_drift']
    threshold = batch_thresholds['one_dim_drift_kl_divergence']
    
    # Filter out zero values in datasets
    batch_df = incoming_batch[incoming_batch.iloc[:,column] != 0]
    train_df = training_data[training_data.iloc[:,column] != 0]
    
    # Filter out negative values in dataset, if mean is positive (= remove negative outliers)
    if batch_df.iloc[:,column].mean() > 0:
        batch_df = batch_df[batch_df.iloc[:,column] > 0]
        
    batch_df = batch_df.reset_index()
    train_df = train_df.reset_index()
    batch_df = batch_df.drop('index', axis=1)
    train_df = train_df.drop('index', axis=1)
    batch = batch_df.iloc[start_index:end_index,column]
    batch = batch.reset_index()
    batch = batch.drop('index', axis=1)
    batch = batch.iloc[:,0]
    train = train_df.iloc[:,column]
    
    divergence_score = kl_divergence(batch, train);
    
    if divergence_score > threshold:
        print('[DRIFT][{} Window]: KL-Divergence detected! ({} Divergence: {})\t(Index: {})'.format(batch_info['name'], batch_df.columns[column], divergence_score, end_index))        
        

from scipy.stats import wasserstein_distance

def monitor_drift_wasserstein_distance(training_data, incoming_batch, start_index, end_index, batch_info, column, thresholds):
    
    batch_thresholds = thresholds[batch_info['name'].lower() + '_batch_drift']
    threshold = batch_thresholds['one_dim_drift_wasserstein']
    
    batch = incoming_batch.iloc[start_index:end_index, column]
    train = training_data.iloc[:, column]
    wasserstein_dist = wasserstein_distance(batch, train);
    
    if wasserstein_dist > threshold:
        print('[DRIFT][{} Window]: Great Wasserstein Distance detected! ({} Distance: {})\t(Index: {})'.format(batch_info['name'], incoming_batch.columns[column], wasserstein_dist, end_index))
                    
def monitor_drift_one_dimension(training_data, train_data_infos, incoming_batch, start_index, end_index, batch_info, thresholds):
    
    columns_to_use = train_data_infos['columns_to_use']
    
    for column in columns_to_use:
        monitor_drift_main_metrics(training_data, incoming_batch, start_index, end_index, batch_info, column, thresholds);
        monitor_drift_kl_divergence(training_data, incoming_batch, start_index, end_index, batch_info, column, thresholds);
        monitor_drift_wasserstein_distance(training_data, incoming_batch, start_index, end_index, batch_info, column, thresholds);
    
def monitor_drift_multi_dimensions(training_data, train_data_infos, incoming_batch, start_index, end_index, batch_info, thresholds):
    columns_to_use = train_data_infos['columns_to_use'];
    pca_model = train_data_infos['pca_model'];
    
    incoming_batch_filtered = get_data_filtered(incoming_batch, columns_to_use);
    incoming_batch_transformed = transform_pca(pca_model, incoming_batch_filtered);
    distances = calculate_euclidean_distances(incoming_batch_transformed, columns_to_use);
    distances_metrics = get_distance_metrics(distances, columns_to_use, pca_model, thresholds);
    
    if distances_metrics['mean'] > train_data_infos['mean']:
        print('[DRIFT] Multi-Dimensional Drift Detected! Mean Distance: {}\t\t\t(Index: {})'.format(distances_metrics['maximum'], end_index));
        
    if distances_metrics['first_quarter'] > train_data_infos['first_quarter']:
        print('[DRIFT] Multi-Dimensional Drift Detected! Distance: {}\t\t\t(Index: {})'.format(distances_metrics['first_quarter'], end_index));
        
    if distances_metrics['third_quarter'] > train_data_infos['third_quarter']:
        print('[DRIFT] Multi-Dimensional Drift Detected! Distance: {}\t\t\t(Index: {})'.format(distances_metrics['third_quarter'], end_index));
        
        
def monitor_batch_drift(training_data, train_data_infos, incoming_batch, index, batch_info, thresholds):
    
    batch_size = batch_info['size'];
    start_index = index - batch_size + 1;
    end_index = index;
    
    monitor_drift_one_dimension(training_data, train_data_infos, incoming_batch, start_index, end_index, batch_info, thresholds);
    monitor_drift_multi_dimensions(training_data, train_data_infos, incoming_batch, start_index, end_index, batch_info, thresholds);
    
def monitor_drift(training_data, train_data_infos, batch, step_sizes, index, row, batch_infos, thresholds):
    
    # Add new sample to batch
    batch.loc[index] = row.values;
        
    if batch.shape[0] % step_sizes['small'] == 0 and batch.shape[0] > (step_sizes['small']):
        monitor_batch_drift(training_data, train_data_infos, batch, index, batch_infos['small'], thresholds);

    if batch.shape[0] % step_sizes['medium'] == 0:
        monitor_batch_drift(training_data, train_data_infos, batch, index, batch_infos['medium'], thresholds);

    if batch.shape[0] % step_sizes['large'] == 0:
        monitor_batch_drift(training_data, train_data_infos, batch, index, batch_infos['large'], thresholds);

In [13]:
def get_avg_samples_per_day(data):
    # TODO: More elegant solution possible?
    df = pd.DataFrame();
    df['pickup_datetime'] = train_data['pickup_datetime']
    df['count'] = 1
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])
    df.index = df['pickup_datetime'] 
    df = df.resample('D').sum()
    df = df[df['count'] != 0]
    return df['count'].mean()

def get_batch_infos(data):
    batch_size_day = get_avg_samples_per_day(data);
    batch_size_week = batch_size_day * 7;
    batch_size_month = batch_size_day * 30;
    
    batch_sizes = {
        'small': {'size': int(batch_size_day), 'name': 'Small'},
        'medium': {'size': int(batch_size_week), 'name': 'Medium'},
        'large': {'size': int(batch_size_month), 'name': 'Large'},        
    }
    
    return batch_sizes;

def get_step_sizes(batch_infos):
    
    step_sizes = {
        'small': (batch_infos['small']['size'] / 2),
        'medium': (batch_infos['medium']['size'] / 2),
        'large': (batch_infos['large']['size'] / 2) 
    }
    return step_sizes;

def get_data_filtered(data, columns_to_use):
    data_filtered = pd.DataFrame()
    i = 0

    # Filter Input Columns
    for column in columns_to_use:
        data_filtered.insert(i, data.columns[column], data.iloc[:,column])
        i = i + 1
        
    return data_filtered;

def apply_pca(train_data_filtered):
    pca_model = get_pca_model(train_data_filtered);
    return pca_model, transform_pca(pca_model, train_data_filtered);

def get_pca_model(train_data_filtered):
    from sklearn.decomposition import PCA
    
    pca = PCA(whiten=True)
    return pca.fit(train_data_filtered)

def transform_pca(pca_model, train_data_filtered):
    from sklearn.decomposition import PCA
    
    return pca_model.transform(train_data_filtered)

def get_zero_vector(columns_to_use):
    number_of_dimensions = len(columns_to_use)
    return np.zeros((number_of_dimensions)*1)

def calculate_euclidean_distance(sample, zero_vector):
    from scipy.spatial import distance
    
    return distance.euclidean(sample, zero_vector)

def calculate_euclidean_distances(transformed_data, columns_to_use):
    from scipy.spatial import distance
    
    zero_vector = get_zero_vector(columns_to_use);
    
    index = 0
    distances = np.ndarray(shape=(np.size(transformed_data,0),1))

    # Create distances to zero vector
    for sample in transformed_data:
        distances[index] = calculate_euclidean_distance(transformed_data[index], zero_vector);
        index = index + 1
        
    return distances;

def get_distance_metrics(distances, columns_to_use, pca_model, thresholds):
    
    raw_threshold = thresholds['multi_dim_outlier']
    threshold = raw_threshold * 100
    
    train_data_infos = {
        'columns_to_use': columns_to_use,
        'pca_model': pca_model,
        'minimum': distances.min(),
        'lower_threshold_percentile': np.percentile(distances, threshold),
        'first_quarter': np.percentile(distances, 25),
        'mean': distances.mean(),
        'third_quarter': np.percentile(distances, 75),
        'upper_threshold_percentile': np.percentile(distances, (100 - threshold)),
        'maximum': distances.max()
    }
    
    return train_data_infos;

def get_train_data_infos(train_data, columns_to_use, thresholds):
    
    # +++ REMOVE: Only for testing purposes +++
    test = {
        'columns_to_use': columns_to_use,
        'pca_model': get_pca_model(get_data_filtered(train_data, columns_to_use)),
        'minimum': 0.7249817600534729, 
        'lower_threshold_percentile': 1.0764902427267176,
        'first_quarter': 1.1133579226848411,
        'mean': 1.922411839793231,
        'third_quarter': 2.200820905397247,
        'upper_threshold_percentile': 8.095414750208105, 
        'maximum': 58.320576749831
    }
    
    # return test
    # +++ REMOVE: Only for testing purposes +++
    
    train_data_filtered = get_data_filtered(train_data, columns_to_use);
    pca_model, transformed_data = apply_pca(train_data_filtered)
    distances = calculate_euclidean_distances(transformed_data, columns_to_use)
    
    return get_distance_metrics(distances, columns_to_use, pca_model, thresholds);  

In [9]:
def monitor_outliers_one_dimension(train_data, train_data_infos, incoming_sample, index, thresholds):
    
    threshold = thresholds['one_dim_outlier']
    columns_to_use = train_data_infos['columns_to_use']
    
    for column in columns_to_use:
            
            if incoming_sample[column] > train_data.iloc[:,column].max():
                print('[OUTLIER] MAX Outlier Detected! {}: {}\t\t\t(Index: {})'.format(train_data.columns[column], incoming_sample[column], index));

            elif incoming_sample[column] > train_data.iloc[:,column].quantile(1 - threshold):
                print('[POT. OUTLIER] Potential MAX Outlier Detected! {}: {}\t\t\t(Index: {})'.format(train_data.columns[column], incoming_sample[column], index));

            if incoming_sample[column] < train_data.iloc[:,column].min():
                print('[OUTLIER] MIN Outlier Detected! {}: {}\t\t\t(Index: {})'.format(train_data.columns[column], incoming_sample[column], index));
                
            elif incoming_sample[column] < train_data.iloc[:,column].quantile(threshold):
                print('[POT. OUTLIER] Potential MIN Outlier Detected! {}: {}\t\t\t(Index: {})'.format(train_data.columns[column], incoming_sample[column], index));
                
    
def monitor_outliers_multi_dimensions(train_data, train_data_infos, incoming_sample, index):
    incoming_sample_df = pd.DataFrame(incoming_sample).transpose();
    incoming_sample_df_filtered = get_data_filtered(incoming_sample_df, train_data_infos['columns_to_use']);
    incoming_sample_df_transformed = transform_pca(train_data_infos['pca_model'], incoming_sample_df_filtered);
    distance = calculate_euclidean_distance(incoming_sample_df_transformed, get_zero_vector(train_data_infos['columns_to_use']));

    if distance > train_data_infos['maximum']:
        print('[OUTLIER] Multi-Dimensional Outlier Detected! Distance: {}\t\t\t(Index: {})'.format(distance, index));
        
    elif distance > train_data_infos['upper_threshold_percentile']:
        print('[POT. OUTLIER] Potential Multi-Dimensional Outlier Detected! Distance: {}\t\t\t(Index: {})'.format(distance, index));

def monitor_outliers(train_data, train_data_infos, incoming_sample, index, thresholds):
    monitor_outliers_one_dimension(train_data, train_data_infos, incoming_sample, index, thresholds);
    monitor_outliers_multi_dimensions(train_data, train_data_infos, incoming_sample, index);

In [10]:
# Define desired thresholds

def get_thresholds():
    thresholds = {
        'one_dim_outlier': 0.001,
        'multi_dim_outlier': 0.01,
        'small_batch_drift': {
            'one_dim_drift_metric': 0.03,
            'one_dim_drift_kl_divergence': 100,
            'one_dim_drift_wasserstein': 0.02
        },
        'medium_batch_drift': {
            'one_dim_drift_metric': 0.05,
            'one_dim_drift_kl_divergence': 150,
            'one_dim_drift_wasserstein': 0.03
        },
        'large_batch_drift': {
            'one_dim_drift_metric': 0.07,
            'one_dim_drift_kl_divergence': 200,
            'one_dim_drift_wasserstein': 0.04
        }
    }
    
    return thresholds;

In [11]:
def monitor(train_data, incoming_data):
        
    thresholds = get_thresholds();
    columns_to_use = [3, 4, 5, 6, 7, 12, 13, 14]
    batch = pd.DataFrame(columns=incoming_data.columns);
    batch_infos = get_batch_infos(train_data);
    train_data_infos = get_train_data_infos(train_data, columns_to_use, thresholds);
    step_sizes = get_step_sizes(batch_infos);
    
    for index, row in incoming_data.iterrows():
        
        monitor_outliers(train_data, train_data_infos, row, index, thresholds);
        monitor_drift(train_data, train_data_infos, batch, step_sizes, index, row, batch_infos, thresholds);

In [14]:
monitor(train_data, incoming_data_passenger_count_outlier.head(1000));

[POT. OUTLIER] Potential MAX Outlier Detected! dropoff_latitude: 40.887882232666016			(Index: 4)
[OUTLIER] MIN Outlier Detected! passenger_count: -1			(Index: 150)
[POT. OUTLIER] Potential MAX Outlier Detected! manhattan: 0.36305999755859375			(Index: 343)
[POT. OUTLIER] Potential MAX Outlier Detected! euclidean: 0.2629007645951818			(Index: 343)
[POT. OUTLIER] Potential MAX Outlier Detected! haversine: 27.40050231658701			(Index: 343)
[POT. OUTLIER] Potential Multi-Dimensional Outlier Detected! Distance: 12.049343616294522			(Index: 343)
[POT. OUTLIER] Potential Multi-Dimensional Outlier Detected! Distance: 10.145906069419583			(Index: 344)
[POT. OUTLIER] Potential MAX Outlier Detected! pickup_latitude: 40.86755752563477			(Index: 424)
[POT. OUTLIER] Potential MAX Outlier Detected! manhattan: 0.3475189208984233			(Index: 440)
[POT. OUTLIER] Potential MAX Outlier Detected! euclidean: 0.2458510235458707			(Index: 440)
[POT. OUTLIER] Potential MAX Outlier Detected! haversine: 24.07603490

KeyboardInterrupt: 