# Modules 

In [1]:
import atd2020
import numpy as np
import math
import multiprocessing  
import random
import pandas as pd
import data_prep
import read_data
import modeling
from functools import partial
from sklearn.gaussian_process.kernels import (RBF,Matern,RationalQuadratic,ExpSineSquared,ConstantKernel)

# Read the data

In [2]:
data, data_obs = read_data.load_in_data()

Accessing data file from /Users/qinghe/Documents/project/atd2021/atd2021/data/City4_all.parquet.brotli.


In [3]:
# Detrend the data to make it stationary (constant mean and variance)
data_detrended = atd2020.detrend.detrend(data)
groups_detrended = data_detrended.groupby(["ID", "Weekday", "Hour"])
data_obs_detrended = atd2020.detrend.detrend(data_obs)
groups_obs_detrended = data_obs_detrended.groupby(["ID", "Weekday", "Hour"])

# Standardize the data

In [4]:
# normalize the data first
def standardize_self2(df, ycol="TotalFlow"):
    y = df[ycol]
    m = np.mean(y)
    std = np.std(y)
    df["TotalFlow_norm"] = (y - m) / std
    return df

In [5]:
def normalize_ID_Hour_Wkdcat(data):
    Weekdays_lst = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
    data_copy = data.copy()
    data_copy["Weekday_cat"] = np.where(data_copy.Weekday.isin(Weekdays_lst), 'wkd_c1', 'wkd_c2')
    data_norm = data_copy.groupby(["ID","Weekday_cat","Hour"]).apply(standardize_self2)
    data_norm.rename(
        columns={"TotalFlow": "TotalFlow_detrended", "TotalFlow_norm": "TotalFlow"},
        inplace=True,
    )
    return data_norm


In [6]:
data_obs_detrended_norm = normalize_ID_Hour_Wkdcat(data_obs_detrended)
data_detrended_norm = normalize_ID_Hour_Wkdcat(data_detrended)

In [8]:
# define the kernels
kernels = [1.0 * RBF(length_scale=1.0, length_scale_bounds=(1e-1, 10.0)),
           1.0* RationalQuadratic(length_scale=1.0,alpha=0.1,length_scale_bounds=(0.01, 10.0),alpha_bounds=(0.01, 10),),
           1.0* ExpSineSquared(length_scale=1.0,periodicity=3.0,length_scale_bounds=(0.01, 100.0),periodicity_bounds=(1.0, 100.0),),
            1.0 * Matern(length_scale=1.0, length_scale_bounds=(1e-1, 10.0), nu=1.5),
]

# Run the model

In [9]:

def get_fraction_slice(fraction_groups, fraction):
    fraction_groups_lst = list(fraction_groups.groups)
    res = []
    for i in fraction_groups_lst:
        if i[3] == fraction:
            res.append(i[0:3])
    return res
def train_val_test(seed, train_size, validation_size, slice_list):
    # sample the training, validation, and test dataset
    random.seed(seed)
    training = random.sample(slice_list, train_size)
    temp = [elem for elem in slice_list if not elem in training]
    validation = random.sample(temp, validation_size)
    # test = random.sample(temp2, test_size)
    return training, validation

