In [None]:
import datetime as dt
import json
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from dotenv import load_dotenv
load_dotenv(override=True)

PATH = os.environ.get("PATH_RAW_DATA")
TICKER = os.environ.get("TICKER")


In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense
import keras

## Load the data

Already downloaded using src/utils/fetchStockData

In [None]:
PATH_RAW_DATA = f"data/data-BTC.json"
PATH_RAW_DATA_CSV = "data/data-BTC.csv"
print(f"Looking for {TICKER}")

if not os.path.exists(PATH_RAW_DATA_CSV):
  f = open(PATH_RAW_DATA)
  data = json.load(f)
  data = data['Data']["Data"]
  df = pd.DataFrame(columns=['Date','Low','High','Close','Open'])
  for v in data:
    date = dt.datetime.utcfromtimestamp(v["time"]).strftime('%Y-%m-%d')
    data_row = [date,float(v['low']),float(v['high']),
                float(v['close']),float(v['open'])]
    df.loc[-1,:] = data_row
    df.index = df.index + 1
  print('Data saved to : %s'%PATH_RAW_DATA_CSV)        
  df.to_csv(PATH_RAW_DATA_CSV)

# If the data is already there, just load it from the CSV
else:
    print('File already exists. Loading data from CSV')
    df = pd.read_csv(PATH_RAW_DATA_CSV)

## Preprocessing

Preparing the data 

In [None]:
train_test_split = int(len(df.iloc[:])*0.8)
train_data = df.iloc[:train_test_split]
test_data = df.iloc[train_test_split:]

In [None]:
scaler = MinMaxScaler()
train_data = np.array(train_data.loc[:, "Close"]).reshape(-1,1)
test_data = np.array(test_data.loc[:, "Close"]).reshape(-1,1)

In [None]:
# Train the Scaler with training data and smooth data
smoothing_window_size = 365
for di in range(0,int(train_test_split/smoothing_window_size),smoothing_window_size):
    scaler.fit(train_data[di:di+smoothing_window_size,:])
    train_data[di:di+smoothing_window_size,:] = scaler.transform(train_data[di:di+smoothing_window_size,:])

# You normalize the last bit of remaining data
scaler.fit(train_data[di+smoothing_window_size:,:])
train_data[di+smoothing_window_size:,:] = scaler.transform(train_data[di+smoothing_window_size:,:])

In [None]:
# Reshape both train and test data
train_data = train_data.reshape(-1)

# Normalize test data
test_data = scaler.transform(test_data).reshape(-1)

all_close_data = np.concatenate([train_data,test_data],axis=0)

In [None]:
# convert an array of values into a dataset matrix
def create_dataset(dataset, time_step=1):
    dataX, dataY = [], []
    for i in range(len(dataset)-time_step-1):
        a = dataset[i:(i+time_step)]  
        dataX.append(a)
        dataY.append(dataset[i + time_step])
    return np.array(dataX), np.array(dataY)

In [None]:
# split a univariate sequence into samples
def split_sequence(sequence, n_steps_in, n_steps_out):
	X, y = list(), list()
	for i in range(len(sequence)):
		# find the end of this pattern
		end_ix = i + n_steps_in
		out_end_ix = end_ix + n_steps_out
		# check if we are beyond the sequence
		if out_end_ix > len(sequence):
			break
		# gather input and output parts of the pattern
		seq_x, seq_y = sequence[i:end_ix], sequence[end_ix:out_end_ix]
		X.append(seq_x)
		y.append(seq_y)
	return np.array(X), np.array(y)
 
# choose a number of time steps
n_steps_in, n_steps_out = 90, 7
# split into samples
X, y = split_sequence(train_data, n_steps_in, n_steps_out)
# reshape from [samples, timesteps] into [samples, timesteps, features]
n_features = 1
X = X.reshape((X.shape[0], X.shape[1], n_features))
# define model
model = Sequential()
model.add(LSTM(100, activation='tanh', return_sequences=True, input_shape=(n_steps_in, n_features)))
model.add(LSTM(100, activation='tanh'))
model.add(Dense(n_steps_out))
model.compile(optimizer='adam', loss='mse')
# fit model
results = model.fit(X, y, epochs=50, verbose=1)

In [None]:
# demonstrate prediction
x_input = test_data[-97:-7]
x_input = x_input.reshape((1, n_steps_in, n_features))
yhat = model.predict(x_input, verbose=0)

In [None]:
loss = results.history['loss']
epochs = range(len(loss))
plt.plot(epochs, loss, 'r', label='Training loss')
plt.title('Training and validation loss')
plt.legend(loc=0)
plt.figure()
plt.show()

In [None]:
plt.figure(figsize = (18,9))
plt.plot(range(test_data.shape[0]),test_data,color='b',label='True')
plt.plot(range(test_data.shape[0]),np.concatenate([test_data[:-7],yhat.reshape(-1)]),color='orange', label='Prediction')
#plt.xticks(range(0,df.shape[0],50),df['Date'].loc[::50],rotation=45)
plt.xlabel('Date')
plt.ylabel('Close Price')
plt.legend(fontsize=18)
plt.show()