In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import math
from sklearn.metrics import r2_score


In [2]:

# ──────────────────────────────────────────────────────────────────────────────
# 1) 데이터셋 로드 & 분할
# ──────────────────────────────────────────────────────────────────────────────
raw = torch.load('/content/drive/MyDrive/DEAM_dataset_cleaned.pt')

# dict with keys 'spectrograms', 'annotations', 'song_ids'
clean_specs = raw['spectrograms']   # Tensor list or Tensor of shape (N,1,128,60)
clean_anns  = raw['annotations']    # Tensor list or Tensor of shape (N,60,2)
clean_ids   = raw['song_ids']       # list or Tensor of shape (N,)

# make sure they're all the same length N
N = len(clean_ids)

# build unified list of dicts
data_list = []
for i in range(N):
    song_id = int(clean_ids[i])

    spec    = clean_specs[i]   # Tensor[1,128,60]
    ann     = clean_anns[i]    # Tensor[60,2]
    data_list.append({'song_id': song_id, 'spec': spec, 'ann': ann})


In [21]:

# ──────────────────────────────────────────────────────────────────────────────
# 2) Train/Val/Test 데이터 분할 (정규화 포함)
# ──────────────────────────────────────────────────────────────────────────────

# Step 1: song_id를 기준으로 데이터 아이템을 train/val과 test용으로 먼저 분리합니다.
train_val_ids = {d['song_id'] for d in data_list if d['song_id'] <= 2000}
test_ids      = {d['song_id'] for d in data_list if d['song_id']  > 2000}

train_val_items = [d for d in data_list if d['song_id'] in train_val_ids]
test_items      = [d for d in data_list if d['song_id'] in test_ids]

# Step 2: train_val_items를 train과 validation용으로 다시 분할합니다.
#         random_split을 사용하기 위해 임시 Dataset을 만듭니다.
class TempDataset(Dataset):
    def __init__(self, items): self.items = items
    def __len__(self): return len(self.items)
    def __getitem__(self, idx): return self.items[idx]

val_ratio = 0.1
temp_ds = TempDataset(train_val_items)
val_size = int(len(temp_ds) * val_ratio)
train_size = len(temp_ds) - val_size

train_subset, val_subset = random_split(
    temp_ds, [train_size, val_size], generator=torch.Generator().manual_seed(42)
)

# 분할된 subset에서 실제 데이터 아이템 리스트를 다시 추출합니다.
train_items = [item for item in train_subset]
val_items   = [item for item in val_subset]


# Step 3: ★★★ 오직 Train 데이터셋의 스펙트로그램으로만 평균과 표준편차를 계산합니다. ★★★
print("학습 데이터셋에서 정규화 통계치 계산 중...")
train_specs = torch.cat([d['spec'] for d in train_items], dim=0) # 모든 스펙트로그램을 배치 차원으로 연결
mean = train_specs.mean()
std = train_specs.std()

print(f"  - 계산된 Mean: {mean:.4f}")
print(f"  - 계산된 Std Dev: {std:.4f}")
# 이 값을 저장해두면 나중에 추론 시에도 사용할 수 있습니다.
torch.save({'mean': mean, 'std': std}, 'norm_stats.pt')


# Step 4: 계산된 통계치(mean, std)를 모든 데이터에 적용합니다.
#         (메모리 절약을 위해 in-place로 연산)
def normalize_items(items, mean, std):
    for item in items:
        item['spec'] = (item['spec'] - mean) / (std + 1e-8) # 0으로 나누는 것을 방지
    return items

train_items = normalize_items(train_items, mean, std)
val_items   = normalize_items(val_items, mean, std)
test_items  = normalize_items(test_items, mean, std)
print("모든 데이터셋(Train/Val/Test)에 정규화를 적용했습니다.")


# Step 5: 최종 PyTorch Dataset을 생성합니다.
class DEAMDataset(Dataset):
    def __init__(self, items):
        self.samples = items
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        # 데이터가 이미 정규화되었으므로 그대로 반환
        return self.samples[idx]['spec'], self.samples[idx]['ann']

train_ds = DEAMDataset(train_items)
val_ds   = DEAMDataset(val_items)
test_ds  = DEAMDataset(test_items)

print(f"\n데이터셋 준비 완료:")
print(f"  - Train: {len(train_ds)} 샘플")
print(f"  - Validation: {len(val_ds)} 샘플")
print(f"  - Test: {len(test_ds)} 샘플")


