# Calling and Processing Data

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import numpy as np

In [None]:
aapl = pd.read_csv('AAPL.csv')
aapl

In [None]:
aapl = aapl[(aapl['Date']>= '2013-12-31') & (aapl['Date']<='2015-12-31')]
aapl = aapl.copy()
aapl

In [None]:
aapl['Date'] = pd.to_datetime(aapl['Date'])
aapl['Prev Adj'] = aapl['Adj Close'].shift(1)
aapl['prev close'] = aapl['Close'].shift(1)
aapl['Change'] = (aapl['Close'] - aapl['prev close'])/aapl['prev close']*100
aapl['Sector'] = 'Consumer Goods'
print(aapl)

In [None]:
aapl['movement'] = aapl.apply(
    lambda row: 1 if row['Adj Close'] > row['Prev Adj'] else 0,
    axis = 1
)

In [None]:
aapl = aapl.iloc[1:].copy()
aapl.drop(columns=['Prev Adj', 'prev close'], errors='ignore', inplace=True)


In [None]:
aapl

In [None]:
data = aapl.copy()

# 변수 정규화
scaler = StandardScaler()
data[['Open', 'High', 'Low', 'Volume', 'Adj Close','Close']] = scaler.fit_transform(data[['Open', 'High', 'Low', 'Volume', 'Adj Close','Close']])

# 학습 세트: 2014년 1월 1일 ~ 2015년 8월 1일
train_data = data[(data['Date'] >= '2014-01-01') & (data['Date'] < '2015-10-01')]

# 개발 세트: 2015년 8월 1일 ~ 2015년 10월 1일
# valid_data = data[(data['Date'] >= '2015-08-01') & (data['Date'] < '2015-10-01')]

# 테스트 세트: 2015년 10월 1일 ~ 2016년 1월 1일
test_data = data[(data['Date'] >= '2015-10-01') & (data['Date'] < '2016-01-01')]

class StockDataset(Dataset):
    def __init__(self, data, seq_length):
        self.data = data
        self.seq_length = seq_length
        self.features = data[['Open','High','Low','Volume','Adj Close','Close']].values
        self.labels = data['movement'].values

    def __len__(self):
        return len(self.data) - self.seq_length   #데이터 길이 반환

    def __getitem__(self, idx):
        x = self.features[idx:idx+self.seq_length]  #idx부터 seq만큼의 피처 데이터
        y = self.labels[idx+self.seq_length] #seq 다음 날의 레이블 데이터
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)


seq_length = 5 #5일치로 하루 예측

# train_data, test_data = train_test_split(data, test_size=0.2,shuffle=False)

train_dataset = StockDataset(train_data, seq_length)
test_dataset = StockDataset(test_data, seq_length)
# valid_dataset = StockDataset(valid_data, seq_length)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# valid_dataset = DataLoader(valid_dataset, batch_size=32, shuffle=False)



In [None]:
sentiment = pd.read_csv('allsenti_AAPL.csv')
sentiment

In [None]:
sentiment['filename'] = pd.to_datetime(sentiment['filename'])
filtered_sentiment = sentiment[sentiment['filename'].isin(aapl['Date'])]
filtered_sentiment

In [None]:
sentiment_data = filtered_sentiment.copy()

data = aapl.copy()

# 변수 정규화
scaler = StandardScaler()
data[['Open', 'High', 'Low', 'Volume', 'Adj Close','Close']] = scaler.fit_transform(data[['Open', 'High', 'Low', 'Volume', 'Adj Close','Close']])

# 학습 세트: 2014년 1월 1일 ~ 2015년 8월 1일
train_data = data[(data['Date'] >= '2014-01-01') & (data['Date'] < '2015-10-01')]
train_sentiment = sentiment_data[(sentiment_data['filename'] >= '2014-01-01') & (sentiment_data['filename'] < '2015-10-01')]

train_data.reset_index(drop=True, inplace=True)
train_sentiment.reset_index(drop=True, inplace=True)


# 테스트 세트: 2015년 10월 1일 ~ 2016년 1월 1일
test_data = data[(data['Date'] >= '2015-10-01') & (data['Date'] < '2016-01-01')]
test_sentiment = sentiment_data[(sentiment_data['filename'] >= '2015-10-01') & (sentiment_data['filename'] < '2016-01-01')]

