In [None]:
!pip install mne

Collecting mne
  Downloading mne-1.9.0-py3-none-any.whl.metadata (20 kB)
Downloading mne-1.9.0-py3-none-any.whl (7.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m71.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mne
Successfully installed mne-1.9.0


### 데이터 만들기 끝

## pkl 파일 로드

In [None]:
import torch
import pickle
from sklearn.model_selection import train_test_split

data = torch.load('/content/drive/MyDrive/Colab Notebooks/sleep_classification_checkpoint.pkl', map_location='cpu')


FileNotFoundError: [Errno 2] No such file or directory: '/content/drive/MyDrive/Colab Notebooks/sleep_classification_checkpoint.pkl'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 여기서부터 진짜 시작

In [None]:
!pip install scikit-learn

import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

import pickle



## 모델 정의

In [None]:
import torch
import torch.nn as nn
from einops import rearrange, repeat

# DSConv Block
class DSConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.depthwise = nn.Conv1d(in_ch, in_ch, kernel_size=3, padding=1, groups=in_ch)
        self.pointwise = nn.Conv1d(in_ch, out_ch, kernel_size=1)
        self.bn = nn.BatchNorm1d(out_ch)
        self.act = nn.ReLU()

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        return self.act(self.bn(x))

class CNNFeatureExtractor(nn.Module):
    def __init__(self, in_ch=1, base_ch=32):
        super().__init__()
        self.layers = nn.Sequential(
            DSConvBlock(in_ch, base_ch),
            DSConvBlock(base_ch, base_ch * 2),
            DSConvBlock(base_ch * 2, base_ch * 4)
        )

    def forward(self, x):
        return self.layers(x)  # (B, C, L)

# Lite Transformer Encoder
class LiteTransformer(nn.Module):
    def __init__(self, d_model=64, heads=4):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim=d_model, num_heads=heads, batch_first=True)
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x):
        pos_embed = self._positional_encoding(x.size(1), x.size(2), x.device)
        x = x + pos_embed
        attn_out, _ = self.attn(x, x, x)
        return self.norm(x + attn_out)

    def _positional_encoding(self, seq_len, dim, device):
        pos = torch.arange(seq_len, dtype=torch.float32, device=device).unsqueeze(1)
        i = torch.arange(0, dim, 2, dtype=torch.float32, device=device)
        angle_rates = 1 / torch.pow(10000, (i / dim))
        angle_rads = pos * angle_rates

        pe = torch.zeros(seq_len, dim, device=device)
        pe[:, 0::2] = torch.sin(angle_rads)
        pe[:, 1::2] = torch.cos(angle_rads)
        return pe.unsqueeze(0)  # (1, seq_len, dim)


# Cross-Modal Attention
class CrossModalTransformer(nn.Module):
    def __init__(self, dim=64, heads=4):
        super().__init__()
        self.cross_attn = nn.MultiheadAttention(embed_dim=dim, num_heads=heads, batch_first=True)
        self.norm = nn.LayerNorm(dim)

    def forward(self, query, context):
        out, _ = self.cross_attn(query, context, context)
        return self.norm(out + query)

