In [1]:
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

print("In Colab:", IN_COLAB)

%ls
if IN_COLAB:
    # from google.colab import drive
    # drive.mount('/content/drive')
    %cd /content/drive/MyDrive/StockClassification

In Colab: True
[0m[01;34mdrive[0m/  [01;34msample_data[0m/
/content/drive/MyDrive/StockClassification


In [2]:
import pandas as pd
import numpy as np
import os
import random
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from utils import preprecessingData, calculatePriceBB, calculate_yellow_box, calculateTamountBB, makeLabel, sliding_window, EarlyStopping, adjust_learning_rate, setup_logger, makeDataset
from model.dlinear import Model as DlinearModel
from model.timesnet import Model as TimesnetModel
from model.non_transformer import Model as NontransformerModel
from time_feature import time_features
from torch.optim.lr_scheduler import ReduceLROnPlateau
from matplotlib import pyplot as plt

import warnings
warnings.filterwarnings("ignore")

In [3]:
def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.enabled = False
    # torch.use_deterministic_algorithms(True)
    os.environ['PYTHONHASHSEED'] = str(seed)

# Set the seed
seed_everything(42)

### 모델 학습

In [6]:
def makeDataloader(data_X, data_y, BATCH_SIZE=32):
  # Tensor로 변환
  X_tensor = torch.tensor(data_X, dtype=torch.float32)
  y_tensor = torch.tensor(data_y, dtype=torch.float32)

  print(f'X_tensor shape : {X_tensor.shape}')
  print(f'y_tensor shape : {y_tensor.shape}')


  # 학습 및 검증 데이터로 분할 (80% Train, 20% Validation)
  X_train, X_val, y_train, y_val = train_test_split(X_tensor, y_tensor, test_size=0.2)

  # DataLoader 생성
  train_dataset = TensorDataset(X_train, y_train)
  val_dataset = TensorDataset(X_val, y_val)

  train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
  val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

  return train_loader, val_loader

In [7]:
def calculate_accuracy(outputs, labels):
    # 이진 분류의 경우, 시그모이드 함수를 적용한 후 0.5를 기준으로 양/음성 클래스를 결정
    preds = torch.round(torch.sigmoid(outputs))
    correct = (preds == labels).float().sum()  # 맞춘 예측의 개수
    accuracy = correct / labels.numel()  # 전체 데이터 중 맞춘 개수의 비율
    return accuracy.item()

def calculate_accuracy_for_positives(outputs, labels):
    # 이진 분류의 경우, 시그모이드 함수를 적용한 후 0.5를 기준으로 양/음성 클래스를 결정
    preds = torch.round(torch.sigmoid(outputs))

    # 정답이 1인 데이터만 선택
    positive_mask = labels == 1

    # 해당하는 예측값과 실제값 비교
    correct = (preds[positive_mask] == labels[positive_mask]).float().sum()  # 맞춘 예측의 개수
    total_positives = positive_mask.float().sum()  # 정답이 1인 데이터의 총 개수

    # 1인 데이터가 존재하는 경우에만 계산, 그렇지 않으면 0으로 처리
    if total_positives == 0:
        return 0.0

    accuracy = correct / total_positives  # 1인 데이터 중 맞춘 개수의 비율
    return accuracy.item()

In [8]:
## Focal Loss 함수

class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha  # Class balancing factor
        self.gamma = gamma  # Focusing parameter
        self.reduction = reduction
        self.ce_loss = nn.BCEWithLogitsLoss()

    def forward(self, inputs, targets):
        BCE_loss = self.ce_loss(inputs, targets)  # Binary Cross-Entropy loss
        pt = torch.exp(-BCE_loss)  # p_t is the probability of the correct class
        F_loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss

        if self.reduction == 'mean':
            return torch.mean(F_loss)
        elif self.reduction == 'sum':
            return torch.sum(F_loss)
        else:
            return F_loss


In [9]:
## 모델 선언

class LinearModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, drop_out=0.5):
        super(LinearModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.dropout = nn.Dropout(drop_out)
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        ## x: [batch_size, sequence_len, feature_len]
        x = self.fc1(x.permute(0,2,1)).permute(0,2,1)
        x = self.dropout(x)
        out = self.fc2(x.permute(0,2,1)).permute(0,2,1)
        out = out[:,:,1]

        return out

class LinearModelFlatten(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, drop_out=0.5):
        super(LinearModelFlatten, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.dropout = nn.Dropout(drop_out)
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        ## x: [batch_size, sequence_len, feature_len]
        batch_size = x.size(0)
        x = x.view(batch_size, -1)  # Flatten the input
        x = self.fc1(x)
        x = self.dropout(x)
        out = self.fc2(x)

        return out


class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, drop_out=0.5):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=drop_out)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, x_mark_enc):
        out, (hn, cn) = self.lstm(x)
        out = self.fc(out[:, -1, :])  # 마지막 시점의 출력을 사용
        return out

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [10]:
def train_model(args, epochs=10):

    data_X, data_y = makeDataset(LOAD=True, target="신고가",
                                target_feature=args.target_feature,
                                target_time=args.target_time)
    train_loader, val_loader = makeDataloader(data_X, data_y, BATCH_SIZE=32)

    ## 모델 선정
    # 모델 초기화
    input_size = data_X.shape[2] - 4  # 피처 개수
    args.enc_in = input_size
    args.dec_in = input_size
    hidden_size = 64
    output_size = 1
    num_layers = 2
    drop_out = 0.3
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


    model = LSTMModel(input_size, hidden_size, output_size, num_layers, drop_out=drop_out)
    if args.model_name == 'LSTM':
      model = LSTMModel(input_size, hidden_size, output_size, num_layers, drop_out=drop_out)
    elif args.model_name == 'Linear':
      model = LinearModel(data_X.shape[1], hidden_size, output_size, drop_out=drop_out)
    elif args.model_name == 'LinearFlatten':
      model = LinearModelFlatten(data_X.shape[1]*data_X.shape[2], hidden_size, output_size, drop_out=drop_out)
    elif args.model_name == 'Dlinear-S':
      model = DlinearModel(args, individual=False)
    elif args.model_name == 'Dlinear-I':
      model = DlinearModel(args, individual=True)
    elif args.model_name == 'NontransformerModel':
      model = NontransformerModel(args)
    elif args.model_name == 'TimesnetModel':
      model = TimesnetModel(args)
    model.to(device)


    ## 손실 함수 설정
    criterion = nn.BCEWithLogitsLoss()
    if args.loss_func == 'BCE':
      criterion = nn.BCEWithLogitsLoss()
    elif args.loss_func == 'Focal':
      criterion = FocalLoss()
    elif args.loss_func == 'MSE':
      criterion = nn.MSELoss()
    elif args.loss_func == 'MAE':
      criterion = nn.L1Loss()


    ## 옵티마이저 및 스케줄러 세팅
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
    # scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)
    # scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer = optimizer,
    #                                         steps_per_epoch = len(train_loader),
    #                                         pct_start = 0.3,
    #                                         epochs = epochs,
    #                                         max_lr = 0.001)
    early_stopping = EarlyStopping(patience=20, verbose=True)


    ## 로그파일 설정
    setting_str = f'{target_time}_{args.model_name}_{args.loss_func}_{epochs}epochs_{"_".join(target_feature)}'
    logger = setup_logger(log_file=f'{setting_str}_log.txt')



    ## 학습 시작 ==============================================
    train_loss_list = []
    valid_loss_list = []
    accuracy_list = []

    for epoch in range(epochs):

        logger.info("=" * 50)
        logger.info(f'Epoch: {epoch + 1}/{epochs}')

        # Training phase
        model.train()
        total_train_loss = 0
        for X_batch, y_batch in train_loader:
            x_mark_enc = X_batch[:, :, -4:]
            X_batch, y_batch = X_batch[:, :, :input_size].to(device), y_batch.to(device)

            optimizer.zero_grad()
            output = model(X_batch, x_mark_enc)
            loss = criterion(output.squeeze(), y_batch)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)
        train_loss_list.append(avg_train_loss)
        logger.info(f'Train Loss: {avg_train_loss:.4f}')

        # Validation phase
        model.eval()  # Set model to evaluation mode
        total_val_loss = 0
        total_accuracy = 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                x_mark_enc = X_batch[:, :, -4:]
                X_batch, y_batch = X_batch[:, :, :input_size].to(device), y_batch.to(device)
                output = model(X_batch, x_mark_enc)
                loss = criterion(output.squeeze(), y_batch)
                total_val_loss += loss.item()
                # 정확도 계산
                accuracy = calculate_accuracy_for_positives(output.squeeze(), y_batch)
                total_accuracy += accuracy

        avg_val_loss = total_val_loss / len(val_loader)
        avg_accuracy = total_accuracy / len(val_loader)
        accuracy_list.append(avg_accuracy)

        valid_loss_list.append(avg_val_loss)

        # Early Stopping
        early_stopping(-avg_accuracy, model, f'./checkpoints/{setting_str}_')
        # 학습률 스케줄러
        # scheduler.step(avg_val_loss)

        logger.info(f'Validation Loss: {avg_val_loss:.4f}, Accuracy: {avg_accuracy:.4f}')


    ## 학습 결과(정확도) 기록
    last_10_accuracy = sum(accuracy_list[-10:]) / len(accuracy_list[-10:]) if len(accuracy_list) >= 10 else sum(accuracy_list) / len(accuracy_list)
    final_accuracy = accuracy_list[-1]
    best_accuracy = max(accuracy_list)
    logger.info("\n--- Summary ---")
    logger.info(f"Best Accuracy: {best_accuracy:.4f}")
    logger.info(f"Last 10 Epochs Average Accuracy: {last_10_accuracy:.4f}")
    logger.info(f"Final Epoch Accuracy: {final_accuracy:.4f}")
    for handler in logger.handlers:
        handler.close()
        logger.removeHandler(handler)

    return train_loss_list, valid_loss_list, accuracy_list, setting_str

In [11]:
class Args():
    def __init__(self, model_name, loss_func, target_time, target_feature):
        self.model_name = model_name
        self.loss_func = loss_func
        self.target_time = target_time
        self.target_feature = target_feature

        self.task_name = 'classification'
        self.seq_len = 10
        self.pred_len = 0
        self.label_len = 1
        self.moving_avg = 3
        self.num_class = 1
        # 예측할 피처 수
        self.c_out = 13

        ## 임베딩 관련 인자
        # 인코더, 디코더 입력 피처 수
        self.enc_in = 13
        self.dec_in = 13
        # 임베딩 및 레이어의 은닉 상태 크기
        self.d_model = 4
        # 임베딩 유형
        self.embed = 'fixed'
        # 입력 데이터 주기
        self.freq = 't'
        self.dropout = 0.3

        ## 어텐션, 인코더/디코더
        self.factor = 1
        self.output_attention = False
        # 어텐션 헤드 수
        self.n_heads = 4
        # 인코더, 디코더 레이어 수
        self.e_layers = 3
        self.d_layers = 1
        # 피드포워드 네트워크 차원. 주로 d_modl의 4배
        self.d_ff = 4 * self.n_heads
        # 활성화 함수. relu 혹은 gelu
        self.activation = 'gelu'

        # Projector 히든 레이어 차원 및 레이어 수
        self.p_hidden_dims = [128, 128]
        self.p_hidden_layers = 2

        self.top_k = 5
        self.num_kernels = 6

        self.time_feature = True

In [12]:
# ['종가', '거래대금', '거래량볼밴', '주가볼밴', '이등분선']
target_feature_list = [['종가', '거래대금', '거래량볼밴', '주가볼밴', '이등분선'],
                       ['종가', '거래대금', '거래량볼밴', '이등분선']]
# ["all", "AM"]
target_time_list = ["all", "AM"]
# ['BCE', 'Focal']
loss_func_list = ['BCE', 'Focal']
# ['LSTM', 'Dlinear-S', 'Dlinear-I', 'NontransformerModel', 'TimesnetModel']
model_list = ['NontransformerModel', 'TimesnetModel']


for target_feature in target_feature_list:
  for target_time in target_time_list:
    for loss_func in loss_func_list:
      for model_name in model_list:

        args = Args(model_name, loss_func, target_time, target_feature)

        seed_everything(42)

        # 모델 학습
        print(f'target_feature : {target_feature}')
        print(f'target_time : {target_time}')
        print(f'loss_func : {loss_func}')
        print(f'model_name : {model_name}')
        print("=" * 50)

        train_loss_list, valid_loss_list, accuracy_list, setting_str = train_model(args, epochs=100)

        plt.figure(figsize=(16, 10), dpi=300)
        plt.plot(train_loss_list, label='Train Loss')
        plt.plot(valid_loss_list, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.grid(True, axis='y')
        plt.legend()
        plt.savefig(f'./image/{setting_str}_train_loss.png', format='png')
        plt.show()
        plt.close()

        plt.figure(figsize=(16, 10), dpi=300)
        plt.plot(accuracy_list, label='Valid Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.grid(True, axis='y')
        plt.legend()
        plt.savefig(f'./image/{setting_str}_train_accuracy.png', format='png')
        plt.show()
        plt.close()

target_feature : ['종가', '거래대금', '거래량볼밴', '주가볼밴', '이등분선']
target_time : all
loss_func : BCE
model_name : NontransformerModel
X_tensor shape : torch.Size([166311, 10, 17])
y_tensor shape : torch.Size([166311])


2024-11-05 09:36:10,744 - INFO - Epoch: 1/100
INFO:train_logger:Epoch: 1/100


KeyboardInterrupt: 