<a href="https://colab.research.google.com/github/LeeSaeBom/GTM-Transformer-Jupyter/blob/main/GTM_Step1_Dummy_Only_%EA%B0%95%EC%9D%98%EC%9A%A9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 🚀 GTM Step 1: Temporal Features + Google Trends

## 📚 특강 1단계: 기본 시계열 예측
- **사용 모달리티**: Temporal Features (날짜 정보) + Google Trends
- **목적**: 시계열 데이터만으로 매출 예측의 기초 구현
- **학습 목표**:
  - Transformer 기본 구조 이해
  - 시계열 인코딩 (Positional Encoding)
  - Google Trends 데이터 활용

## 1. 📦 패키지 설치 및 import

In [None]:
# 패키지 설치
!pip install lightning --upgrade --quiet
!pip install transformers scikit-learn pillow --quiet

# Import
import math
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import lightning as L
import pandas as pd
import numpy as np
from tqdm import tqdm
from PIL import Image, ImageFile
from torch.utils.data import DataLoader, TensorDataset
from torchvision.transforms import Resize, ToTensor, Normalize, Compose
from sklearn.preprocessing import MinMaxScaler
from transformers import Adafactor
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

ImageFile.LOAD_TRUNCATED_IMAGES = True

# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

print(f" PyTorch: {torch.__version__}")
print(f" Lightning: {L.__version__}")
print(f" CUDA 사용 가능: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f" GPU: {torch.cuda.get_device_name(0)}")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m821.1/821.1 kB[0m [31m26.5 MB/s[0m eta [36m0:00:00[0m
[2K   [91m━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.3/363.4 MB[0m [31m115.5 MB/s[0m eta [36m0:00:03[0m

## 2. 🧠 모델 컴포넌트 정의
### 1단계에서는 최소한의 컴포넌트만 사용

In [None]:
# 기본 모듈들
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=52):
        super(PositionalEncoding, self).__init__()
        # 드롭아웃: 위치 임베딩이 추가된 후 과적합을 줄이기 위한 정규화
        self.dropout = nn.Dropout(p=dropout)

        # pe: [max_len, d_model] 크기의 위치 임베딩 테이블(고정값, 학습되지 않음)
        pe = torch.zeros(max_len, d_model)

        # position: [max_len, 1] 형태로 0 ~ max_len-1까지의 정수 위치
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # div_term: 주파수 스케일(지수적으로 커지는 간격)
        #   - 논문 "Attention Is All You Need"의 사인/코사인 위치 인코딩 공식을 그대로 구현
        #   - d_model의 짝수 인덱스에 대응하는 주파수들만 계산
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # 짝수 채널(0,2,4,...)에는 sin, 홀수 채널(1,3,5,...)에는 cos를 채운다.
        # position * div_term의 브로드캐스팅으로 [max_len, d_model/2]에 해당하는 값을 만든 뒤 할당
        pe[:, 0::2] = torch.sin(position * div_term)  # even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # odd  indices

        # Transformer의 기본 입력 형태(S, N, E = seq_len, batch, embed)에 맞추기 위해
        # pe를 [max_len, 1, d_model]로 바꿔 배치 차원으로 브로드캐스팅되게 만든다.
        pe = pe.unsqueeze(0).transpose(0, 1)  # [1, max_len, d_model] -> [max_len, 1, d_model]

        # register_buffer: 학습 파라미터는 아니지만 모델과 함께 디바이스 이동/저장되도록 등록
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: [seq_len, batch_size, d_model] 형태를 기대 (PyTorch nn.Transformer 표준)
        # 현재 시퀀스 길이(seq_len)에 해당하는 위치 임베딩을 잘라 더한다.
        # self.pe[:x.size(0), :] -> [seq_len, 1, d_model] 이고 배치 차원으로 브로드캐스팅됨
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)  # 위치 인코딩이 더해진 후 드롭아웃 적용

In [None]:
class TimeDistributed(nn.Module):
    def __init__(self, module, batch_first=True):
        super(TimeDistributed, self).__init__()
        # module: 시간축 각 스텝에 똑같이 적용할 하위 모듈
        #   예: nn.Linear, 작은 CNN, MLP 등
        self.module = module
        # batch_first:
        #   True  → 입력 형태가 [batch, time, feature]
        #   False → 입력 형태가 [time, batch, feature]
        self.batch_first = batch_first

    def forward(self, x):
        # (1) 입력이 2차원 이하이면 (시간축이 없으면)
        # 예: [batch, feature] 또는 [time, feature]
        # 그냥 module에 바로 넣어서 처리
        if len(x.size()) <= 2:
            return self.module(x)

        # (2) 시간축이 있는 경우 → 시간축과 배치축을 합쳐서 한 번에 처리
        # 예: batch_first=True  -> [B, T, F] → [B*T, F]
        #     batch_first=False -> [T, B, F] → [T*B, F]
        x_reshape = x.contiguous().view(-1, x.size(-1))
        # contiguous(): 메모리 상에서 연속적으로 만들어줘서 view()가 안전하게 동작하도록 함

        # (3) 평탄화된 데이터를 module에 통과
        # 이렇게 하면 for문 없이 한 번에 모든 시점 데이터 처리 가능
        y = self.module(x_reshape)  # shape: [B*T, F_out] 또는 [T*B, F_out]

        # (4) 원래 모양으로 복원
        if self.batch_first:
            # batch_first=True → [B, T, F_out] 형태로 복원
            y = y.contiguous().view(x.size(0), -1, y.size(-1))
        else:
            # batch_first=False → [T, B, F_out] 형태로 복원
            y = y.view(-1, x.size(1), y.size(-1))

        # (5) 최종 결과 반환
        return y


print(" 기본 모듈 정의 완료")


In [None]:
# 1단계: Dummy (시간) + GTrends 인코더만 사용
class DummyEmbedder(nn.Module):
    """시간 정보(일/주/월/연)를 각각 임베딩한 뒤 하나로 합쳐 단일 임베딩으로 투영"""
    def __init__(self, embedding_dim):
        super().__init__()
        self.embedding_dim = embedding_dim
        # 각 스칼라 시간값(일/주/월/연: 모두 1차원)을 임베딩 차원으로 선형 사상
        self.day_embedding   = nn.Linear(1, embedding_dim)
        self.week_embedding  = nn.Linear(1, embedding_dim)
        self.month_embedding = nn.Linear(1, embedding_dim)
        self.year_embedding  = nn.Linear(1, embedding_dim)
        # 4개 임베딩을 concat하여(embedding_dim*4) 다시 embedding_dim으로 축소
        self.dummy_fusion = nn.Linear(embedding_dim*4, embedding_dim)
        self.dropout = nn.Dropout(0.2)

    def forward(self, temporal_features):
        """
        temporal_features: [B, 4] 가정 (컬럼: day, week, month, year 순서)
          - 각 컬럼은 스칼라이므로 Linear 입력을 위해 [B, 1]로 확장
        반환: temporal_embeddings ∈ ℝ[B, embedding_dim]
        """
        # 각 시간 성분 분리 + [B, 1]로 차원 확장
        d, w, m, y = temporal_features[:, 0].unsqueeze(1), temporal_features[:, 1].unsqueeze(1), \
                     temporal_features[:, 2].unsqueeze(1), temporal_features[:, 3].unsqueeze(1)

        # 각각 선형 임베딩
        d_emb = self.day_embedding(d)     # [B, D]
        w_emb = self.week_embedding(w)    # [B, D]
        m_emb = self.month_embedding(m)   # [B, D]
        y_emb = self.year_embedding(y)    # [B, D]

        # concat 후 차원 축소(융합)
        temporal_embeddings = self.dummy_fusion(torch.cat([d_emb, w_emb, m_emb, y_emb], dim=1))  # [B, D]
        temporal_embeddings = self.dropout(temporal_embeddings)
        return temporal_embeddings

In [None]:
class GTrendEmbedder(nn.Module):
    """Google Trends 데이터 인코딩"""
    def __init__(self, forecast_horizon, embedding_dim, use_mask, trend_len, num_trends, gpu_num):
        super().__init__()
        # 예측 지평(horizon): 마스크 생성 시 블록 크기 결정에 사용
        self.forecast_horizon = forecast_horizon

        # 시점별 입력(길이 num_trends 벡터)을 embedding_dim으로 투영
        # 기대 입력: [B, T, num_trends]  → 출력: [B, T, embedding_dim]
        # (forward에서 permute로 [B, num_trends, T]를 [B, T, num_trends]로 바꾼 뒤 사용)
        self.input_linear = TimeDistributed(nn.Linear(num_trends, embedding_dim))

        # 위치 인코딩: Transformer가 순서를 알 수 있도록 추가
        # PositionalEncoding은 [S, N, E] (seq, batch, embed) 형태를 기대함
        # trend_len은 사용할 최대 시퀀스 길이(≥ 실제 T)로 설정해야 함
        self.pos_embedding = PositionalEncoding(embedding_dim, max_len=trend_len)

        # 시계열 내 상관관계 학습을 위한 Transformer Encoder (2층)
        # d_model=embedding_dim, nhead=4, dropout=0.2
        encoder_layer = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=4, dropout=0.2)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)

        # 마스크 사용 여부(1: 사용, 그 외: 미사용)
        self.use_mask = use_mask

        # (참고) 현재 코드에서 gpu_num은 직접 사용되지 않음
        self.gpu_num = gpu_num

    def _generate_encoder_mask(self, size, forecast_horizon):
        # 어텐션 마스크(추가 가중치 방식, additive mask) 생성
        # - 입력/출력 shape: [size, size] (size = 시퀀스 길이 T)
        # - split = gcd(T, horizon)로 대각선 블록을 만들고 블록 내부만 어텐션 허용
        # - PyTorch 규약: 허용=0.0, 차단=-inf (가중치에 더해져 softmax에서 무시됨)
        mask = torch.zeros((size, size))
        split = math.gcd(size, forecast_horizon)
        for i in range(0, size, split):
            mask[i:i+split, i:i+split] = 1
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, gtrends):
        # gtrends 예상 입력: [B, num_trends, T]  (채널 우선)
        # TimeDistributed 적용을 위해 [B, T, num_trends]로 변환
        gtrend_emb = self.input_linear(gtrends.permute(0,2,1))   # → [B, T, D]

        # PositionalEncoding은 [S, N, E] 형태를 기대하므로 [T, B, D]로 변환하여 적용
        gtrend_emb = self.pos_embedding(gtrend_emb.permute(1,0,2))  # → [T, B, D]

        # 시퀀스 길이 T에 맞춰 블록 대각선 마스크 생성 (데이터 누수 방지/주기 반영)
        input_mask = self._generate_encoder_mask(gtrend_emb.shape[0], self.forecast_horizon).to(gtrend_emb.device)

        # 마스크 사용 여부에 따라 Encoder에 전달
        if self.use_mask == 1:
            gtrend_emb = self.encoder(gtrend_emb, input_mask)   # [T, B, D] (마스크 적용)
        else:
            gtrend_emb = self.encoder(gtrend_emb)               # [T, B, D] (전체 어텐션)

        # 반환: 시퀀스 우선 텐서 [T, B, D]
        return gtrend_emb

In [None]:
class TransformerDecoderLayer(nn.Module):
    """커스텀 트랜스포머 디코더 레이어
    - 구성: (1) 디코더 자기어텐션 → (2) 인코더-디코더(크로스) 어텐션 → (3) 위치별 FFN
    - 각 블록마다: 잔차 연결(Residual) + LayerNorm + Dropout
    - 기본 텐서 형상(기본값 batch_first=False 가정):
        tgt    : [T_tgt, B, D]   (디코더 입력/이전 단계 출력)
        memory : [S_src, B, D]   (인코더 출력)
    """
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu"):
        super(TransformerDecoderLayer, self).__init__()

        # (1) 디코더 자기어텐션: 디코더의 현재 토큰들이 자기 자신 시퀀스를 참조
        #  - 오토리그레시브(미래 차단)로 쓰려면 forward에서 tgt_mask(혹은 causal) 전달
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        # (2) 인코더-디코더(크로스) 어텐션:
        #  - 디코더가 인코더의 정보를 끌어와 현재 토큰을 더 정확히 예측
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        # (3) 위치별 피드포워드 네트워크(FFN): 각 시점의 피처를 비선형 변환
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        # 정규화 및 드롭아웃 (각 서브레이어 뒤에 사용)
        self.norm1 = nn.LayerNorm(d_model)  # self-attn 뒤
        self.norm2 = nn.LayerNorm(d_model)  # cross-attn 뒤
        self.norm3 = nn.LayerNorm(d_model)  # FFN 뒤
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

        # 활성함수 (필요시 GELU 등으로 교체 가능)
        self.activation = F.relu

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None,
            memory_key_padding_mask=None, tgt_is_causal=None, memory_is_causal=None):
        """
        인자 설명:
          - tgt : 디코더 입력/이전 디코더 레이어 출력  (shape: [T_tgt, B, D])
          - memory : 인코더 출력(소스 시퀀스 인코딩 결과) (shape: [S_src, B, D])
          - tgt_mask : 디코더 자기어텐션용 attn mask (예: 미래 차단용 causal mask)
          - memory_mask : 인코더-디코더 어텐션용 mask (특정 소스 위치 차단 등)
          - *_key_padding_mask : 패딩 토큰(True=무시) 가리기 위한 마스크 (batch 차원 기준)

        반환:
          - tgt : 현재 디코더 레이어의 출력 (shape: [T_tgt, B, D])
          - attn_weights : 크로스 어텐션 가중치(디버깅/가시화 용)
        """

        # ---------------------------
        # (A) 디코더 자기어텐션 블록
        #  - Query/Key/Value 모두 tgt
        #  - 일반적으로 tgt_mask로 미래 토큰 차단(causal)하거나,
        #    tgt_key_padding_mask로 패딩 위치 무시
        # ---------------------------
        tgt2 = self.self_attn(
            tgt, tgt, tgt,
            attn_mask=tgt_mask,
            key_padding_mask=tgt_key_padding_mask
        )[0]  # 반환: (attn_output, attn_weights); 여기선 출력만 사용
        tgt = tgt + self.dropout1(tgt2)  # 잔차 연결
        tgt = self.norm1(tgt)            # 정규화

        # ---------------------------
        # (B) 인코더-디코더(크로스) 어텐션 블록
        #  - Query=tgt, Key/Value=memory(인코더 출력)
        #  - memory_mask로 특정 소스 위치 차단 가능,
        #    memory_key_padding_mask로 소스 패딩 무시
        # ---------------------------
        tgt2, attn_weights = self.multihead_attn(
            tgt, memory, memory,
            attn_mask=memory_mask,
            key_padding_mask=memory_key_padding_mask
        )
        tgt = tgt + self.dropout2(tgt2)  # 잔차 연결
        tgt = self.norm2(tgt)            # 정규화

        # ---------------------------
        # (C) 위치별 FFN 블록
        #  - 각 시점(feature 벡터)에 독립적으로 적용되는 2층 MLP
        # ---------------------------
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)  # 잔차 연결
        tgt = self.norm3(tgt)            # 정규화

        # 디코더 레이어 출력과, 크로스 어텐션 가중치 반환
        return tgt, attn_weights

print("1단계 인코더 (Dummy + GTrends) 정의 완료")

## 3. 🎯 GTM Step 1 모델

In [None]:
class GTM_Step1(L.LightningModule):
    """1단계: Temporal Features(시간 특성) + Google Trends 데이터만 사용해서 판매 예측"""

    def __init__(self, embedding_dim, hidden_dim, output_dim, num_heads, num_layers,
                 cat_dict, col_dict, fab_dict, trend_len, num_trends, gpu_num,
                 use_encoder_mask=1, autoregressive=False):
        super().__init__()

        # 주요 하이퍼파라미터 저장
        self.hidden_dim = hidden_dim            # 내부 처리 차원 (은닉 벡터 크기)
        self.embedding_dim = embedding_dim      # 임베딩 차원 (시간 특성 임베딩 크기)
        self.output_len = output_dim            # 예측할 시점 개수
        self.use_encoder_mask = use_encoder_mask# GTrends 인코더 마스크 사용 여부
        self.autoregressive = autoregressive    # 자기회귀 예측 모드 여부
        self.gpu_num = gpu_num                  # 사용 GPU 번호 (데이터 이동 시 사용)
        self.save_hyperparameters()             # Lightning에서 하이퍼파라미터 자동 저장

        # (1) 시간 특성 인코더 — 날짜 관련 스칼라(day/week/month/year)를 벡터로 변환
        self.dummy_encoder = DummyEmbedder(embedding_dim)

        # (2) Google Trends 인코더 — 시계열 데이터를 Transformer 인코더로 변환
        #    - output_dim: 예측 길이
        #    - hidden_dim: 내부 처리 차원
        #    - trend_len: 시계열 길이
        #    - num_trends: 트렌드 채널 수
        self.gtrend_encoder = GTrendEmbedder(output_dim, hidden_dim,
                                             use_encoder_mask, trend_len,
                                             num_trends, gpu_num)

        # (3) 시간 특성과 트렌드 정보를 결합하기 전에,
        #     시간 특성 벡터를 은닉 차원 크기로 변환하는 작은 네트워크
        self.feature_fusion = nn.Sequential(
            nn.BatchNorm1d(embedding_dim),      # 배치 정규화 (학습 안정화)
            nn.Linear(embedding_dim, hidden_dim),# 차원 변환
            nn.ReLU(),                          # 비선형 활성화
            nn.Dropout(0.2)                     # 과적합 방지
        )

        # (4) Transformer 디코더 레이어
        #     - Cross-Attention을 사용해 시간 특성(tgt)와 트렌드 인코딩(memory) 연결
        self.decoder_layer = TransformerDecoderLayer(
            d_model=self.hidden_dim,
            nhead=num_heads,
            dim_feedforward=self.hidden_dim * 4,
            dropout=0.1
        )

        # (5) 자기회귀 모드인 경우, 디코더 입력에 Positional Encoding 추가
        if self.autoregressive:
            self.pos_encoder = PositionalEncoding(hidden_dim, max_len=12)

        # (6) 디코더 출력 → 최종 예측값 변환하는 FC 레이어
        #     - 비자가회귀: 한 번에 output_len 길이 예측
        #     - 자기회귀: 한 번에 1 시점만 예측
        self.decoder_fc = nn.Sequential(
            nn.Linear(hidden_dim,
                      self.output_len if not self.autoregressive else 1),
            nn.Dropout(0.2)
        )

In [None]:
    def _generate_square_subsequent_mask(self, size):
        """
        자기회귀(Autoregressive) 예측에서 미래 시점 정보를 보지 않도록 만드는 마스크 생성 함수
        - size: 시퀀스 길이
        - torch.triu(...): 상삼각행렬을 만들어, 현재 시점 이후(미래) 값은 차단
        - transpose(0,1): Transformer 규칙에 맞게 차원 전환
        - masked_fill: 0인 부분(미래 시점)은 -inf로 채워서 attention에서 무시
                       1인 부분(현재 및 과거 시점)은 0.0으로 채워서 attention 허용
        """
        mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, category, color, fabric, temporal_features, gtrends, images):
        """
        모델 순전파(forward) 과정
        category, color, fabric: (현재 단계에서는 사용 안 함)
        temporal_features: 날짜/시간 관련 특성 (day/week/month/year 등)
        gtrends: Google Trends 시계열 데이터
        images: (현재 단계에서는 사용 안 함)
        """

        # (1) 날짜/시간 특성 → 임베딩
        dummy_encoding = self.dummy_encoder(temporal_features)

        # (2) Google Trends 시계열 데이터 → Transformer 인코딩
        gtrend_encoding = self.gtrend_encoder(gtrends)

        # (3) 시간 특성 임베딩을 은닉 차원 크기로 변환 (BatchNorm → Linear → ReLU → Dropout)
        static_feature_fusion = self.feature_fusion(dummy_encoding)

        # (4) 디코더 입력 준비
        #     - Transformer 디코더는 [시퀀스 길이, 배치 크기, 특성 차원] 형태를 기대
        #     - 여기서는 한 시점 정보만 넣으므로 unsqueeze(0)으로 seq_len = 1 추가
        tgt = static_feature_fusion.unsqueeze(0)

        # (5) 디코더의 'memory'는 인코더에서 나온 Google Trends 인코딩 벡터
        memory = gtrend_encoding

        # (6) Transformer 디코더 레이어 통과
        #     - tgt: 시간 특성
        #     - memory: 트렌드 인코딩 (Cross-Attention)
        decoder_out, attn_weights = self.decoder_layer(tgt, memory)

        # (7) 디코더 출력 → 최종 예측값 변환
        forecast = self.decoder_fc(decoder_out)

        # (8) [seq_len, batch, output_len] → [batch, output_len] 형태로 변환
        return forecast.view(-1, self.output_len), attn_weights

