In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from sklearn.metrics import r2_score

In [2]:
rnn_df = pd.read_csv('coin_Bitcoin.csv')

In [3]:
x = rnn_df[['High', 'Low', 'Open']].values
y = rnn_df['Close'].values

In [4]:
scaler_x = StandardScaler()
scaler_y = StandardScaler()

x = scaler_x.fit_transform(x)
y = scaler_y.fit_transform(y.reshape(-1, 1))

In [5]:
train_x, test_x, train_y, test_y = train_test_split(x, y, test_size = 0.2, random_state = 50)

In [6]:
train_x = torch.tensor(train_x, dtype = torch.float32)
train_y = torch.tensor(train_y, dtype = torch.float32)
test_x = torch.tensor(test_x, dtype = torch.float32)

In [7]:
train_x = train_x.unsqueeze(1)
seq_len = train_x[0].shape[0]

In [8]:
class BitCoinDataSet(Dataset):
    def __init__(self, train_x, train_y):
        super(BitCoinDataSet, self).__init__()
        self.data = train_x
        self.labels = train_y

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

In [9]:
hidden_size = 64
num_layers = 2
learning_rate = 0.001
batch_size = 32
epoch_size = 10

In [10]:
train_dataset = BitCoinDataSet(train_x, train_y)
test_dataset = BitCoinDataSet(test_x, test_y)
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle = False)

In [11]:
class RNN(nn.Module):
    def __init__(self, input_feature_size, hidden_size, num_layers):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_feature_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
    
    def forward(self, x):
        x = x.reshape(x.size(0), 1, -1)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        output, _ = self.lstm(x, (h0, c0))
        output = output[:, -1, :]
        output = self.fc(output)
        return output

In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
rnn = RNN(input_feature_size = 3, hidden_size = hidden_size, num_layers = num_layers).to(device)
criteria = nn.MSELoss()
optimizer = torch.optim.Adam(rnn.parameters(), lr = learning_rate)

In [13]:
rnn.train()
for epoch in range(epoch_size):

    loss = 0.0

    for batch_idx, data in enumerate(train_loader):
        inputs, targets = data
        inputs.to(device)
        targets.to(device)

        optimizer.zero_grad()

        outputs = rnn(inputs)
        loss = criteria(outputs, targets)
        loss.backward()
        optimizer.step()

        loss += loss.item()
        if batch_idx % 100 == 99:
            print(f'[{epoch + 1}, {batch_idx + 1:5d}] loss: {loss / 100:.3f}')
            loss = 0.0

print('Finished Training')

Finished Training


In [14]:
prediction = []
ground_truth = []
rnn.eval()
with torch.no_grad():
    for data in test_loader:
        inputs, targets = data
        inputs = inputs.to(device)

        ground_truth += targets.flatten().tolist()
        out = rnn(inputs).detach().cpu().flatten().tolist()
        prediction += out

In [15]:
prediction = scaler_y.inverse_transform(np.array(prediction).reshape(-1, 1))
ground_truth = scaler_y.inverse_transform(np.array(ground_truth).reshape(-1, 1))

In [16]:
r2score = r2_score(prediction,ground_truth)
print(r2score)

0.998452298013993
