In [None]:
import numpy as np
import pandas as pd

# Collecting track ids, album ids, artist ids and genre ids

In [None]:
# Read the data from trackdata.txt file
with open('../data/landed/trackData1.txt', 'r') as file:
    data = file.read()
# Split the data into lines
lines = data.split('\n')

# Parse the data
track_ids = []
for line in lines:
    # Split the line by '|'
    elements = line.split('|')
    if elements[0]:
        track_ids.append(int(elements[0]))

# Read the data from trackdata.txt file
with open('../data/landed/albumData1.txt', 'r') as file:
    data = file.read()
# Split the data into lines
lines = data.split('\n')

# Parse the data
album_ids = []
for line in lines:
    # Split the line by '|'
    elements = line.split('|')
    if elements[0]:
        album_ids.append(int(elements[0]))

# Read the data from trackdata.txt file
with open('../data/landed/artistData1.txt', 'r') as file:
    data = file.read()
# Split the data into lines
lines = data.split('\n')

# Parse the data
artist_ids = []
for line in lines:
    if line:
        artist_ids.append(int(line))

# Read the data from trackdata.txt file
with open('../data/landed/genreData1.txt', 'r') as file:
    data = file.read()
# Split the data into lines
lines = data.split('\n')

# Parse the data
genre_ids = []
for line in lines:
    if line:
        genre_ids.append(int(line))

# Preprocessing User data

In [None]:
# File paths
input_file_path = '../data/landed/trainIdx1.txt'
output_file_path = '../data/curated/song_ratings_train.csv'

# Preprocess the file to associate user IDs correctly
def preprocess_file(input_path, output_path):
    with open(input_path, 'r') as infile, open(output_path, 'w') as outfile:
        outfile.write("user_id,item_id,ratings\n")
        current_user_id = None
        for line in infile:
            line = line.strip()
            if "|" in line:
                # Update the current user ID
                current_user_id = line.split("|")[0]
            else:
                # Write the user_id, track_id, and rating to the output file
                parts = line.split()
                outfile.write(f"{current_user_id},{parts[0]},{parts[1]}\n")

# Preprocess the input file
preprocess_file(input_file_path, output_file_path)

# File paths
input_file_path = '../data/landed/validationIdx1.txt'
output_file_path = '../data/curated/song_ratings_validation.csv'

# Preprocess the input file
preprocess_file(input_file_path, output_file_path)

# Combining the training and validation data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("CombineCSV") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()

# Paths to your CSV files
data1_path = "../data/curated/song_ratings_train.csv"
data2_path = "../data/curated/song_ratings_validation.csv"

# Load the CSV files into DataFrames
df1_spark = spark.read.csv(data1_path, header=True, inferSchema=True)
df2_spark = spark.read.csv(data2_path, header=True, inferSchema=True)

# Add a source and a sequence column to each DataFrame to preserve their original order
df1_spark = df1_spark.withColumn("source", F.lit("train")).withColumn("sequence", F.monotonically_increasing_id())
df2_spark = df2_spark.withColumn("source", F.lit("validation")).withColumn("sequence", F.monotonically_increasing_id())

# Combine the DataFrames
merged_df = df1_spark.unionByName(df2_spark)

# Sort by user_id and source to ensure training entries come before validation entries, then by sequence
merged_df = merged_df.orderBy(["user_id", "source", "sequence"])

# Drop the extra columns used for sorting
merged_df = merged_df.drop("source", "sequence")

# Add the four new columns based on whether item_id is in any of the lists
merged_df = merged_df.withColumn('istrack', F.col('item_id').isin(track_ids).cast('integer'))
merged_df = merged_df.withColumn('isalbum', F.col('item_id').isin(album_ids).cast('integer'))
merged_df = merged_df.withColumn('isartist', F.col('item_id').isin(artist_ids).cast('integer'))
merged_df = merged_df.withColumn('isgenre', F.col('item_id').isin(genre_ids).cast('integer'))

# Save the merged DataFrame to a new CSV file
merged_df.write.csv('../data/curated/combined_song_ratings', header=True, mode='overwrite')

# Stop the Spark session
spark.stop()

# Save as a single CSV file

In [None]:
from pyspark.sql import SparkSession
import os

# Initialize Spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Combine CSV") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()

# Directory containing the CSV files
data_directory = "../data/curated/combined_song_ratings"

# List all files in the directory and filter for CSV files, then sort them
file_paths = sorted([os.path.join(data_directory, f) for f in os.listdir(data_directory) if f.endswith('.csv')])

# Read and combine each CSV file in the sorted order
combined_df = None
for file_path in file_paths:
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    combined_df = df if combined_df is None else combined_df.unionByName(df)

combined_df = combined_df.coalesce(1)

# Save the combined DataFrame to a new CSV file
combined_df.write.csv('../data/development/combined_song_ratings.csv', header=True, mode='overwrite')

# Stop the Spark session
spark.stop()

import os