In [None]:
    def configure_optimizers(self):
        """
        학습에 사용할 옵티마이저(Optimizer) 설정
        - Adafactor: Adam의 변형으로, 메모리를 절약하고 학습 속도를 높이는 최적화 알고리즘
        - scale_parameter, relative_step, warmup_init: 학습률 자동 조정 옵션
        - lr=None: 학습률은 Adafactor 내부에서 자동 결정
        """
        optimizer = Adafactor(self.parameters(), scale_parameter=True, relative_step=True, warmup_init=True, lr=None)
        return optimizer

    def training_step(self, batch, batch_idx):
        """
        학습 단계에서 한 배치(batch)를 처리하는 함수
        - batch: (item_sales, category, color, fabric, temporal_features, gtrends, images) 형태
        - batch_idx: 현재 배치 번호
        """

        # 배치에서 각 데이터 꺼내기
        item_sales, category, color, fabric, temporal_features, gtrends, images = batch

        # temporal_features와 gtrends가 학습 중에 기울기를 계산할 수 있도록 설정
        temporal_features = temporal_features.requires_grad_(True)
        gtrends = gtrends.requires_grad_(True)

        # 모델을 통과시켜 예측값(forecasted_sales) 구하기
        forecasted_sales, _ = self.forward(category, color, fabric, temporal_features, gtrends, images)

        # 예측값과 실제 판매량(item_sales)의 차이를 MSE(평균제곱오차)로 계산
        loss = F.mse_loss(item_sales, forecasted_sales.squeeze())

        # 학습 중 loss를 기록(log) → prog_bar=True이면 진행바에 표시됨
        self.log('train_loss', loss, prog_bar=True)

        # Lightning이 이 값을 기반으로 역전파(Backpropagation) 진행
        return loss

    def validation_step(self, batch, batch_idx):
        """
        검증 단계에서 한 배치(batch)를 처리하는 함수
        - 학습과 달리 loss로 가중치를 업데이트하지 않고, 성능만 확인
        """

        # 배치에서 각 데이터 꺼내기
        item_sales, category, color, fabric, temporal_features, gtrends, images = batch

        # forward()로 예측값 계산
        forecasted_sales, _ = self.forward(category, color, fabric, temporal_features, gtrends, images)

        # validation 결과를 리스트에 저장 (나중에 한 번에 계산하기 위함)
        if not hasattr(self, 'validation_step_outputs'):
            self.validation_step_outputs = []
        self.validation_step_outputs.append((item_sales.squeeze(), forecasted_sales.squeeze()))

        # 현재 배치의 실제값과 예측값 반환 (다음 단계에서 사용 가능)
        return item_sales.squeeze(), forecasted_sales.squeeze()

