In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import os, warnings, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow.keras.layers as L
from tensorflow.keras import Sequential, Model
from sklearn.preprocessing import MinMaxScaler

warnings.filterwarnings('ignore')
pd.set_option('display.float_format', lambda x: '%.2f' % x)

  from .autonotebook import tqdm as notebook_tqdm


In [32]:
data = pd.read_csv("smart_home_dataset.csv", nrows=10000)

In [33]:
#the autoencoder model
class AE(nn.Module):
  #initialise the model, ran before saving the model
    def __init__(self, n_past, n_future, n_features):
        super(AE, self).__init__()
        # the autoencoder LSTM itself
        self.encoder_inputs = tf.keras.layers.Input(shape=(n_past, n_features))
        self.encoder_l1 = tf.keras.layers.LSTM(100,return_sequences = True, return_state=True)
        self.encoder_outputs1 = self.encoder_l1(self.encoder_inputs)
        self.encoder_states1 = self.encoder_outputs1[1:]

        self.encoder_l2 = tf.keras.layers.LSTM(100, return_state=True)
        self.encoder_outputs2 = self.encoder_l2(self.encoder_outputs1[0])
        self.encoder_states2 = self.encoder_outputs2[1:]

        self.decoder_inputs = tf.keras.layers.RepeatVector(n_future)(self.encoder_outputs2[0])

        self.decoder_l1 = tf.keras.layers.LSTM(100, return_sequences=True)(self.decoder_inputs,initial_state = self.encoder_states1)
        self.decoder_l2 = tf.keras.layers.LSTM(100, return_sequences=True)(self.decoder_l1,initial_state = self.encoder_states2)
        self.decoder_outputs2 = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(n_features))(self.decoder_l2)
        self.model = tf.keras.models.Model(self.encoder_inputs,self.decoder_outputs2)

  #train the model, run ONCE, takes in X_train, Y_train, X_val, Y_val, Adam optimizer
  # for dimensions, see data preprocessing
    def train_model(self, X_train, Y_train, X_val, Y_val, optimizer):
        reduce_lr = tf.keras.callbacks.LearningRateScheduler(lambda x: 1e-3 * 0.90 ** x)
        self.model.compile(optimizer=optimizer, loss=tf.keras.losses.Huber(), metrics=['accuracy'])
        self.model.fit(X_train,Y_train,epochs=25,validation_data=(X_val,Y_val),batch_size=128,verbose=2,callbacks=[reduce_lr])

    # tests the model, returns test accuracy
    def test_model(self, X_test, Y_test):
        scores = self.model.evaluate(X_test, Y_test)
        test_accuracy = scores[1]*100
        return test_accuracy

    def test_anomaly(self, X_test, Y_test, threshold):
        scores = self.model.evaluate(X_test, Y_test)
        test_accuracy = scores*100

        print(f'Test Accuracy: {test_accuracy}')
        return 1 if test_accuracy > threshold else 0
    
n_past = 60*24
n_future = 10
columns = ['temperature','relative_humidity','light_switch', 'ultrasonic','pir', 'pressure']
n_features = len(columns)

ae = AE(n_past, n_future, n_features)

In [34]:
d = np.array(data[columns])
df = pd.DataFrame(d, columns = columns)

#scale the feature values to between -1 and 1
data = df
scalers={}
for i in df.columns:
    scaler = MinMaxScaler(feature_range=(-1,1))
    s_s = scaler.fit_transform(data[i].values.reshape(-1,1))
    s_s=np.reshape(s_s,len(s_s))
    scalers['scaler_'+ str(i)] = scaler
    data[i]=s_s

In [35]:
# Define proportions
TEST_PROP = 0.3
VAL_PROP = 0.2

# Calculate data lengths
total_length = len(data)
train_val_length = int(total_length * (1 - TEST_PROP))
train_length = int(train_val_length * (1 - VAL_PROP))

# Split the data into training, validation, and test sets
train = data[:train_length]
val = data[train_length:train_val_length]
test = data[train_val_length:]

In [36]:
def split_series(series, n_past, n_future):
    X, y = [], []
    
    # for each series, a window of (n_past + n_future + 1) is created
    for i in range(len(series) - n_past - n_future + 1):
        X.append(series[i:i + n_past])
        y.append(series[i + n_past:i + n_past + n_future])
        
    return np.array(X), np.array(y)

X_train, Y_train = split_series(train, n_past, n_future)
X_val, Y_val = split_series(val, n_past, n_future)
X_test, Y_test = split_series(test.values, n_past, n_future)

In [37]:
# ae.train_model(X_train, Y_train, X_val, Y_val, optimizer)

# PATH = "autoencoder_trained.pt"
# torch.save(ae, PATH)

In [38]:
PATH = "autoencoder.pt"

model = torch.load(PATH)
model.eval()



AE()

In [40]:
test_accuracy = model.test_model(X_test, Y_test)
print('Test accuracy: ', test_accuracy, '%')

Test accuracy:  97.19535708427429 %
