# Environment Import

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT, SparseVector
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, collect_list, split, explode, udf

# Part 1: Init Spark Session with Ratings

In [2]:
def init_spark_session_with_ratings(filename):
    """
    Reads the ratings file and returns the SparkSession and DataFrame created from the file.

    Args:
    filename (str): The path to the ratings file.

    Returns:
    SparkSession, DataFrame: The Spark session and the DataFrame created from the ratings file.
    """

    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("MovieRecommend") \
        .config("spark.local.dir", "/home/spark-local-dir") \
        .config("spark.executor.memory", "4g") \
        .config("spark.executor.cores", "4") \
        .config("spark.executor.instances", "4") \
        .config("spark.driver.memory", "4G") \
        .getOrCreate()

    ratings_df = spark.read.csv("dataset/ratings.csv", header=True, inferSchema=True)
    print(ratings_df)
    ratings_df.show()

    return spark, ratings_df


spark, ratings_df = init_spark_session_with_ratings("dataset/ratings.csv")

23/11/30 13:27:37 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.102.123 instead (on interface ens192)
23/11/30 13:27:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/30 13:27:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/30 13:27:38 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
                                                                                

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [3]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf, col, explode, split, collect_list
import numpy as np

def feature_engineering(spark, ratings_df):
    # Converts a list of vectors into a single vector by summing up each dimension.
    def sum_vectors(vectors):
        length = max(v.size for v in vectors)
        sum_vector = np.zeros(length)
        for v in vectors:
            sum_vector += v.toArray() if isinstance(v, SparseVector) else v
        return Vectors.dense(sum_vector)

    # Converts a column of movie features into a DenseVector.
    def to_vector(col):
        return Vectors.dense(col)

    sum_vectors_udf = udf(sum_vectors, VectorUDT())
    to_vector_udf = udf(to_vector, VectorUDT())

    # Load and preprocess datasets
    # Replace these lines with your actual data loading code
    movies_df = spark.read.csv("dataset/movies.csv", header=True, inferSchema=True)
    tags_df = spark.read.csv("dataset/tags.csv", header=True, inferSchema=True)
    genome_scores_df = spark.read.csv("dataset/genome-scores.csv", header=True, inferSchema=True)
    genome_tags_df = spark.read.csv("dataset/genome-tags.csv", header=True, inferSchema=True)

    # Process genome scores
    genome_df = genome_scores_df.join(genome_tags_df, "tagId")
    movie_genome_features = genome_df.groupBy('movieId').agg(collect_list('relevance').alias('genomeFeatures'))
    # 获取基因组标签的数量
    vector_length = genome_tags_df.count()

    # 创建一个稀疏向量的UDF来填充空值
    def fill_with_sparse_vector(v):
        if v is None:
            return SparseVector(vector_length, [], [])
        else:
            return v

    # 注册UDF
    fill_with_sparse_vector_udf = udf(fill_with_sparse_vector, VectorUDT())

    # 使用withColumn和UDF来替换空值
    movie_genome_features = movie_genome_features.withColumn(
        'genomeFeaturesVec', 
        fill_with_sparse_vector_udf(F.col('genomeFeatures'))
    )

    # Process genres using StringIndexer + OneHotEncoder
    movies_df = movies_df.withColumn("split_genres", split(col("genres"), "\|"))
    movies_exploded = movies_df.withColumn("genre", explode(col("split_genres")))
    genre_indexer = StringIndexer(inputCol="genre", outputCol="genreIndex")
    indexed_genre = genre_indexer.fit(movies_exploded).transform(movies_exploded)
    genre_encoder = OneHotEncoder(inputCol="genreIndex", outputCol="genreVec")
    encoded_genre = genre_encoder.fit(indexed_genre).transform(indexed_genre)
    genre_aggregated = encoded_genre.groupBy("movieId").agg(collect_list("genreVec").alias("genreVecList"))
    genre_aggregated = genre_aggregated.withColumn("genresVec", sum_vectors_udf("genreVecList"))

    # Process tags using StringIndexer + OneHotEncoder
    tags_df = tags_df.join(genome_tags_df, tags_df.tag == genome_tags_df.tag, "inner").select(tags_df["*"])
    tag_indexer = StringIndexer(inputCol="tag", outputCol="tagIndex")
    tag_model = tag_indexer.fit(tags_df)
    indexed_tags = tag_model.transform(tags_df)
    tag_encoder = OneHotEncoder(inputCols=["tagIndex"], outputCols=["tagVec"])
    tags_encoded = tag_encoder.fit(indexed_tags).transform(indexed_tags)
    movie_tags_features = tags_encoded.groupBy('movieId').agg(collect_list('tagVec').alias('tagVectors'))
    movie_tags_features = movie_tags_features.withColumn('tagFeatures', sum_vectors_udf('tagVectors'))

    movie_genome_features = movie_genome_features.withColumn('genomeFeaturesVec', to_vector_udf('genomeFeatures'))

    # Combine movie features with ratings
    complete_data_df = ratings_df.join(genre_aggregated.select("movieId", "genresVec"), "movieId") \
        .join(movie_tags_features, "movieId") \
        .join(movie_genome_features.select('movieId', 'genomeFeaturesVec'), 'movieId', 'left')  # Use the corrected column

    # 检查数据集 schema
    complete_data_df.printSchema()

    # 删除包含空值的行
    complete_data_df = complete_data_df.dropna()

    # Assemble features into a single column
    assembler = VectorAssembler(
        inputCols=["genresVec", "tagFeatures", "genomeFeaturesVec"], 
        outputCol="features",
    )
    data_ready = assembler.transform(complete_data_df)

    # 确保数据类型正确
    data_ready = data_ready.withColumn("rating", col("rating").cast("double"))

    return data_ready