In [None]:
    def on_validation_epoch_end(self):
        """
        한 번의 검증(epoch)이 끝난 후 성능을 계산하고 기록하는 함수
        - validation_step에서 저장한 모든 배치의 결과를 모아 평균적인 성능 지표를 계산
        """

        # validation_step에서 결과를 저장한 리스트가 있는지 확인
        if hasattr(self, 'validation_step_outputs'):
            val_step_outputs = self.validation_step_outputs  # [(실제값, 예측값), ...] 형태

            # 리스트에서 실제 판매량(item_sales)과 예측값(forecasted_sales)만 각각 추출
            item_sales = [x[0] for x in val_step_outputs]
            forecasted_sales = [x[1] for x in val_step_outputs]

            # 리스트를 하나의 텐서로 합치기 (stack)
            item_sales = torch.stack(item_sales)
            forecasted_sales = torch.stack(forecasted_sales)

            # 예측과 실제값을 원래 스케일로 복원 (학습 시 정규화했으므로 1065 곱함)
            rescaled_item_sales = item_sales * 1065
            rescaled_forecasted_sales = forecasted_sales * 1065

            # MSE(평균제곱오차) 계산 - 정규화된 값 기준
            loss = F.mse_loss(item_sales, forecasted_sales.squeeze())

            # MAE(평균절대오차) 계산 - 원래 스케일 기준
            mae = F.l1_loss(rescaled_item_sales, rescaled_forecasted_sales)

            # 성능 지표 기록 (진행바에도 표시)
            self.log('val_mae', mae, prog_bar=True)   # 예측 정확도
            self.log('val_loss', loss, prog_bar=True) # 손실값

            # 다음 epoch을 위해 결과 리스트 초기화
            self.validation_step_outputs.clear()

