In [22]:
from tqdm import tqdm
from pathlib import Path
import pandas as pd
import copy

import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split

device = "cuda" if torch.cuda.is_available() else "cpu"

In [23]:
# Default paths
ROOT = Path("dataset") # Root dataset directory

PATH_NO_LABEL = ROOT / "spotify_songs.csv"
PATH_INT_LABEL = ROOT / "spotify_songs_with_genre_int.csv"
# PATH_ONE_HOT_LABEL = ROOT / "filename.csv"

In [24]:
int_label_df = pd.read_csv(PATH_INT_LABEL)
int_label_df.head()

Unnamed: 0,track_id,track_name,track_artist,track_popularity,track_album_id,track_album_name,track_album_release_date,playlist_name,playlist_id,playlist_genre,...,loudness,mode,speechiness,acousticness,instrumentalness,liveness,valence,tempo,duration_ms,genre_int
0,6f807x0ima9a1j3VPbc7VN,I Don't Care (with Justin Bieber) - Loud Luxur...,Ed Sheeran,66,2oCs0DGTsRO98Gh5ZSl2Cx,I Don't Care (with Justin Bieber) [Loud Luxury...,2019-06-14,Pop Remix,37i9dQZF1DXcZDD7cfEKhW,pop,...,-2.634,1,0.0583,0.102,0.0,0.0653,0.518,122.036,194754,0
1,0r7CVbZTWZgbTCYdfa2P31,Memories - Dillon Francis Remix,Maroon 5,67,63rPSO264uRjW1X5E6cWv6,Memories (Dillon Francis Remix),2019-12-13,Pop Remix,37i9dQZF1DXcZDD7cfEKhW,pop,...,-4.969,1,0.0373,0.0724,0.00421,0.357,0.693,99.972,162600,0
2,1z1Hg7Vb0AhHDiEmnDE79l,All the Time - Don Diablo Remix,Zara Larsson,70,1HoSmj2eLcsrR0vE9gThr4,All the Time (Don Diablo Remix),2019-07-05,Pop Remix,37i9dQZF1DXcZDD7cfEKhW,pop,...,-3.432,0,0.0742,0.0794,2.3e-05,0.11,0.613,124.008,176616,0
3,75FpbthrwQmzHlBJLuGdC7,Call You Mine - Keanu Silva Remix,The Chainsmokers,60,1nqYsOef1yKKuGOVchbsk6,Call You Mine - The Remixes,2019-07-19,Pop Remix,37i9dQZF1DXcZDD7cfEKhW,pop,...,-3.778,1,0.102,0.0287,9e-06,0.204,0.277,121.956,169093,0
4,1e8PAfcKUYoKkxPhrHqw4x,Someone You Loved - Future Humans Remix,Lewis Capaldi,69,7m7vv9wlQ4i0LFuJiE2zsQ,Someone You Loved (Future Humans Remix),2019-03-05,Pop Remix,37i9dQZF1DXcZDD7cfEKhW,pop,...,-4.672,1,0.0359,0.0803,0.0,0.0833,0.725,123.976,189052,0


In [25]:
# from sklearn.preprocessing import StandardScaler
# import numpy as np

# # feature_col 변수가 아직 정의되지 않았을 때도 안전하게 동작하도록 컬럼 리스트를 직접 사용
# features = ["track_popularity", "danceability", "energy", "key", "loudness", "mode", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "duration_ms"]

# # 1) 라벨 진단 및 0-based 재매핑 (CrossEntropy는 0-based 정수 라벨 필요)
# print('genre_int dtype, min, max:', int_label_df['genre_int'].dtype, int_label_df['genre_int'].min(), int_label_df['genre_int'].max())
# print('Label distribution:\n', int_label_df['genre_int'].value_counts())
# if int_label_df['genre_int'].min() != 0 or int_label_df['genre_int'].nunique() != (int_label_df['genre_int'].max() + 1):
#     int_label_df['genre_int'], uniques = pd.factorize(int_label_df['genre_int'])
#     print('Remapped labels to 0..C-1. #original labels:', len(uniques))

# # 2) Feature 스케일링: duration_ms 같은 큰 값 때문에 초기 logits/손실이 매우 큼
# scaler = StandardScaler()
# int_label_df[features] = scaler.fit_transform(int_label_df[features])
# print('Feature means (post-scale):', np.round(int_label_df[features].mean().values,3))
# print('Feature stds  (post-scale):', np.round(int_label_df[features].std().values,3))

In [26]:
from sklearn.preprocessing import StandardScaler
import numpy as np

features = ["track_popularity","danceability","energy","key","loudness","mode",
            "speechiness","acousticness","instrumentalness","liveness","valence",
            "tempo","duration_ms"]

# --- (A) log1p 적용: 비음수 & 고왜도 컬럼 ---
log_cols = ["duration_ms", "instrumentalness", "liveness", "speechiness", "acousticness"]
for c in log_cols:
    # 결측/음수 방어 (이 컬럼들은 보통 0~1 or 양수지만 안전망)
    x = int_label_df[c].to_numpy()
    x = np.where(x < 0, 0.0, x)       # 음수면 0으로 클립
    int_label_df[c] = np.log1p(x)     # log(1+x)

# (참고) loudness는 dB(음수 포함) → log 변환 하지 말고 그냥 스케일만
# key(0~11), mode(0/1)는 범주/이진 → 그대로 두거나 필요시 원-핫 (지금은 그대로)

# --- (B) 라벨 0-based 보장 ---
print('genre_int dtype, min, max:', int_label_df['genre_int'].dtype, int_label_df['genre_int'].min(), int_label_df['genre_int'].max())
print('Label distribution:\n', int_label_df['genre_int'].value_counts())
if int_label_df['genre_int'].min() != 0 or int_label_df['genre_int'].nunique() != int_label_df['genre_int'].max()+1:
    int_label_df['genre_int'], uniques = pd.factorize(int_label_df['genre_int'])
    print('Remapped labels to 0..C-1. #original labels:', len(uniques))

# --- (C) 스케일링 ---
scaler = StandardScaler()
int_label_df[features] = scaler.fit_transform(int_label_df[features])
print('Feature means (post-scale):', np.round(int_label_df[features].mean().values, 3))
print('Feature stds  (post-scale):',  np.round(int_label_df[features].std().values, 3))


genre_int dtype, min, max: int64 0 5
Label distribution:
 genre_int
5    6043
1    5746
0    5507
3    5431
4    5155
2    4951
Name: count, dtype: int64
Feature means (post-scale): [-0.  0.  0. -0.  0. -0. -0. -0.  0. -0.  0.  0. -0.]
Feature stds  (post-scale): [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]


In [27]:
feature_col = ["track_popularity", "danceability", "energy", "key", "loudness", "mode", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "duration_ms"]
label_col = ["genre_int"]

In [28]:
from sklearn.metrics import accuracy_score, f1_score

def evaluate(model, dataloader, device="cpu"):
    """
    Evaluate a classification model on a given dataset.

    Args:
        model (torch.nn.Module): The classification model to evaluate.
        dataloader (DataLoader): DataLoader providing batches of {"X": features, "y": labels}.
        device (str, optional): Device to run evaluation on ("cpu" or "cuda"). Default is "cpu".

    Returns:
        dict: A dictionary containing:
            - "accuracy": Overall accuracy of predictions.
            - "f1_macro": Macro-averaged F1 score across all classes.
    """
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch in dataloader:
            X = batch["X"].to(device)
            y = batch["y"].to(device)
            logits = model(X)
            preds = torch.argmax(logits, dim=1)
            all_preds.extend(preds.cpu().tolist())
            all_labels.extend(y.cpu().tolist())

    acc = accuracy_score(all_labels, all_preds)
    f1_macro = f1_score(all_labels, all_preds, average="macro", zero_division=0)

    return {"accuracy": acc, "f1_macro": f1_macro}

In [29]:
# === MLP based Classifier ===

import torch.nn as nn

class Classifier(nn.Module):
    def __init__(self, in_dim, hidden=128, num_classes=6):
        super().__init__()
        self.fc1 = nn.Linear(in_dim, hidden)
        self.bn1 = nn.BatchNorm1d(hidden)
        self.fc2 = nn.Linear(hidden, num_classes)
        # 가중치 초기화로 초기 출력 폭을 줄임
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.zeros_(self.fc1.bias)
        nn.init.xavier_uniform_(self.fc2.weight)
        nn.init.zeros_(self.fc2.bias)
        
    def forward(self, x):
        x = F.relu(self.bn1(self.fc1(x)))
        x = F.dropout(x, p=0.2, training=self.training)
        return self.fc2(x)


In [30]:
class ProductDataset(Dataset):
    def __init__(self, df, feature_cols=feature_col, label_col=label_col):
        self.X = torch.tensor(df[feature_cols].to_numpy(dtype="float32"))
        # label_col은 리스트이므로 2D가 될 수 있음 -> 1D로 변환
        self.y = torch.tensor(df[label_col].to_numpy().ravel(), dtype=torch.long)
        
    def __len__(self): 
        return len(self.y)
    
    def __getitem__(self, idx): 
        return {"X": self.X[idx], "y": self.y[idx]}

In [31]:
# === Prepare test dataset and data loader ===
dataset = ProductDataset(
    df=int_label_df,
    feature_cols=feature_col,
    label_col=label_col,
)

# === Split training dataset into train/validation sets (50:25:25) ===
val_ratio, test_ratio = 0.25, 0.25
val_size  = int(len(dataset) * val_ratio)
test_size  = int(len(dataset) * test_ratio)
train_size = len(dataset) - val_size - test_size

g = torch.Generator().manual_seed(42)
train_split, val_split, test_split = random_split(dataset, [train_size, val_size, test_size], generator=g)

# === Create DataLoaders for training and validation ===
train_loader = DataLoader(train_split, batch_size=32, shuffle=True, drop_last=True)
val_loader   = DataLoader(val_split,   batch_size=64, shuffle=False, drop_last=True)
test_loader  = DataLoader(test_split, batch_size=64, shuffle=False, drop_last=True)

In [32]:
# === Initialize model and optimizer ===
in_dim = next(iter(train_loader))["X"].shape[1]  # 13
num_classes = int(int_label_df['genre_int'].nunique())  # 정확한 라벨 개수
model = Classifier(in_dim=in_dim, hidden=256, num_classes=num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [33]:
def print_eval_result(metrics: dict, stage="val", is_improved=False):
    """
    Print evaluation results (accuracy, F1-macro).
    
    Args:
        metrics: dict with keys 'accuracy' and 'f1_macro'
        stage: string label (e.g., "val", "test")
        is_improved: mark with '*' if results improved
    """
    star = " *" if is_improved else ""
    print(f"[{stage.upper():4}] Acc: {metrics['accuracy']:.4f} | "
          f"F1-macro: {metrics['f1_macro']:.4f}{star}")

In [34]:
# b = next(iter(train_loader))
# print("X:", b["X"].shape, b["X"].mean().item(), b["X"].std().item())
# print("y range:", int(torch.min(b["y"])), int(torch.max(b["y"])))

# c = next(iter(val_loader))
# print("X:", c["X"].shape, c["X"].mean().item(), c["X"].std().item())
# print("y range:", int(torch.min(c["y"])), int(torch.max(c["y"])))

In [35]:
# === Training loop with validation, test evaluation, and early stopping ===

EPOCHS = 100

best_score = 0
wait_time = 6
cnt = 0
best_model_state = copy.deepcopy(model.state_dict())

train_losses, val_acc_list, test_acc_list = [], [], []

for epoch in range(1, EPOCHS + 1):
    # --- Training phase ---
    model.train()
    total_loss = 0.0

    for batch in tqdm(train_loader, desc=f"Epoch {epoch}"):
        X, y = batch["X"].to(device), batch["y"].to(device)
        # if y.dim() == 2 and y.size(1) == 1:
        #     y = y.squeeze(1)          # (B,1) → (B,)
        logits = model(X)
        loss = F.cross_entropy(logits, y, label_smoothing=0.05)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / max(1, len(train_loader))
    train_losses.append(avg_loss)
    print(f"[Epoch {epoch}] Train Loss: {avg_loss:.4f}")

    test_result = evaluate(model, test_loader, device=device)
    test_acc = float(test_result.get("accuracy", 0.0))
    test_acc_list.append(test_acc)
    print_eval_result(test_result, stage="test")
    
    val_result = evaluate(model, val_loader, device=device)
    val_acc = float(val_result.get("accuracy", 0.0))
    val_acc_list.append(val_acc)
    print_eval_result(val_result, stage="validation")
    print()

    if val_acc > best_score:
        best_score = val_acc
        best_model_state = copy.deepcopy(model.state_dict())
        cnt = 0
    else:
        cnt += 1
        if cnt >= wait_time:
            print(f"No imporvement for {cnt} Epochs")
            break

Epoch 1: 100%|██████████| 513/513 [00:02<00:00, 191.13it/s]


[Epoch 1] Train Loss: 1.5095
[TEST] Acc: 0.5033 | F1-macro: 0.4906
[VALIDATION] Acc: 0.5004 | F1-macro: 0.4875



Epoch 2: 100%|██████████| 513/513 [00:02<00:00, 189.40it/s]


[Epoch 2] Train Loss: 1.4209
[TEST] Acc: 0.5127 | F1-macro: 0.5006
[VALIDATION] Acc: 0.5024 | F1-macro: 0.4897



Epoch 3: 100%|██████████| 513/513 [00:02<00:00, 246.26it/s]


[Epoch 3] Train Loss: 1.3858
[TEST] Acc: 0.5142 | F1-macro: 0.5022
[VALIDATION] Acc: 0.5106 | F1-macro: 0.4990



Epoch 4: 100%|██████████| 513/513 [00:02<00:00, 191.77it/s]


[Epoch 4] Train Loss: 1.3731
[TEST] Acc: 0.5170 | F1-macro: 0.5071
[VALIDATION] Acc: 0.5166 | F1-macro: 0.5059



Epoch 5: 100%|██████████| 513/513 [00:01<00:00, 272.40it/s]


[Epoch 5] Train Loss: 1.3610
[TEST] Acc: 0.5089 | F1-macro: 0.5060
[VALIDATION] Acc: 0.5127 | F1-macro: 0.5090



Epoch 6: 100%|██████████| 513/513 [00:01<00:00, 269.95it/s]


[Epoch 6] Train Loss: 1.3559
[TEST] Acc: 0.5194 | F1-macro: 0.5120
[VALIDATION] Acc: 0.5186 | F1-macro: 0.5095



Epoch 7: 100%|██████████| 513/513 [00:02<00:00, 248.38it/s]


[Epoch 7] Train Loss: 1.3536
[TEST] Acc: 0.5199 | F1-macro: 0.5122
[VALIDATION] Acc: 0.5215 | F1-macro: 0.5130



Epoch 8: 100%|██████████| 513/513 [00:01<00:00, 270.30it/s]


[Epoch 8] Train Loss: 1.3502
[TEST] Acc: 0.5225 | F1-macro: 0.5115
[VALIDATION] Acc: 0.5192 | F1-macro: 0.5090



Epoch 9: 100%|██████████| 513/513 [00:02<00:00, 218.40it/s]


[Epoch 9] Train Loss: 1.3418
[TEST] Acc: 0.5253 | F1-macro: 0.5160
[VALIDATION] Acc: 0.5244 | F1-macro: 0.5148



Epoch 10: 100%|██████████| 513/513 [00:01<00:00, 260.51it/s]


[Epoch 10] Train Loss: 1.3457
[TEST] Acc: 0.5209 | F1-macro: 0.5155
[VALIDATION] Acc: 0.5192 | F1-macro: 0.5126



Epoch 11: 100%|██████████| 513/513 [00:02<00:00, 220.03it/s]


[Epoch 11] Train Loss: 1.3404
[TEST] Acc: 0.5227 | F1-macro: 0.5112
[VALIDATION] Acc: 0.5190 | F1-macro: 0.5071



Epoch 12: 100%|██████████| 513/513 [00:02<00:00, 196.32it/s]


[Epoch 12] Train Loss: 1.3423
[TEST] Acc: 0.5281 | F1-macro: 0.5166
[VALIDATION] Acc: 0.5308 | F1-macro: 0.5187



Epoch 13: 100%|██████████| 513/513 [00:02<00:00, 203.15it/s]


[Epoch 13] Train Loss: 1.3340
[TEST] Acc: 0.5289 | F1-macro: 0.5178
[VALIDATION] Acc: 0.5281 | F1-macro: 0.5160



Epoch 14: 100%|██████████| 513/513 [00:01<00:00, 265.32it/s]


[Epoch 14] Train Loss: 1.3350
[TEST] Acc: 0.5284 | F1-macro: 0.5155
[VALIDATION] Acc: 0.5292 | F1-macro: 0.5159



Epoch 15: 100%|██████████| 513/513 [00:02<00:00, 198.38it/s]


[Epoch 15] Train Loss: 1.3299
[TEST] Acc: 0.5281 | F1-macro: 0.5172
[VALIDATION] Acc: 0.5275 | F1-macro: 0.5160



Epoch 16: 100%|██████████| 513/513 [00:02<00:00, 253.39it/s]


[Epoch 16] Train Loss: 1.3305
[TEST] Acc: 0.5245 | F1-macro: 0.5186
[VALIDATION] Acc: 0.5282 | F1-macro: 0.5208



Epoch 17: 100%|██████████| 513/513 [00:02<00:00, 249.26it/s]


[Epoch 17] Train Loss: 1.3290
[TEST] Acc: 0.5250 | F1-macro: 0.5113
[VALIDATION] Acc: 0.5254 | F1-macro: 0.5107



Epoch 18: 100%|██████████| 513/513 [00:02<00:00, 213.05it/s]


[Epoch 18] Train Loss: 1.3304
[TEST] Acc: 0.5330 | F1-macro: 0.5251
[VALIDATION] Acc: 0.5342 | F1-macro: 0.5266



Epoch 19: 100%|██████████| 513/513 [00:02<00:00, 191.88it/s]


[Epoch 19] Train Loss: 1.3272
[TEST] Acc: 0.5253 | F1-macro: 0.5195
[VALIDATION] Acc: 0.5294 | F1-macro: 0.5237



Epoch 20: 100%|██████████| 513/513 [00:01<00:00, 276.20it/s]


[Epoch 20] Train Loss: 1.3249
[TEST] Acc: 0.5264 | F1-macro: 0.5204
[VALIDATION] Acc: 0.5286 | F1-macro: 0.5217



Epoch 21: 100%|██████████| 513/513 [00:02<00:00, 193.42it/s]


[Epoch 21] Train Loss: 1.3247
[TEST] Acc: 0.5287 | F1-macro: 0.5206
[VALIDATION] Acc: 0.5294 | F1-macro: 0.5201



Epoch 22: 100%|██████████| 513/513 [00:02<00:00, 214.40it/s]


[Epoch 22] Train Loss: 1.3260
[TEST] Acc: 0.5312 | F1-macro: 0.5217
[VALIDATION] Acc: 0.5323 | F1-macro: 0.5220



Epoch 23: 100%|██████████| 513/513 [00:02<00:00, 193.31it/s]


[Epoch 23] Train Loss: 1.3209
[TEST] Acc: 0.5244 | F1-macro: 0.5065
[VALIDATION] Acc: 0.5326 | F1-macro: 0.5138



Epoch 24: 100%|██████████| 513/513 [00:02<00:00, 222.22it/s]


[Epoch 24] Train Loss: 1.3196
[TEST] Acc: 0.5277 | F1-macro: 0.5157
[VALIDATION] Acc: 0.5292 | F1-macro: 0.5158

No imporvement for 6 Epochs


In [36]:
# === Load the best model and evaluate on the test set ===
model.load_state_dict(best_model_state) 

final_test_result = evaluate(model, test_loader, device=device)
print_eval_result(final_test_result, stage="final_test")

[FINAL_TEST] Acc: 0.5330 | F1-macro: 0.5251


In [37]:
# 스포티파이 api 이용해서 뭔가 해보려고 했으나 policy update로 인해 무산

# import pprint
# from spotifyyy import *

# genre_dict = {0: 'pop', 1: 'rap', 2: 'rock', 3: 'r&b', 4: 'latin', 5: 'edm'}
# query = input()

# respond = get_track_by_search(query)
# pprint.pprint(respond)

In [38]:
# 그래서 걍 랜덤으로 한 행 뽑아서 테스트 해봄

genre_dict = {0: 'pop', 1: 'rap', 2: 'rock', 3: 'r&b', 4: 'latin', 5: 'edm'}
row = int_label_df.sample(n=1, random_state=1).iloc[0]

song_name   = row["track_name"]
artist_name = row["track_artist"]
answer      = row.get("playlist_genre", None)

# === 스케일러에 feature 이름을 유지해 전달 ===
x_df = pd.DataFrame([row[features]], columns=features)
x_np = scaler.transform(x_df)[0]

# === 모델 추론 ===
model.eval(); model.to(device)
x = torch.tensor(x_np, dtype=torch.float32).unsqueeze(0).to(device)

with torch.no_grad():
    logits = model(x)
    probs  = F.softmax(logits, dim=1)[0]
    pred_id = int(torch.argmax(probs).item())
    pred_conf = float(probs[pred_id].item())

pred_genre = genre_dict[pred_id]

print(f"song: {song_name} | artist: {artist_name}")
if answer is not None:
    print(f"answer: {answer} | pred: {pred_genre} | conf: {pred_conf:.4f}")
else:
    print(f"pred: {pred_genre} | conf: {pred_conf:.4f}")

# 확률 Top-k 장르들 보기
k = 3
top_p, top_i = torch.topk(probs, k)
print("top-k:", [(genre_dict[int(i)], float(p)) for p, i in zip(top_p, top_i)])

song: Poetic Justice | artist: Kendrick Lamar
answer: rap | pred: rap | conf: 1.0000
top-k: [('rap', 1.0), ('r&b', 6.376916217298856e-10), ('edm', 3.5217558686579975e-17)]
