In [8]:
# basic
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import h5py

# feature extraction
from pyts.transformation import BagOfPatterns, BOSS, WEASEL

# classifiers
from knnClassifier import kNNClassifier
from logisticRegression import LogisticRegression as logisticRegression

# sklearn
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix, precision_score, recall_score
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectKBest, VarianceThreshold
from sklearn.feature_selection import chi2, f_classif
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA


# utils

In [2]:
def custom_f1_score(actual_y, pred_y):
    tp = np.sum((actual_y + pred_y) == 2)
    fp = np.sum((actual_y - pred_y) == -1)
    fn = np.sum((actual_y - pred_y) == 1)

    print("True Positives (tp):", tp)
    print("False Positives (fp):", fp)
    print("False Negatives (fn):", fn)

    try:
        precision = tp / (tp + fp)
        recall = tp / (tp + fn)
    except Exception as e:
        precision = 0.
        recall = 0.

    print("precision: ", precision)
    print("recall: ", recall)

    try:
        f1 =  2 * precision * recall / (precision + recall)
    except Exception as e:
        f1 = 0.
    return f1


# data

In [3]:
path = 'D:/Documents-D/Downloads/'

# train data
with h5py.File(path + 'train.h5', 'r') as f:
   print(f.keys())
   x_train_raw = f['x']
   y_train_raw = f['y']

   x_train = np.array(x_train_raw)
   y_train = np.array(y_train_raw)

# test data
with h5py.File(path + 'test.h5', 'r') as f:
   print(f.keys())
   x_test_raw = f['x']

   x_test_ = np.array(x_test_raw)

<KeysViewHDF5 ['x', 'y']>
<KeysViewHDF5 ['x']>


In [4]:
x_train = x_train.reshape(x_train.shape[0], x_train.shape[2])
x_test_ = x_test_.reshape(x_test_.shape[0], x_test_.shape[2])
y_train = y_train.reshape(y_train.shape[0], 1)

x_train, x_test, y_train, y_test = train_test_split(
    x_train, y_train, test_size=0.3, random_state=42
)

In [26]:
def tune_bop(window_sizes, word_sizes, feature_selection_strategy, k_best, k_neighbors, x_train, x_test, y_train, y_test, filename):
    y_train = y_train.flatten()
    y_test = y_test.flatten()
    data = []
    for window_size in window_sizes:
        for word_size in word_sizes:
            bop = BagOfPatterns(
                window_size=window_size,
                word_size=word_size,
                n_bins=4,
                strategy='quantile'
            )

            x_train_bop_raw = bop.fit_transform(x_train).toarray()
            x_test_bop_raw = bop.transform(x_test).toarray()

            if feature_selection_strategy == 'pca':
                scaler = StandardScaler()
                x_train_bop_std = scaler.fit_transform(x_train_bop_raw)
                x_test_bop_std = scaler.transform(x_test_bop_raw)

                for n_comp in k_best:
                    pca = PCA(n_components=n_comp)
                    x_train_bop = pca.fit_transform(x_train_bop_std)
                    x_test_bop = pca.transform(x_test_bop_std)

                    knn = kNNClassifier(d=n_comp)
                    knn.insert_data(x_train_bop, y_train)

                    for kn in k_neighbors:
                        y_pred = knn.predict(x_test_bop, k=kn).flatten()
                        f1 = f1_score(y_test, y_pred, zero_division=0.)
                        accuracy = accuracy_score(y_test, y_pred)
                        precision = precision_score(y_test, y_pred, zero_division=0.)
                        recall = recall_score(y_test, y_pred, zero_division=0.)

                        row = {
                            'window_size': window_size,
                            'word_size': word_size,
                            'feature_selection_strategy': feature_selection_strategy,
                            'k_best': n_comp,
                            'k_neighbors': kn,
                            'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
                        }
                        data.append(row)
            elif feature_selection_strategy == 'kbest':
                for k in k_best:
                    if k >= x_train.shape[1]:
                        continue
                    selector = SelectKBest(chi2, k=k)
                    x_train_bop = selector.fit_transform(x_train_bop_raw, y_train)
                    x_test_bop = selector.transform(x_test_bop_raw)

                    # print("k: ", k)
                    # print("x_train_bop.shape: ", x_train_bop.shape)
                    knn = kNNClassifier(d=k)
                    knn.insert_data(x_train_bop, y_train)
                    for kn in k_neighbors:
                        y_pred = knn.predict(x_test_bop, k=kn).flatten()
                        f1 = f1_score(y_test, y_pred, zero_division=0.)
                        accuracy = accuracy_score(y_test, y_pred)
                        precision = precision_score(y_test, y_pred, zero_division=0.)
                        recall = precision_score(y_test, y_pred, zero_division=0.)

                        row = {
                            'window_size': window_size,
                            'word_size': word_size,
                            'feature_selection_strategy': feature_selection_strategy,
                            'k_best': k,
                            'k_neighbors': kn,
                            'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
                        }
                        data.append(row)
            else: # no feature selection
                if x_train_bop.shape[1] <= 100:
                    knn = kNNClassifier(d=x_train_bop_raw.shape[1])
                    knn.insert_data(x_train_bop_raw, y_train)

                    for kn in k_neighbors:
                        y_pred = knn.predict(x_test_bop_raw, k=kn).flatten()
                        f1 = f1_score(y_test, y_pred, zero_division=0.)
                        accuracy = accuracy_score(y_test, y_pred)
                        precision = precision_score(y_test, y_pred, zero_division=0.)
                        recall = precision_score(y_test, y_pred, zero_division=0.)

                        row = {
                            'window_size': window_size,
                            'word_size': word_size,
                            'feature_selection_strategy': feature_selection_strategy,
                            'k_best': k,
                            'k_neighbors': kn,
                            'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
                        }
                        data.append(row)
    df = pd.DataFrame.from_records(data)
    df.to_csv(filename, index=False)