학습 데이터셋에서 정규화 통계치 계산 중...
  - 계산된 Mean: -0.0000
  - 계산된 Std Dev: 1.0000
모든 데이터셋(Train/Val/Test)에 정규화를 적용했습니다.

데이터셋 준비 완료:
  - Train: 1570 샘플
  - Validation: 174 샘플
  - Test: 58 샘플


In [13]:
# ──────────────────────────────────────────────────────────────────────────────
# 3) DataLoader
# ──────────────────────────────────────────────────────────────────────────────
batch_size  = 16
num_workers = 2

train_loader = DataLoader(train_ds,   batch_size=batch_size, shuffle=True,  num_workers=num_workers)
val_loader   = DataLoader(val_ds,     batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_loader  = DataLoader(test_ds,    batch_size=batch_size, shuffle=False, num_workers=num_workers)


In [14]:
def safe_collate(batch):
    # batch: list of (spec,ann)
    specs, anns = zip(*batch)
    # 각 원소를 반드시 clone() 하여 독립 스토리지 확보
    specs = [torch.as_tensor(s, dtype=torch.float32).clone() for s in specs]
    anns  = [torch.as_tensor(a, dtype=torch.float32).clone() for a in anns]
    return torch.stack(specs, 0), torch.stack(anns, 0)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,
                          num_workers=4, collate_fn=safe_collate)



In [15]:

# ──────────────────────────────────────────────────────────────────────────────
# 1) Concordance Correlation Coefficient 정의
# ──────────────────────────────────────────────────────────────────────────────
def concordance_correlation_coefficient(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    true_mean = np.mean(y_true)
    pred_mean = np.mean(y_pred)
    covariance = np.mean((y_true - true_mean) * (y_pred - pred_mean))
    true_var = np.var(y_true)
    pred_var = np.var(y_pred)
    return 2 * covariance / (true_var + pred_var + (true_mean - pred_mean)**2 + 1e-8)

In [16]:
# ──────────────────────────────────────────────────────────────────────────────
# 4) 모델 정의 (CNN + Transformer)
# ──────────────────────────────────────────────────────────────────────────────
class DetailedEmotionCNN(nn.Module):
    def __init__(self, dropout_cfg=(0.3,0.3,0.4,0.4), negative_slope=0.02):
        super().__init__()
        d1,d2,d3,d4 = dropout_cfg
        nl = negative_slope

        # Block1
        self.block1 = nn.Sequential(
            nn.Conv2d(1,16,3,padding=1), nn.BatchNorm2d(16),
            nn.LeakyReLU(nl,inplace=True),
            nn.Conv2d(16,16,3,padding=1), nn.BatchNorm2d(16),
            nn.LeakyReLU(nl,inplace=True),
            nn.Dropout(d1),
            nn.MaxPool2d((2,1),stride=(2,1)),
        )
        # Block2
        self.block2 = nn.Sequential(
            nn.Conv2d(16,32,3,padding=1), nn.BatchNorm2d(32),
            nn.LeakyReLU(nl,inplace=True),
            nn.Conv2d(32,32,3,padding=1), nn.BatchNorm2d(32),
            nn.LeakyReLU(nl,inplace=True),
            nn.Dropout(d2),
            nn.MaxPool2d((2,1),stride=(2,1)),
        )
        # Block3
        self.block3 = nn.Sequential(
            nn.Conv2d(32,64,3,padding=1), nn.BatchNorm2d(64),
            nn.LeakyReLU(nl,inplace=True),
            nn.Conv2d(64,64,3,padding=1), nn.BatchNorm2d(64),
            nn.LeakyReLU(nl,inplace=True),
            nn.Dropout(d3),
            nn.MaxPool2d((2,1),stride=(2,1)),
        )
        # Block4 + extra conv+pool
        self.block4 = nn.Sequential(
            nn.Conv2d(64,128,3,padding=1), nn.BatchNorm2d(128),
            nn.LeakyReLU(nl,inplace=True),
            nn.Conv2d(128,128,3,padding=1), nn.BatchNorm2d(128),
            nn.LeakyReLU(nl,inplace=True),
            nn.Dropout(d4),
            nn.MaxPool2d((2,1),stride=(2,1)),
            nn.Conv2d(128,128,3,padding=1), nn.BatchNorm2d(128),
            nn.LeakyReLU(nl,inplace=True),
            nn.MaxPool2d((8,1),stride=(8,1)),
        )
        # 마지막 채널 2개로
        self.conv_out = nn.Conv2d(128,2,1)

    def forward(self, x):
        # x: [B,1,128,60]
        x = self.block1(x)   # → [B,16,64,60]
        x = self.block2(x)   # → [B,32,32,60]
        x = self.block3(x)   # → [B,64,16,60]
        x = self.block4(x)   # → [B,128,1,60]
        x = self.conv_out(x) # → [B,2,1,60]
        x = x.squeeze(2)     # → [B,2,60]
        return x.permute(0,2,1)  # → [B,60,2]  (valence/arousal)


class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0,max_len).unsqueeze(1).float()
        div = torch.exp(torch.arange(0,d_model,2).float() * (-math.log(10000.0)/d_model))
        pe[:,0::2] = torch.sin(pos*div)
        pe[:,1::2] = torch.cos(pos*div)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        # x: [B,T,d_model]
        return x + self.pe[:,:x.size(1),:]