# Temporal Attention Pooling 적용
class TemporalAttentionPooling(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.q = nn.Parameter(torch.randn(1, 1, dim))
        self.attn = nn.Linear(dim, 1)

    def forward(self, x):  # x: (B, L, D)
        score = torch.softmax(self.attn(x), dim=1)  # (B, L, 1)
        pooled = (x * score).sum(dim=1)             # (B, D)
        return pooled, score

# 전체 모델 구성
class CrossModalSleepNet(nn.Module):
    def __init__(self, cnn_dim=64, transformer_dim=64, num_classes=5):
        super().__init__()
        self.eeg_cnn = CNNFeatureExtractor(1, cnn_dim)
        self.eog_cnn = CNNFeatureExtractor(1, cnn_dim)

        self.project = nn.Linear(cnn_dim * 4, transformer_dim)

        self.lite_transformer = LiteTransformer(transformer_dim)
        self.cross_transformer = CrossModalTransformer(transformer_dim)
        self.temporal_pool = TemporalAttentionPooling(transformer_dim)

        self.classifier = nn.Linear(transformer_dim, num_classes)

    def forward(self, x):  # x: (B, 2, T)
        eeg, eog = x[:, 0:1, :], x[:, 1:2, :]
        eeg_feat = self.eeg_cnn(eeg)  # (B, C, L)
        eog_feat = self.eog_cnn(eog)

        eeg_feat = rearrange(eeg_feat, 'b c l -> b l c')
        eog_feat = rearrange(eog_feat, 'b c l -> b l c')

        eeg_feat = self.project(eeg_feat)
        eog_feat = self.project(eog_feat)

        eeg_feat = self.lite_transformer(eeg_feat)
        fused_feat = self.cross_transformer(eeg_feat, eog_feat)

        pooled, attn_weights = self.temporal_pool(fused_feat)
        out = self.classifier(pooled)

        return out, attn_weights

## 데이터 합치기

In [None]:
import os
import pickle
import torch
import numpy as np

# 병합할 pkl 경로 설정
pkl_dir = "/content/drive/MyDrive/sleep_segments"
file_list = [os.path.join(pkl_dir, f"sleep_data_part{i}.pkl") for i in range(1, 7)]

all_segments, all_labels = [], []

print("🔄 피클 파일 병합 중...")
for file in file_list:
    with open(file, 'rb') as f:
        segments, labels = pickle.load(f)
        for seg in segments:
            tensor_seg = torch.tensor(seg, dtype=torch.float32)
            if tensor_seg.ndim == 3:  # (2, 60, 50)
                tensor_seg = tensor_seg.view(2, -1)  # (2, 3000)
            all_segments.append(tensor_seg)
        all_labels.extend(labels)

all_segments = torch.stack(all_segments)  # (N, 2, T)
all_labels = torch.tensor(all_labels, dtype=torch.long)

save_path = "/content/drive/MyDrive/sleep_segments/merged_sleep_dataset.pt"
torch.save((all_segments, all_labels), save_path)
print(f"✅ 병합 완료! 저장 위치: {save_path}")

🔄 피클 파일 병합 중...


  tensor_seg = torch.tensor(seg, dtype=torch.float32)


✅ 병합 완료! 저장 위치: /content/drive/MyDrive/sleep_segments/merged_sleep_dataset.pt


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader, Subset
import numpy as np

class SleepTensorDataset(Dataset):
    def __init__(self, data_tensor, label_tensor):
        self.data = data_tensor
        self.labels = label_tensor

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# 불러오기
merged_path = "/content/drive/MyDrive/sleep_segments/merged_sleep_dataset.pt"
print("🔄 병합된 데이터 불러오는 중...")
all_data, all_labels = torch.load(merged_path)
print(f"✅ 불러오기 완료: 총 샘플 = {len(all_data)}")

# Train/Test Split
indices = np.arange(len(all_data))
np.random.seed(42)
np.random.shuffle(indices)
split = int(len(indices) * 0.8)
train_idx, test_idx = indices[:split], indices[split:]

train_dataset = SleepTensorDataset(all_data[train_idx], all_labels[train_idx])
test_dataset  = SleepTensorDataset(all_data[test_idx], all_labels[test_idx])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"✅ DataLoader 생성 완료: Train = {len(train_dataset)} / Test = {len(test_dataset)}")

🔄 병합된 데이터 불러오는 중...
✅ 불러오기 완료: 총 샘플 = 326557
✅ DataLoader 생성 완료: Train = 261245 / Test = 65312


## 학습 파이프라인

## 학습/평가 정의

In [None]:
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss, correct, total = 0, 0, 0
    for x, y in tqdm(dataloader, desc="Train"):
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        output, _ = model(x)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()

        preds = output.argmax(1)
        total_loss += loss.item()
        correct += (preds == y).sum().item()
        total += y.size(0)

    acc = correct / total * 100
    return total_loss / len(dataloader), acc

In [None]:
from sklearn.metrics import classification_report

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss, correct, total = 0, 0, 0
    all_preds, all_labels = [], []

    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            output, _ = model(x)
            loss = criterion(output, y)
            total_loss += loss.item()

            preds = output.argmax(dim=1)
            correct += (preds == y).sum().item()
            total += y.size(0)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    # 전체 성능
    acc = 100. * correct / total
    report = classification_report(all_labels, all_preds, output_dict=True, zero_division=0)

    macro_f1 = report['macro avg']['f1-score']
    precision = report['macro avg']['precision']
    recall = report['macro avg']['recall']

    # REM class = 4 기준 성능
    rem_acc = report.get("4", {}).get("recall", 0.0)
    rem_f1 = report.get("4", {}).get("f1-score", 0.0)

    return total_loss / len(dataloader), acc, macro_f1, precision, recall, rem_acc, rem_f1

