In [35]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader

import lightning as L

import pandas as pd
import numpy as np

from sklearn.preprocessing import StandardScaler, MinMaxScaler

In [36]:
device = torch.device('mps:0' if torch.backends.mps.is_available() else 'cpu')
print(f"{device} 사용 가능합니다")

data = pd.read_csv('./NVDA_110721_Final_1_fdr.csv', index_col='Date', usecols=['Close', 'Date'])
data.Close.to_numpy()

mps:0 사용 가능합니다


array([  3.675   ,   3.75    ,   3.69    , ..., 181.610001, 187.797501,
       186.119995])

In [40]:
class LSTMbyHand(nn.Module):

    def __init__(self):
        super().__init__()

        mean = torch.tensor(0.0)
        std = torch.tensor(1.0)

        self.wlr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wlr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.blr1 = nn.Parameter(torch.tensor(0.), requires_grad=True)
        
        self.wpr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wpr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bpr1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

        self.wp1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wp2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bp1 = nn.Parameter(torch.tensor(0.), requires_grad=True)
        
        self.wo1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wo2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bo1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

        self.final_bias = nn.Parameter(torch.tensor(1.), requires_grad=True)
        
    def lstm_unit(self, input_value, long_memory, short_memory):
        long_remember_percent = torch.sigmoid((short_memory * self.wlr1) + 
                                              (input_value * self.wlr2) +
                                              self.blr1)
        
        potential_remember_percent = torch.sigmoid((short_memory * self.wpr1) + 
                                              (input_value * self.wpr2) +
                                              self.bpr1)
        
        potential_memory = torch.tanh((short_memory * self.wp1) +
                                      (input_value * self.wp2) +
                                      self.bp1)
        
        updated_long_memory = ((long_memory * long_remember_percent) +
                               (potential_remember_percent * potential_memory))

        output_percent = torch.sigmoid((short_memory * self.wo1 +
                                        (input_value * self.wo2 +
                                         self.bo1)))
        
        updated_short_memory = updated_long_memory * output_percent + self.final_bias

        return ([updated_long_memory, updated_short_memory])

    def forward(self, input):
        long_memory = 0
        short_memory = 0
        
        for i in range(len(data) - 1):
            temp_day = data.Close.to_numpy()[i]
            long_memory, short_memory = self.lstm_unit(temp_day, long_memory, short_memory)

        print(short_memory)
        return short_memory
    
    def configure_optimizers(self):
        return Adam(self.parameters())
    
    def training_step(self, batch, batch_id):
        input, label = batch
        output = self.forward(input[0])
        loss = (output - label) ** 2

        self.log("train_loss", loss)
        self.log("out", output)

        return loss

In [38]:
ms_x, ms_y = MinMaxScaler(), MinMaxScaler()
ss = StandardScaler()

split = len(data) - 1
X_train = data[:split]
X_test = data[split:]
y_train = data[['Close']][:split]
y_test = data[['Close']][split:]

ms_x.fit(X_train)
X_train = ms_x.transform(X_train)
X_test = ms_x.transform(X_test)

ms_y.fit(y_train)
y_train = ms_y.transform(y_train)
y_test = ms_y.transform(y_test)

In [41]:
model = LSTMbyHand()
print("Observed = 0, Predicted=", model(torch.tensor(X_train)).detach())

tensor(2., grad_fn=<AddBackward0>)
Observed = 0, Predicted= tensor(2.)
