In [1]:
# Import necessary libraries
import pandas as pd
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
import joblib
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

### Collaborative Filtering using ALS

In [2]:
# Define the path where the model will be saved
model_path = "../models/als_model"

# Check if the directory exists
if os.path.exists(model_path):
    # Remove the directory and its contents
    shutil.rmtree(model_path)


# Initialize Spark session
spark = SparkSession.builder.appName("KuaiRec").getOrCreate()

# Load the interactions_train dataset
interactions_train = spark.read.csv("../data/processed/interactions_train.csv", header=True, inferSchema=True)

# Indexer for user_id and video_id
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index")
video_indexer = StringIndexer(inputCol="video_id", outputCol="video_index")

# Fit and transform the data
interactions_train = user_indexer.fit(interactions_train).transform(interactions_train)
interactions_train = video_indexer.fit(interactions_train).transform(interactions_train)

# Select relevant columns
interactions_train = interactions_train.select("user_index", "video_index", "watch_ratio")

# Split the data into training and validation sets
(training, validation) = interactions_train.randomSplit([0.8, 0.2])

# Initialize the ALS model
als = ALS(maxIter=10, regParam=0.1, userCol="user_index", itemCol="video_index", ratingCol="watch_ratio", coldStartStrategy="drop")

# Fit the model
als_model = als.fit(training)

# Make predictions
predictions = als_model.transform(validation)

# Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="watch_ratio", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

# Save the model
als_model.save(model_path)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/02 17:12:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/02 17:12:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/02 17:12:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/05/02 17:12:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

Root-mean-square error = 1.548697855411912


25/05/02 17:13:08 WARN MemoryManager: Total allocation exceeds 95,00% (1 020 054 720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers


### Content-Based Filtering using TF-IDF and Cosine Similarity

In [3]:
# Load the video_metadata and user_features_engineered datasets
video_metadata = pd.read_csv("../data/processed/video_metadata.csv")
user_features = pd.read_csv("../data/processed/user_features_engineered.csv")

# Merge the datasets on video_id
merged_data = pd.merge(interactions_train.toPandas(), video_metadata, on="video_id")
merged_data = pd.merge(merged_data, user_features, on="user_id")

# Select relevant features
features = ["total_likes", "avg_play_duration", "video_tags", "total_videos_watched", "avg_watch_ratio", "preferred_category"]

# Fill missing values
merged_data = merged_data.fillna(0)

# Encode categorical features
merged_data = pd.get_dummies(merged_data, columns=["video_tags", "preferred_category"])

# Split the data into training and validation sets
X = merged_data[features]
y = merged_data["watch_ratio"]

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train a content-based filtering model
model = LinearRegression()
model.fit(X_train, y_train)

# Make predictions
y_pred = model.predict(X_val)

# Evaluate the model
rmse = mean_squared_error(y_val, y_pred, squared=False)
print(f"Root-mean-square error = {rmse}")

# Save the model
joblib.dump(model, "../models/content_based_model.pkl")

Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.status.AppStatusStore.activeStages(AppStatusStore.scala:170)
	at org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:64)
	at org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:52)
	at java.base/java.util.TimerThread.mainLoop(Timer.java:566)
	at java.base/java.util.TimerThread.run(Timer.java:516)
25/05/02 17:13:17 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.invoke.DirectMethodHandle.allocateInstance(DirectMethodHandle.java:520)
	at java.base/java.lang.invoke.DirectMethodHandle$Holder.newInvokeSpecial(DirectMethodHandle$Holder)
	at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
	at org.apache.spark.ContextCleaner.$anonfun$keepC

Py4JJavaError: An error occurred while calling o116.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