# Assuming spark is your SparkSession and ratings_df is the DataFrame you are passing
data_ready = feature_engineering(spark, ratings_df)
print(data_ready)

                                                                                

root
 |-- movieId: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- genresVec: vector (nullable = true)
 |-- tagVectors: array (nullable = false)
 |    |-- element: vector (containsNull = false)
 |-- tagFeatures: vector (nullable = true)
 |-- genomeFeaturesVec: vector (nullable = true)



                                                                                

DataFrame[movieId: int, userId: int, rating: double, timestamp: int, genresVec: vector, tagVectors: array<vector>, tagFeatures: vector, genomeFeaturesVec: vector, features: vector]


In [6]:
def train_evaluate_visualize(data_ready):
    """
    Train, evaluate, and visualize movie recommendations using a Random Forest model.

    Args:
    data_ready (DataFrame): DataFrame containing prepared features for training.
    """

    # Splitting the dataset
    training_features, test_features = data_ready.randomSplit([0.7, 0.3])

    # Training the Random Forest model with optimized parameters
    rf = RandomForestRegressor(
        featuresCol="features",
        labelCol="rating",
        numTrees=5,            # 减少树的数量以加快训练
        maxDepth=5,             # 降低树的最大深度
        maxBins=32,
        featureSubsetStrategy="auto",  # 自动选择特征子集
        subsamplingRate=0.7,    # 在每棵树的训练中使用70%的数据
        minInstancesPerNode=1  # 每个节点的最小实例数
    )
    model = rf.fit(training_features)

    # Evaluate model on test dataset
    predictions_df = model.transform(test_features)
    rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")

    # Calculate and output evaluation metrics
    rmse = rmse_evaluator.evaluate(predictions_df)
    mae = mae_evaluator.evaluate(predictions_df)
    print(f"Root Mean Square Error (RMSE): {rmse}")
    print(f"Mean Absolute Error (MAE): {mae}")

    # Get top5 movie recommendations
    top_predictions = predictions_df.orderBy('prediction', ascending=False).limit(5)

    # Join with the movies dataframe to get movie titles
    movies_df = spark.read.csv("dataset/movies.csv", header=True, inferSchema=True)  # Load movies data
    top_movies_with_titles = top_predictions.join(movies_df, 'movieId').select('title', 'prediction')
    print(top_movies_with_titles)


train_evaluate_visualize(data_ready)

[Stage 56:>                                                         (0 + 1) / 1]

# Part 5: Model Comparison

**ALS model**:
+ Advantages: It is suitable for large-scale data sets, can effectively handle sparsity problems, and is often used in recommendation systems.
+ Disadvantages: Need to adjust multiple parameters, sensitive to cold start issues.

**Random forest model**:
+ Advantages: It handles nonlinear relationships well and is less prone to overfitting.
+ Disadvantages: A large amount of feature engineering is required and the computational cost is high.