In [1]:
from pyspark.ml.feature import Bucketizer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

sdf = spark.read.parquet("gs://my-bucket-an/cleaned/reviews1.parquet")

splits_author_num_reviews = [0, 100, 500, 1000, 5000, 10500]
splits_playtime_forever = [0, 100, 500, 1000, 5000, 10000, 50000, 100000, 1000000, 10000000, float('inf')]
splits_playtime_at_review = [0, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000, 10000000, 50000000, 100000000, 500000000, 1000000000, 5000000000]
splits_comment_count = [0, 10, 20, 50, 100, 200, 300, float("inf")]
splits_votes_up = [0, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 40000, 50000, 60000, float('inf')]

# bucketizer 
bucketizer_author_num_reviews = Bucketizer(splits=splits_author_num_reviews, inputCol="author_num_reviews", outputCol="author_num_reviewsBucket")
bucketizer_playtime_forever = Bucketizer(splits=splits_playtime_forever, inputCol="author_playtime_forever", outputCol="author_playtime_foreverBucket")
bucketizer_playtime_at_review = Bucketizer(splits=splits_playtime_at_review, inputCol="author_playtime_at_review", outputCol="author_playtime_at_reviewBucket")
bucketizer_comment_count = Bucketizer(splits=splits_comment_count, inputCol="comment_count", outputCol="comment_countBucket")
bucketizer_votes_up = Bucketizer(splits=splits_votes_up, inputCol="votes_up", outputCol="votes_upBucket")

# assembler
assembler = VectorAssembler(inputCols=["author_num_reviewsBucket", 
                                       "author_playtime_foreverBucket", 
                                       "author_playtime_at_reviewBucket",
                                       "comment_countBucket",
                                       "votes_upBucket"],
                            outputCol="features")

#pipeline
r_pipeline= Pipeline(stages=[bucketizer_author_num_reviews,
                            bucketizer_playtime_forever,
                            bucketizer_playtime_at_review,
                            bucketizer_comment_count,
                            bucketizer_votes_up,
                            assembler])

pipeline_model = r_pipeline.fit(sdf)

transformed_sdf = pipeline_model.transform(sdf)

print("Transformed features")
transformed_sdf.select("author_num_reviews", 
                       "author_playtime_forever", 
                       "author_playtime_at_review", 
                       "comment_count", 
                       "votes_up", 
                       "features").show(truncate=False)


trainingData, testData = transformed_sdf.randomSplit([0.7, 0.3], seed=42)

# Linear Regression Estimator
linear_reg = LinearRegression(labelCol='weighted_vote_score')

# regression evaluator
evaluator = RegressionEvaluator(labelCol='weighted_vote_score', metricName='rmse')

stagess = [linear_reg]

# pipeline
regression_pipe = Pipeline(stages=stagess)

grid = ParamGridBuilder()
grid = grid.build()

# CrossValidator 
cv = CrossValidator(estimator=regression_pipe, 
                    estimatorParamMaps=grid, 
                    evaluator=evaluator, 
                    numFolds=3)

# Train the models
all_models = cv.fit(trainingData)

# Get the best model 
bestModel = all_models.bestModel

# Use the model 'bestModel' to predict the test set
test_results = bestModel.transform(testData)

# Show the predicted weighted_vote_score
test_results.select('author_num_reviews', 'author_playtime_forever', 'comment_count', 'votes_up', 'weighted_vote_score', 'prediction').show(truncate=False)

# Calculate RMSE
rmse = evaluator.evaluate(test_results, {evaluator.metricName: 'rmse'})
print(f"RMSE: {rmse}")


                                                                                

Transformed features


                                                                                

+------------------+-----------------------+-------------------------+-------------+--------+-------------------+
|author_num_reviews|author_playtime_forever|author_playtime_at_review|comment_count|votes_up|features           |
+------------------+-----------------------+-------------------------+-------------+--------+-------------------+
|3                 |197.0                  |197                      |0            |0       |(5,[1],[1.0])      |
|21                |441.0                  |441                      |0            |0       |(5,[1],[1.0])      |
|1                 |1440.0                 |1313                     |0            |0       |(5,[1,2],[3.0,1.0])|
|4                 |1636.0                 |1612                     |0            |0       |(5,[1,2],[3.0,1.0])|
|2                 |197.0                  |197                      |0            |0       |(5,[1],[1.0])      |
|2                 |1685.0                 |1649                     |0            |0   