## 4. 📊 데이터셋 클래스

In [None]:
class ZeroShotDataset():
    """
    판매량 예측을 위한 데이터셋 클래스
    - Google Trends, 카테고리/색상/원단 정보, 이미지 데이터를 묶어서 모델에 전달할 수 있게 전처리함
    """

    def __init__(self, data_df, img_root, gtrends, cat_dict, col_dict, fab_dict, trend_len):
        # 초기 설정 (데이터와 필요한 매핑 정보 저장)
        self.data_df = data_df               # 상품 정보가 담긴 데이터프레임
        self.gtrends = gtrends               # Google Trends 데이터
        self.cat_dict = cat_dict             # 카테고리 → 숫자 매핑
        self.col_dict = col_dict             # 색상 → 숫자 매핑
        self.fab_dict = fab_dict             # 원단 → 숫자 매핑
        self.trend_len = trend_len           # Google Trends 시계열 길이
        self.img_root = img_root             # 이미지 파일이 저장된 폴더 경로

    def __len__(self):
        # 전체 데이터 개수 반환 (len(dataset) 할 때 호출됨)
        return len(self.data_df)

    def __getitem__(self, idx):
        # 특정 인덱스의 데이터 한 줄을 반환
        return self.data_df.iloc[idx, :]

    def preprocess_data(self):
        """
        원본 데이터를 모델이 학습하기 좋은 형태로 변환하는 함수
        1) Google Trends 데이터를 추출 & 정규화
        2) 이미지를 불러와서 텐서로 변환
        3) 카테고리, 색상, 원단을 숫자로 변환
        4) 텐서 형태의 학습용 데이터셋으로 반환
        """

        data = self.data_df
        gtrends, image_features = [], []

        # 이미지 변환 (크기 조정, 텐서 변환, 정규화)
        img_transforms = Compose([
            Resize((256, 256)),
            ToTensor(),
            Normalize(mean=[0.485, 0.456, 0.406],  # 이미지 색상 평균
                      std=[0.229, 0.224, 0.225])  # 이미지 색상 표준편차
        ])

        # 데이터프레임의 각 행(row)을 하나씩 처리
        for (idx, row) in tqdm(data.iterrows(), total=len(data), ascii=True, desc="데이터 전처리"):
            cat, col, fab, fiq_attr, start_date, img_path = \
                row['category'], row['color'], row['fabric'], row['extra'], \
                row['release_date'], row['image_path']

            # Google Trends 데이터: 출시일 기준 1년(52주) 전부터 데이터 가져오기
            gtrend_start = start_date - pd.DateOffset(weeks=52)
            cat_gtrend = self.gtrends.loc[gtrend_start:start_date][cat][-52:].values[:self.trend_len]
            col_gtrend = self.gtrends.loc[gtrend_start:start_date][col][-52:].values[:self.trend_len]
            fab_gtrend = self.gtrends.loc[gtrend_start:start_date][fab][-52:].values[:self.trend_len]

            # Min-Max 정규화 (0~1 범위로)
            cat_gtrend = MinMaxScaler().fit_transform(cat_gtrend.reshape(-1,1)).flatten()
            col_gtrend = MinMaxScaler().fit_transform(col_gtrend.reshape(-1,1)).flatten()
            fab_gtrend = MinMaxScaler().fit_transform(fab_gtrend.reshape(-1,1)).flatten()

            # 3개의 트렌드 데이터를 하나로 합치기
            multitrends = np.vstack([cat_gtrend, col_gtrend, fab_gtrend])

            # 이미지 불러오기 (RGB 변환)
            img = Image.open(os.path.join(self.img_root, img_path)).convert('RGB')

            # 리스트에 저장
            gtrends.append(multitrends)
            image_features.append(img_transforms(img))

        # 리스트를 numpy 배열로 변환
        gtrends = np.array(gtrends)

        # 필요 없는 컬럼 삭제
        data = data.copy()
        data.drop(['external_code', 'season', 'release_date', 'image_path'], axis=1, inplace=True)

        # 텐서 형태로 변환
        item_sales = torch.FloatTensor(data.iloc[:, :12].values)     # 판매량
        temporal_features = torch.FloatTensor(data.iloc[:, 13:17].values)  # 시간 관련 특성

        # 카테고리/색상/원단 → 숫자 ID 변환
        categories = [self.cat_dict[val] for val in data.category.values]
        colors = [self.col_dict[val] for val in data.color.values]
        fabrics = [self.fab_dict[val] for val in data.fabric.values]

        categories = torch.LongTensor(categories)
        colors = torch.LongTensor(colors)
        fabrics = torch.LongTensor(fabrics)

        gtrends = torch.FloatTensor(gtrends)        # Google Trends 데이터
        images = torch.stack(image_features)        # 이미지 데이터

        # 학습에 사용할 TensorDataset 반환
        return TensorDataset(item_sales, categories, colors, fabrics, temporal_features, gtrends, images)

    def get_loader(self, batch_size, train=True):
        """
        DataLoader 생성 함수
        - batch_size 크기로 데이터를 나눠서 모델에 공급
        - 학습 모드일 때는 데이터 순서를 섞음(shuffle=True)
        """
        print(' 1단계 데이터셋 생성 시작...')
        data_with_gtrends = self.preprocess_data()
        if train:
            data_loader = DataLoader(data_with_gtrends, batch_size=batch_size, shuffle=True, num_workers=2)
        else:
            data_loader = DataLoader(data_with_gtrends, batch_size=1, shuffle=False, num_workers=2)
        print(' 1단계 데이터셋 생성 완료')
        return data_loader

