# Steam Review Dataset Feature Engineering and Modeling
This notebook performs feature engineering on a cleaned dataset of Steam reviews using PySpark. It includes review text processing, timestamp decomposition, and the creation of cyclical and behavioral features to support sentiment classification tasks at scale using GCP Dataproc.

In [None]:
spark

## 1. Imports and Loading/Inspection of Data

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib as plt
from textblob import TextBlob

In [None]:
from pyspark.sql.functions import col, isnan, isnull, when, count, udf, size, split, year, month, format_number, date_format, length, lit, from_unixtime, sin
from pyspark.sql.types import IntegerType, DateType, StringType, StructType, DoubleType
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Normalizer, StandardScaler, HashingTF, IDF, Tokenizer, RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 200)

In [None]:
# Set up paths to file (Make sure to replace the bucket variable below with the path to your bucket)
bucket = 'gs://whatever-your-bucket-name-is'
landing_folder = f'{bucket}/landing/'
cleaned_folder = f'{bucket}/cleaned/'
trusted_folder = f'{bucket}/trusted/'
models_folder = f'{bucket}/models/'
cleaned_filename = f'{cleaned_folder}/cleaned_steam_reviews_data.parquet/'

In [None]:
#read in cleaned file
sdf = spark.read.parquet(cleaned_filename)

In [None]:
#set dynamic columns
column_list = sdf.columns

In [None]:
#check dataset column statistics
sdf.summary().show(vertical=True)

## 2. Timestamp Feature Engineering

In [None]:
#create wordcount column for reviews
sdf = sdf.withColumn('review_wordcount', size(split(col('clean_review'), ' ')))

In [None]:
#filter out short reviews
sdf = sdf.where(length(sdf.clean_review) > 10)

#filter out reviews with less than 5 words
sdf = sdf.where(sdf.review_wordcount > 5)

In [None]:
#change column name for clarity
sdf = sdf.withColumnRenamed('timestamp_created', 'unix_timestamp_created')

In [None]:
#transform unix timestamp into spark readable time
sdf = sdf.withColumn('fe_timestamp_created', from_unixtime('unix_timestamp_created'))

In [None]:
# Double check schema
sdf.printSchema()

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, minute, weekofyear
# Create dictionary to apply functions to create features from timestamp
timestamp_feature = {
    'month': month,
    'weekday': dayofweek,
    'hour': hour,
}

for name, function in timestamp_feature.items():
    sdf = sdf.withColumn(name, function('fe_timestamp_created'))

In [None]:
# Make date features cyclical to help model understand time related patterns
sdf = sdf.withColumn('hour_sin', sin(2 * 3.1416 * col('hour') / 24))
sdf = sdf.withColumn('month_sin', sin(2 * 3.1416 * col('month') / 24))
sdf = sdf.withColumn('weekday_sin', sin(2 * 3.1416 * col('weekday') / 24))

In [None]:
sdf.show(1, vertical=True)

In [None]:
# Rename column for clarity
sdf = sdf.withColumnRenamed('author_last_played', 'author_last_played_unix')

In [None]:
# Transfrom column from unix time stamp to year-month-day for feature engineering
sdf = sdf.withColumn('author_last_played_fe', from_unixtime('author_last_played_unix'))

In [None]:
sdf.show(1, vertical=True)

In [None]:
#dictionary to apply functions to create features from author_last_played = alp features
alp_timestamp_feature = {
    'alp_month': month,
    'alp_weekday': dayofweek,
    'alp_hour': hour,
}

for alpname, alpfunction in alp_timestamp_feature.items():
    sdf = sdf.withColumn(alpname, alpfunction('author_last_played_fe'))

In [None]:
#make date features cyclical for model
sdf = sdf.withColumn('alp_hour_sin', sin(2 * 3.1416 * col('alp_hour') / 24))
sdf = sdf.withColumn('alp_month_sin', sin(2 * 3.1416 * col('alp_month') / 24))
sdf = sdf.withColumn('alp_weekday_sin', sin(2 * 3.1416 * col('alp_weekday') / 24))