test_data.reset_index(drop=True, inplace=True)
test_sentiment.reset_index(drop=True, inplace=True)

class AddStockDataset(Dataset):
    def __init__(self, data, seq_length, sentiment_data):
        self.data = data
        self.seq_length = seq_length
        self.sentiment_data = sentiment_data
        self.features = data[['Open','High','Low','Volume','Adj Close','Close']].values
        self.labels = data['movement'].values
        self.sentiment_data = sentiment_data[['positive','neutral','negative']].values

    def __len__(self):
        return len(self.data) - self.seq_length   #데이터 길이 반환

    def __getitem__(self, idx):
        x = self.features[idx:idx+self.seq_length]  #idx부터 seq만큼의 피처 데이터
        y = self.labels[idx+self.seq_length] #seq 다음 날의 레이블 데이터
        sentiment = self.sentiment_data[idx+self.seq_length] #해당 인덱스의 감정 분석 결과

        # print(x.shape)
        # print(sentiment.shape)

        x = torch.tensor(x, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.long)
        sentiment = torch.tensor(sentiment, dtype=torch.float32)

        return x,y,sentiment


seq_length = 5 #5일치로 하루 예측

# train_data, test_data = train_test_split(data, test_size=0.2,shuffle=False)

add_train_dataset = AddStockDataset(train_data, seq_length,train_sentiment)
add_test_dataset = AddStockDataset(test_data, seq_length, test_sentiment)
# valid_dataset = StockDataset(valid_data, seq_length, sentiment_data)

add_train_loader = DataLoader(add_train_dataset, batch_size=32, shuffle=False)
add_test_loader = DataLoader(add_test_dataset, batch_size=32, shuffle=False)
# valid_dataset = DataLoader(valid_dataset, batch_size=32, shuffle=False)


In [None]:
# 학습 세트: 2014년 1월 1일 ~ 2015년 8월 1일 사이의 20,339개 변동.
# 개발 세트: 2015년 8월 1일 ~ 2015년 10월 1일 사이의 2,555개 변동.
# 테스트 세트: 2015년 10월 1일 ~ 2016년 1월 1일 사이의 3,720개 변동.


### Base

