In [2]:
from pyspark.sql import SparkSession


In [3]:
cleaned_data_path = 'gs://steam-reviews-bucket/cleaned/fixed_steam_reviews_cleaned.parquet'

df = spark.read.parquet(cleaned_data_path)

df.printSchema()
df.show(5)

                                                                                

root
 |-- app_id: long (nullable = true)
 |-- app_name: string (nullable = true)
 |-- review_id: long (nullable = true)
 |-- language: long (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: timestamp (nullable = true)
 |-- timestamp_updated: timestamp (nullable = true)
 |-- recommended: long (nullable = true)
 |-- votes_helpful: long (nullable = true)
 |-- votes_funny: long (nullable = true)
 |-- weighted_vote_score: double (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- steam_purchase: long (nullable = true)
 |-- received_for_free: long (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)
 |-- author.steamid: long (nullable = true)
 |-- author.num_games_owned: long (nullable = true)
 |-- author.num_reviews: long (nullable = true)
 |-- author.playtime_forever: double (nullable = true)
 |-- author.playtime_last_two_weeks: double (nullable = true)
 |-- author.playtime_at_review: double (nullable = true)
 |-- author.

                                                                                

+------+--------------------+---------+--------+----------------------------------+-------------------+-------------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+-------------------+-----------------+-----------------+
|app_id|            app_name|review_id|language|                            review|  timestamp_created|  timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|   author.steamid|author.num_games_owned|author.num_reviews|author.playtime_forever|author.playtime_last_two_weeks|author.playtime_at_review| author.last_played|review_word_count|review_month_year|
+------+--------------------+---------+--------+----------------------------------+---------

In [4]:
from pyspark.sql.functions import col

df = df.withColumn("author_num_games_owned", col("`author.num_games_owned`")) \
       .withColumn("author_num_reviews", col("`author.num_reviews`")) \
       .withColumn("author_playtime_forever", col("`author.playtime_forever`")) \
       .withColumn("author_playtime_last_two_weeks", col("`author.playtime_last_two_weeks`")) \
       .withColumn("author_playtime_at_review", col("`author.playtime_at_review`")) \
       .drop("author")

df.printSchema()

root
 |-- app_id: long (nullable = true)
 |-- app_name: string (nullable = true)
 |-- review_id: long (nullable = true)
 |-- language: long (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: timestamp (nullable = true)
 |-- timestamp_updated: timestamp (nullable = true)
 |-- recommended: long (nullable = true)
 |-- votes_helpful: long (nullable = true)
 |-- votes_funny: long (nullable = true)
 |-- weighted_vote_score: double (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- steam_purchase: long (nullable = true)
 |-- received_for_free: long (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)
 |-- author.steamid: long (nullable = true)
 |-- author.num_games_owned: long (nullable = true)
 |-- author.num_reviews: long (nullable = true)
 |-- author.playtime_forever: double (nullable = true)
 |-- author.playtime_last_two_weeks: double (nullable = true)
 |-- author.playtime_at_review: double (nullable = true)
 |-- author.

In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

categorical_columns = [
    'language', 'steam_purchase', 'received_for_free'
]


indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_indexed", handleInvalid="skip") 
    for col in categorical_columns
]

encoders = [
    OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_encoded")
    for col in categorical_columns
]

In [6]:
numeric_columns = [
    'author_num_games_owned', 'author_num_reviews', 'author_playtime_forever',
    'author_playtime_last_two_weeks', 'author_playtime_at_review', 
    'votes_helpful', 'votes_funny', 'comment_count', 'weighted_vote_score'
]

numeric_assembler = VectorAssembler(inputCols=numeric_columns, outputCol="numeric_features")
scaler = MinMaxScaler(inputCol="numeric_features", outputCol="scaled_numeric_features")


In [7]:
feature_assembler = VectorAssembler(
    inputCols=['scaled_numeric_features', *[f"{col}_encoded" for col in categorical_columns]],
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + encoders + [numeric_assembler, scaler, feature_assembler])

pipeline_model = pipeline.fit(df)
df_transformed = pipeline_model.transform(df)

df_transformed.select("features").show(5, truncate=False)


[Stage 14:>                                                         (0 + 1) / 1]

+-----------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                       |
+-----------------------------------------------------------------------------------------------------------------------------------------------+
|(38,[0,1,2,3,4,10,36,37],[0.010619469026548672,0.018518518518518517,0.054697428991016026,0.5220407177338798,0.06679485384942173,1.0,1.0,1.0])  |
|(38,[0,1,2,3,4,10,36,37],[0.05309734513274336,0.16666666666666666,0.07925956228267501,0.9889210557624533,0.09363207191392155,1.0,1.0,1.0])     |
|(38,[0,2,3,4,10,36,37],[0.008849557522123894,0.030336389188294604,0.38251740436163434,0.037010803997447446,1.0,1.0,1.0])                       |
|(38,[0,1,2,3,4,9,36,37],[0.008849557522123894,0.037037037037037035,0.16035769360706484,1.0,0.19361386470323455,1.0,1.0,1.0]


                                                                                

In [8]:
train_data, test_data = df_transformed.randomSplit([0.7, 0.3], seed=42)


In [9]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="recommended", maxIter=10)
lr_model = lr.fit(train_data)

print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)


24/12/07 21:46:14 WARN BlockManager: Asked to remove block broadcast_38_piece0, which does not exist


Coefficients: [0.09366920492897707,-0.7926007563826885,5.290529311050524,0.3877093588135467,1.9887471048376835,1.2080351600547317,-3.3729514154944615,-0.6963868494032718,-3.1472955273490824,-0.27392397421042675,-0.7063881490338667,0.4735130228609157,0.5080968660055106,1.0318339840236668,0.6470838669932164,1.0383780509036957,0.013702173718120226,-1.6355359728384895,-0.08462805994031344,-0.4446060111745185,1.089298284874528,1.3972983178494753,-0.17874072165842494,0.11246222582377381,1.7120326124845993,-0.9168143617295654,1.7461100329838966,0.12393718555621337,-0.23958287809088186,-0.8514081449384815,-1.5548799420857855,-0.825294808064331,-2.0071506705443904,-0.5227547468702936,-0.2513832118127253,-0.32705258383460634,-0.07608677120175589,-0.4244706472161419]
Intercept: 5.415204475423947


In [10]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

test_results = lr_model.transform(test_data)

evaluator = BinaryClassificationEvaluator(labelCol="recommended", metricName="areaUnderROC")

roc_auc = evaluator.evaluate(test_results)
print(f"ROC AUC: {roc_auc}")


                                                                                

ROC AUC: 0.9265262249572869




                                                                                

In [13]:
confusion_matrix = test_results.groupBy("recommended").pivot("prediction").count().fillna(0)
confusion_matrix.show()

matrix = confusion_matrix.collect()
tn = matrix[0][1]
fp = matrix[0][2]
fn = matrix[1][1]
tp = matrix[1][2]

accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1_score = 2 * (precision * recall) / (precision + recall)

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")

                                                                                

+-----------+---+-----+
|recommended|0.0|  1.0|
+-----------+---+-----+
|          1|167|58641|
|          0|201|  979|
+-----------+---+-----+





Accuracy: 0.019103820764152832
Precision: 0.016420664206642066
Recall: 0.8296610169491525
F1 Score: 0.032203947368421054



                                                                                

In [19]:
lr_model_path = "gs://steam-reviews-bucket/models/logistic_regression_model"
lr_model.write().overwrite().save(lr_model_path)
print(f"Logistic Regression model saved at {lr_model_path}")


                                                                                

Logistic Regression model saved at gs://steam-reviews-bucket/models/logistic_regression_model


In [23]:
trusted_data_path = "gs://steam-reviews-bucket/trusted/steam_reviews_features.parquet"
df_transformed.select("features", "recommended").write.mode("overwrite").parquet(trusted_data_path)
print(f"Transformed dataset with features saved at {trusted_data_path}")


                                                                                

Transformed dataset with features saved at gs://steam-reviews-bucket/trusted/steam_reviews_features.parquet


In [24]:
evaluation_results_path = "gs://steam-reviews-bucket/results/evaluation_metrics.txt"

evaluation_results = f"""
Logistic Regression Evaluation Metrics:
----------------------------------------
Accuracy: {accuracy}
Precision: {precision}
Recall: {recall}
F1 Score: {f1_score}
ROC AUC: {roc_auc}
"""

with open("evaluation_metrics.txt", "w") as file:
    file.write(evaluation_results)

In [25]:
import subprocess
subprocess.run(["gsutil", "cp", "evaluation_metrics.txt", evaluation_results_path], check=True)
print(f"Evaluation metrics saved at {evaluation_results_path}")

import os
os.remove("evaluation_metrics.txt")

Copying file://evaluation_metrics.txt [Content-Type=text/plain]...
/ [0 files][    0.0 B/  231.0 B]                                                
/ [1 files][  231.0 B/  231.0 B]                                                
Operation completed over 1 objects/231.0 B.                                      


Evaluation metrics saved at gs://steam-reviews-bucket/results/evaluation_metrics.txt