# Specify the directory where the CSV file is located
directory = '../data/development/combined_song_ratings.csv'

# List all files in the directory
files = os.listdir(directory)

# Filter out the CSV file
csv_file = [f for f in files if f.endswith('.csv')]


old_name = os.path.join(directory, csv_file[0])
new_name = os.path.join(directory, 'data.csv')
    
# Rename the file
os.rename(old_name, new_name)


# Add Timestep

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import matplotlib.pyplot as plt
import numpy as np


In [None]:
def rename_and_remove_header(df: DataFrame, original_cols: list) -> DataFrame:
    """
    Renames columns in df to the specified original columns specified 
    in header row, and removes the header row. 
    
    Parameters:
    df (DataFrame): The input Spark DataFrame with default column names.
    original_cols (list): A list of original column names to rename the DataFrame columns.
    
    Returns:
    DataFrame: The DataFrame with renamed columns and without the header row.
    """
    # Rename columns to original names
    for i in range(0, len(df.columns)):
        df = df.withColumnRenamed(f"_c{i}", original_cols[i])

    # Remove the first row, assuming it's the header
    df = df.filter(df["user_id"] != "user_id")
    
    return df

def cast_columns_to_int(df: DataFrame) -> DataFrame:
    """
    Casts the 'ratings' and binary indicator columns to IntegerType.
    
    Parameters:
    df (DataFrame): The input Spark DataFrame with columns to be cast.
    
    Returns:
    DataFrame: The DataFrame with 'ratings' and binary columns cast to integers.
    """
    # Cast the rating and binary variables to integers
    df = df.withColumn("ratings", F.col("ratings").cast(IntegerType())) \
           .withColumn("istrack", F.col("istrack").cast(IntegerType())) \
           .withColumn("isalbum", F.col("isalbum").cast(IntegerType())) \
           .withColumn("isartist", F.col("isartist").cast(IntegerType())) \
           .withColumn("isgenre", F.col("isgenre").cast(IntegerType()))

    return df

def discretize_ratings(df: DataFrame, t1: int, t2: int) -> DataFrame:
    """
    Discretizes the 'ratings' column based on the given thresholds t1 and t2.
    
    Ratings will be categorized as:
    - 0 <= rating < t1  -> Category 0
    - t1 <= rating < t2  -> Category 1
    - t2 <= rating <= 100 -> Category 2
    
    Parameters:
    df (DataFrame): The input Spark DataFrame containing the 'ratings' column.
    t1 (int): The first threshold for discretizing ratings.
    t2 (int): The second threshold for discretizing ratings.
    
    Returns:
    DataFrame: A new DataFrame with an additional column 'ratings_discretized'.
    """
    # Create a new column 'ratings_discretized' based on the conditions
    df_discretized = df.withColumn(
        "ratings_discretized",
        F.when((F.col("ratings") >= 0) & (F.col("ratings") < t1), 0)
        .when((F.col("ratings") >= t1) & (F.col("ratings") < t2), 1)
        .when((F.col("ratings") >= t2) & (F.col("ratings") <= 100), 2)
        .otherwise(None)  # Handle any outliers, although ratings are assumed to be within 0-100
    )
    
    return df_discretized


def add_timestep(df):
    """
    Adds a timestep column to df based on the order of appearance for each user.
    The timestep will be sequential for EACH USER based on their row order.
    
    Parameters:
    df (DataFrame): The input Spark DataFrame containing user ratings.
    
    Returns:
    DataFrame: A new DataFrame with an additional 'timestep' column.
    """
    # Define the window specification to partition by user_id and preserve the row order
    window_spec = Window.partitionBy("user_id").orderBy(F.monotonically_increasing_id())
    
    # Add a column 'timestep' with the row number as the order of the rating for each user
    df_with_timestep = df.withColumn("timestep", F.row_number().over(window_spec))
    
    return df_with_timestep


In [None]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

df_save = spark.read.csv("../data/development/combined_song_ratings.csv/data.csv") # to be saved 


In [None]:
# List of original columns
original_cols = ["user_id", "item_id", "ratings", "istrack", "isalbum", "isartist", "isgenre"]

## If you want to directly save

# 1. apply the rename and remove header function
# 2. apply the function to cast columns to integers
# 3. discretize ratings
# 4. add timestep

# Ensure that data is ordered by user_id and timestep
#df_with_timestep_ordered = df_with_timestep.orderBy("user_id", "timestep")

# Apply all transformations in one chain --> more efficient
df_save = (rename_and_remove_header(df_save, original_cols)
           .transform(cast_columns_to_int)
           .transform(lambda df: discretize_ratings(df, 30, 70))
           .transform(add_timestep)
           )  # Cache for reuse

# add_timestep might display the sequnece of last user first
# we'd prefer display sequences of users in order (userid=0,1,2...)
df_save = df_save.orderBy("user_id", "timestep")

df_save.show()

In [None]:
# Coalesce the DataFrame to a single partition
df_save = df_save.coalesce(1)