In [None]:
sdf.show(1, vertical=True)

## 3. Text Feature Engineering & Sentiment Extraction

In [None]:
#columns we are excluding from double cast
double_exclusion_columns = ['review', 'game', 'clean_review', 'language', 'author_last_played_fe', 'fe_timestamp_created'] 

In [None]:
#select all columns we are casting double to
columns_to_double = [c for c in sdf.columns if c not in double_exclusion_columns]

In [None]:
for column in columns_to_double:
    sdf = sdf.withColumn(column, col(column).cast(DoubleType()))

In [None]:
#check if double broadcast worked 
sdf.printSchema()

In [None]:
#set up tokenizer TF and IDF
tokenizer = RegexTokenizer(inputCol='clean_review', outputCol='clean_review_words', pattern='\\w+', gaps=False)
sdf = tokenizer.transform(sdf)

In [None]:
#running hash function over tokens
sdf = sdf.drop('clean_review_tf')
hashtf = HashingTF(numFeatures=1024, inputCol='clean_review_words', outputCol='clean_review_tf')

sdf = hashtf.transform(sdf)

print("Hashed sdf record count", sdf.count())

In [None]:
# Drop just to make sure IDF can output col
sdf = sdf.drop('clean_review_features')
#create inverse document frequency vectors
idf = IDF(inputCol='clean_review_tf', outputCol='clean_review_features', minDocFreq=100)
sdf = idf.fit(sdf).transform(sdf)

In [None]:
# Display Clean Review Feature
sdf.select('clean_review_features').show(2, truncate=False, vertical=True)

In [None]:
#use textblob to get sentiment analysis of review column
from textblob import TextBlob
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, isnan, when, count, udf

# Create a function to perform sentiment analysis on text
def sentiment_analysis(some_text):
    sentiment = TextBlob(some_text).sentiment.polarity
    return sentiment

#turn function into UDF
sentiment_analysis_udf = udf(sentiment_analysis, DoubleType())

In [None]:
#using sentiment_analysis_udf to create clean_review_sentiment column
sdf = sdf.withColumn('clean_review_sentiment', sentiment_analysis_udf(sdf['clean_review']))

In [None]:
# Display cleaned review and Sentiment score from TextBlob
sdf.select('clean_review', 'clean_review_sentiment').show(2, truncate=False, vertical=True)

In [None]:
sdf.show(1, vertical=True)

## 4. Categorical Feature Engineering

In [None]:
#pick top 200 games to encode categorically
top_games = sdf.groupby('game').count().orderBy('count', ascending=False).limit(200)

In [None]:
top_games.show(5)

In [None]:
#get list of games from top_games
top_game_list = [row['game'] for row in top_games.collect()]

In [None]:
#limit games to top 200 and refer to games after top 200 as other
sdf = sdf.withColumn('game', when(col('game').isin(top_game_list), col('game')).otherwise('other'))

In [None]:
#rename label for clarity
sdf = sdf.withColumnRenamed('voted_up', 'label')

In [None]:
#create age of review features
from pyspark.sql.functions import abs, floor
sdf = sdf.withColumn('age_of_review_unix', abs(col('unix_timestamp_created')) - abs(col('author_last_played_unix')))
sdf = sdf.withColumn('age_of_review_fe', from_unixtime('age_of_review_unix'))

#compute days that have passed between review time an d last played time
sdf = sdf.withColumn('age_of_review_days', floor(abs(col('age_of_review_unix')) / 86400))

In [None]:
sdf.show(1, vertical=True)

In [None]:
from pyspark.ml.feature import Bucketizer
#match index for seasonal month to use bucketizer
sdf = sdf.withColumn('shifted_month', when(col('month') == 12, 0).otherwise(col('month')))

#seasonal splits foir bucketizer
season_splits = [0, 3, 6, 9, 12]

# Engineer seasonal bucket feature
bucketizer = Bucketizer(splits=season_splits, inputCol='shifted_month', outputCol='season_bucket')
sdf = bucketizer.setHandleInvalid('keep').transform(sdf)

