In [None]:
pip install tensorflow

In [None]:
import numpy as np
import pandas as pd
from pandas import concat
from pandas import DataFrame
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, r2_score
from sklearn.preprocessing import MinMaxScaler
from numpy import concatenate
from math import sqrt
from sklearn.metrics import mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense
from keras.layers import LSTM
import matplotlib.pyplot as plt

In [None]:
#Read dataset
df = pd.read_csv("")   

In [None]:
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
    n_vars = 1 if type(data) is list else data.shape[1]
    df = DataFrame(data)
    cols, names = list(), list()
    # input sequence (t-n, ... t-1)
    for i in range(n_in, 0, -1):
        cols.append(df.shift(i))
        names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
    # forecast sequence (t, t+1, ... t+n)
    for i in range(0, n_out):
        cols.append(df.shift(-i))
        if i == 0:
            names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
        else:
            names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
# put it all together
    agg = concat(cols, axis=1)
    agg.columns = names
    # drop rows with NaN values
    if dropnan:
        agg.dropna(inplace=True)
    return agg

In [None]:
values = df.values
# ensure all data is float
values = values.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
# frame as supervised learning
reframed = series_to_supervised(scaled, 1, 1)
reframed.drop(reframed.columns[[8,9,10,11,12,13]], axis=1, inplace=True)
print(reframed.head())

#Recurrent neural network

In [None]:
def run_rnn_split(values, train_ratio, epochs=10, batch_size=32):
    n_train = int(len(values) * train_ratio)
    train = values[:n_train, :]
    test = values[n_train:, :]
    train_X, train_y = train[:, :-1], train[:, -1]
    test_X, test_y = test[:, :-1], test[:, -1]

    # reshape to 3D
    train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
    test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
    print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
    # build model
    model = Sequential()
    model.add(SimpleRNN(10, input_shape=(train_X.shape[1], train_X.shape[2])))
    model.add(Dense(1))
    model.compile(loss='mae', optimizer='adam')

    # train
    model.fit(
        train_X, train_y,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(test_X, test_y),
        verbose=0,
        shuffle=False
    )

    train_yhat = model.predict(train_X)
    train_X_2d = train_X.reshape(train_X.shape[0], train_X.shape[2])

    inv_train_yhat = scaler.inverse_transform(
    np.c_[train_yhat, train_X_2d[:, 1:]]
    )[:, 0]

    inv_train_y = scaler.inverse_transform(
    np.c_[train_y.reshape(-1, 1), train_X_2d[:, 1:]]
    )[:, 0]

    train_rmse = np.sqrt(mean_squared_error(inv_train_y, inv_train_yhat))
    train_f1 = f1_score(np.rint(inv_train_y), np.rint(inv_train_yhat),average='weighted')
    train_r2   = r2_score(inv_train_y, inv_train_yhat)
# ---------- TEST ----------
    test_yhat = model.predict(test_X)
    test_X_2d = test_X.reshape(test_X.shape[0], test_X.shape[2])

    inv_test_yhat = scaler.inverse_transform(
    np.c_[test_yhat, test_X_2d[:, 1:]])[:, 0]

    inv_test_y = scaler.inverse_transform(
    np.c_[test_y.reshape(-1, 1), test_X_2d[:, 1:]])[:, 0]

    test_rmse = np.sqrt(mean_squared_error(inv_test_y, inv_test_yhat))
    test_f1 = f1_score(np.rint(inv_test_y), np.rint(inv_test_yhat),average='weighted')
    test_r2   = r2_score(inv_test_y, inv_test_yhat)
    
    # transitions (on predicted TEST signal)
    test_present_trans = (np.diff(inv_test_y)!=0).sum()
    x_roun = np.round(inv_test_yhat)
    test_detect_trans=(np.diff(x_roun)!=0).sum()
    
    train_pct = round(train_ratio * 100)
    test_pct  = 100 - train_pct
    print("train_pct",train_pct)
    print("test_pct",test_pct)

    return {
        "Train %": train_pct,
        "Test %": test_pct,
        "Train RMSE": train_rmse,
        "Test RMSE": test_rmse,
        "Train R2": train_r2,
        "Test R2": test_r2,
        "Train F1": train_f1,
        "Test F1": test_f1,
        "Transitions present in Test Set": test_present_trans,
        "Transitions Detected in Test Set": test_detect_trans
    }

