In [1]:
!pip install --ignore-installed -q pyspark==3.2.1

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.0/199.0 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.sql.functions import col, split, array, collect_list,when,explode
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.ml.classification import LogisticRegression, LinearSVC
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, split
from pyspark.ml.feature import StringIndexer, VectorAssembler, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
# 初始化Spark Session
spark = SparkSession.builder.appName("MovieLensPredictionWithTFIDF").getOrCreate()

# Load the ratings and movies datasets
ratings = spark.read.csv("/content/drive/MyDrive/ratingsmall.csv", header=True, inferSchema=True)
movies = spark.read.csv("/content/drive/MyDrive/moviesmall.csv", header=True, inferSchema=True)

# 1. 将评分转换为二分类标签: 1 表示评分 >= 4.0，0 表示评分 < 4.0
ratings = ratings.withColumn("label", when(col("rating") >= 4.0, 1).otherwise(0))

In [5]:
# 2. 将userId和movieId转换为索引列
user_indexer = StringIndexer(inputCol="userId", outputCol="userIndex").fit(ratings)
movie_indexer = StringIndexer(inputCol="movieId", outputCol="movieIndex").fit(ratings)

ratings = user_indexer.transform(ratings)
ratings = movie_indexer.transform(ratings)

In [7]:
movies = movies.withColumn("genres", split(col("genres"), "\|"))
movies = movies.withColumn("title", split(col("title"), " "))


In [8]:
# 3. 处理genres列，使用TF-IDF
hashingTF_genres = HashingTF(inputCol="genres", outputCol="rawGenreFeatures", numFeatures=20)
movies = hashingTF_genres.transform(movies)

idf_genres = IDF(inputCol="rawGenreFeatures", outputCol="genreFeatures")
idf_model_genres = idf_genres.fit(movies)
movies = idf_model_genres.transform(movies)

# 4. 处理title列，使用TF-IDF
hashingTF_titles = HashingTF(inputCol="title", outputCol="rawTitleFeatures", numFeatures=20)
movies = hashingTF_titles.transform(movies)

idf_titles = IDF(inputCol="rawTitleFeatures", outputCol="titleFeatures")
idf_model_titles = idf_titles.fit(movies)
movies = idf_model_titles.transform(movies)

In [10]:
# 5. 将处理后的movies数据与ratings数据结合
ratings_with_features = ratings.join(movies.select("movieId", "genreFeatures", "titleFeatures"), on="movieId", how="left")
ratings_with_features.show()

