In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
from torch.autograd import Variable
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm, trange
import time

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
df = pd.read_csv('btc.csv')
df=df.dropna()
df['Date']=pd.to_datetime(df['Date'], format='%d/%m/%Y') # read the date in datetime format

dates=df['Date'].values
close_prices=df['Close'].values

# Plot the figure of closed prices
fig=plt.figure(figsize=(25,5))
ax=fig.add_subplot(111)
ax.title.set_text('Bitcoin Close Prices')
ax.plot(dates, close_prices)
#ax.legend()
fig.show()

In [None]:
close_prices=close_prices.reshape(-1, 1)
close_prices.shape

In [None]:
class LSTM(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers, window_size):
        super(LSTM, self).__init__()
        self.num_classes = num_classes
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.window_size= window_size
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))  
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        _placeholder, (h_out, _) = self.lstm(x, (h_0, c_0)) 
        h_out = h_out.view(-1, self.hidden_size)
        out = self.fc(h_out)
        return out

In [None]:
scaler = MinMaxScaler(feature_range=(-1, 1)) # normalize to (-1, 1)
# hidden size: 32, 64, 128, 256
lstm = LSTM(num_classes=1, input_size=1, hidden_size=32, num_layers=1, window_size=3)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(lstm.parameters(), lr=0.03)
#optimizer = torch.optim.SGD(lstm.parameters(), lr=learning_rate)

In [None]:
def window_data(data, window_size):
    X=[]
    y=[]

    i=0
    while(i+window_size)<len(data):
        X.append(data[i:i+window_size])
        y.append(data[i+window_size])
        i+=1
    assert len(X)==len(y)
    return X, y

In [None]:
def make_data(data, ratio, window_size):
    training_data = scaler.fit_transform(data)
    x, y = window_data(training_data, window_size)
    train_size = int(len(y) * ratio)
    #test_size = len(y) - train_size
    # all data
    X = Variable(torch.Tensor(np.array(x))) 
    Y = Variable(torch.Tensor(np.array(y)))
    # train data
    trainX = Variable(torch.Tensor(np.array(x[0:train_size])))
    trainY = Variable(torch.Tensor(np.array(y[0:train_size])))
    # test data
    testX = Variable(torch.Tensor(np.array(x[train_size:len(x)])))
    testY = Variable(torch.Tensor(np.array(y[train_size:len(y)])))
    return X, Y, trainX, trainY, testX, testY, train_size

In [None]:
def train(lstm, trainX, trainY, nums_epoches=1000): 
    train_loss=[] 
    lstm.train()
    for epoch in trange(nums_epoches):
        outputs = lstm(trainX)
        optimizer.zero_grad()
        loss = criterion(outputs, trainY)
        loss=torch.sqrt(loss)
        train_loss.append(loss.detach().numpy())
        loss.backward()
        optimizer.step()

        if (epoch+1) % 100 == 0:
            print(f"Epoch: %d, loss: %1.5f" % (epoch+1, loss.item()))

    fig=plt.figure(figsize=(25,15))
    ax=fig.add_subplot(111)
    ax.title.set_text('Training loss')
    ax.plot(train_loss)
    fig.show()

In [None]:
def test(X, Y, train_size):
    lstm.eval()
    train_predict = lstm(X)
    _predict = train_predict.data.numpy()
    Y_plot = Y.data.numpy()

    _predict = scaler.inverse_transform(_predict)
    Y_plot = scaler.inverse_transform(Y_plot)

    # Plot the figure of closed prices
    fig=plt.figure(figsize=(25,10))
    ax=fig.add_subplot(111)
    ax.title.set_text('Bitcoin Close Prices')
    ax.axvline(x=train_size, c='r', linestyle='--')
    ax.plot(Y_plot, label='Groundtruth')
    ax.plot(_predict, label='Prediction')
    ax.legend()
    fig.show()
    
    # plt.axvline(x=train_size, c='r', linestyle='--')

    # plt.plot(Y_plot)
    # plt.plot(_predict)
    # plt.suptitle('Closed Prices')
    # plt.show()

In [None]:
X, Y, trainX, trainY, testX, testY, train_size = make_data(data=close_prices, ratio=0.9, window_size=3)

In [None]:
train(lstm=lstm, trainX=trainX, trainY=trainY, nums_epoches=1000)

In [None]:
test(X, Y, train_size)