In [11]:
def final_model(data, data_obs, n_IDs, p_training, p_validation, kernel, cutoff):
    random.seed(1234)
    # random select IDs from 0 to 399
    sampled_IDs = random.sample(range(400), n_IDs)
    # print(sampled_IDs)
    ################################################### Data prep ###################################################
    # combine the data parallelly on IDs
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-3) # change the # of cpu here
        temp = pool.map(partial(data_prep.data_prep_comb, data_obs_for_combine=data_obs, comb_hours=False, comb_wkd=True,hour_bef_aft=True),sampled_IDs)
        pool.close()
    # save the combined data as data_obs_new
    data_obs_new = pd.DataFrame(columns=["ID", "Date", "Hour", "TotalFlow"])
    for i in temp:
        data_obs_new = pd.concat([data_obs_new, i])
    data_obs_new["Weekday"] = [i.strftime("%A") for i in data_obs_new.Date]

    print("Data prep finished.")
    print("Peek the combined data:")
    print(data_obs_new.head())

    ################################################### Training and validation ###################################################
    # run the fraction-wise model parallelly on each fraction
    fraction_ind = [0.01,0.02,0.05,0.1]
    fraction_groups = data_obs[data_obs.ID.isin(sampled_IDs)].groupby(
        ["ID", "Weekday", "Hour", "Fraction_Observed"]
    )
    training_lst=[]
    validation_lst=[]
    for fraction in fraction_ind:
        slice_p = get_fraction_slice(fraction_groups, fraction)
        training, validation = train_val_test(seed=123,train_size=math.floor(len(slice_p) * p_training),validation_size=math.floor(len(slice_p) * p_validation),
        slice_list=slice_p)
        training_lst.append(training)
        validation_lst.append(validation)

    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-3) # change the # of cpu here
        res_training = pool.map(partial(modeling.run_train_fraction, data=data, data_obs_new=data_obs_new, data_obs=data_obs, kernel=kernel,cutoff=cutoff),training_lst)
        pool.close()
    
    iter_obj=[]
    for i in range(4):
        temp = []
        temp.append(res_training[i])
        temp.append(validation_lst[i])
        iter_obj.append(temp)
    # print(iter_obj)
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-3) # change the # of cpu here
        res_validation = pool.map(partial(modeling.run_valid_fraction, data_obs_new=data_obs_new, data_obs=data_obs, kernel=kernel),iter_obj)
        pool.close()
    print("1% training f1:", res_training[0]['training_F1'], "1% validation f1:", res_validation[0]['validation_F1'])
    print("2% training f1:", res_training[1]['training_F1'], "2% validation f1:", res_validation[1]['validation_F1'])
    print("5% training f1:", res_training[2]['training_F1'], "5% validation f1:", res_validation[2]['validation_F1'])
    print("10% training f1:", res_training[3]['training_F1'], "10% validation f1:", res_validation[3]['validation_F1'])
    # return the models and F1 scores
    return (iter_obj)

# Model on the detrended normalized data

In [13]:
data_detrended_norm_clean = data_detrended_norm.drop(['Year', 'Month','Day','Latitude','Longitude','Weekday_cat'], axis=1)
data_obs_detrended_norm_clean = data_obs_detrended_norm.drop(['Year', 'Month','Day','Latitude','Longitude','Weekday_cat'], axis=1)
result = final_model(data=data_detrended_norm_clean, data_obs=data_obs_detrended_norm_clean, n_IDs=400, p_training=0.5, p_validation=0.5,
kernel=kernels[1], cutoff=[x / 100.0 for x in range(10, 100, 1)])

Data prep finished.
Peek the combined data:
    ID        Date Hour  TotalFlow    Weekday
0  265  2011-01-03   22   -1.56472     Monday
1  265  2011-01-05   22   -1.56472  Wednesday
2  265  2011-01-06   22   -1.56472   Thursday
3  265  2011-01-07   22   -1.56472     Friday
4  265  2011-01-04   21   -1.56472    Tuesday
1% training f1: 0.5913621262458473 1% validation f1: 0.6106870229007634
2% training f1: 0.6823266219239374 2% validation f1: 0.6932835820895523
5% training f1: 0.758674585808065 5% validation f1: 0.7574233327924712
10% training f1: 0.822360248447205 10% validation f1: 0.8267756770567194


In [None]:
# import pickle
# with open("/Users/qinghe/Documents/project/atd2021/results/sprint4/result.pkl", "wb") as fp:   #Pickling
#     pickle.dump(result, fp)

# Baseline model

In [7]:
random.seed(1234)
# random select IDs from 0 to 399
sampled_IDs = random.sample(range(400), 200)
validation_IDs = [i for i in range(400) if i not in sampled_IDs]

In [17]:
validation_detrended = data_obs_detrended[data_obs_detrended.ID.isin(validation_IDs)].copy()
validation_detrended.head()