+-------+------+------+---------+-----+---------+----------+--------------------+--------------------+
|movieId|userId|rating|timestamp|label|userIndex|movieIndex|       genreFeatures|       titleFeatures|
+-------+------+------+---------+-----+---------+----------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    1|    111.0|      11.0|(20,[0,6,10,12,19...|(20,[2,9,17],[1.8...|
|      3|     1|   4.0|964981247|    1|    111.0|     422.0|(20,[8,19],[1.465...|(20,[6,7,9,17],[1...|
|      6|     1|   4.0|964982224|    1|    111.0|     129.0|(20,[0,14,19],[1....|(20,[1,17],[1.750...|
|     47|     1|   5.0|964983815|    1|    111.0|      15.0|(20,[8,14],[1.465...|(20,[4,10,17],[1....|
|     50|     1|   5.0|964982931|    1|    111.0|      14.0|(20,[8,14,19],[1....|(20,[0,1,12,17],[...|
|     70|     1|   3.0|964982400|    0|    111.0|     390.0|(20,[0,14,19],[2....|(20,[1,6,11,15],[...|
|    101|     1|   5.0|964980868|    1|    111.0|    1095.0|(20,[8,10,19]

In [11]:
# 6. 将所有特征合并为一个特征向量
assembler = VectorAssembler(inputCols=["userIndex", "movieIndex", "timestamp", "genreFeatures", "titleFeatures"], outputCol="features")
ratings_with_features = assembler.transform(ratings_with_features)

In [12]:
# 7. 分割数据为训练集和测试集
train, test = ratings_with_features.randomSplit([0.8, 0.2], seed=1234)

In [13]:
# 8. 逻辑回归模型
# 初始化逻辑回归模型
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

# 训练逻辑回归模型
lr_model = lr.fit(train)

# 在测试集上进行预测
lr_predictions = lr_model.transform(test)

# 评估逻辑回归模型性能
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy with Binary Labels: {lr_accuracy:.4f}")

# 计算逻辑回归模型的混淆矩阵
lr_conf_matrix = lr_predictions.groupBy("label", "prediction").count()
lr_conf_matrix.show()

# 计算混淆矩阵的四个核心值
tp_lr = lr_conf_matrix.filter((col("label") == 1) & (col("prediction") == 1)).select(F.col("count")).collect()[0][0]
tn_lr = lr_conf_matrix.filter((col("label") == 0) & (col("prediction") == 0)).select(F.col("count")).collect()[0][0]
fp_lr = lr_conf_matrix.filter((col("label") == 0) & (col("prediction") == 1)).select(F.col("count")).collect()[0][0]
fn_lr = lr_conf_matrix.filter((col("label") == 1) & (col("prediction") == 0)).select(F.col("count")).collect()[0][0]

# 打印逻辑回归混淆矩阵
print(f"Confusion Matrix for Logistic Regression:\nTP: {tp_lr}, TN: {tn_lr}, FP: {fp_lr}, FN: {fn_lr}")

# 计算TPR和FPR
tpr_lr = tp_lr / (tp_lr + fn_lr)  # True Positive Rate (Sensitivity/Recall)
fpr_lr = fp_lr / (fp_lr + tn_lr)  # False Positive Rate

# 输出TPR和FPR
print(f"Logistic Regression - True Positive Rate (TPR): {tpr_lr:.4f}")
print(f"Logistic Regression - False Positive Rate (FPR): {fpr_lr:.4f}")

Logistic Regression Accuracy with Binary Labels: 0.6016
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 4251|
|    0|       0.0| 6758|
|    1|       1.0| 5268|
|    0|       1.0| 3712|
+-----+----------+-----+

Confusion Matrix for Logistic Regression:
TP: 5268, TN: 6758, FP: 3712, FN: 4251
Logistic Regression - True Positive Rate (TPR): 0.5534
Logistic Regression - False Positive Rate (FPR): 0.3545


In [18]:
accuracy = (tp_lr + tn_lr) / (tp_lr+tn_lr+fp_lr+fn_lr)
precision = tp_lr / (tp_lr + fp_lr) if (tp_lr + fp_lr) > 0 else 0.0
recall = tp_lr / (tp_lr + fn_lr) if (tp_lr + fn_lr) > 0 else 0.0
f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")

Accuracy: 0.6016
Precision: 0.5866
Recall: 0.5534
F1 Score: 0.5695


In [14]:
# 9. 支持向量机 (SVM) 模型
# 初始化SVM模型
svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=10)

# 训练SVM模型
svm_model = svm.fit(train)

# 在测试集上进行预测
svm_predictions = svm_model.transform(test)

# 评估SVM模型性能
svm_accuracy = evaluator.evaluate(svm_predictions)
print(f"SVM Accuracy with Binary Labels: {svm_accuracy:.4f}")

# 计算SVM模型的混淆矩阵
svm_conf_matrix = svm_predictions.groupBy("label", "prediction").count()
svm_conf_matrix.show()

# 计算混淆矩阵的四个核心值
tp_svm = svm_conf_matrix.filter((col("label") == 1) & (col("prediction") == 1)).select(F.col("count")).collect()[0][0]
tn_svm = svm_conf_matrix.filter((col("label") == 0) & (col("prediction") == 0)).select(F.col("count")).collect()[0][0]
fp_svm = svm_conf_matrix.filter((col("label") == 0) & (col("prediction") == 1)).select(F.col("count")).collect()[0][0]
fn_svm = svm_conf_matrix.filter((col("label") == 1) & (col("prediction") == 0)).select(F.col("count")).collect()[0][0]

# 打印SVM混淆矩阵
print(f"Confusion Matrix for SVM:\nTP: {tp_svm}, TN: {tn_svm}, FP: {fp_svm}, FN: {fn_svm}")

# 计算SVM的TPR和FPR
tpr_svm = tp_svm / (tp_svm + fn_svm)  # True Positive Rate (Sensitivity/Recall)
fpr_svm = fp_svm / (fp_svm + tn_svm)  # False Positive Rate

# 输出SVM的TPR和FPR
print(f"SVM - True Positive Rate (TPR): {tpr_svm:.4f}")
print(f"SVM - False Positive Rate (FPR): {fpr_svm:.4f}")

SVM Accuracy with Binary Labels: 0.5988
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 4350|
|    0|       0.0| 6801|
|    1|       1.0| 5169|
|    0|       1.0| 3669|
+-----+----------+-----+

Confusion Matrix for SVM:
TP: 5169, TN: 6801, FP: 3669, FN: 4350
SVM - True Positive Rate (TPR): 0.5430
SVM - False Positive Rate (FPR): 0.3504


In [17]:
accuracy = (tp_svm + tn_svm) / (tp_svm+tn_svm+fp_svm+fn_svm)
precision = tp_svm / (tp_svm + fp_svm) if (tp_svm + fp_svm) > 0 else 0.0
recall = tp_svm / (tp_svm + fn_svm) if (tp_svm + fn_svm) > 0 else 0.0
f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")

Accuracy: 0.5988
Precision: 0.5849
Recall: 0.5430
F1 Score: 0.5632


In [None]:
# 停止Spark Session
spark.stop()