In [None]:
#pt 불러오기
x_tensor, y_tensor = torch.load('/content/drive/MyDrive/sleep_segments/merged_sleep_dataset.pt')

from torch.utils.data import TensorDataset, random_split, DataLoader

dataset = TensorDataset(x_tensor, y_tensor)
train_len = int(len(dataset) * 0.8)
train_set, test_set = random_split(dataset, [train_len, len(dataset) - train_len])

train_loader = DataLoader(train_set, batch_size=16, shuffle=True)
test_loader = DataLoader(test_set, batch_size=16, shuffle=False)

print(f"✅ DataLoader 생성 완료: Train = {len(train_set)}개 / Test = {len(test_set)}개")

✅ DataLoader 생성 완료: Train = 261245개 / Test = 65312개


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from tqdm import tqdm
from sklearn.utils.class_weight import compute_class_weight

In [None]:
# ✅ Class weights 계산 및 손실 함수 정의
y_train = [label.item() for _, label in train_set]
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)
criterion = nn.CrossEntropyLoss(weight=weights_tensor)

In [None]:
# 모델, 손실 함수, 옵티마이저 정의
model = CrossModalSleepNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# 학습 기록
train_losses, val_losses, val_accuracies = [], [], []
all_f1, all_precision, all_recall = [], [], []

# EarlyStopping 설정
best_val_loss = float('inf')
patience = 3
patience_counter = 0
best_model_state = None

for epoch in range(1, 21):  # ⏱️ Epoch 20까지
    print(f"\n🌀 Epoch {epoch}")

    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    print(f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.2f}%")

    val_loss, val_acc, f1, prec, rec, rem_acc, rem_f1 = evaluate(model, test_loader, criterion, device)
    print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}% | F1: {f1:.3f} | Precision: {prec:.3f} | Recall: {rec:.3f}")
    print(f"REM Class ▶ Accuracy (Recall): {rem_acc:.3f} | F1 Score: {rem_f1:.3f}")


    # 기록 저장
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)
    all_f1.append(f1)
    all_precision.append(prec)
    all_recall.append(rec)

    # 🛑 EarlyStopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        best_model_state = model.state_dict()
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"\n⏹️ Early stopping at epoch {epoch}. Best Val Loss: {best_val_loss:.4f}")
            break

# 가장 성능 좋았던 모델 불러오기
if best_model_state is not None:
    model.load_state_dict(best_model_state)

print("\n📊 최종 모델 평가 요약:")
print(f"Average Validation Accuracy: {np.mean(val_accuracies):.2f}%")
print(f"Average Macro F1 Score: {np.mean(all_f1):.4f}")
print(f"Average Precision: {np.mean(all_precision):.4f}")
print(f"Average Recall: {np.mean(all_recall):.4f}")
print(f"Last Epoch Accuracy: {val_accuracies[-1]:.2f}% | F1: {all_f1[-1]:.4f} | Precision: {all_precision[-1]:.4f} | Recall: {all_recall[-1]:.4f}")
print(f"🔍 REM Class ▶ Accuracy (Recall): {rem_acc:.3f} | F1 Score: {rem_f1:.3f}")


🌀 Epoch 1


Train: 100%|██████████| 16328/16328 [32:01<00:00,  8.50it/s]


Train Loss: 0.7398 | Train Accuracy: 81.72%
Val Loss: 4.0243 | Val Acc: 68.97% | F1: 0.163 | Precision: 0.138 | Recall: 0.200
REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000

🌀 Epoch 2


Train: 100%|██████████| 16328/16328 [32:00<00:00,  8.50it/s]


Train Loss: 0.6016 | Train Accuracy: 85.82%
Val Loss: 3.8612 | Val Acc: 67.93% | F1: 0.163 | Precision: 0.139 | Recall: 0.197
REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000

🌀 Epoch 3


Train: 100%|██████████| 16328/16328 [32:00<00:00,  8.50it/s]


Train Loss: 0.5672 | Train Accuracy: 86.66%
Val Loss: 4.5327 | Val Acc: 68.97% | F1: 0.163 | Precision: 0.138 | Recall: 0.200
REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000

🌀 Epoch 4