In [None]:
class BaseRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(BaseRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Sequential(nn.Linear(hidden_size, output_size), nn.Sigmoid())

    def forward(self, x):
      # h0 초기화 (num_layers,batch_size,hidden_size)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])  #RNN 마지막 타임스텝 출력을 fc layer에 통과
        # print(out)
        return out

num_layers = 1
input_size = 6
hidden_size = 16
output_size = 2  # 1 또는 0

model = BaseRNN(input_size, hidden_size, output_size, num_layers)

# out은 각 확률에 대한 확률 (batch_size,output_size) tensor
# output_size=2 > [p(class 0),p[class 1]]
# 큰 양수 : 해당 클래스 매우 높음, 큰 음수 : 매우 낮음, 0: 확률 0.5
# 입력 데이터 x 형태 (batch_size, seq_length,input_size)


In [None]:
# 내부적으로 소프트맥스 적용하여 로짓-> 확률 변환, 확률과 실제 레이블 간 차이 계산
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 50
train_losses = []

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for features, labels in train_loader:
        outputs = model(features)
        # labels = labels.unsqueeze(1)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_losses.append(loss.item())

    if (epoch+1) % 5 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

print('Training complete')


In [None]:
plt.figure(figsize=(8, 5))
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.legend()
plt.show()


In [None]:
def evaluate_model(loader, model):
    model.eval()  # 평가 모드로 전환
    total = 0
    correct = 0
    tp = tn = fp = fn = 0

    # 평가 단계이므로 파라미터 업데이트X
    with torch.no_grad():
        for features, labels in loader:
            outputs = model(features)
            # out = 각 클래스(오를 확률, 내릴 확률)에 대한 logit
            # max함수를 통해 로짓에서 큰 값을 가진 인덱스 반환
            # 최대값의 인덱스 = 예측된 클래스 레이블
            _, predicted = torch.max(outputs, 1)

            # total += labels.size(0)
            # correct += (predicted == labels).sum().item()

            tp += ((predicted == 1) & (labels == 1)).sum().item()
            tn += ((predicted == 0) & (labels == 0)).sum().item()
            fp += ((predicted == 1) & (labels == 0)).sum().item()
            fn += ((predicted == 0) & (labels == 1)).sum().item()

    accuracy = (tp+tn)/(tp+tn+fp+fn)
    macc = (tp*tn-fp*fn) / np.sqrt( (tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))

    print(f'ACC: {accuracy:.4f}, MACC: {macc:.4f}')
    print(f'TP: {tp}, TN: {tn}, FP: {fp}, FN: {fn}')
    # print(outputs)
# evaluate_model(test_loader, model)

In [None]:
train_acc = evaluate_model(train_loader, model)

In [None]:
test_acc = evaluate_model(test_loader, model)

### Add Sentiment

In [None]:
class StockRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, sentiment_size, num_layers):
        super(StockRNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.sentiment_size = sentiment_size
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Sequential(nn.Linear((hidden_size + sentiment_size), output_size), nn.Sigmoid())

    def forward(self, x, sentiment_data):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, hn = self.rnn(x, h0)
        last_hidden = hn[-1]  # 마지막 hidden state 사용

        sentiment_data = sentiment_data.view(sentiment_data.size(0), -1)
        out = torch.cat((last_hidden, sentiment_data), dim=1)

        output = self.fc(out)
        return output

num_layers = 1
input_size = 6
hidden_size = 16
output_size = 2
sentiment_size = 3

model = StockRNN(input_size, hidden_size, output_size, sentiment_size, num_layers)

# print("Linear layer weights:", model.fc[0].weight.shape)
# print("Linear layer bias:", model.fc[0].bias)


In [None]:
def evaluate_add_model(loader, model):
    model.eval()  # 평가 모드로 전환
    total = 0
    correct = 0
    tp = tn = fp = fn = 0

    # 평가 단계이므로 파라미터 업데이트X
    with torch.no_grad():
        for features, labels, sentiment_data in loader:
            outputs = model(features, sentiment_data)
            # out = 각 클래스(오를 확률, 내릴 확률)에 대한 logit
            # max함수를 통해 로짓에서 큰 값을 가진 인덱스 반환
            _, predicted = torch.max(outputs, 1)

            # total += labels.size(0)
            # correct += (predicted == labels).sum().item()

            tp += ((predicted == 1) & (labels == 1)).sum().item()
            tn += ((predicted == 0) & (labels == 0)).sum().item()
            fp += ((predicted == 1) & (labels == 0)).sum().item()
            fn += ((predicted == 0) & (labels == 1)).sum().item()

    accuracy = (tp+tn)/(tp+tn+fp+fn)
    macc = (tp*tn-fp*fn) / np.sqrt( (tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))

    print(f'ACC: {accuracy:.4f}, MACC: {macc:.4f}')
    print(f'TP: {tp}, TN: {tn}, FP: {fp}, FN: {fn}')

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 50
train_losses = []

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for features, labels, sentiment_data in add_train_loader:

        features = features.view(-1,5,input_size)

        outputs = model(features, sentiment_data)

        # labels = labels.unsqueeze(1)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_losses.append(loss.item())

    if (epoch+1) % 5 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

print('Training complete')

In [None]:
train_acc = evaluate_add_model(add_train_loader, model)

In [None]:
test_acc = evaluate_add_model(add_test_loader, model)

In [None]:
plt.figure(figsize=(8, 5))
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.legend()
plt.show()

# Experiment

In [None]:
def normalize(x):
    x = (x-x.min()) / (x.max()-x.min())
    return x

In [None]:
def preprocess(data):
    d_open = normalize(data['Open'].values)
    d_close = normalize(data['Close'].values)
    d_high = normalize(data['High'].values)
    d_low = normalize(data['Low'].values)
    d_volume = normalize(data['Volume'].values)
    d_adj = normalize(data['Adj Close'])
    d_movement = data['movement']

    x = np.stack([d_open,d_close,d_high,d_low,d_volume,d_adj])

    return x,d_movement

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, output_size,hidden_dim,n_layers):
        super(RNN,self).__init__()

        self.hidden_dim = hidden_dim
        self.rnn = nn.RNN(input_size,hidden_dim,n_layers,batch_first=True)

        self.fc = nn.Linear(hidden_dim, output_size)

    def forward(self,x,hidden):
        # x (batch_size,seq_length,input_size)
        # hidden (n_layers, batch_size, hidden_dim)
        # r_out (batch_size, time_step, hidden_size)
        batch_size = x.size(0)

        r_out, hidden = self.rnn(x, hidden)
        r_out = r_out.view(-1, self.hidden_dim)

        output = self.fc(r_out)

        return output, hidden