In [None]:
data = df_save


# sequence lengths for track interactions
track_lengths = data.filter(data["istrack"] == 1).groupBy("user_id").count()

# sequence lengths for genre interactions
genre_lengths = data.filter(data["isgenre"] == 1).groupBy("user_id").count()

# sequence lengths for album interactions
album_lengths = data.filter(data["isalbum"] == 1).groupBy("user_id").count()

# sequence lengths for artist interactions
artist_lengths = data.filter(data["isartist"] == 1).repartition(500, "user_id").groupBy("user_id").count()


def calculate_entropy_udf(track_count, album_count, artist_count, genre_count):
    """
    calculate the entropy of item type distribution.
    """
    counts=np.array([track_count, album_count, artist_count, genre_count])
    probs=counts / counts.sum() if counts.sum() > 0 else np.zeros_like(counts)

    return float(-np.sum(probs*np.log(probs + 1e-10))) # returns the (Shannon) entropy

entropy_udf = F.udf(calculate_entropy_udf, DoubleType())

def plot_sequence_distribution_with_entropy(data, album_lengths, track_lengths, genre_lengths, artist_lengths, 
                                            lower_bound, upper_bound, entropy_threshold):
    """
    Plots the distribution of user ratings sequence lengths filtered by both sequence length and entropy of item
    types distribution.
    
    Also returns the filtered dataset to be saved for modelling.

    args:
    data (DataFrame): Original dataset.
    album_lengths (spark df): spark df containing album counts.
    track_lengths (spark df): spark df containing track counts.
    genre_lengths (spark df): spark df containing genre counts.
    artist_lengths (spark df): spark df containing artist counts.
    lower_bound (int): Minimum number of ratings under consideration.
    upper_bound (int): Maximum number of ratings under consideration.
    entropy_threshold (float): The entropy threshold for filtering.
    
    Returns:
    spark df: Filtered spark df based on user_id where sequence length and entropy match the criteria.
    """
    # rename columns first, before the join
    track_lengths=track_lengths.withColumnRenamed("count", "track_count")
    album_lengths = album_lengths.withColumnRenamed("count", "album_count")
    artist_lengths = artist_lengths.withColumnRenamed("count", "artist_count")
    genre_lengths = genre_lengths.withColumnRenamed("count", "genre_count")

    # merge dfs by userid
    merged_df=track_lengths.join(album_lengths, "user_id", "outer") \
                             .join(artist_lengths, "user_id", "outer") \
                             .join(genre_lengths, "user_id", "outer") \
                             .fillna(0)  # NaNs --> 0

    # total seq length 
    merged_df=merged_df.withColumn(
        "count", F.col("track_count") + F.col("album_count") + F.col("artist_count") + F.col("genre_count")
    )

    # calculate entropy for item type distribution for @ user
    merged_df=merged_df.withColumn( "entropy",
        entropy_udf("track_count", "album_count", "artist_count", "genre_count")
    )

    # Apply filters : sequence length and entropy threshold
    filtered_df=merged_df.filter((F.col("count") >= lower_bound) & 
                                   (F.col("count") <= upper_bound) & 
                                   (F.col("entropy") > entropy_threshold))

    df_for_plot=filtered_df.select("count").toPandas()

    # Plot the histogram of sequence length distribution filtered data
    plt.figure(figsize=(10, 6))
    plt.hist(df_for_plot["count"], bins=50, color='skyblue', alpha=0.7)
    plt.xlabel('Number of Ratings per User')
    plt.ylabel('Number of Users')
    plt.title(f'Distribution of Rating Sequence Lengths (Filtered by Entropy > {entropy_threshold})')
    plt.show()

    # some helpful results for visualisation
    print(f"Users with sequence lengths between {lower_bound} and {upper_bound} and entropy > {entropy_threshold}: {len(df_for_plot)}")

    # filter original 'data' based on user_id from filtered_df
    filtered_user_ids=filtered_df.select("user_id").rdd.flatMap(lambda x: x).collect()  # Collect user_ids to a list
    filtered_data=data.filter(F.col("user_id").isin(filtered_user_ids))

    return filtered_data


filtered_data = plot_sequence_distribution_with_entropy(
    data, album_lengths, track_lengths, genre_lengths, artist_lengths, 
    lower_bound=28, upper_bound=170, entropy_threshold=1.3825
)

filtered_data.show()

# Save function to avoid repetition
def save_and_confirm(df, path, format='parquet'):
    """
    Save the given DataFrame to the specified path and confirm once saved.
    
    Parameters:
    df (spark df): The spark df to be saved.
    path (str): The location to save the DataFrame.
    format (str): Format to save the df. Either 'csv' or 'parquet'. Defaults to 'parquet'.
    """
    if format == 'csv': df.write.option("header", "true").csv(path)
    else: df.write.mode("overwrite").parquet(path)
    
    print(f"Data has been saved to {path} in {format} format.")

save_and_confirm(filtered_data, "../data/development", format='parquet')