In [None]:
# Check Seasonal bucket feature counts
sdf.groupBy('season_bucket').count().show()

In [None]:
# Make sure binary column doesnt have any non binary values
sdf = sdf.filter((col('steam_purchase') <= 1) & (col('steam_purchase') >= 0))

In [None]:
sdf.groupBy('steam_purchase').count().show()

In [None]:
# Make sure binary column doesnt have any non binary values
sdf = sdf.filter((col('received_for_free') <= 1) & (col('received_for_free') >= 0))

In [None]:
sdf.groupBy('received_for_free').count().show()

In [None]:
# Make sure binary column doesnt have any non binary values
sdf = sdf.filter((col('written_during_early_access') <= 1) & (col('written_during_early_access') >= 0))

In [None]:
sdf.groupBy('written_during_early_access').count().show()

In [None]:
# Check age_of_review_days feature 
sdf.select('age_of_review_days').show()

In [None]:
sdf.columns

## 5. Assembling ML Pipeline and Results

In [None]:
#list continous data columns
continous_columns_list = [
 'author_num_games_owned',
 'author_num_reviews',
 'author_playtime_forever',
 'author_playtime_last_two_weeks',
 'author_playtime_at_review',
 'steam_purchase',
 'received_for_free',
 'written_during_early_access',
 'clean_review_length',
 'review_wordcount',
 'clean_review_sentiment',
 'hour_sin',
 'month_sin',
 'weekday_sin',
 'alp_hour_sin',
 'alp_month_sin',
 'alp_weekday_sin',
 'age_of_review_days'
]
#index top 1000 games ML PIPELINE1
game_indexer = StringIndexer(inputCol='game', outputCol='game_index')

#OHE 1000 top games encoded for indexes ML PIPELINE 2
encoder = OneHotEncoder(inputCols=['game_index', 'season_bucket'], outputCols=['game_ohe', 'season_ohe'], dropLast=True)

#assemble continuous columns into vector ML PIPELINE 3
continuous_assembler = VectorAssembler(inputCols=continous_columns_list, outputCol='continousVector')

#scale the continous columns ML PIPELINE 4
scaler = StandardScaler(inputCol='continousVector', outputCol='continuous_scaled')

#assemble all of the vectors together into one large vector ML PIPELINE 5
final_assembler = VectorAssembler(inputCols=['continuous_scaled', 'game_ohe', 'clean_review_features', 'season_ohe'], outputCol='features')

In [None]:
#create estimator
lr = LogisticRegression(featuresCol='features')

In [None]:
#assemble pipeline
machine_learning_pipeline = Pipeline(stages=[game_indexer, encoder, continuous_assembler, scaler, final_assembler, lr])

In [None]:
#split into training and test
trainingData, testData = sdf.randomSplit([0.7, 0.3])

In [None]:
#create evaluator
evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC')

In [None]:
#create a grid to hold hyperparameters
grid = ParamGridBuilder()
grid = grid.addGrid(lr.regParam,  [0.0, 0.5, 1.0])
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

#build parameter grid
grid = grid.build()

#show # of models tested
print('Number of models to be tested: ', len(grid))

In [None]:
#create cross validator
cv = CrossValidator(estimator=machine_learning_pipeline,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    numFolds=3,
                    parallelism=2
                   )

In [None]:
#use CV to fit data
all_models = cv.fit(trainingData)

In [None]:
#use bestModel to run model on test data
best_model = all_models.bestModel

In [None]:
# Save the model to the models folder
model_path =  f'{models_folder}/steam_reviews_logistic_regression_model_2'
best_model.write().overwrite().save(model_path)

In [None]:
#show avg metric on all the model runs
print(f'Average metric: {all_models.avgMetrics}')

In [None]:
#use the BestModel to run the model on the testData
test_results = best_model.transform(testData)

In [None]:
#evaluate the predictions
print(evaluator.evaluate(test_results))

In [None]:
#show label and prediction
test_results.select(['label', 'prediction']).show(10, truncate=False)