In [None]:
# dimension 확인
seq_length = 20
test_rnn = RNN(input_size=1, output_size=1, hidden_dim=10, n_layers=2)

time_steps = np.linspace(0, np.pi, seq_length)
data = np.sin(time_steps)
data.resize((seq_length,1))

test_input = torch.Tensor(data).unsqueeze(0)
print('Input size:', test_input.size())

test_output, test_h = test_rnn(test_input, None)
print('output:', test_output.size())
print('hidden:', test_h.size())

In [None]:
# train
input_size = 1
output_size = 1
hidden_dim = 32
n_layers = 1

rnn = RNN(input_size, output_size, hidden_dim, n_layers)
print(rnn)

In [None]:
# train the RNN
def train(rnn, n_steps, print_every):

    # initialize the hidden state
    hidden = None

    for batch_i, step in enumerate(range(n_steps)):
        # defining the training data
        time_steps = np.linspace(step * np.pi, (step+1)*np.pi, seq_length + 1)
        data = np.sin(time_steps)
        data.resize((seq_length + 1, 1)) # input_size=1

        x = data[:-1]
        y = data[1:]

        # convert data into Tensors
        x_tensor = torch.Tensor(x).unsqueeze(0) # unsqueeze gives a 1, batch_size dimension
        y_tensor = torch.Tensor(y)

        # outputs from the rnn
        prediction, hidden = rnn(x_tensor, hidden)

        ## Representing Memory ##
        # make a new variable for hidden and detach the hidden state from its history
        # this way, we don't backpropagate through the entire history
        hidden = hidden.data

        # Loss and Optimization
        criterion = nn.MSELoss()
        optimizer = torch.optim.Adam(rnn.parameters(), lr=0.01)
        # calculate the loss
        loss = criterion(prediction, y_tensor)
        # zero gradients
        optimizer.zero_grad()
        # perform backprop and update weights
        loss.backward()
        optimizer.step()

        # display loss and predictions
        if batch_i%print_every == 0:
            print('Loss: ', loss.item())
            plt.plot(time_steps[1:], x, 'r.') # input
            plt.plot(time_steps[1:], prediction.data.numpy().flatten(), 'b.') # predictions
            plt.show()

    return rnn

In [None]:
# train the rnn and monitor results
n_steps = 75
print_every = 15

trained_rnn = train(rnn, n_steps, print_every)

### LSTM

In [None]:
class BaseLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(BaseLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) #drop out, bidirectional=True
        self.fc = nn.Sequential(nn.Linear(hidden_size, output_size), nn.Sigmoid())

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, (hn, cn) = self.lstm(x, (h0, c0))  #LSTM의 output은 Output과 마지막 타임스텝의 (hidden state, cell state)


        output = self.fc(out[:,-1,:])
        return output

num_layers = 1
input_size = 6
hidden_size = 16
output_size = 2
sentiment_size = 3

model = BaseLSTM(input_size, hidden_size, output_size, num_layers)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 50
train_losses = []

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for features, labels in train_loader:
        outputs = model(features)
        # labels = labels.unsqueeze(1)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_losses.append(loss.item())

    if (epoch+1) % 5 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

print('Training complete')

In [None]:
train_acc = evaluate_model(train_loader, model)

In [None]:
test_acc = evaluate_model(test_loader, model)

