In [2]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import col
from scipy.spatial.distance import euclidean

# Stop existing Spark session if running
try:
    spark.stop()
except:
    pass

# Spark Configuration (Optimized for Clustering)
conf = SparkConf()
conf.set("spark.executor.memory", "24g")
conf.set("spark.driver.memory", "24g")
conf.set("spark.executor.cores", "4")
conf.set("spark.task.cpus", "1")

# Initialize Spark Session
spark = SparkSession.builder.config(conf=conf).appName("RecommendSystem_SparkSession").getOrCreate()
print("Optimized Spark Session initialized successfully.")

# Define data paths
DATA_PATHS = {
    'pca': '/kaggle/input/msd-parquet-by-artist/clustering_results/clusters_pca',
    'umap_2d': '/kaggle/input/msd-parquet-by-artist/clustering_results/clusters_umap2d',
    'umap_3d': '/kaggle/input/msd-parquet-by-artist/clustering_results/clusters_umap3d'
}

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/12 16:04:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Optimized Spark Session initialized successfully.


In [13]:
def load_cluster_data(embedding_type):
    """ Load cluster data based on embedding type """
    if embedding_type not in DATA_PATHS:
        raise ValueError("Invalid embedding type. Choose from 'pca', 'umap_2d', 'umap_3d'.")

    data_path = DATA_PATHS[embedding_type]
    df = spark.read.parquet(data_path)
    df.printSchema()  # Print schema for verification
    return df

In [14]:
def get_sample_song_id(embedding_type='pca'):
    """ Get a sample song ID from the specified cluster data """
    df = load_cluster_data(embedding_type)
    sample_song = df.select('song_id').limit(1).collect()

    if sample_song:
        return sample_song[0]['song_id']
    else:
        print(f"No data found in {embedding_type} embedding.")
        return None

In [17]:
def recommend_songs(song_id, cluster, embedding_type='pca', num_recommendations=5):
    """
    Recommend songs based on proximity in feature space within the same cluster.
    """
    # Load data
    df = load_cluster_data(embedding_type)

    # Filter by cluster
    cluster_songs = df.filter(col('cluster') == cluster)

    # Collect data as Pandas DataFrame for easier manipulation
    if embedding_type == 'pca':
        cluster_data = cluster_songs.select('song_id', 'pca_features').toPandas()

        # Extract the target song's features
        target_features = cluster_data[cluster_data['song_id'] == song_id]['pca_features'].values

    elif embedding_type == 'umap_2d':
        cluster_data = cluster_songs.select('song_id', 'UMAP_1', 'UMAP_2').toPandas()
        target_features = cluster_data[cluster_data['song_id'] == song_id][['UMAP_1', 'UMAP_2']].values

    elif embedding_type == 'umap_3d':
        cluster_data = cluster_songs.select('song_id', 'UMAP_1', 'UMAP_2', 'UMAP_3').toPandas()
        target_features = cluster_data[cluster_data['song_id'] == song_id][['UMAP_1', 'UMAP_2', 'UMAP_3']].values

    # Ensure the target song exists in the dataset
    if len(target_features) == 0:
        print(f"Song ID {song_id} not found in cluster {cluster} for {embedding_type}.")
        return []

    # Extract target features
    target_features = target_features[0]

    # Calculate distances
    def calculate_distance(row):
        feature_vector = row[1:] if embedding_type == 'pca' else row[1:]
        return euclidean(feature_vector, target_features)

    cluster_data['distance'] = cluster_data.apply(calculate_distance, axis=1)

    # Sort by distance and exclude the target song
    recommendations = cluster_data[cluster_data['song_id'] != song_id].sort_values('distance').head(num_recommendations)

    # Return recommended song IDs
    return recommendations['song_id'].tolist()

In [19]:
# Test the recommendation function
def test_recommendation_function():
    # Get a sample song ID
    sample_song_id = get_sample_song_id('pca')
    if not sample_song_id:
        print("No sample song ID found.")
        return

    # Example parameters
    cluster = 1
    embedding_type = 'pca'
    num_recommendations = 5

    try:
        # Corrected function call
        recommendations = recommend_songs(sample_song_id, cluster, embedding_type, num_recommendations)
        print(f"Recommendations for {embedding_type}: {recommendations}")
    except Exception as e:
        print(f"Error during recommendation: {e}")


In [1]:
# Run test
test_recommendation_function()

Recommendations for PCA - Lost in Time by Phil Collins:
  - Echoes of Silence by Phil Collins (ID: SOXHMJM12A58A7A33A)
  - Mellow Waves by Phil Collins (ID: SOYHHHT12A6D4F7F97)
  - City Lights by Phil Collins (ID: SOXFQVY12A58A7B456)
  - Morning Sun by Phil Collins (ID: SOFSRCP12CF5CFD696)
  - Whispering Wind by Benabar (ID: SOMFNPC12A8C13E5FE)
