Setup

In [1]:
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd
from google.colab import drive
from pyspark.sql import SparkSession
import os
from pyspark.ml.feature import CountVectorizer
import numpy as np
import sys
import sqlite3
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import functions as F

In [4]:
drive.mount('/content/drive')

Mounted at /content/drive


Load dataset

In [2]:
spark = SparkSession.builder.appName("NestedJSONProcessing").getOrCreate()

def process_nested_folders(base_path):
    final_df = None
    # List all first-level folders
    first_level_folders = [f for f in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, f))]

    # Iterate over each first-level folder
    for folder in first_level_folders:
        folder_path = os.path.join(base_path, folder)
        print(f"Processing folder: {folder_path}")

        # List all subfolders within this folder
        subfolders = [os.path.join(folder_path, subfolder)
                      for subfolder in os.listdir(folder_path)
                      if os.path.isdir(os.path.join(folder_path, subfolder))]

        # Read JSON files in all subfolders as a batch
        for subfolder in subfolders:
            print(f"Processing subfolder: {subfolder}")
            json_files = os.path.join(subfolder, "*.json")

            try:
                # Load JSON files in the subfolder
                chunk_df = spark.read.json(json_files)
                final_df = chunk_df if final_df is None else final_df.union(chunk_df)
                # Perform any processing you need
                print(f"Loaded {chunk_df.count()} rows from {subfolder}")

            except Exception as e:
                print(f"Error processing subfolder {subfolder}: {e}")

    return final_df

In [None]:
data = process_nested_folders("/content/drive/MyDrive/lastfm_subset/lastfm_subset/A")

Preprocessing

In [5]:
df = data.drop('similars')
def extract_tags(tags_column):
    return [tag[0] for tag in tags_column]

extract_tags_udf = udf(extract_tags, ArrayType(StringType()))
df = df.withColumn("tags", extract_tags_udf(col("tags")))

Create feature vectors

In [6]:
count_vectorizer = CountVectorizer(inputCol="tags", outputCol="tags_vector")
vectorizer_model = count_vectorizer.fit(df)
df = vectorizer_model.transform(df)

In [7]:
artist_indexer = StringIndexer(inputCol="artist", outputCol="artist_index")
df = artist_indexer.fit(df).transform(df)

In [8]:
assembler = VectorAssembler(
    inputCols=["tags_vector", "artist_index"],
    outputCol="features"
)
df = assembler.transform(df)

In [9]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

In [10]:
pandas_df = df.select("track_id", "title", "scaled_features").toPandas()
features_matrix = np.array(pandas_df["scaled_features"].tolist())

similarity_matrix = cosine_similarity(features_matrix)

Query recommendations based on song id

In [None]:
def get_similar_songs(song_id, similarity_matrix, pandas_df, top_n=3):
    song_idx = pandas_df[pandas_df["track_id"] == song_id].index[0]
    similarity_scores = list(enumerate(similarity_matrix[song_idx]))
    similarity_scores = sorted(similarity_scores, key=lambda x: x[1], reverse=True)
    similar_songs = [
        (pandas_df.iloc[idx]["track_id"], pandas_df.iloc[idx]["title"], score)
        for idx, score in similarity_scores[1:top_n + 1]
    ]
    return similar_songs

In [None]:
song_id = "TRAZJGD128F4289D2A"  # Replace with the song ID you want to query
similar_songs = get_similar_songs(song_id, similarity_matrix, pandas_df)
print(similar_songs)

[('TRAHBNZ128F92FC2A7', 'Uptown', 0.9979169735841514), ('TRAOMQN128F9328043', 'Life Of A Star', 0.9924641032573147), ('TRAIZHG128F934FE42', 'More Than Vocals (MTV)', 0.9913884825412685)]


Query recommendations based on user profile

Load the data

In [12]:
file_path = "/content/drive/MyDrive/train_triplets.txt"
pandas_import = pd.read_csv(file_path, delimiter='\t', header=None, names=['user_id', 'song_id', 'play_count'], nrows=1000)
user_counts = pandas_import['user_id'].value_counts()
pandas_import = pandas_import[pandas_import['user_id'].isin(user_counts[user_counts >= 20].index)]
user_histories = spark.createDataFrame(pandas_import)

Filter and join data

In [13]:
user_history = user_histories.filter(F.col("user_id") == "b80344d063b5ccb3212f76538f3d9e43d87dca9e")
user_history = user_history.withColumnRenamed("song_id", "track_id")
user_songs = user_history.join(df, on="track_id", how="inner")

Calculate weights

In [15]:
exploded_tags = user_songs.withColumn("tag", F.explode(F.col("tags")))
tag_weights = exploded_tags.groupBy("tag").agg(F.sum("play_count").alias("tag_weight"))
artist_weights = user_songs.groupBy("artist").agg(F.sum("play_count").alias("artist_weight"))
combined_weights = tag_weights.withColumnRenamed("tag", "feature").union(artist_weights.withColumnRenamed("artist", "feature"))

In [None]:
total_weight = combined_weights.agg(F.sum("tag_weight").alias("total_weight")).collect()[0]["total_weight"]
normalized_weights = combined_weights.withColumn(
    "normalized_weight", F.col("tag_weight") / total_weight
)

In [23]:
assembler = VectorAssembler(inputCols=["normalized_weight"], outputCol="user_profile")
assembler_model = assembler
user_profile_vector = assembler_model.transform(normalized_weights)

In [None]:
user_profile = user_profile_vector.select("user_profile").collect()[0]["user_profile"].toArray().reshape(1, -1)
similarity_scores = cosine_similarity(user_profile_vector, features_matrix).flatten()

In [None]:
def get_recommended_songs(similarity_matrix, top_n=3):
    similarity_scores = list(enumerate(similarity_matrix))
    similarity_scores = sorted(similarity_scores, key=lambda x: x[1], reverse=True)
    similar_songs = [
        (pandas_df.iloc[idx]["track_id"], pandas_df.iloc[idx]["title"], score)
        for idx, score in similarity_scores[1:top_n + 1]
    ]
    return similar_songs