<a href="https://colab.research.google.com/github/Tyred/TimeSeries_OCC-PUL/blob/main/Notebooks/runners/OC_CAE_UCR_runner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1> One-Class Classification Using Convolutional Autoencoders </h1>

The main idea is to train a CAE (Convolutional Autoencoder) with data from the positive class only and calculate a reconstruction error threshold T based on the reconstruction errors obtained during the training. <br/>
Then we perform the One-Class Classification (OCC) as follows:

- For each data sample in the test dataset, do:
    - Reconstruct the data with the CAE and calculate its reconstruction error E.
    - if E <= T the sample is classified as a member of the positive class.
    - else (if E > T) the sample is classified as not a member of the positive class.  

- Evaluate the Model's Accuracy, Precision and Recall.

## Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score,
                            roc_auc_score)
from tensorflow import keras
from sklearn.metrics import mean_squared_error, mean_absolute_error
%matplotlib inline

# Autoencoders

## Convolutional Using MaxPooling
- Subclassing

In [None]:
# Convolutional Autoencoder with MaxPooling:
class ConvAutoencoder(tf.keras.Model):
    def __init__(self, serie_length): #, serie_length):
        super(ConvAutoencoder, self).__init__()
        self.conv_1  = keras.layers.Conv1D(32, 3, activation='swish', padding='causal') #, input_shape=(serie_length))
        self.max_1   = keras.layers.MaxPooling1D(2)
        self.conv_2  = keras.layers.Conv1D(64, 3, activation='swish', padding='causal')
        self.max_2   = keras.layers.MaxPooling1D(2)
        self.conv_3  = keras.layers.Conv1D(128, 3, activation='swish', padding='causal')
        
        # encoded representation
        self.encoded = keras.layers.MaxPooling1D(2)
        
        # decoder layers
        self.conv_4  = keras.layers.Conv1D(128, 3, activation='swish', padding='causal')
        self.up_1    = keras.layers.UpSampling1D(2)
        self.conv_5  = keras.layers.Conv1D(64, 3, activation='swish', padding='causal')
        self.up_2    = keras.layers.UpSampling1D(2)
        self.conv_6  = keras.layers.Conv1D(32, 3, activation='swish', padding='causal')
        self.up_3    = keras.layers.UpSampling1D(2)
        
        #self.flt     = keras.layers.Flatten()
        
        # decoded output
        self.decoded = keras.layers.Conv1D(1, 3, activation='linear', padding='causal')
        #self.decoded = keras.layers.Dense(serie_length, activation='linear')
        
    def encoder(self, inputs):
        if self.padding != 0:
            inputs = keras.layers.ZeroPadding1D(padding=(8 + 8-self.padding, 0))(inputs)
        x = self.conv_1(inputs)
        x = self.max_1(x)
        x = self.conv_2(x)
        x = self.max_2(x)
        x = self.conv_3(x)
        return self.encoded(x)
    
    def decoder(self, inputs):
        #x = self.encode(inputs)
        x = self.conv_4(inputs)
        x = self.up_1(x)
        x = self.conv_5(x)
        x = self.up_2(x)
        x = self.conv_6(x)
        x = self.up_3(x)
        
        #x = self.flt(x)
        
        if self.padding != 0:
            x = keras.layers.Cropping1D(cropping=(8 + 8-self.padding, 0))(x)
        
        return self.decoded(x)
        
        
    def call(self, inputs):
        self.padding = inputs.shape[1] % 8
        _encoded = self.encoder(inputs)
        _decoded = self.decoder(_encoded)

        return _decoded

    def model(self):
        x = keras.layers.Input(shape=(serie_length, 1))
        return tf.keras.Model(inputs=[x], outputs=self.call(x))

# OCC Functions

In [None]:
def predict(model, data, threshold):
    reconstructions = model(data)
    reconstructions = reconstructions.numpy().squeeze()
    data = data.numpy().squeeze()
    loss = tf.keras.losses.mse(reconstructions, data)
    return tf.math.less_equal(loss, threshold)