In [None]:
results_rnn = []

for r in np.arange(0.1, 1.0, 0.1):
    res = run_rnn_split(reframed.values, train_ratio=r)
    results_rnn.append(res)

results_df_rnn = pd.DataFrame(results_rnn)
results_df_rnn

#Long-short term memory

In [None]:
def run_lstm_split(values, train_ratio, epochs=10, batch_size=32):
    n_train = int(len(values) * train_ratio)
    train = values[:n_train, :]
    test = values[n_train:, :]
    train_X, train_y = train[:, :-1], train[:, -1]
    test_X, test_y = test[:, :-1], test[:, -1]

    # reshape to 3D
    train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
    test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
    print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
    # build model
    model = Sequential()
    model.add(LSTM(10, input_shape=(train_X.shape[1], train_X.shape[2])))
    model.add(Dense(1))
    model.compile(loss='mae', optimizer='adam')

    # train
    model.fit(
        train_X, train_y,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(test_X, test_y),
        verbose=0,
        shuffle=False
    )

    train_yhat = model.predict(train_X)
    train_X_2d = train_X.reshape(train_X.shape[0], train_X.shape[2])

    inv_train_yhat = scaler.inverse_transform(
    np.c_[train_yhat, train_X_2d[:, 1:]]
    )[:, 0]

    inv_train_y = scaler.inverse_transform(
    np.c_[train_y.reshape(-1, 1), train_X_2d[:, 1:]]
    )[:, 0]

    train_rmse = np.sqrt(mean_squared_error(inv_train_y, inv_train_yhat))
    train_f1 = f1_score(np.rint(inv_train_y), np.rint(inv_train_yhat),average='weighted')
    train_r2   = r2_score(inv_train_y, inv_train_yhat)
# ---------- TEST ----------
    test_yhat = model.predict(test_X)
    test_X_2d = test_X.reshape(test_X.shape[0], test_X.shape[2])

    inv_test_yhat = scaler.inverse_transform(
    np.c_[test_yhat, test_X_2d[:, 1:]])[:, 0]

    inv_test_y = scaler.inverse_transform(
    np.c_[test_y.reshape(-1, 1), test_X_2d[:, 1:]])[:, 0]

    test_rmse = np.sqrt(mean_squared_error(inv_test_y, inv_test_yhat))
    test_f1 = f1_score(np.rint(inv_test_y), np.rint(inv_test_yhat),average='weighted')
    test_r2   = r2_score(inv_test_y, inv_test_yhat)
    
    # transitions (on predicted TEST signal)
    test_present_trans = (np.diff(inv_test_y)!=0).sum()
    x_roun = np.round(inv_test_yhat)
    test_detect_trans=(np.diff(x_roun)!=0).sum()
    
    train_pct = round(train_ratio * 100)
    test_pct  = 100 - train_pct
    
    return {
        "Train %": train_pct,
        "Test %": test_pct,
        "Train RMSE": train_rmse,
        "Test RMSE": test_rmse,
        "Train R2": train_r2,
        "Test R2": test_r2,
        "Train F1": train_f1,
        "Test F1": test_f1,
        "Transitions present in Test Set": test_present_trans,
        "Transitions Detected in Test Set": test_detect_trans

    }

In [None]:
results_lstm = []

for r in np.arange(0.1, 1.0, 0.1):
    res = run_lstm_split(reframed.values, train_ratio=r)
    results_lstm.append(res)

results_df_lstm = pd.DataFrame(results_lstm)
results_df_lstm