# MovieLens: Spark-based Big Data Recommendation Analysis

## Task4- CONTRASTING EXPERIMENT TWO PREDICTION METHOD COMPARISON

## Model2-Part2-Decision Tree Model Training
---

# 1. Session Creating and Data Loading

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [2]:
spark = SparkSession.builder.appName("Test_decisiontree").getOrCreate()

23/12/08 05:08:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
import time

df_1 = spark.read.csv("gs://dataproc-staging-asia-southeast2-933547737015-zijhgarf/final.csv"
                      , inferSchema=True, header=True)

                                                                                

In [4]:
df_1.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- user_avg_rating: double (nullable = true)
 |-- user_avg_rating_squared: double (nullable = true)
 |-- user_min_rating: double (nullable = true)
 |-- user_max_rating: double (nullable = true)
 |-- user_median_rating: double (nullable = true)
 |-- user_rating_range: double (nullable = true)
 |-- user_rating_std: double (nullable = true)
 |-- user_num_movies_rated: integer (nullable = true)
 |-- user_num_movies_rated_squared: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- Adventure: double (nullable = true)
 |-- Animation: double (nullable = true)
 |-- Children: double (nullable = true)
 |-- Comedy: double (nullable = true)
 |-- Fantasy: double (nullable = true)
 |-- Romance: double (nullable = true)
 |-- Drama: double (nullable = true)
 |-- Action: double (nullable 

In [5]:
df_1=df_1.drop('_c0')
df_1.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- user_avg_rating: double (nullable = true)
 |-- user_avg_rating_squared: double (nullable = true)
 |-- user_min_rating: double (nullable = true)
 |-- user_max_rating: double (nullable = true)
 |-- user_median_rating: double (nullable = true)
 |-- user_rating_range: double (nullable = true)
 |-- user_rating_std: double (nullable = true)
 |-- user_num_movies_rated: integer (nullable = true)
 |-- user_num_movies_rated_squared: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- Adventure: double (nullable = true)
 |-- Animation: double (nullable = true)
 |-- Children: double (nullable = true)
 |-- Comedy: double (nullable = true)
 |-- Fantasy: double (nullable = true)
 |-- Romance: double (nullable = true)
 |-- Drama: double (nullable = true)
 |-- Action: double (nullable = true)
 |-- Crime: double (nullable

# 2. Feature Selection

In [7]:
from pyspark.sql.functions import col, concat_ws
from pyspark.ml.feature import VectorAssembler

# Feature Colum Selection
selected_cols = ['rating', 'user_avg_rating', 'user_min_rating', 'user_max_rating',
       'user_median_rating', 'user_rating_range', 'user_rating_std',
       'user_num_movies_rated']  # 选择你需要的特征列

# data = data.na.fill(0)

data = df_1.select(selected_cols)

# 将特征列合并为一个稠密向量
assembler = VectorAssembler(inputCols=data.columns[1:], outputCol="features")
data = assembler.transform(data)

selected_col = ['rating', 'features'] 
data = data.select(selected_col)

In [8]:
print(data.head(2))

[Stage 2:>                                                          (0 + 1) / 1]

[Row(rating=5.0, features=DenseVector([3.8143, 0.5, 5.0, 4.0, 4.5, 1.0042, 70.0])), Row(rating=3.5, features=DenseVector([3.8143, 0.5, 5.0, 4.0, 4.5, 1.0042, 70.0]))]


                                                                                

In [9]:
data = data.withColumnRenamed("rating", "label")

data.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [10]:
print(data.head(2))

[Stage 3:>                                                          (0 + 1) / 1]

[Row(label=5.0, features=DenseVector([3.8143, 0.5, 5.0, 4.0, 4.5, 1.0042, 70.0])), Row(label=3.5, features=DenseVector([3.8143, 0.5, 5.0, 4.0, 4.5, 1.0042, 70.0]))]


                                                                                

In [11]:
# 假设 df 是你的 Spark DataFrame
# 使用指定的值（例如 0）填充所有列中的空值
data = data.na.fill(0)

# 3.Model Trainning

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

import time 
start=time.time()
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed=0)

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

end=time.time()

execution=end-start
print(f"Cluster模式下代码执行时间为：{execution}秒")

                                                                                

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|1.4513215859030837|  0.5|[0.66666666666666...|
|1.4513215859030837|  0.5|[0.66666666666666...|
|1.4513215859030837|  0.5|[0.66666666666666...|
|1.4513215859030837|  0.5|[1.05293440736478...|
|1.4513215859030837|  0.5|[1.05293440736478...|
+------------------+-----+--------------------+
only showing top 5 rows





Root Mean Squared Error (RMSE) on test data = 0.947499
DecisionTreeRegressionModel: uid=DecisionTreeRegressor_03ad6ab44f83, depth=5, numNodes=63, numFeatures=7
Cluster模式下代码执行时间为：442.3557200431824秒


                                                                                

In [13]:
spark.stop()