Train: 100%|██████████| 16328/16328 [32:00<00:00,  8.50it/s]


Train Loss: 0.5448 | Train Accuracy: 87.07%
Val Loss: 3.2002 | Val Acc: 45.94% | F1: 0.133 | Precision: 0.130 | Recall: 0.143
REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000

🌀 Epoch 5


Train: 100%|██████████| 16328/16328 [32:00<00:00,  8.50it/s]


Train Loss: 0.5318 | Train Accuracy: 87.41%
Val Loss: 3.6661 | Val Acc: 65.04% | F1: 0.162 | Precision: 0.141 | Recall: 0.193
REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000

🌀 Epoch 6


Train: 100%|██████████| 16328/16328 [32:00<00:00,  8.50it/s]


Train Loss: 0.5182 | Train Accuracy: 87.73%
Val Loss: 3.7168 | Val Acc: 49.82% | F1: 0.136 | Precision: 0.126 | Recall: 0.149
REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000

🌀 Epoch 7


Train: 100%|██████████| 16328/16328 [32:01<00:00,  8.50it/s]


Train Loss: 0.5066 | Train Accuracy: 88.09%
Val Loss: 3.9360 | Val Acc: 68.90% | F1: 0.164 | Precision: 0.147 | Recall: 0.200
REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000

⏹️ Early stopping at epoch 7. Best Val Loss: 3.2002

📊 최종 모델 평가 요약:
Average Validation Accuracy: 62.23%
Average Macro F1 Score: 0.1549
Average Precision: 0.1370
Average Recall: 0.1833
Last Epoch Accuracy: 68.90% | F1: 0.1637 | Precision: 0.1474 | Recall: 0.2000
🔍 REM Class ▶ Accuracy (Recall): 0.000 | F1 Score: 0.000


In [None]:
print("\n🧪 테스트 세트 성능 평가:")
test_loss, test_acc, test_f1, test_prec, test_rec, rem_acc, rem_f1 = evaluate(model, test_loader, criterion, device)

print(f"Test Loss: {test_loss:.4f} | Accuracy: {test_acc:.2f}%")
print(f"Macro F1: {test_f1:.4f} | Precision: {test_prec:.4f} | Recall: {test_rec:.4f}")
print(f"REM Accuracy: {rem_acc:.4f} | REM F1: {rem_f1:.4f}")


🧪 테스트 세트 성능 평가:
Test Loss: 3.9360 | Accuracy: 68.90%
Macro F1: 0.1637 | Precision: 0.1474 | Recall: 0.2000
REM Accuracy: 0.0000 | REM F1: 0.0000


In [None]:
import torch
import pickle

# 모델 학습이 끝난 후 (예: best 모델 기준)
model_path_pt = '/content/drive/MyDrive/best_model_trans.pt'
model_path_pkl = '/content/drive/MyDrive/best_model_trans.pkl'

#저장 - PyTorch 공식 방식 (.pt)
torch.save(model.state_dict(), model_path_pt)

# 저장 - pickle 방식 (.pkl)
with open(model_path_pkl, 'wb') as f:
    pickle.dump(model.state_dict(), f)

print("✅ 모델 저장 완료 (.pt, .pkl)")

✅ 모델 저장 완료 (.pt, .pkl)


##평가

In [None]:
# 평가 모드
model.eval()
y_true = []
y_pred = []

with torch.no_grad():
    for x, y in tqdm(test_loader, desc="Evaluating"):
        x = x.to(device)
        out, _ = model(x)
        preds = out.argmax(dim=1).cpu()
        y_pred.extend(preds.tolist())
        y_true.extend(y.tolist())

# ✅ 평가 지표 출력
print("✅ 평가 결과")
print("Accuracy      :", round(accuracy_score(y_true, y_pred) * 100, 2), "%")
print("Macro F1      :", round(f1_score(y_true, y_pred, average="macro") * 100, 2), "%")
print("Macro Precision:", round(precision_score(y_true, y_pred, average="macro") * 100, 2), "%")
print("Macro Recall  :", round(recall_score(y_true, y_pred, average="macro") * 100, 2), "%")

Evaluating: 100%|██████████| 4082/4082 [00:18<00:00, 223.33it/s]
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


✅ 평가 결과
Accuracy      : 68.8 %
Macro F1      : 16.3 %
Macro Precision: 13.77 %
Macro Recall  : 19.99 %