print(" 데이터셋 클래스 정의 완료")

## 5. 🚀 1단계 실행 코드
### 데이터 로딩부터 모델 훈련까지

In [None]:
# 데이터셋 경로 설정
dataset_path = Path('/content/drive/MyDrive/GTM-dataset-small/')

# 데이터 로딩
print(" 데이터 로딩 중...")
train_df = pd.read_csv(dataset_path / 'train.csv', parse_dates=['release_date'])
test_df = pd.read_csv(dataset_path / 'test.csv', parse_dates=['release_date'])
gtrends = pd.read_csv(dataset_path / 'gtrends.csv', index_col=[0], parse_dates=True)

cat_dict = torch.load(dataset_path / 'category_labels.pt', weights_only=False)
col_dict = torch.load(dataset_path / 'color_labels.pt', weights_only=False)
fab_dict = torch.load(dataset_path / 'fabric_labels.pt', weights_only=False)

print(f" 훈련 데이터: {len(train_df):,}개")
print(f" 테스트 데이터: {len(test_df):,}개")
print(f" Google Trends: {len(gtrends):,}개 시점")

In [None]:
# 데이터셋 생성
train_dataset = ZeroShotDataset(train_df, dataset_path / 'images', gtrends, cat_dict, col_dict, fab_dict, trend_len=52)
test_dataset = ZeroShotDataset(test_df, dataset_path / 'images', gtrends, cat_dict, col_dict, fab_dict, trend_len=52)