In [None]:
class StockLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, sentiment_size, num_layers):
        super(StockLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.sentiment_size = sentiment_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) #drop out, bidirectional=True
        self.fc = nn.Sequential(nn.Linear((hidden_size + sentiment_size), output_size), nn.Sigmoid())

    def forward(self, x, sentiment_data):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, (hn, cn) = self.lstm(x, (h0, c0))  #LSTM의 output은 Output과 마지막 타임스텝의 (hidden state, cell state)
        last_hidden = hn[-1]  # 마지막 hidden state 사용
        # print('hidden',last_hidden)
        sentiment_data = sentiment_data.view(sentiment_data.size(0), -1)
        # print('sentiment', sentiment_data)
        out = torch.cat((last_hidden, sentiment_data), dim=1)

        output = self.fc(out)
        return output

num_layers = 1
input_size = 6
hidden_size = 16
output_size = 2
sentiment_size = 3

model = StockLSTM(input_size, hidden_size, output_size, sentiment_size, num_layers)

# sigmoid를 거친 hidden tensor도 0~1, raw sentiment score도 0~1이므로 추가적 scaling은 필요하지 않을 것으로 판단
# LSTM의 hidden tensor는 (batch_size, hidden_size) 형태, sentiment score tensor는 (batch_size, 3)
# concat을 수행하면 (batch_size, hidden_dim + sentiment_dim) 형태의 텐서

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 50
train_losses = []

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for features, labels, sentiment_data in add_train_loader:

        features = features.view(-1,5,input_size)

        outputs = model(features, sentiment_data)

        # labels = labels.unsqueeze(1)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_losses.append(loss.item())

    if (epoch+1) % 5 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

print('Training complete')

In [None]:
train_acc = evaluate_add_model(add_train_loader, model)

In [None]:
test_acc = evaluate_add_model(add_test_loader, model)

### Calculating Correlation

In [None]:
from scipy.stats import pearsonr

# sentiment 데이터와 AAPL 데이터 병합
merged_data = pd.merge(aapl, filtered_sentiment, left_on='Date', right_on='filename')

# 필요한 열만 선택
correlation_data = merged_data[['positive', 'negative', 'neutral','sentiment_score', 'movement']]

# 상관계수 계산
correlation_matrix = correlation_data.corr()

print(correlation_matrix)


In [None]:
# 데이터 다운로드

In [None]:
import pandas as pd
import os
import zipfile
import requests
from io import BytesIO

# GitHub 저장소의 ZIP 파일 URL
url = 'https://github.com/yumoxu/stocknet-dataset/archive/refs/heads/master.zip'

# ZIP 파일을 다운로드하여 메모리로 읽기
response = requests.get(url)
zip_file = zipfile.ZipFile(BytesIO(response.content))

# ZIP 파일 내의 'stocknet-dataset-master/price/raw/' 폴더 경로
price_folder_path = 'stocknet-dataset-master/price/raw/'

# 모든 CSV 파일을 읽고 결합
data_frames = []
for file_name in zip_file.namelist():
    if file_name.startswith(price_folder_path) and file_name.endswith('.csv'):
        # 주식 이름을 파일명에서 추출 (예: 'raw/prices/appl.csv' -> 'appl')
        stock_name = os.path.splitext(os.path.basename(file_name))[0]

        # CSV 파일을 읽어 데이터프레임으로 변환
        with zip_file.open(file_name) as file:
            df = pd.read_csv(file)
            # 주식 이름 열 추가
            df['Stock'] = stock_name
            data_frames.append(df)

# 모든 데이터프레임을 하나로 결합
combined_df = pd.concat(data_frames, ignore_index=True)

# 결합된 데이터프레임 확인
print(combined_df.head())


In [None]:
print(combined_df)

In [None]:
combined_df = combined_df[(combined_df['Date']>= '2013-12-31') & (combined_df['Date']<='2015-12-31')]
combined_df

# sentiment rnn + price rnn

In [None]:
class SentimentRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
          super(SentimentRNN, self).__init__()
          self.num_layers = num_layers
          self.hidden_size = hidden_size
          self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
          self.layer_norm = nn.LayerNorm(hidden_size)
    def forward(self, x):
          h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
          out, hn = self.rnn(x,h0)  #out은 모든 time step hidden state(batch_size, seq_length, hidden_size), hn은 마지막 time step hidden state(num_layers, batch_size, hidden_size)
          out = self.layer_norm(out)
          return hn[-1]

