In [13]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import librosa
import librosa.display
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity
import json
from tqdm import tqdm

# Device configuration (force CPU for consistency)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Set seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Ensure deterministic behavior in PyTorch
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Define the conv layer and ResBlock
def conv(ni, nf, ks=3, stride=1, bias=False):
    return nn.Conv2d(ni, nf, kernel_size=ks, stride=stride, padding=ks//2, bias=bias)

def conv_layer(ni, nf, ks=3, stride=1, act=True):
    bn = nn.BatchNorm2d(nf)
    layers = [conv(ni, nf, ks, stride)]
    if act:
        layers.append(nn.ReLU(inplace=True))
    layers.append(bn)
    return nn.Sequential(*layers)

class ResBlock(nn.Module):
    def __init__(self, nf):
        super().__init__()
        self.conv1 = conv_layer(nf, nf)
        self.conv2 = conv_layer(nf, nf, act=False)

    def forward(self, x):
        return x + self.conv2(self.conv1(x))

def conv_layer_averpl(ni, nf):
    return nn.Sequential(conv_layer(ni, nf), nn.AvgPool2d(2, 2))

# Load the recommendation model
def load_recommendation_model():
    model = nn.Sequential(
        conv_layer_averpl(1, 64),
        ResBlock(64),
        conv_layer_averpl(64, 64),
        ResBlock(64),
        conv_layer_averpl(64, 128),
        ResBlock(128),
        conv_layer_averpl(128, 256),
        ResBlock(256),
        conv_layer_averpl(256, 512),
        ResBlock(512),
        nn.AdaptiveAvgPool2d((1, 1)),
        nn.Flatten(),
        nn.Linear(512, 40)
    )
    state_dict = torch.load('best_model.pt', map_location=device)
    model.load_state_dict(state_dict, strict=False)
    model = model.to(device)
    model.eval()
    return model

# Function to extract features from spectrogram slices
def extract_features(image_tensor, model):
    image_tensor = image_tensor.to(torch.float32)
    if len(image_tensor.shape) == 2:
        image_tensor = image_tensor.unsqueeze(0).unsqueeze(0)
    elif len(image_tensor.shape) == 3:
        image_tensor = image_tensor.unsqueeze(0)
    with torch.no_grad():
        return model(image_tensor)

# Function to calculate the average vector for each track
def calculate_average_vector(file_path, model, spec_folder, slice_size=128, duration=30, sr=22050):
    if not os.path.exists(spec_folder):
        os.makedirs(spec_folder)

    try:
        y, _ = librosa.load(file_path, sr=sr, duration=duration)
        window_size = sr * duration
        hop_length = sr
        rms = librosa.feature.rms(y=y, frame_length=window_size, hop_length=hop_length)[0]
        max_rms_index = np.argmax(rms)
        start_sample = max_rms_index * hop_length
        y_segment = y[start_sample:start_sample + window_size]

        mel_spectrogram = librosa.feature.melspectrogram(y=y_segment, sr=sr, n_mels=128, fmax=8000)
        mel_db = librosa.power_to_db(mel_spectrogram)

        title = os.path.splitext(os.path.basename(file_path))[0]
        spec_path = os.path.join(spec_folder, f"{title}.jpg")
        plt.figure(figsize=(mel_db.shape[1] / 100, mel_db.shape[0] / 100))
        plt.axis('off')
        librosa.display.specshow(mel_db, sr=sr, cmap='gray_r')
        plt.savefig(spec_path, dpi=100, bbox_inches='tight', pad_inches=0)
        plt.close()

        img = Image.open(spec_path).convert('L')
        width, height = img.size
        num_slices = width // slice_size

        vectors = []
        for i in range(num_slices):
            start = i * slice_size
            img_crop = img.crop((start, 0, start + slice_size, slice_size))
            img_crop = img_crop.resize((128, 128))
            img_array = np.array(img_crop).astype(np.float32) / 255.0
            image_tensor = torch.from_numpy(img_array).unsqueeze(0).unsqueeze(0).to(device)
            feature_vector = extract_features(image_tensor, model)
            vectors.append(feature_vector.cpu().numpy())

        if len(vectors) == 0:
            print(f"No slices extracted for {file_path}. Check the spectrogram dimensions.")
            return None

        avg_vector = np.mean(vectors, axis=0)
        return avg_vector

    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

# Function to compare the input vector with all tracks in the database
def compare_with_db(avg_vector, db_csv):
    db_vectors = pd.read_csv(db_csv)
    distances = {}
    vector_columns = [col for col in db_vectors.columns if col.startswith('vector_')]

    for _, row in db_vectors.iterrows():
        db_vector = row[vector_columns].values.astype(np.float32)
        euclidean_dist = calculate_distances(avg_vector, db_vector)
        distances[row['track_name']] = {
            'distance': euclidean_dist
        }

    return distances

# Function to calculate distances between vectors
def calculate_distances(anchor, db_vector):
    euclidean_distance = np.linalg.norm(anchor - db_vector)
    return euclidean_distance

# Function to save the comparison results to a JSON file
def save_results_to_json(distances, video_info_file, output_json):
    with open(video_info_file, 'r', encoding='utf-8') as f:
        video_info = json.load(f)

    for entry in video_info:
        title = entry['title']
        if title in distances:
            entry['distance'] = float(distances[title]['distance'])
        else:
            entry['distance'] = None

    with open(output_json, 'w', encoding='utf-8') as f:
        json.dump(video_info, f, ensure_ascii=False, indent=4)
    
    print(f"Updated JSON file saved: '{output_json}'")

# Function to print top 10 recommendations
def print_top_10_recommendations(distances):
    sorted_by_euclidean = sorted(distances.items(), key=lambda x: x[1]['distance'])[:10]

    print("\nTop 10 Recommendations (Euclidean Distance):")
    for rank, (title, metrics) in enumerate(sorted_by_euclidean, 1):
        print(f"Rank {rank}: {title}, Euclidean Distance: {metrics['distance']:.7f}")

# Main logic
model = load_recommendation_model()
db_csv = 'db_vectors.csv'
input_file = 'sample1.wav'
spec_folder = 'spec_folder'

# Calculate the vector for sample1.wav
avg_input_vector = calculate_average_vector(input_file, model, spec_folder)

if avg_input_vector is not None:
    # Compare the input vector with all tracks in the database
    distances = compare_with_db(avg_input_vector, db_csv)

    # Save the comparison results to a JSON file
    video_info_file = 'video_info.json'
    output_json = 'video_info_climax.json'
    save_results_to_json(distances, video_info_file, output_json)

    # Print the top 10 recommendations based on cosine similarity and euclidean distance
    print_top_10_recommendations(distances)

  state_dict = torch.load('best_model.pt', map_location=device)


Updated JSON file saved: 'video_info_climax.json'

Top 10 Recommendations (Euclidean Distance):
Rank 1: 고민중독, Euclidean Distance: 0.0000000
Rank 2: 클락션 (Klaxon), Euclidean Distance: 0.0001589
Rank 3: 파이팅 해야지 (Feat. 이영지), Euclidean Distance: 0.0001669
Rank 4: 위태로울걸, Euclidean Distance: 0.0001745
Rank 5: I AM, Euclidean Distance: 0.0001839
Rank 6: ETA, Euclidean Distance: 0.0001902
Rank 7: Rain, Euclidean Distance: 0.0001921
Rank 8: Do or Die, Euclidean Distance: 0.0002202
Rank 9: 우리 영화, Euclidean Distance: 0.0002424
Rank 10: 1 TO 13, Euclidean Distance: 0.0002837