BATCH_SIZE = 8 if torch.cuda.is_available() else 4
train_loader = train_dataset.get_loader(batch_size=BATCH_SIZE, train=True)
test_loader = test_dataset.get_loader(batch_size=1, train=False)

print(f" 배치 크기: {BATCH_SIZE}")
print(f" 훈련 배치 수: {len(train_loader)}")
print(f" 테스트 배치 수: {len(test_loader)}")

In [None]:
# 1단계 모델 생성
print(" GTM Step 1 모델 생성 중...")

model = GTM_Step1(
    embedding_dim=32,
    hidden_dim=64,
    output_dim=12,
    num_heads=4,
    num_layers=1,
    cat_dict=cat_dict,
    col_dict=col_dict,
    fab_dict=fab_dict,
    trend_len=52,
    num_trends=3,
    gpu_num=0,
    use_encoder_mask=1,
    autoregressive=False
)

print(f" Step 1 모델 생성 완료!")
print(f" 모델 파라미터: {sum(p.numel() for p in model.parameters()):,}")
print("\n 사용 모달리티: Temporal Features (시간 정보) + Google Trends")

In [None]:
# Trainer 설정 및 훈련
from lightning.pytorch.callbacks import ModelCheckpoint
from lightning.pytorch.loggers import CSVLogger