24/04/26 02:22:15 WARN Instrumentation: [f02961c3] regParam is zero, which might cause numerical instability and overfitting.
24/04/26 02:25:26 WARN Instrumentation: [3997de96] regParam is zero, which might cause numerical instability and overfitting.
24/04/26 02:28:41 WARN Instrumentation: [83436211] regParam is zero, which might cause numerical instability and overfitting.
24/04/26 02:30:21 WARN Instrumentation: [cbe871bc] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

+------------------+-----------------------+-------------+--------+-------------------+-------------------+
|author_num_reviews|author_playtime_forever|comment_count|votes_up|weighted_vote_score|prediction         |
+------------------+-----------------------+-------------+--------+-------------------+-------------------+
|92                |76.0                   |0            |0       |0.0                |0.19066581294500634|
|2                 |80062.0                |0            |0       |0.0                |0.15285773734772784|
|8                 |4056.0                 |0            |0       |0.0                |0.14852783978391013|
|3                 |31927.0                |0            |1       |0.528985500335693  |0.1552867607201981 |
|1                 |25901.0                |0            |0       |0.0                |0.1552867607201981 |
|8                 |1664.0                 |0            |2       |0.54356849193573   |0.14852783978391013|
|2                 |11617.0 



RMSE: 0.23467789849773346



                                                                                

In [2]:
rmse = evaluator.evaluate(test_results, {evaluator.metricName:'rmse'})
r2 =evaluator.evaluate(test_results,{evaluator.metricName:'r2'})
print(f"RMSE: {rmse}  R-squared:{r2}")



RMSE: 0.23467789849773346  R-squared:0.12638481977611427



                                                                                

In [3]:
bestModel.save("gs://my-bucket-an/models/model1")

                                                                                

In [4]:
sdf.columns

['recommendationid',
 'appid',
 'game',
 'author_steamid',
 'author_num_reviews',
 'author_playtime_forever',
 'author_playtime_at_review',
 'language',
 'review',
 'voted_up',
 'votes_up',
 'votes_funny',
 'weighted_vote_score',
 'comment_count']

In [12]:
coefficients = bestModel.stages[0].coefficients
print("bestModel coefficients", coefficients)
intercept = bestModel.stages[0].intercept
print("bestModel intercept", intercept)

bestModel coefficients [0.07178270125171159,-0.014045991053698737,0.011616967681228479,-0.13789835393949887,0.2812903282502768]
bestModel intercept 0.19066581294500634


In [15]:
# Print the coefficients for each feature
for i in range(len(coefficients)):
    print(testData.columns[i], coefficients[i])

# Calculate feature importance based on absolute coefficients
feature_importance = sorted(list(zip(testData.columns[:-1], map(abs, coefficients))), key=lambda x: x[1], reverse=True)

# Print feature importance
print("Feature Importance:")
for feature, importance in feature_importance:
    print("  {}: {:.3f}".format(feature, importance))


recommendationid 0.07178270125171159
appid -0.014045991053698737
game 0.011616967681228479
author_steamid -0.13789835393949887
author_num_reviews 0.2812903282502768
Feature Importance:
  author_num_reviews: 0.281
  author_steamid: 0.138
  recommendationid: 0.072
  appid: 0.014
  game: 0.012


In [20]:
# coefficients of the best model
coefficients = bestModel.stages[-1].coefficients

# feature importance for columns
print("Feature Importance:")
for i in range(len(testData.columns)-5):
    print("  {}: {:.3f}".format(testData.columns[i+5], abs(coefficients[i])))


Feature Importance:
  author_playtime_forever: 0.072
  author_playtime_at_review: 0.014
  language: 0.012
  review: 0.138
  voted_up: 0.281


IndexError: index 5 is out of bounds for axis 0 with size 5