class CNNTransformerEmotion(nn.Module):
    def __init__(self,
                 cnn: nn.Module,
                 d_model: int = 128,
                 nhead: int = 8,
                 num_layers: int = 4,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1,
                 seq_len: int = 60):
        super().__init__()
        self.cnn = cnn
        # CNN에서 2채널 → d_model로 매핑
        self.channel_mapper = nn.Conv2d(2, d_model, 1)
        self.pos_encoder   = PositionalEncoding(d_model, max_len=seq_len)
        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout, activation='relu'
        )
        self.transformer = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.regressor   = nn.Linear(d_model, 2)

    def forward(self, x):
        # x: [B,1,128,60]
        feat = self.cnn(x)          # → [B,60,2]
        feat = feat.permute(0,2,1).unsqueeze(-1)   # → [B,2,60,1]
        feat = self.channel_mapper(feat)           # → [B,d_model,60,1]
        seq  = feat.squeeze(-1).permute(0,2,1)     # → [B,60,d_model]
        seq  = self.pos_encoder(seq)               # → [B,60,d_model]
        seq  = self.transformer(seq)               # → [B,60,d_model]
        return torch.tanh(self.regressor(seq))     # → [B,60,2]

In [17]:

# device 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [18]:
# 모델 인스턴스
base_cnn = DetailedEmotionCNN()
model    = CNNTransformerEmotion(base_cnn).to(device)

total_params = sum(p.numel() for p in model.parameters())
trainable   = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total: {total_params:,}개, Trainable: {trainable:,}개")


# ──────────────────────────────────────────────────────────────────────────────
# 5) Optimizer / Scheduler / Loss 설정
# ──────────────────────────────────────────────────────────────────────────────
lr, weight_decay = 1e-3, 1e-5
optimizer   = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
scheduler   = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=4, factor=0.5)
criterion   = nn.MSELoss()
early_stop_patience = 5




Total: 1,236,020개, Trainable: 1,236,020개


In [19]:
# ──────────────────────────────────────────────────────────────────────────────
# 6) TRAIN / VAL / TEST 루프
# ──────────────────────────────────────────────────────────────────────────────
best_val_loss      = float('inf')
early_stop_counter = 0

for epoch in range(1, 51):
    # -- TRAIN --
    model.train()
    train_loss = 0.0
    for spec, ann in train_loader:
        spec, ann = spec.to(device), ann.to(device)
        optimizer.zero_grad()
        out = model(spec)        # [B,60,2]
        loss = criterion(out, ann)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * spec.size(0)
    train_loss /= len(train_loader.dataset)

    # -- VALIDATION --
    model.eval()
    val_loss = 0.0
    all_preds, all_trues = [], []
    with torch.no_grad():
        for spec, ann in val_loader:
            spec, ann = spec.to(device), ann.to(device)
            out = model(spec)
            val_loss += criterion(out, ann).item() * spec.size(0)
            all_preds.append(out.cpu().numpy().reshape(-1,2))
            all_trues.append(ann.cpu().numpy().reshape(-1,2))
    val_loss /= len(val_loader.dataset)

    preds = np.vstack(all_preds)
    trues = np.vstack(all_trues)
    # 지표 계산
    mse     = np.mean((preds-trues)**2)
    mae     = np.mean(np.abs(preds-trues))
    r_val   = np.corrcoef(trues[:,0],preds[:,0])[0,1]
    r_aro   = np.corrcoef(trues[:,1],preds[:,1])[0,1]
    r2_val  = r2_score(trues[:,0],preds[:,0])
    r2_aro  = r2_score(trues[:,1],preds[:,1])
    ccc_val = concordance_correlation_coefficient(trues[:,0],preds[:,0])
    ccc_aro = concordance_correlation_coefficient(trues[:,1],preds[:,1])

    # Scheduler & EarlyStopping
    scheduler.step(val_loss)
    if val_loss < best_val_loss:
        best_val_loss, best_epoch = val_loss, epoch
        early_stop_counter = 0
        torch.save(model.state_dict(), 'best_model.pt')
    else:
        early_stop_counter += 1

    lr_current = optimizer.param_groups[0]['lr']
    print(f"[{epoch:02d}] Train:{train_loss:.4f} | Val:{val_loss:.4f} | LR:{lr_current:.2e}")
    print(f"    MSE:{mse:.4f}, MAE:{mae:.4f}, Corr(V/A):{r_val:.4f}/{r_aro:.4f}, "
          f"R2(V/A):{r2_val:.4f}/{r2_aro:.4f}, CCC(V/A):{ccc_val:.4f}/{ccc_aro:.4f}")

    if early_stop_counter >= early_stop_patience:
        print(f"Early stopping @ epoch {epoch} (Best Val {best_val_loss:.4f} @ {best_epoch})")
        break

