In [None]:
from datetime import datetime
import time
from re import sub
import pandas as pd
import numpy as np
from keras import Sequential
from keras.layers import Bidirectional
from keras.layers import LSTM, Dense, Dropout, GRU
from sklearn.preprocessing import MinMaxScaler
import keras

In [None]:
def mdy_to_ymd(d):
    return time.mktime(datetime.strptime(d, '%d-%b-%y').timetuple())


def threeD_dataset(X, y, time_steps=1):
    Xs, ys = [], []

    for i in range(len(X) - time_steps):
        v = X[i:i + time_steps, :]
        Xs.append(v)
        ys.append(y[i + time_steps])

    return np.array(Xs), np.array(ys)


class Beautify:

    @staticmethod
    def beautify(path):
        dataset = pd.read_csv(path)
        dataset.columns = ["date", "price", "open", "high", "low", "vol", "change"]
        dataset["date"] = dataset.apply(lambda row: mdy_to_ymd(row['date']), axis=1)
        for col in ["price", "open", "high", "low", "vol", "change"]:
            dataset[col] = dataset.apply(
                lambda row: (lambda x: 0 if x == '-' else float(x))(sub(r'[^\d.-]', '', row[col])),
                axis=1)
        dataset = dataset.sort_values(["date"])
        return dataset


pd.set_option('display.float_format', lambda x: '%.0f' % x)

In [None]:
dataset = Beautify.beautify("dataset.csv")
dataset = dataset.reset_index(drop=True)
dataset["next_price"] = dataset["price"].shift(-1, fill_value=0)
dataset.drop(dataset.tail(1).index, inplace=True)


In [None]:
TRAIN = time.mktime(datetime.strptime("20-01-02", '%y-%m-%d').timetuple())
train = dataset[dataset.date < TRAIN]
test = dataset[dataset.date >= TRAIN]
X_train = train.drop('next_price', axis=1)
y_train = train.loc[:, ['next_price']]
X_test = test.drop('next_price', axis=1)
y_test = test.loc[:, ['next_price']]
X_test.drop(["date", "open", "high", "low", "vol", "change"], axis=1, inplace=True)
X_train.drop(["date", "open", "high", "low", "vol", "change"], axis=1, inplace=True)


In [None]:
scaler_x = MinMaxScaler(feature_range=(-1, 1))
scaler_y = MinMaxScaler(feature_range=(-1, 1))
input_scaler = scaler_x.fit(X_train)
output_scaler = scaler_y.fit(y_train)
train_y_norm = output_scaler.transform(y_train)
train_x_norm = input_scaler.transform(X_train)
test_y_norm = output_scaler.transform(y_test)
test_x_norm = input_scaler.transform(X_test)
TIME_STEPS = 10
X_test, y_test = threeD_dataset(test_x_norm, test_y_norm, TIME_STEPS)
X_train, y_train = threeD_dataset(train_x_norm, train_y_norm, TIME_STEPS)

In [None]:
def create_model_bilstm(units):
    model = Sequential()
    model.add(Bidirectional(LSTM(units=units, return_sequences=True),
                            input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(Bidirectional(LSTM(units=units)))
    model.add(Dense(1))
    model.compile(loss='mse', optimizer='adam')
    return model


def create_model(units, m):
    model = Sequential()
    model.add(m(units=units, return_sequences=True,
                input_shape=[X_train.shape[1], X_train.shape[2]]))
    model.add(Dropout(0.2))
    model.add(m(units=units))
    model.add(Dropout(0.2))
    model.add(Dense(units=1))
    model.compile(loss='mse', optimizer='adam')
    return model


model_bilstm = create_model_bilstm(64)
model_gru = create_model(64, GRU)
model_lstm = create_model(64, LSTM)

In [None]:
def fit_model(model):
    early_stop = keras.callbacks.EarlyStopping(monitor='val_loss',
                                               patience=10)
    history = model.fit(X_train, y_train, epochs=100, validation_split=0.2,
                        batch_size=32, shuffle=False, callbacks=[early_stop])
    return history


history_bilstm = fit_model(model_bilstm)
history_lstm = fit_model(model_lstm)
history_gru = fit_model(model_gru)

In [None]:
y_test = scaler_y.inverse_transform(y_test)
y_train = scaler_y.inverse_transform(y_train)


def prediction(model):
    prediction = model.predict(X_test)
    prediction = scaler_y.inverse_transform(prediction)
    return prediction


prediction_bilstm = prediction(model_bilstm)
prediction_lstm = prediction(model_lstm)
prediction_gru = prediction(model_gru)

In [None]:
def evaluate_prediction(pred, real, model_name):
    errors = pred - real
    mse = np.square(errors).mean()
    res = pd.DataFrame({"index": np.arange(len(real)), "real": real.reshape(1, -1)[0], "pred": pred.reshape(1, -1)[0]})
    print(model_name)
    print("Accuracy: ",
          res[(res.pred * 0.95 <= res.real) & (res.pred * 1.05 >= res.real)].shape[0] / res.shape[0] * 100)
    print('Mean Absolute Error: {:.4f}'.format(mse))


evaluate_prediction(prediction_bilstm, y_test, 'Bidirectional LSTM')
evaluate_prediction(prediction_lstm, y_test, 'LSTM')
evaluate_prediction(prediction_gru, y_test, 'GRU')