In [None]:
#create confusion matrix
cm = test_results.groupby('label').pivot('prediction').count().fillna(0).sort('label', ascending=True).collect()

In [None]:
#function to calculate recall and precision
def calculate_recall_precision(cm):
    tn = cm[0][1]
    fp = cm[0][2]
    fn = cm[1][1]
    tp = cm[1][2]
    precision = tp / (tp +fp)
    recall = tp / (tp + fn)
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    f1_score = 2 * ((precision * recall) / (precision + recall))
    return accuracy, precision, recall, f1_score

In [None]:
# Assign evaluation metrics
accuracy, precision, recall, f1_score = calculate_recall_precision(cm)

In [None]:
print(accuracy, precision, recall, f1_score)

In [None]:
cm

In [None]:
# Create Confusion matrix
print("label,0.0,1.0\n", cm[0][0],",",cm[0][1],",",cm[0][2], "\n", cm[1][0],",",cm[1][1],",",cm[1][2])

In [None]:
#Check stages in pipeline
for i, stage in enumerate(best_model.stages):
    print(f"Stage {i}: {type(stage)}")

In [None]:
#look at parameters for the best model from grid
paramap = best_model.stages[-1].extractParamMap()

for p, v in paramap.items():
    print(p, v)
    
#grab model stage
mymodel = best_model.stages[-1]

# Plot ROC curve for model evaluation
import matplotlib.pyplot as plt
plt.plot(mymodel.summary.roc.select('FPR').collect(),
         mymodel.summary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.savefig('roc1.png')
plt.show()

In [None]:
import numpy as np
hyperparams = cv.getEstimatorParamMaps()[np.argmax(all_models.avgMetrics)]
#print out hyperparams for best model
for i in range(len(hyperparams.items())):
    print([x for x in hyperparams.items()][i])

In [None]:
#extract the coefficients on each of the variables
coeff = mymodel.coefficients.toArray().tolist()

#extract original column names from features
var_index = dict()
for variable_type in ['numeric', 'binary']:
    for variable in test_results.schema['features'].metadata['ml_attr']['attrs'][variable_type]:
        print(f'Found variable: {variable}' )
        idx = variable['idx']
        name = variable['name']
        var_index[idx] = name
        
#print out associated coefficients
for i in range(len(var_index)):
    print(f'Coefficient {i} {var_index[i]} {coeff[i]}')
    
import pandas as pd

coef_df = pd.DataFrame({
    'feature': [var_index[i] for i in range(len(var_index))],
    'coefficient': coeff
})

#Find top 5 most influential features by absolute value for plotting
top5 = coef_df.reindex(coef_df.coefficient.abs().sort_values(ascending=False).index).head(5)

print("\nTop 5 Most Influential Features:")
for i, row in top5.iterrows():
    print(f"{row['feature']}: {row['coefficient']:.6f}")

In [None]:
# plot sentiment score distributiopn by label class to see if Sentiment score helps Model discern
sample = sdf.select("clean_review_sentiment", "label").sample(False, 0.01).toPandas()

import seaborn as sns
import matplotlib.pyplot as plt

sns.histplot(data=sample, x="clean_review_sentiment", hue="label", bins=50, stat="density", common_norm=False)
plt.title("Review Sentiment Score Distribution by Class")
plt.xlabel("Sentiment Score")
plt.ylabel("Density")
plt.tight_layout()
plt.show()


In [None]:
# Plot top 5 influential features from model
top5 = coef_df.reindex(coef_df.coefficient.abs().sort_values(ascending=False).index).head(5)

plt.figure(figsize=(6, 4))
sns.barplot(data=top5, x="coefficient", y="feature", orient="h")
plt.title("Top 5 Most Influential Features")
plt.tight_layout()
plt.show()

In [None]:
# Extract precision-recall points from the logistic regression summary
pr_df = mymodel.summary.pr.toPandas()

import matplotlib.pyplot as plt

plt.figure(figsize=(6, 5))
plt.plot(pr_df['recall'], pr_df['precision'], marker='o')
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.grid(True)
plt.tight_layout()
plt.show()