In [None]:
# ✅ 셀 1: 모듈 import 및 경로 설정
import os
import random
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from PIL import Image

data_root = "./data"
song_ids = sorted([d for d in os.listdir(data_root) if os.path.isdir(os.path.join(data_root, d))])
random.seed(42)
random.shuffle(song_ids)
split_ratio = 0.8
split_index = int(len(song_ids) * split_ratio)
train_ids = song_ids[:split_index]
val_ids = song_ids[split_index:]


In [None]:
# ✅ 셀 2: EfficientNet 임베딩 모델 정의
class EfficientNetEmbedding(nn.Module):
    def __init__(self, embedding_size=128):
        super().__init__()
        self.base_model = models.efficientnet_b0(pretrained=True)
        for param in self.base_model.parameters():
            param.requires_grad = False
        self.features = self.base_model.features
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.embedding = nn.Linear(1280, embedding_size)
        self.l2_norm = nn.functional.normalize

    def forward(self, x):
        x = self.features(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.embedding(x)
        x = self.l2_norm(x, dim=1)
        return x


In [None]:
# ✅ 셀 3: Triplet Dataset 클래스 정의
class TripletDataset(Dataset):
    def __init__(self, root_dir, song_ids=None, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        if song_ids is None:
            self.song_dirs = [os.path.join(root_dir, d) for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]
        else:
            self.song_dirs = [os.path.join(root_dir, d) for d in song_ids]
        self.data = []
        for song_dir in self.song_dirs:
            images = [f for f in os.listdir(song_dir) if f.lower().endswith(('.png','.jpg','.jpeg'))]
            if len(images) >= 2:
                self.data.append((song_dir, images))

    def __len__(self):
        return sum(len(images) for _, images in self.data)

    def __getitem__(self, idx):
        anchor_song_idx = random.randint(0, len(self.data) - 1)
        anchor_song_dir, anchor_images = self.data[anchor_song_idx]
        anchor_img_name = random.choice(anchor_images)
        positive_img_name = anchor_img_name
        while positive_img_name == anchor_img_name:
            positive_img_name = random.choice(anchor_images)
        negative_song_idx = anchor_song_idx
        while negative_song_idx == anchor_song_idx:
            negative_song_idx = random.randint(0, len(self.data) - 1)
        negative_song_dir, negative_images = self.data[negative_song_idx]
        negative_img_name = random.choice(negative_images)

        anchor_img = Image.open(os.path.join(anchor_song_dir, anchor_img_name)).convert('RGB')
        positive_img = Image.open(os.path.join(anchor_song_dir, positive_img_name)).convert('RGB')
        negative_img = Image.open(os.path.join(negative_song_dir, negative_img_name)).convert('RGB')
        if self.transform:
            anchor_img = self.transform(anchor_img)
            positive_img = self.transform(positive_img)
            negative_img = self.transform(negative_img)
        return anchor_img, positive_img, negative_img


In [None]:
# ✅ 셀 4: 전처리, 데이터셋 및 데이터로더 설정
transform = transforms.Compose([
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

train_dataset = TripletDataset(root_dir=data_root, song_ids=train_ids, transform=transform)
val_dataset = TripletDataset(root_dir=data_root, song_ids=val_ids, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)


In [None]:
# ✅ 셀 5: 학습 및 검증 함수
def train_triplet(model, data_loader, optimizer, loss_fn, device):
    model.train()
    total_loss = 0
    for anchor, positive, negative in data_loader:
        anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
        optimizer.zero_grad()
        anchor_embed = model(anchor)
        positive_embed = model(positive)
        negative_embed = model(negative)
        loss = loss_fn(anchor_embed, positive_embed, negative_embed)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(data_loader)

@torch.no_grad()
def validate_triplet(model, data_loader, loss_fn, device):
    model.eval()
    total_loss = 0
    for anchor, positive, negative in data_loader:
        anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
        anchor_embed = model(anchor)
        positive_embed = model(positive)
        negative_embed = model(negative)
        loss = loss_fn(anchor_embed, positive_embed, negative_embed)
        total_loss += loss.item()
    return total_loss / len(data_loader)


In [None]:
# ✅ 셀 6: 학습 실행
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EfficientNetEmbedding(embedding_size=128).to(device)
loss_fn = nn.TripletMarginLoss(margin=1.0)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 10
for epoch in range(num_epochs):
    train_loss = train_triplet(model, train_loader, optimizer, loss_fn, device)
    val_loss = validate_triplet(model, val_loader, loss_fn, device)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    torch.save(model.state_dict(), f"model_epoch_{epoch+1}.pth")


In [None]:
# ✅ 셀 7: 임베딩 추출 및 추천 함수
def extract_embeddings(model, inputs, device, batch_size=64):
    model.eval()
    embeddings = []
    with torch.no_grad():
        if isinstance(inputs, DataLoader):
            for batch in inputs:
                batch = batch.to(device)
                emb = model(batch)
                embeddings.append(emb.cpu().numpy())
            embeddings = np.vstack(embeddings)
        else:
            inputs = inputs.to(device)
            emb = model(inputs)
            embeddings = emb.cpu().numpy()
    return embeddings

def recommend_topk(query_embedding, gallery_embeddings, gallery_ids, topk=5):
    sims = cosine_similarity(query_embedding.reshape(1, -1), gallery_embeddings).flatten()
    topk_idx = sims.argsort()[::-1][:topk]
    return [(gallery_ids[i], sims[i]) for i in topk_idx]


In [None]:
# ✅ 셀 8: 추론 및 추천 실행
gallery_ids = train_ids + val_ids
gallery_dataset = TripletDataset(root_dir=data_root, song_ids=gallery_ids, transform=transform)
gallery_loader = DataLoader(gallery_dataset, batch_size=64, shuffle=False, num_workers=4)
gallery_embeddings = extract_embeddings(model, gallery_loader, device)
np.save("gallery_embeddings.npy", gallery_embeddings)

test_img_path = "./test_query.png"
test_img = Image.open(test_img_path).convert('RGB')
test_img_tensor = transform(test_img).unsqueeze(0)
query_embedding = extract_embeddings(model, test_img_tensor, device)

gallery_id_names = [os.path.basename(d) for d, _ in gallery_dataset.data]
recommendations = recommend_topk(query_embedding, gallery_embeddings, gallery_id_names, topk=5)

print("🎧 추천 결과:")
for i, (song_id, score) in enumerate(recommendations, 1):
    print(f"{i}. 곡 ID: {song_id} (유사도: {score:.4f})")