Unnamed: 0,Timestamp,ID,TotalFlow_original,Year,Month,Day,Hour,Weekday,Latitude,Longitude,Anomaly,Fraction_Observed,Observed,Date,TotalFlow
87646,2011-01-01 08:00:00,1,1531.0,2011,1,1,8,Saturday,0.510311,0.836739,False,0.02,True,2011-01-01,1682.995664
87714,2011-01-04 04:00:00,1,345.0,2011,1,4,4,Tuesday,0.510311,0.836739,False,0.02,True,2011-01-04,496.753393
87772,2011-01-06 14:00:00,1,4392.0,2011,1,6,14,Thursday,0.510311,0.836739,False,0.02,True,2011-01-06,4543.546751
87988,2011-01-15 14:00:00,1,4707.0,2011,1,15,14,Saturday,0.510311,0.836739,False,0.02,True,2011-01-15,4857.777185
88083,2011-01-19 13:00:00,1,3441.0,2011,1,19,13,Wednesday,0.510311,0.836739,False,0.02,True,2011-01-19,3591.438719


In [18]:
validation_detrended.reset_index(drop=True, inplace=True)
# Returns a dataframe with predicted anomalies in the 'Anomaly' column
anomalies_baseline = atd2020.detector.BaselineDetector().fit_predict(validation_detrended)
anomalies_baseline.head()

Unnamed: 0,Timestamp,ID,TotalFlow_original,Year,Month,Day,Hour,Weekday,Latitude,Longitude,...,Fraction_Observed,Observed,Date,TotalFlow,mean,std,count,lower_thresh,upper_thresh,Fallback
0,2011-01-01 08:00:00,1,1531.0,2011,1,1,8,Saturday,0.510311,0.836739,...,0.02,True,2011-01-01,1682.995664,2940.146196,674.180771,17,917.603882,4962.68851,False
1,2011-01-04 04:00:00,1,345.0,2011,1,4,4,Tuesday,0.510311,0.836739,...,0.02,True,2011-01-04,496.753393,726.139492,330.526472,19,-265.439923,1717.718908,False
2,2011-01-06 14:00:00,1,4392.0,2011,1,6,14,Thursday,0.510311,0.836739,...,0.02,True,2011-01-06,4543.546751,4085.400645,414.486975,10,2841.939721,5328.861569,False
3,2011-01-15 14:00:00,1,4707.0,2011,1,15,14,Saturday,0.510311,0.836739,...,0.02,True,2011-01-15,4857.777185,4446.508129,282.681089,11,3598.46486,5294.551397,False
4,2011-01-19 13:00:00,1,3441.0,2011,1,19,13,Wednesday,0.510311,0.836739,...,0.02,True,2011-01-19,3591.438719,3869.423685,404.793823,11,2655.042217,5083.805153,False


In [20]:
metrics_baseline = atd2020.metrics.metrics(
    validation_detrended,
    anomalies_baseline,
    groupby="Fraction_Observed",
)
metrics_baseline

Unnamed: 0_level_0,True Positive,False Positive,True Negative,False Negative,Precision,Recall,F1,Accuracy,Support
Fraction_Observed,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
overall,6054,2065,767349,4567,0.745658,0.570003,0.646105,0.991498,10621
0.01,0,2,41584,510,0.0,0.0,0.0,0.987837,510
0.02,149,69,88070,1115,0.683486,0.11788,0.20108,0.986757,1264
0.05,1768,834,232579,1447,0.679477,0.549922,0.607873,0.99036,3215
0.1,4137,1160,405116,1495,0.781008,0.734553,0.757068,0.993554,5632


# GBM 1

In [None]:
import modeling_GBM1