class StockSentimentRNN(nn.Module):
    def __init__(self, stock_input_size, sentiment_input_size, hidden_size,output_size, num_layers):
        super(StockSentimentRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.stock_rnn = nn.RNN(stock_input_size, hidden_size, num_layers, batch_first=True)
        self.stock_layer_norm = nn.LayerNorm(hidden_size)
        self.sentiment_rnn = SentimentRNN(sentiment_input_size, hidden_size,num_layers)
        self.fc = nn.Sequential(nn.Linear((hidden_size * 2), output_size), nn.Sigmoid())

    def forward(self, data, sentiment_data):
        h0_stock = torch.zeros(self.stock_rnn.num_layers, data.size(0), self.stock_rnn.hidden_size).to(data.device)
        stock_out, stock_hn = self.stock_rnn(data, h0_stock)
        stock_out = self.stock_layer_norm(stock_out)
        stock_hidden = stock_hn[-1]  # 마지막 hidden state

        # 감정 데이터 RNN 처리
        # seq_length = data.size(1)
        # sentiment_data = sentiment_data.unsqueeze(1).repeat(1, seq_length,1) #시퀀스 길이에 맞게 차원 추가
        # sentiment_data = sentiment_data.view(-1, data.size(1), sentiment_data.size(-1))
        seq_length = data.size(1)
        sentiment_data = sentiment_data.unsqueeze(1).repeat(1, seq_length, 1)  # (batch_size, seq_length, sentiment_input_size)
        sentiment_out = self.sentiment_rnn(sentiment_data) #(batch_size, hidden_size)

        # 두 hidden state를 결합
        # layernorm 거친 두 마지막 hidden state concat
        combined_hidden = torch.cat((stock_hidden, sentiment_out), dim=1)

        # Fully Connected Layer
        output = self.fc(combined_hidden)
        return output




In [None]:
stock_input_size = 6       # 주가 데이터의 feature 수
sentiment_input_size = 3   # 감정 데이터의 feature 수
hidden_size = 16           # RNN의 hidden size
output_size = 2            # 최종 출력 클래스 수
num_layers = 1             # RNN의 layer 수

model = StockSentimentRNN(stock_input_size, sentiment_input_size, hidden_size, output_size, num_layers)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for data, labels, sentiment_data in add_train_loader:
        data = data.view(-1,seq_length,stock_input_size)
        sentiment_data = sentiment_data  # (배치 크기, seq_length, sentiment_input_size)
        # labels = labels.view(-1)

        optimizer.zero_grad()
        outputs = model(data, sentiment_data)
        # labels = labels.unsqueeze(1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    avg_epoch_loss = epoch_loss / len(add_train_loader)
    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}')

print('Training complete')

In [None]:
def evaluate_add_model(loader, model):
    model.eval()  # 평가 모드로 전환
    total = 0
    correct = 0
    tp = tn = fp = fn = 0

    # 평가 단계이므로 파라미터 업데이트X
    with torch.no_grad():
        for data, labels, sentiment_data in loader:
            outputs = model(data, sentiment_data)
            # out = 각 클래스(오를 확률, 내릴 확률)에 대한 logit
            # max함수를 통해 로짓에서 큰 값을 가진 인덱스 반환
            _, predicted = torch.max(outputs, 1)

            # total += labels.size(0)
            # correct += (predicted == labels).sum().item()

            tp += ((predicted == 1) & (labels == 1)).sum().item()
            tn += ((predicted == 0) & (labels == 0)).sum().item()
            fp += ((predicted == 1) & (labels == 0)).sum().item()
            fn += ((predicted == 0) & (labels == 1)).sum().item()

    accuracy = (tp+tn)/(tp+tn+fp+fn)
    macc = (tp*tn-fp*fn) / np.sqrt( (tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))

    print(f'ACC: {accuracy:.4f}, MACC: {macc:.4f}')
    print(f'TP: {tp}, TN: {tn}, FP: {fp}, FN: {fn}')

In [None]:
train_acc = evaluate_add_model(add_train_loader, model)

In [None]:
test_acc = evaluate_add_model(add_test_loader,model)