EPOCHS = 5
ACCELERATOR = 'gpu' if torch.cuda.is_available() else 'cpu'

checkpoint_callback = ModelCheckpoint(
    dirpath='./checkpoints/',
    filename='gtm-step1-{epoch:02d}-{val_mae:.2f}',
    monitor='val_mae',
    mode='min',
    save_top_k=2
)

csv_logger = CSVLogger(save_dir='./logs/', name='gtm_step1')

trainer = L.Trainer(
    devices=1,
    accelerator=ACCELERATOR,
    max_epochs=EPOCHS,
    logger=csv_logger,
    callbacks=[checkpoint_callback],
    enable_progress_bar=True,
    gradient_clip_val=1.0
)

print("🚀 GTM Step 1 훈련 시작!")
print("=" * 50)

try:
    trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=test_loader)
    print("\n Step 1 훈련 완료!")
    print(f"💾 최고 모델: {checkpoint_callback.best_model_path}")

except Exception as e:
    print(f"\n Step 1 훈련 실패: {e}")
    import traceback
    traceback.print_exc()

## 📋 1단계 요약

### ✅ 구현 완료
- **시간적 특성 임베딩**: 날짜 정보 (일, 주, 월, 년)를 벡터로 변환
- **Google Trends 인코딩**: Transformer Encoder로 시계열 패턴 학습
- **기본 Cross-Attention**: 시간 정보와 트렌드 데이터 간 관계 학습

### 🎯 학습 목표 달성
- Transformer 기본 구조 이해
- 시계열 데이터 인코딩 방법
- Multi-modal 입력 처리 기초

### 🔜 다음 단계 예고
**Step 2**에서는 **이미지 정보**를 추가하여 시각적 특성도 함께 학습합니다!