In [None]:
# K BEST BOP
window_sizes=[20, 30, 40, 50]
word_sizes=[0.25, 0.33, 0.5, 0.66]
k_best=[5, 10, 20, 40, 50, 80, 100]
k_neighbors=[5, 15]
feature_selection_strategy='kbest'
filename=f'bop_{feature_selection_strategy}.csv'
tune_bop(window_sizes, word_sizes, feature_selection_strategy, k_best, k_neighbors, x_train, x_test, y_train, y_test, filename)

In [21]:
df = pd.read_csv(filename)
df[df['f1']>=0.5]

Unnamed: 0,window_size,word_size,feature_selection_strategy,k_best,k_neighbors,f1,accuracy,precision,recall
106,30,0.66,kbest,50,5,0.5,0.806452,0.75,0.75
114,40,0.25,kbest,10,5,0.5,0.774194,0.583333,0.583333


In [29]:
# PCA BOP
window_sizes=[20, 30, 40, 50]
word_sizes=[0.2, 0.25, 0.33, 0.5, 0.66]
k_best=[2, 3, 4]
k_neighbors=[5, 15]
feature_selection_strategy='pca'
filename=f'bop_{feature_selection_strategy}2.csv'
tune_bop(window_sizes, word_sizes, feature_selection_strategy, k_best, k_neighbors, x_train, x_test, y_train, y_test, filename)


In [32]:
df = pd.read_csv('bop_pca2.csv')
df[df['f1'] >= 0.5]

Unnamed: 0,window_size,word_size,feature_selection_strategy,k_best,k_neighbors,f1,accuracy,precision,recall
38,30,0.25,pca,3,5,0.5,0.709677,0.45,0.5625


# weasel

In [63]:
def tune_bop(window_sizes, word_sizes, feature_selection_strategy, k_best, k_neighbors, x_train, x_test, y_train, y_test, filename):
    y_train = y_train.flatten()
    y_test = y_test.flatten()
    data = []
    for window_size in window_sizes:
        for word_size in word_sizes:
            if word_size >= window_size[0]:
                continue
            bop = WEASEL(
                window_sizes=np.arange(window_size[0], window_size[1]),
                word_size=word_size
            )

            x_train_bop_raw = bop.fit_transform(x_train, y_train).toarray()
            x_test_bop_raw = bop.transform(x_test).toarray()

            if feature_selection_strategy == 'pca':
                scaler = StandardScaler()
                x_train_bop_std = scaler.fit_transform(x_train_bop_raw)
                x_test_bop_std = scaler.transform(x_test_bop_raw)

                for n_comp in k_best:
                    pca = PCA(n_components=n_comp)
                    x_train_bop = pca.fit_transform(x_train_bop_std)
                    x_test_bop = pca.transform(x_test_bop_std)

                    knn = kNNClassifier(d=n_comp)
                    knn.insert_data(x_train_bop, y_train)

                    for kn in k_neighbors:
                        y_pred = knn.predict(x_test_bop, k=kn).flatten()
                        f1 = f1_score(y_test, y_pred, zero_division=0.)
                        accuracy = accuracy_score(y_test, y_pred)
                        precision = precision_score(y_test, y_pred, zero_division=0.)
                        recall = recall_score(y_test, y_pred, zero_division=0.)

                        row = {
                            'window_size': window_size,
                            'word_size': word_size,
                            'feature_selection_strategy': feature_selection_strategy,
                            'k_best': n_comp,
                            'k_neighbors': kn,
                            'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
                        }
                        data.append(row)
            elif feature_selection_strategy == 'kbest':
                for k in k_best:
                    if k >= x_train.shape[1]:
                        continue
                    selector = SelectKBest(f_classif, k=k)
                    x_train_bop = selector.fit_transform(x_train_bop_raw, y_train)
                    x_test_bop = selector.transform(x_test_bop_raw)

                    # print("k: ", k)
                    # print("x_train_bop.shape: ", x_train_bop.shape)
                    knn = kNNClassifier(d=k)
                    knn.insert_data(x_train_bop, y_train)
                    for kn in k_neighbors:
                        y_pred = knn.predict(x_test_bop, k=kn).flatten()
                        f1 = f1_score(y_test, y_pred, zero_division=0.)
                        accuracy = accuracy_score(y_test, y_pred)
                        precision = precision_score(y_test, y_pred, zero_division=0.)
                        recall = recall_score(y_test, y_pred, zero_division=0.)

                        row = {
                            'window_size': window_size,
                            'word_size': word_size,
                            'feature_selection_strategy': feature_selection_strategy,
                            'k_best': k,
                            'k_neighbors': kn,
                            'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
                        }
                        data.append(row)
            else: # no feature selection
                if x_train_bop.shape[1] <= 100:
                    knn = kNNClassifier(d=x_train_bop_raw.shape[1])
                    knn.insert_data(x_train_bop_raw, y_train)

                    for kn in k_neighbors:
                        y_pred = knn.predict(x_test_bop_raw, k=kn).flatten()
                        f1 = f1_score(y_test, y_pred, zero_division=0.)
                        accuracy = accuracy_score(y_test, y_pred)
                        precision = precision_score(y_test, y_pred, zero_division=0.)
                        recall = recall_score(y_test, y_pred, zero_division=0.)

                        row = {
                            'window_size': window_size,
                            'word_size': word_size,
                            'feature_selection_strategy': feature_selection_strategy,
                            'k_best': k,
                            'k_neighbors': kn,
                            'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
                        }
                        data.append(row)
    df = pd.DataFrame.from_records(data)
    df.to_csv(filename, index=False)

