In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset, Dataset
from sklearn.preprocessing import StandardScaler

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
train_seoul = pd.read_csv('/content/drive/MyDrive/train_seoul.csv', encoding='utf-8')
test_seoul = pd.read_csv('/content/drive/MyDrive/test_seoul.csv', encoding='utf-8')

In [None]:
from re import X
target_var = "관측미세먼지"

X_train = train_seoul.drop(columns=[target_var]).values
Y_train = train_seoul[target_var].values

X_test = test_seoul.drop(columns=[target_var]).values
Y_test = test_seoul[target_var].values

scaler = StandardScaler()

columns_to_scale_train = np.hstack((X_train[:, :-3], X_train[:, -1:]))
columns_to_scale_test = np.hstack((X_test[:, :-3], X_test[:, -1:]))
X_train_scaled = scaler.fit_transform(columns_to_scale_train)
X_test_scaled = scaler.transform(columns_to_scale_test)

X_train = np.hstack((X_train_scaled, X_train[:, -3:-2], X_train[:, -2:-1]))
X_test = np.hstack((X_test_scaled, X_test[:, -3:-2], X_test[:, -2:-1]))

train_columns = list(train_seoul.columns[:-4]) + [train_seoul.columns[-1], train_seoul.columns[-4], train_seoul.columns[-3]]

scaled_train_seoul = pd.DataFrame(X_train, columns=train_columns)
scaled_train_seoul[target_var] = Y_train
scaled_test_seoul = pd.DataFrame(X_test, columns=train_columns)
scaled_test_seoul[target_var] = Y_test

In [None]:
def create_sequences(data, sequence_length=1440):
    sequences = []
    labels = []

    # "경과일"과 "경과시간"을 분 단위로 변환
    data['total_minutes'] = data['경과일'] * 1440 + data['경과시간']

    start_index = 0

    while start_index < len(data):
        start_time = data.iloc[start_index]['total_minutes']
        end_time = start_time + sequence_length

        end_index = start_index
        while end_index < len(data) and data.iloc[end_index]['total_minutes'] < end_time:
            end_index += 1

        subset = data.iloc[start_index:end_index]
        if len(subset) > 0:
            sequence = subset.drop(columns=['경과일', '경과시간', 'total_minutes', '관측미세먼지']).values

            # 시퀀스가 너무 길면 자르기
            if len(sequence) > sequence_length:
                sequence = sequence[:sequence_length]
            else:
                # 시퀀스 길이를 일정하게 만들기 위해 padding 적용
                padded_sequence = np.zeros((sequence_length, sequence.shape[1]))
                padded_sequence[:sequence.shape[0], :] = sequence
                sequence = padded_sequence

            sequences.append(sequence)
            # 라벨 생성: 마지막 "관측미세먼지" 값을 라벨로 사용
            label = 1 if subset[target_var].iloc[-1] > 80 else 0  # 라벨을 이진 분류 문제로 설정
            labels.append(label)

        start_index = end_index

    return np.array(sequences), np.array(labels)

In [None]:
# 시계열 데이터 생성
X_train_seq, y_train_seq = create_sequences(scaled_train_seoul)
X_test_seq, y_test_seq = create_sequences(scaled_test_seoul)

In [None]:
class TimeSeriesDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

train_dataset = TimeSeriesDataset(X_train_seq, y_train_seq)
test_dataset = TimeSeriesDataset(X_test_seq, y_test_seq)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
class TransformerEncoderModel(nn.Module):
    def __init__(self, input_dim, num_classes, nhead, num_encoder_layers, dim_feedforward, dropout):
        super(TransformerEncoderModel, self).__init__()
        encoder_layers = nn.TransformerEncoderLayer(d_model=input_dim, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_encoder_layers)
        self.fc = nn.Linear(input_dim, num_classes)

    def forward(self, src):
        src = src.permute(1, 0, 2)  # Transformer expects (seq_len, batch_size, input_dim)
        output = self.transformer_encoder(src)
        output = output.mean(dim=0)  # Average over the sequence length
        output = self.fc(output)
        return output

input_dim = 28  # input_dim을 28로 설정
num_classes = 2
nhead = 4
num_encoder_layers = 3
dim_feedforward = 128
dropout = 0.1

model = TransformerEncoderModel(input_dim, num_classes, nhead, num_encoder_layers, dim_feedforward, dropout)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


# cross validation

In [None]:
from sklearn.model_selection import KFold

# K-fold Cross Validation을 위한 KFold 객체 생성
kf = KFold(n_splits=5, shuffle=True, random_state=42)

# 각 폴드에서의 평가 지표 저장을 위한 리스트 선언
accuracies = []

for fold, (train_index, val_index) in enumerate(kf.split(X_train_seq, y_train_seq)):
    print(f'Fold [{fold + 1}/{kf.get_n_splits()}]')

    # 해당 폴드를 위한 학습 및 검증 데이터 분할
    X_fold_train, X_fold_val = X_train_seq[train_index], X_train_seq[val_index]
    y_fold_train, y_fold_val = y_train_seq[train_index], y_train_seq[val_index]

    # DataLoader 정의
    fold_train_dataset = TimeSeriesDataset(X_fold_train, y_fold_train)
    fold_val_dataset = TimeSeriesDataset(X_fold_val, y_fold_val)
    fold_train_loader = DataLoader(fold_train_dataset, batch_size=32, shuffle=True)
    fold_val_loader = DataLoader(fold_val_dataset, batch_size=32, shuffle=False)

    # 모델 초기화
    model = TransformerEncoderModel(input_dim, num_classes, nhead, num_encoder_layers, dim_feedforward, dropout)

    # 옵티마이저 재설정
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    num_epochs = 2
    # Training loop
    model.train()
    for epoch in range(num_epochs):
        for X_batch, y_batch in fold_train_loader:
            optimizer.zero_grad()
            output = model(X_batch)
            loss = criterion(output, y_batch)
            loss.backward()
            optimizer.step()

    # Test loop
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, y_batch in fold_val_loader:
            output = model(X_batch)
            _, predicted = torch.max(output.data, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

    fold_accuracy = 100 * correct / total
    accuracies.append(fold_accuracy)
    print(f'Fold [{fold + 1}/{kf.get_n_splits()}], Accuracy: {fold_accuracy:.2f}%')

# K-fold Cross Validation의 평균 Accuracy 계산
avg_accuracy = np.mean(accuracies)
print(f'Average Accuracy: {avg_accuracy:.2f}%')