print(f"\n== Training completed. Best Val Loss: {best_val_loss:.4f} @ epoch {best_epoch} ==")





[01] Train:0.1892 | Val:0.0624 | LR:1.00e-03
    MSE:0.0624, MAE:0.2019, Corr(V/A):0.3813/0.7358, R2(V/A):0.0612/0.1688, CCC(V/A):0.0809/0.3137




[02] Train:0.0528 | Val:0.0547 | LR:1.00e-03
    MSE:0.0547, MAE:0.1863, Corr(V/A):0.4041/0.7232, R2(V/A):0.1283/0.3081, CCC(V/A):0.1704/0.4579




[03] Train:0.0519 | Val:0.0612 | LR:1.00e-03
    MSE:0.0612, MAE:0.1905, Corr(V/A):0.4241/0.7502, R2(V/A):0.0240/0.2267, CCC(V/A):0.2203/0.5306




[04] Train:0.0497 | Val:0.0440 | LR:1.00e-03
    MSE:0.0440, MAE:0.1681, Corr(V/A):0.4135/0.7707, R2(V/A):0.1657/0.5435, CCC(V/A):0.2597/0.6701




[05] Train:0.0480 | Val:0.0519 | LR:1.00e-03
    MSE:0.0519, MAE:0.1820, Corr(V/A):0.4208/0.7462, R2(V/A):0.0478/0.4367, CCC(V/A):0.1534/0.5265




[06] Train:0.0470 | Val:0.0494 | LR:1.00e-03
    MSE:0.0494, MAE:0.1768, Corr(V/A):0.4064/0.7420, R2(V/A):0.0683/0.4838, CCC(V/A):0.1595/0.5886




[07] Train:0.0451 | Val:0.0524 | LR:1.00e-03
    MSE:0.0524, MAE:0.1822, Corr(V/A):0.4340/0.7334, R2(V/A):0.1614/0.3403, CCC(V/A):0.2195/0.5088




[08] Train:0.0438 | Val:0.0475 | LR:1.00e-03
    MSE:0.0475, MAE:0.1801, Corr(V/A):0.4320/0.7667, R2(V/A):0.0820/0.5203, CCC(V/A):0.1773/0.6499




[09] Train:0.0437 | Val:0.0490 | LR:5.00e-04
    MSE:0.0490, MAE:0.1716, Corr(V/A):0.4043/0.7525, R2(V/A):0.1208/0.4554, CCC(V/A):0.3038/0.6301
Early stopping @ epoch 9 (Best Val 0.0440 @ 4)

== Training completed. Best Val Loss: 0.0440 @ epoch 4 ==


In [None]:

# -- TEST --
model.load_state_dict(torch.load('best_model.pt'))
model.eval()
all_preds, all_trues = [], []
with torch.no_grad():
    for spec, ann in test_loader:
        spec, ann = spec.to(device), ann.to(device)
        out = model(spec)
        all_preds.append(out.cpu().numpy().reshape(-1,2))
        all_trues.append(ann.cpu().numpy().reshape(-1,2))
preds = np.vstack(all_preds)
trues = np.vstack(all_trues)