In [None]:
def GBM1(data, data_obs, n_IDs, p_training, p_validation, kernel):
    random.seed(1234)
    # random select IDs from 0 to 399
    sampled_IDs = random.sample(range(400), n_IDs)
    # print(sampled_IDs)
    ################################################### Data prep ###################################################
    # combine the data parallelly on IDs
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-3) # change the # of cpu here
        temp = pool.map(partial(data_prep.data_prep_comb, data_obs_for_combine=data_obs, comb_hours=False, comb_wkd=True,hour_bef_aft=True),sampled_IDs)
        pool.close()
    # save the combined data as data_obs_new
    data_obs_new = pd.DataFrame(columns=["ID", "Date", "Hour", "TotalFlow"])
    for i in temp:
        data_obs_new = pd.concat([data_obs_new, i])
    data_obs_new["Weekday"] = [i.strftime("%A") for i in data_obs_new.Date]

    print("Data prep finished.")
    print("Peek the combined data:")
    print(data_obs_new.head())

    ################################################### Training and validation ###################################################
    # run the fraction-wise model parallelly on each fraction
    fraction_ind = [0.01,0.02,0.05,0.1]
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-4) # change the # of cpu here
        res = pool.map(partial(modeling_GBM2.run_train_fraction2, data=data, data_obs_new=data_obs_new, data_obs=data_obs, kernel=kernel, 
                                sampled_IDs=sampled_IDs, p_training=p_training, p_validation=p_validation),fraction_ind)
        pool.close()
    print("1% training f1:", res[0]['training_F1'], "1% validation f1:", res[0]['validation_F1'])
    print("2% training f1:", res[1]['training_F1'], "2% validation f1:", res[1]['validation_F1'])
    print("5% training f1:", res[2]['training_F1'], "5% validation f1:", res[2]['validation_F1'])
    print("10% training f1:", res[3]['training_F1'], "10% validation f1:", res[3]['validation_F1'])
    # return the models and F1 scores
    return (res)

In [None]:
data_detrended_norm_clean = data_detrended_norm.drop(['Year', 'Month','Day','Latitude','Longitude','Weekday_cat'], axis=1)
data_obs_detrended_norm_clean = data_obs_detrended_norm.drop(['Year', 'Month','Day','Latitude','Longitude','Weekday_cat'], axis=1)
result = GBM1(data=data_detrended_norm_clean, data_obs=data_obs_detrended_norm_clean, n_IDs=400, p_training=0.5, p_validation=0.5,
kernel=kernels[1])

# GBM 2

In [None]:
import modeling_GBM2

In [None]:
def GBM2(data, data_obs, n_IDs, p_training, p_validation, kernel):
    random.seed(1234)
    # random select IDs from 0 to 399
    sampled_IDs = random.sample(range(400), n_IDs)
    # print(sampled_IDs)
    ################################################### Data prep ###################################################
    # combine the data parallelly on IDs
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-3) # change the # of cpu here
        temp = pool.map(partial(data_prep.data_prep_comb, data_obs_for_combine=data_obs, comb_hours=False, comb_wkd=True,hour_bef_aft=True),sampled_IDs)
        pool.close()
    # save the combined data as data_obs_new
    data_obs_new = pd.DataFrame(columns=["ID", "Date", "Hour", "TotalFlow"])
    for i in temp:
        data_obs_new = pd.concat([data_obs_new, i])
    data_obs_new["Weekday"] = [i.strftime("%A") for i in data_obs_new.Date]

    print("Data prep finished.")
    print("Peek the combined data:")
    print(data_obs_new.head())

    ################################################### Training and validation ###################################################
    # run the fraction-wise model parallelly on each fraction
    fraction_ind = [0.01,0.02,0.05,0.1]
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()-4) # change the # of cpu here
        res = pool.map(partial(modeling_GBM1.run_train_fraction2, data=data, data_obs_new=data_obs_new, data_obs=data_obs, kernel=kernel, 
                                sampled_IDs=sampled_IDs, p_training=p_training, p_validation=p_validation),fraction_ind)
        pool.close()
    print("1% training f1:", res[0]['training_F1'], "1% validation f1:", res[0]['validation_F1'])
    print("2% training f1:", res[1]['training_F1'], "2% validation f1:", res[1]['validation_F1'])
    print("5% training f1:", res[2]['training_F1'], "5% validation f1:", res[2]['validation_F1'])
    print("10% training f1:", res[3]['training_F1'], "10% validation f1:", res[3]['validation_F1'])
    # return the models and F1 scores
    return (res)

In [None]:
data_detrended_norm_clean = data_detrended_norm.drop(['Year', 'Month','Day','Latitude','Longitude','Weekday_cat'], axis=1)
data_obs_detrended_norm_clean = data_obs_detrended_norm.drop(['Year', 'Month','Day','Latitude','Longitude','Weekday_cat'], axis=1)
result = GBM2(data=data_detrended_norm_clean, data_obs=data_obs_detrended_norm_clean, n_IDs=400, p_training=0.5, p_validation=0.5,
kernel=kernels[1])