In [36]:
word_sizes = [4, 5, 10]
window_sizes = [(5, 20), (5, 40), (20, 50), (10, 50), (20, 40), (20, 50)]
k_neighbors = [5, 15]
k_best = [2, 3, 4, 5, 10, 15, 20, 30, 50, 75, 100]
feature_selection_strategy='pca'
filename=f'weasel_{feature_selection_strategy}.csv'
tune_bop(window_sizes, word_sizes, feature_selection_strategy, k_best, k_neighbors, x_train, x_test, y_train, y_test, filename)

In [40]:
df = pd.read_csv(filename)
df[df['f1'] >= 0.53]

Unnamed: 0,window_size,word_size,feature_selection_strategy,k_best,k_neighbors,f1,accuracy,precision,recall
30,"(5, 40)",4,pca,10,5,0.538462,0.806452,0.7,0.4375
132,"(10, 50)",5,pca,2,5,0.538462,0.806452,0.7,0.4375
135,"(10, 50)",5,pca,3,15,0.533333,0.774194,0.571429,0.5
155,"(20, 40)",4,pca,2,15,0.533333,0.774194,0.571429,0.5
229,"(20, 50)",4,pca,10,15,0.56,0.822581,0.777778,0.4375


In [64]:
word_sizes = [4, 5, 10]
window_sizes = [(5, 20), (5, 40), (20, 50), (10, 50), (20, 40), (20, 50)]
k_neighbors = [5, 15]
k_best = [2, 3, 4, 5, 10, 15, 20, 30, 50, 75, 100]
feature_selection_strategy='kbest'
filename=f'weasel_{feature_selection_strategy}3.csv'
tune_bop(window_sizes, word_sizes, feature_selection_strategy, k_best, k_neighbors, x_train, x_test, y_train, y_test, filename)

In [46]:
df = pd.read_csv(filename)
df[df['f1'] >= 0.55]

Unnamed: 0,window_size,word_size,feature_selection_strategy,k_best,k_neighbors,f1,accuracy,precision,recall
89,"(20, 50)",10,kbest,2,15,0.571429,0.758065,0.526316,0.526316
105,"(20, 50)",10,kbest,50,15,0.5625,0.774194,0.5625,0.5625
199,"(20, 40)",10,kbest,2,15,0.571429,0.758065,0.526316,0.526316
265,"(20, 50)",10,kbest,2,15,0.571429,0.758065,0.526316,0.526316
281,"(20, 50)",10,kbest,50,15,0.5625,0.774194,0.5625,0.5625


# tsfel

In [47]:
import tsfel