# 테스트 지표 출력
print("\n== Test Results ==")
print(f"MSE: {np.mean((preds-trues)**2):.4f}, MAE: {np.mean(np.abs(preds-trues)):.4f}")
print(f"Corr(V): {np.corrcoef(trues[:,0],preds[:,0])[0,1]:.4f}, Corr(A): {np.corrcoef(trues[:,1],preds[:,1])[0,1]:.4f}")
print(f"R2(V/A): {r2_score(trues[:,0],preds[:,0]):.4f}/{r2_score(trues[:,1],preds[:,1]):.4f}")
print(f"CCC(V/A): {concordance_correlation_coefficient(trues[:,0],preds[:,0]):.4f}/"
      f"{concordance_correlation_coefficient(trues[:,1],preds[:,1]):.4f}")

In [26]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from sklearn.metrics import r2_score

# 이전에 정의된 concordance_correlation_coefficient 함수가 필요합니다.
def concordance_correlation_coefficient(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    true_mean = np.mean(y_true)
    pred_mean = np.mean(y_pred)
    covariance = np.mean((y_true - true_mean) * (y_pred - pred_mean))
    true_var = np.var(y_true)
    pred_var = np.var(y_pred)
    return 2 * covariance / (true_var + pred_var + (true_mean - pred_mean)**2 + 1e-8)


# ──────────────────────────────────────────────────────────────────────────────
# 1) 모델 로드 및 평가 준비
# ──────────────────────────────────────────────────────────────────────────────
model.load_state_dict(torch.load('best_model.pt'))
model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

all_song_preds = []
all_song_trues = []

print("곡 단위 테스트 데이터 예측을 시작합니다...")

# ──────────────────────────────────────────────────────────────────────────────
# 2) DataLoader 대신 test_items 리스트를 직접 순회하며 곡 단위로 예측
# ──────────────────────────────────────────────────────────────────────────────
for song_data in test_items:
    # ★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★
    # [수정된 부분 1]
    # .squeeze(0)를 제거하여 채널 차원을 보존합니다.
    # spec_tensor의 모양은 [1, 128, T_variable]가 됩니다. (1이 채널 차원)
    # ★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★
    spec_tensor = song_data['spec']
    ann_tensor = song_data['ann']

    T = spec_tensor.shape[-1]
    windows = []
    for start in range(0, T, 60):
        # ★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★
        # [수정된 부분 2]
        # 3D 텐서를 슬라이싱하므로 win의 모양은 [1, 128, 60]이 됩니다.
        # ★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★
        win = spec_tensor[:, :, start:start+60]

        if win.size(-1) < 60:
            pad_size = 60 - win.size(-1)
            win = torch.nn.functional.pad(win, (0, pad_size), "constant", 0)

        windows.append(win)

    if not windows:
        continue

    # ★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★
    # [수정된 부분 3]
    # `windows` 리스트에는 [1, 128, 60] 모양의 3D 텐서들이 들어있습니다.
    # torch.cat으로 dim=0에 대해 합치면 [num_windows, 1, 128, 60] 모양의
    # 올바른 4D 텐서가 됩니다.
    # ★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★★
    batch = torch.cat(windows, dim=0).to(device)

    with torch.no_grad():
        out = model(batch)

    preds_padded = out.cpu().numpy().reshape(-1, 2)
    preds_song = preds_padded[:T]

    all_song_preds.append(preds_song)
    all_song_trues.append(ann_tensor.cpu().numpy())


# ──────────────────────────────────────────────────────────────────────────────
# 3) 전체 테스트셋에 대한 평가 지표 계산 및 출력 (이전과 동일)
# ──────────────────────────────────────────────────────────────────────────────
if all_song_preds:
    preds_flat = np.vstack(all_song_preds)
    trues_flat = np.vstack(all_song_trues)

    mse = np.mean((preds_flat - trues_flat) ** 2)
    mae = np.mean(np.abs(preds_flat - trues_flat))
    r2_val = r2_score(trues_flat[:, 0], preds_flat[:, 0])
    r2_aro = r2_score(trues_flat[:, 1], preds_flat[:, 1])
    ccc_val = concordance_correlation_coefficient(trues_flat[:, 0], preds_flat[:, 0])
    ccc_aro = concordance_correlation_coefficient(trues_flat[:, 1], preds_flat[:, 1])

    print("\n== 전체 Test Set 결과 ==")
    print(f"  MSE: {mse:.4f}, MAE: {mae:.4f}")

곡 단위 테스트 데이터 예측을 시작합니다...


RuntimeError: Given groups=1, weight of size [16, 1, 3, 3], expected input[1, 10, 128, 60] to have 1 channels, but got 10 channels instead