def print_stats(predictions, labels):
    print("Accuracy = %.2f"  % (accuracy_score(labels, predictions)  *100) + "%")
    print("Precision = %.2f" % (precision_score(labels, predictions) *100) + "%")
    print("Recall = %.2f"    % (recall_score(labels, predictions)    *100) + "%")
    print("F1-Score = %.2f"  % (f1_score(labels, predictions)        *100) + "%")

# UCR Binary Datasets

In [None]:
datasets = ['Yoga', 'WormsTwoClass', 'Wine', 'Wafer', 'TwoLeadECG', 'Strawberry', 'SemgHandGenderCh2', 
            'BeetleFly', 'BirdChicken', 'Computers', 'DistalPhalanxOutlineCorrect', 'Earthquakes',
            'ECG200', 'ECGFiveDays', 'FordA', 'FordB', 'HandOutlines', 'ItalyPowerDemand', 
            'MiddlePhalanxOutlineCorrect', 'Chinatown', 'FreezerRegularTrain', 'FreezerSmallTrain',
            'GunPointAgeSpan', 'GunPointMaleVersusFemale', 'GunPointOldVersusYoung', 'PowerCons', 'Coffee',
            'Ham', 'Herring', 'Lightning2', 'MoteStrain', 'PhalangesOutlinesCorrect', 'ProximalPhalanxOutlineCorrect',
            'ShapeletSim', 'SonyAIBORobotSurface1', 'SonyAIBORobotSurface2', 'ToeSegmentation1', 'ToeSegmentation2',
            'HouseTwenty']

path = 'drive/My Drive/UFSCar/FAPESP/IC/Data/UCRArchive_2018'

In [None]:
for dataset in datasets:
    tr_data = np.genfromtxt(path + "/" + dataset + "/" + dataset + "_TRAIN.tsv", 
                            delimiter="\t",)
    te_data = np.genfromtxt(path + "/" + dataset + "/" + dataset + "_TEST.tsv", 
                            delimiter="\t",)

    labels = te_data[:, 0]
    #print("Labels:", np.unique(labels))
    unique_labels = np.unique(labels)
    for class_label in unique_labels:
        train_data  = tr_data[tr_data[:, 0] == class_label, 1:] # train
        test_data   = te_data[:, 1:]                            # test
        #print("Train data shape:", train_data.shape)
        #print("Test data shape:", test_data.shape)

        train_data = tf.cast(train_data, tf.float32)
        test_data  = tf.cast(test_data , tf.float32)

        train_data = tf.expand_dims(train_data, axis=-1)
        test_data  = tf.expand_dims(test_data , axis=-1)

        serie_length = train_data.shape[1]

        occ_labels = [1 if x == class_label else 0 for x in labels]
        #print("Positive samples:", occ_labels.count(1))
        #print("Negative samples:", occ_labels.count(0))
        
        # Labeling for OCC
        positive_test_data = test_data[np.array(occ_labels).astype(bool)]
        negative_test_data = test_data[~np.array(occ_labels).astype(bool)]

        # Autoencoder
        autoencoder = ConvAutoencoder(train_data.shape[1]) 
        autoencoder.compile(optimizer='adam', loss='mse') 

        # Train
        batch_size = 32
        epochs = 200

        my_callbacks = [
            tf.keras.callbacks.EarlyStopping(patience=5, monitor='val_loss')
        ]

        history = autoencoder.fit(train_data, train_data, 
                epochs=epochs, 
                batch_size=batch_size,
                validation_data=(test_data, test_data),
                callbacks=my_callbacks,
                verbose=0)

        # OCC Task
        reconstructions = autoencoder.predict(train_data).squeeze()
        train_loss = tf.keras.losses.mse(reconstructions, train_data.numpy().squeeze())
        
        sigma_list = [2] # chosen empirically
        mean = np.mean(train_loss)
        stddev = np.std(train_loss)

        for sigma in sigma_list:
            threshold = mean + sigma*stddev
            print("Dataset:", dataset, "Positive Label:", class_label)
            preds = predict(autoencoder, test_data, threshold)
            print_stats(preds, occ_labels)
            print()