In [59]:
def tsfel_pca(n_components, k_neighbors, x_train, x_test, y_train, y_test, filename):
    y_train = y_train.flatten()
    y_test = y_test.flatten()
    data = []

    cfg = tsfel.get_features_by_domain('temporal')

    features_list = []
    for i in range(len(x_train)):
        time_series = x_train[i]
        features = tsfel.time_series_features_extractor(cfg, time_series, fs=100)
        features_list.append(features)

    features = pd.concat(features_list)
    x_train = features.to_numpy()

    features_list_test = []
    for i in range(len(x_test)):
        time_series = x_test[i]
        features = tsfel.time_series_features_extractor(cfg, time_series, fs=100)
        features_list_test.append(features)

    features_test = pd.concat(features_list_test)
    x_test = features_test.to_numpy()

    scaler = StandardScaler()
    x_train = scaler.fit_transform(x_train)
    x_test = scaler.fit_transform(x_test)

    for n_comp in n_components:
        if n_comp >= x_train.shape[1]:
            continue
        pca = PCA(n_components=n_comp)
        x_train_pca = pca.fit_transform(x_train)
        x_test_pca = pca.transform(x_test)

        knn = kNNClassifier(d=n_comp)
        knn.insert_data(x_train_pca, y_train)

        for k in k_neighbors:
            y_pred = knn.predict(x_test_pca,k = k).flatten()

            f1 = f1_score(y_test, y_pred, zero_division=0.)
            accuracy = accuracy_score(y_test, y_pred)
            precision = precision_score(y_test, y_pred, zero_division=0.)
            recall = recall_score(y_test, y_pred, zero_division=0.)

            row = {
                'feature_selection_strategy': 'pca',
                'k_best': n_comp,
                'k_neighbors': k,
                'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
            }
            data.append(row)
    df = pd.DataFrame.from_records(data)
    df.to_csv(filename, index=False)

In [None]:

n_components = [2, 3, 4, 5, 10, 15, 20, 30]
k_neighbors = [5, 10, 15]
filename = 'tsfel_temporal_pca.csv'
tsfel_pca(n_components, k_neighbors, x_train, x_test, y_train, y_test, filename)


In [53]:
df = pd.read_csv('tsfel_pca.csv')
df[df['f1'] >= 0.6]

Unnamed: 0,feature_selection_strategy,k_best,k_neighbors,f1,accuracy,precision,recall
5,pca,3,15,0.606061,0.790323,0.588235,0.625


In [66]:
def knn_tsfel_pca(x_train, x_test, y_train, y_test, n_comp, k_neighbor):
    y_train = y_train.flatten()
    y_test = y_test.flatten()
    
    # feature extraction
    cfg = tsfel.get_features_by_domain()

    features_list = []
    for i in range(len(x_train)):
        time_series = x_train[i]
        features = tsfel.time_series_features_extractor(cfg, time_series, fs=100)
        features_list.append(features)

    features = pd.concat(features_list)
    x_train = features.to_numpy()

    features_list_test = []
    for i in range(len(x_test)):
        time_series = x_test[i]
        features = tsfel.time_series_features_extractor(cfg, time_series, fs=100)
        features_list_test.append(features)

    features_test = pd.concat(features_list_test)
    x_test = features_test.to_numpy()

    # standardize before pca
    scaler = StandardScaler()
    x_train = scaler.fit_transform(x_train)
    x_test = scaler.fit_transform(x_test)

    # pca
    pca = PCA(n_components=n_comp)
    x_train_pca = pca.fit_transform(x_train)
    x_test_pca = pca.transform(x_test)

    # train knn
    knn = kNNClassifier(d=n_comp)
    knn.insert_data(x_train_pca, y_train)

    y_pred = knn.predict(x_test_pca,k = k_neighbor).flatten()

    # scores
    f1 = f1_score(y_test, y_pred, zero_division=0.)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, zero_division=0.)
    recall = recall_score(y_test, y_pred, zero_division=0.)

    row = {
        'feature_selection_strategy': 'pca',
        'k_best': n_comp,
        'k_neighbors': k_neighbor,
        'f1': f1, 'accuracy': accuracy, 'precision': precision, 'recall': recall
    }
    return row

In [None]:
result = knn_tsfel_pca (x_train, x_test, y_train, y_test, 3, 15)


In [68]:
print(result)

{'feature_selection_strategy': 'pca', 'k_best': 3, 'k_neighbors': 15, 'f1': 0.6060606060606061, 'accuracy': 0.7903225806451613, 'precision': 0.5882352941176471, 'recall': 0.625}


In [56]:
df = pd.read_csv('tsfel_statistical_pca.csv')
df[df['f1'] >= 0.6]

Unnamed: 0,feature_selection_strategy,k_best,k_neighbors,f1,accuracy,precision,recall


In [62]:
df = pd.read_csv('tsfel_temporal_pca.csv')
df[df['f1'] >= 0.5]

Unnamed: 0,feature_selection_strategy,k_best,k_neighbors,f1,accuracy,precision,recall
13,pca,10,10,0.518519,0.790323,0.636364,0.4375
