# Stage 2: Audio Encoder Training & Embedding Extraction

이 노트북은 **Stage 2 오디오 파이프라인**을 구현합니다.
1. **소형 멜-CNN 오디오 인코더**를 학습 (Contrastive Learning)
2. 학습된 인코더로 곡별 오디오 임베딩을 추출하여 `.npz`로 저장

**주요 흐름:**
- Item2Vec 모델과 Mel Spectrogram 파일의 교집합 곡 리스트 생성
- 학습용 서브셋(최대 5만 곡) 샘플링
- Contrastive Learning (InfoNCE Loss) 수행
- 학습된 모델로 임베딩 추출

In [None]:
# [Cell 1] 환경 설정 & 기본 상수 정의

import os
import sys
import json
import random
import glob
import logging
import tarfile
import shutil
from typing import List, Dict, Tuple, Optional, Set

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
from tqdm.auto import tqdm

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def set_seed(seed: int = 42):
    """재현성을 위한 난수 시드 고정"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# --- 상수 및 하이퍼파라미터 정의 ---

# Colab 여부 확인
IS_COLAB = 'google.colab' in sys.modules

# 경로 설정
if IS_COLAB:
    # Colab 환경 경로
    DRIVE_MOUNT_PATH = "/content/drive"
    # 사용자가 말한 "Cola Notebooks\meon-dataset"에 대응
    DRIVE_TAR_DIR = "/content/drive/MyDrive/Cola Notebooks/meon-dataset" 
    LOCAL_MEL_DIR = "/content/mel_data"  # 압축 풀 로컬 경로
    ITEM2VEC_MODEL_PATH = "/content/drive/MyDrive/path/to/v2_item2vec.model" # TODO: 실제 드라이브 경로로 수정 필요
    MELON_TAR_MAP_PATH = "/content/drive/MyDrive/path/to/melon_tar_map.json" # TODO: 실제 드라이브 경로로 수정 필요
    STAGE2_EMB_OUTPUT_PATH = "/content/drive/MyDrive/output/audio_embeddings_stage2.npz"
else:
    # 로컬(Windows) 환경 경로
    ITEM2VEC_MODEL_PATH = r"C:\Users\ASUS\music_recommend\work\models\v2_item2vec.model"
    MELON_TAR_MAP_PATH = r"C:\Users\ASUS\music_recommend\work\melon_tar_map.json"
    LOCAL_MEL_DIR = r"C:\Users\ASUS\data\arena_mel" # 이미 압축 풀려있는 곳 가정
    STAGE2_EMB_OUTPUT_PATH = r"C:\Users\ASUS\music_recommend\work\models\audio_embeddings_stage2.npz"

# 오디오 처리 관련
FIXED_T = 256  # 멜 스펙트로그램 시간축 고정 길이

# 학습 하이퍼파라미터
BATCH_SIZE = 64
NUM_EPOCHS = 10
LEARNING_RATE = 1e-3
EMBED_DIM = 128
TEMPERATURE = 0.1
MAX_TRAIN_SONGS = 50000 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")
logger.info(f"Running in Colab: {IS_COLAB}")
set_seed(42)

In [None]:
# [Cell 2] 데이터 준비: Tar 매핑 로드 & 학습 데이터셋 준비 (Colab용 압축 해제)

from gensim.models import Word2Vec

def prepare_data_and_get_ids(item2vec_path: str, tar_map_path: str, max_songs: int) -> Tuple[List[str], Dict]:
    # 1. Item2Vec 모델 로드
    logger.info(f"Loading Item2Vec model from {item2vec_path}...")
    if not os.path.exists(item2vec_path):
        logger.error(f"Item2Vec model not found at {item2vec_path}")
        return [], {}
        
    try:
        model = Word2Vec.load(item2vec_path)
        cf_song_ids = set(model.wv.key_to_index.keys())
        logger.info(f"Item2Vec vocab size: {len(cf_song_ids)}")
    except Exception as e:
        logger.error(f"Failed to load Item2Vec model: {e}")
        return [], {}

    # 2. Tar Map 로드
    logger.info(f"Loading Tar Map from {tar_map_path}...")
    if not os.path.exists(tar_map_path):
        logger.error(f"Tar map not found at {tar_map_path}")
        return [], {}
        
    with open(tar_map_path, 'r') as f:
        tar_map = json.load(f)
    
    # 3. 학습용 곡 샘플링
    # CF 모델에 있는 곡들 중 실제로 멜론 데이터셋 범위(0~707988)에 들어가는지 확인
    valid_song_ids = []
    for sid in cf_song_ids:
        try:
            folder_id = str(int(sid) // 1000)
            if folder_id in tar_map:
                valid_song_ids.append(sid)
        except:
            continue
            
    logger.info(f"Valid intersection songs: {len(valid_song_ids)}")
    
    valid_song_ids.sort()
    if len(valid_song_ids) > max_songs:
        random.seed(42)
        train_song_ids = random.sample(valid_song_ids, max_songs)
        logger.info(f"Sampled {max_songs} songs for training.")
    else:
        train_song_ids = valid_song_ids
        logger.info(f"Using all {len(train_song_ids)} songs.")

    # 4. (Colab 전용) 필요한 Tar 파일만 복사 및 압축 해제
    if IS_COLAB:
        # 필요한 Tar 파일 식별
        required_tars = set()
        for sid in train_song_ids:
            folder_id = str(int(sid) // 1000)
            if folder_id in tar_map:
                required_tars.add(tar_map[folder_id])
        
        logger.info(f"Required Tar files ({len(required_tars)}): {list(required_tars)[:5]} ...")
        
        # 드라이브 마운트
        if not os.path.exists(DRIVE_MOUNT_PATH):
            from google.colab import drive
            drive.mount(DRIVE_MOUNT_PATH)
            
        os.makedirs(LOCAL_MEL_DIR, exist_ok=True)
        
        for tar_name in tqdm(required_tars, desc="Extracting Tars"):
            # 이미 압축 풀려있는지 체크 (폴더 존재 여부로 간단 확인)
            folder_name = tar_name.replace('.tar', '')
            expected_path = os.path.join(LOCAL_MEL_DIR, folder_name)
            
            if os.path.exists(expected_path):
                continue
                
            tar_path = os.path.join(DRIVE_TAR_DIR, tar_name)
            if os.path.exists(tar_path):
                try:
                    with tarfile.open(tar_path, 'r') as tar:
                        # Tar 파일명으로 폴더를 만들어 그 안에 압축 해제 (구조 유지를 위해)
                        extract_path = os.path.join(LOCAL_MEL_DIR, folder_name)
                        os.makedirs(extract_path, exist_ok=True)
                        tar.extractall(path=extract_path)
                except Exception as e:
                    logger.error(f"Failed to extract {tar_name}: {e}")
            else:
                logger.warning(f"Tar file not found: {tar_path}")

    return train_song_ids, tar_map

# 실행
train_song_ids, tar_map = prepare_data_and_get_ids(ITEM2VEC_MODEL_PATH, MELON_TAR_MAP_PATH, MAX_TRAIN_SONGS)
logger.info(f"Final training song count: {len(train_song_ids)}")

In [None]:
# [Cell 3] 멜 스펙 로더 (디렉토리 구조 반영)

def load_mel_spectrogram(song_id: str, base_dir: str = LOCAL_MEL_DIR, tar_map: Dict = tar_map) -> Optional[np.ndarray]:
    """
    song_id에 해당하는 mel npy 파일을 로드합니다.
    구조: {base_dir}/{tar_folder_name}/arena_mel/{folder_id}/{song_id}.npy
    예: base_dir/arena_mel_0/arena_mel/0/0.npy
    """
    try:
        folder_id = str(int(song_id) // 1000)
        if folder_id not in tar_map:
            return None
            
        tar_name = tar_map[folder_id]
        tar_folder_name = tar_name.replace('.tar', '') # arena_mel_0
        
        # 경로 조립
        path = os.path.join(base_dir, tar_folder_name, "arena_mel", folder_id, f"{song_id}.npy")
        
        if not os.path.exists(path):
            return None
            
        mel = np.load(path)
        
        # (48, T) 확인 및 Transpose
        if mel.shape[0] != 48 and mel.shape[1] == 48:
            mel = mel.T
        return mel
        
    except Exception as e:
        return None

def random_crop(mel: np.ndarray, fixed_t: int = FIXED_T) -> np.ndarray:
    """멜 스펙트로그램을 fixed_t 길이로 랜덤 크롭하거나 패딩합니다."""
    n_freq, n_time = mel.shape
    
    if n_time >= fixed_t:
        start = random.randint(0, n_time - fixed_t)
        return mel[:, start:start+fixed_t]
    else:
        # 패딩 (중앙 정렬)
        pad_total = fixed_t - n_time
        pad_left = pad_total // 2
        pad_right = pad_total - pad_left
        return np.pad(mel, ((0, 0), (pad_left, pad_right)), mode='constant', constant_values=0)

def augment_mel(mel: np.ndarray) -> np.ndarray:
    """간단한 데이터 증강"""
    aug_mel = mel.copy()
    # Noise
    if random.random() < 0.5:
        noise = np.random.normal(0, 0.005, aug_mel.shape)
        aug_mel += noise
    # Time Shift
    if random.random() < 0.5:
        shift = random.randint(-5, 5)
        if shift != 0:
            aug_mel = np.roll(aug_mel, shift, axis=1)
    return aug_mel

def mel_to_tensor(mel: np.ndarray) -> torch.Tensor:
    return torch.from_numpy(mel).unsqueeze(0).float()

In [None]:
# [Cell 4] Contrastive Dataset & DataLoader 정의

class ContrastiveMelDataset(data.Dataset):
    def __init__(self, song_ids: List[str], mel_dir: str = LOCAL_MEL_DIR, tar_map: Dict = None):
        self.song_ids = song_ids
        self.mel_dir = mel_dir
        self.tar_map = tar_map if tar_map is not None else {}
        
    def __len__(self):
        return len(self.song_ids)
    
    def __getitem__(self, index):
        song_id = self.song_ids[index]
        
        mel = load_mel_spectrogram(song_id, self.mel_dir, self.tar_map)
        
        if mel is None:
            mel = np.zeros((48, FIXED_T))
            
        mel1 = random_crop(mel, FIXED_T)
        mel2 = random_crop(mel, FIXED_T)
        
        mel1 = augment_mel(mel1)
        mel2 = augment_mel(mel2)
        
        return {
            "song_id": song_id,
            "anchor": mel_to_tensor(mel1),
            "positive": mel_to_tensor(mel2)
        }

def create_dataloader(song_ids: List[str], tar_map: Dict, batch_size: int = BATCH_SIZE):
    dataset = ContrastiveMelDataset(song_ids, tar_map=tar_map)
    return data.DataLoader(
        dataset, 
        batch_size=batch_size, 
        shuffle=True, 
        num_workers=0, 
        drop_last=True
    )

In [None]:
# [Cell 5] 소형 멜-CNN 인코더 모델 정의

class MelCNNEncoder(nn.Module):
    def __init__(self, embed_dim: int = EMBED_DIM):
        super().__init__()
        
        # Input: (batch, 1, 48, T)
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2), # (32, 24, T/2)
            
            # Block 2
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2), # (64, 12, T/4)
            
            # Block 3
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)  # (128, 6, T/8)
        )
        
        # Global Average Pooling -> (batch, 128, 1, 1)
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        
        # Projection Head
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, embed_dim),
            nn.LayerNorm(embed_dim)
        )
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        x = self.gap(x)
        x = self.fc(x)
        # L2 Normalize (Contrastive Learning에 필수)
        x = F.normalize(x, p=2, dim=-1)
        return x

model = MelCNNEncoder(EMBED_DIM).to(device)
logger.info(model)

In [None]:
# [Cell 6] InfoNCE / SimCLR 스타일 Contrastive Loss 구현

def info_nce_loss(z_i: torch.Tensor, z_j: torch.Tensor, temperature: float = TEMPERATURE) -> torch.Tensor:
    """
    z_i, z_j: (batch_size, embed_dim) 형태의 L2 normalized 임베딩.
    같은 인덱스끼리 positive 쌍, 나머지는 negative로 보는 InfoNCE loss.
    """
    batch_size = z_i.shape[0]
    
    # (2N, D) 형태로 결합
    z = torch.cat([z_i, z_j], dim=0)
    
    # 유사도 행렬 (2N, 2N)
    sim = torch.matmul(z, z.T) / temperature
    
    # 자기 자신과의 유사도(diagonal)는 마스킹 (매우 작은 값으로)
    sim_i_j = torch.diag(sim, batch_size)
    sim_j_i = torch.diag(sim, -batch_size)
    
    # Positive 쌍: (i, i+batch_size) 및 (i+batch_size, i)
    # 이를 위해 라벨을 생성
    # 기준: 각 행(anchor)에 대해 정답 열(positive)의 인덱스
    # 0~N-1 행의 정답은 N~2N-1
    # N~2N-1 행의 정답은 0~N-1
    
    labels = torch.cat([
        torch.arange(batch_size, 2 * batch_size, device=z.device),
        torch.arange(0, batch_size, device=z.device)
    ], dim=0)
    
    # 자기 자신 마스킹 (diagonal에 -inf)
    mask = torch.eye(2 * batch_size, device=z.device).bool()
    sim.masked_fill_(mask, -9e15)
    
    # Cross Entropy Loss
    loss = F.cross_entropy(sim, labels)
    return loss

In [None]:
# [Cell 7] 학습 루프 (Training Loop)

def train_model():
    if not train_song_ids:
        logger.warning("No training songs found. Skipping training.")
        return

    set_seed(42)
    
    # DataLoader 생성 (tar_map 전달)
    train_loader = create_dataloader(train_song_ids, tar_map, BATCH_SIZE)
    logger.info(f"Starting training for {NUM_EPOCHS} epochs...")
    
    # Optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
    
    for epoch in range(NUM_EPOCHS):
        model.train()
        total_loss = 0.0
        
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}")
        for batch in pbar:
            anchor = batch['anchor'].to(device)
            positive = batch['positive'].to(device)
            
            # Forward
            z_i = model(anchor)
            z_j = model(positive)
            
            # Loss
            loss = info_nce_loss(z_i, z_j, TEMPERATURE)
            
            # Backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            pbar.set_postfix(loss=loss.item())
            
        avg_loss = total_loss / len(train_loader)
        logger.info(f"Epoch {epoch+1} done. Avg Loss: {avg_loss:.4f}")
        
        # (선택) 체크포인트 저장
        # torch.save(model.state_dict(), f"mel_cnn_encoder_epoch{epoch+1}.pt")

# 학습 실행
train_model()

In [None]:
# [Cell 8] 곡 임베딩 추출 함수 정의 (chunk 단위 처리 가능하게)

def extract_embeddings_for_song_ids(
    model: MelCNNEncoder,
    song_ids: List[str],
    output_path: str,
    tar_map: Dict,
    mel_dir: str = LOCAL_MEL_DIR
) -> None:
    """
    주어진 song_id 리스트에 대해 멜 스펙을 로드하고,
    학습된 인코더로 임베딩을 추출해 .npz로 저장합니다.
    """
    model.eval()
    emb_dict: Dict[str, np.ndarray] = {}
    
    logger.info(f"Extracting embeddings for {len(song_ids)} songs...")
    
    with torch.no_grad():
        for song_id in tqdm(song_ids):
            mel = load_mel_spectrogram(song_id, mel_dir, tar_map)
            if mel is None:
                continue
            
            mel_crop = random_crop(mel, FIXED_T)
            
            tensor = mel_to_tensor(mel_crop).to(device)
            
            # (1, 1, 48, T) -> (1, D)
            z = model(tensor.unsqueeze(0))
            
            # CPU numpy로 변환
            emb_dict[song_id] = z.squeeze(0).cpu().numpy()
            
    # 저장
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    np.savez_compressed(output_path, **emb_dict)
    logger.info(f"Saved embeddings to {output_path} (Count: {len(emb_dict)})")

In [None]:
# [Cell 9] 임베딩 추출 함수 예시 호출 (train_song_ids 대상으로)

if train_song_ids:
    extract_embeddings_for_song_ids(
        model=model,
        song_ids=train_song_ids,
        output_path=STAGE2_EMB_OUTPUT_PATH,
        tar_map=tar_map
    )

In [None]:
# [Cell 10] 간단 Sanity Check (이웃 곡 확인)

def sanity_check_neighbors(emb_path: str, meta_path: str = r"C:\Users\ASUS\music_recommend\work\song_meta.json"):
    if not os.path.exists(emb_path):
        logger.warning("Embedding file not found. Skipping sanity check.")
        return
        
    # 1. 임베딩 로드
    data = np.load(emb_path)
    keys = list(data.keys())
    vectors = np.array([data[k] for k in keys]) # (N, D)
    
    logger.info(f"Loaded {len(keys)} embeddings for sanity check.")
    
    # 2. 메타데이터 로드 (Colab 환경 고려)
    if IS_COLAB:
        meta_path = "/content/drive/MyDrive/path/to/song_meta.json" # TODO: 수정 필요
        
    if not os.path.exists(meta_path):
        logger.warning(f"Meta file not found at {meta_path}")
        return

    try:
        with open(meta_path, 'r', encoding='utf-8') as f:
            song_meta = json.load(f)
        # id -> meta dict
        meta_dict = {str(s['id']): s for s in song_meta}
    except Exception as e:
        logger.warning(f"Failed to load song_meta: {e}")
        meta_dict = {}

    # 3. 랜덤 시드 곡 몇 개 선정
    n_seeds = 3
    seeds = random.sample(keys, min(len(keys), n_seeds))
    
    for seed_id in seeds:
        seed_vec = data[seed_id] # (D,)
        
        # 코사인 유사도 (이미 L2 정규화 되어있으므로 dot product)
        sims = np.dot(vectors, seed_vec)
        top_k_idx = np.argsort(sims)[::-1][:6] # 자기자신 포함 상위 6개
        
        print(f"\n[Seed Song] {meta_dict.get(seed_id, {}).get('song_name', seed_id)} / {meta_dict.get(seed_id, {}).get('artist_name_basket', [])}")
        print("-" * 40)
        
        for idx in top_k_idx:
            t_id = keys[idx]
            t_score = sims[idx]
            t_meta = meta_dict.get(t_id, {})
            print(f"{t_score:.4f} | {t_meta.get('song_name', t_id)} | {t_meta.get('artist_name_basket', [])} | {t_meta.get('gnr_basket', [])}")

# 실행
sanity_check_neighbors(STAGE2_EMB_OUTPUT_PATH)