In [None]:
%pip install textblob

[0mNote: you may need to restart the kernel to use updated packages.


In [None]:
# Importing necessary libraries
#from pyspark.sql import SparkSession
from pyspark.sql.functions import col,  udf, abs, round, concat_ws, when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, StringIndexer, OneHotEncoder
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml import Pipeline
import re
from textblob import TextBlob

# Create a Spark session
#spark = SparkSession.builder \
    #.appName("Steam Reviews Feature Engineering") \
    #.getOrCreate()

# Define paths for reading and saving data
cleaned_reviews_path = "gs://my-bigdata-project-bl/cleaned/top_300_reviews.parquet"
trusted_data_path = "gs://my-bigdata-project-bl/trusted/"

# Read the filtered reviews data from the cleaned folder
reviews_df = spark.read.parquet(cleaned_reviews_path)

# Show the first few rows to understand the structure
reviews_df.show(5)

                                                                                

+------------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------------------+-----------------+--------+--------+---------+
|              game|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|              review|timestamp_created|voted_up|votes_up|frequency|
+------------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+--------------------+-----------------+--------+--------+---------+
|Grand Theft Auto V|                    88|                 6|                  40651|                             0|                    22829|        1690009614|ROCKSTAR!!!! WHY!...|       1498396971|       0|       1|   520192|
|Grand Theft Auto V|                     0|                 6|                  

In [None]:
# Calculate the difference in days between 'author_last_played' and 'timestamp_created'
reviews_df = reviews_df.withColumn(
    "days_since_last_play",
    round(abs((col("timestamp_created") - col("author_last_played")) / (60 * 60 * 24)))) # Converts from minutes to days

# Check the new feature
reviews_df.select("game", "author_last_played", "timestamp_created", "days_since_last_play").show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+------------------+------------------+-----------------+--------------------+
|              game|author_last_played|timestamp_created|days_since_last_play|
+------------------+------------------+-----------------+--------------------+
|Grand Theft Auto V|        1690009614|       1498396971|              2218.0|
|Grand Theft Auto V|        1658338170|       1498396964|              1851.0|
|Grand Theft Auto V|        1692710732|       1498396954|              2249.0|
|Grand Theft Auto V|        1627332181|       1498396901|              1492.0|
|Grand Theft Auto V|        1558821250|       1498396630|               699.0|
+------------------+------------------+-----------------+--------------------+
only showing top 5 rows



                                                                                

In [None]:
# Text Preprocessing

# Expand contractions
def expand_contractions(text):
    contractions = {
        "isn't": "is not", "aren't": "are not", "can't": "cannot",
        "won't": "will not", "i'm": "i am", "i've": "i have", "i'd": "i would",
        "it's": "it is", "didn't": "did not", "ive": "i have", "id": "i would",
        "you're": "you are", "we're": "we are", "they're": "they are"
    }
    for contraction, expanded in contractions.items():
        text = re.sub(r"\b" + re.escape(contraction) + r"\b", expanded, text, flags=re.IGNORECASE)
    return text

# Clean text (remove special characters, numbers, and lowercase)
def clean_text(text):
    if text is None:
        return None
    text = text.lower()  # Convert to lowercase
    text = expand_contractions(text)  # Expand contractions
    text = re.sub(r"[^a-z\s]", "", text)  # Remove non-alphabetic characters
    return text.strip()

In [None]:
# Register the UDF for text cleaning
clean_text_udf = udf(clean_text, StringType())

# Apply the UDF to clean the reviews in the DataFrame
reviews_df = reviews_df.withColumn("cleaned_review", clean_text_udf(reviews_df["review"]))

# Tokenize the cleaned reviews into words
tokenizer = Tokenizer(inputCol="cleaned_review", outputCol="words")
reviews_df = tokenizer.transform(reviews_df)

# Remove stop words
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
reviews_df = stop_words_remover.transform(reviews_df)

# Reconstruct the cleaned text without stop words
# Use concat_ws to join the words into a single string
reviews_df = reviews_df.withColumn("filtered_review", concat_ws(" ", col("filtered_words")))

# Verify the results
reviews_df.select("game", "cleaned_review", "filtered_review", "filtered_words").show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+------------------+--------------------+--------------------+--------------------+
|              game|      cleaned_review|     filtered_review|      filtered_words|
+------------------+--------------------+--------------------+--------------------+
|Grand Theft Auto V|rockstar why rest...|rockstar rest pat...|[rockstar, rest, ...|
|Grand Theft Auto V|i have played gta...|played gtav since...|[played, gtav, si...|
|Grand Theft Auto V|i played this gam...|played game  hour...|[played, game, , ...|
|Grand Theft Auto V|         bottle flip|         bottle flip|      [bottle, flip]|
|Grand Theft Auto V|  ruined just ruined|       ruined ruined|    [ruined, ruined]|
+------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [None]:
# Define the sentiment analysis function using TextBlob
def sentiment_analysis(review_text):
    try:
        sentiment = TextBlob(review_text).sentiment.polarity  # Get sentiment polarity
    except:
        sentiment = 0.0  # Fallback to neutral sentiment
    return sentiment

# Register the sentiment analysis function as a UDF
sentiment_analysis_udf = udf(sentiment_analysis, DoubleType())

# Apply the sentiment analysis function to the cleaned reviews and create a new column for sentiment score
reviews_df = reviews_df.withColumn("sentiment_score", sentiment_analysis_udf(reviews_df["filtered_review"]))

# Show the results including the sentiment score
reviews_df.select("game", "review", "sentiment_score").show(truncate=False)

[Stage 4:>                                                          (0 + 1) / 1]

+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [None]:
# Print out schema
reviews_df.printSchema()

root
 |-- game: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: long (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: long (nullable = true)
 |-- voted_up: integer (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- frequency: long (nullable = true)
 |-- days_since_last_play: double (nullable = true)
 |-- cleaned_review: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_review: string (nullable = false)
 |-- sentiment_score: double (nullable = true)



In [None]:
# Clean the DataFrame (drop nulls and duplicates)
cleaned_reviews_df = reviews_df.dropna().dropDuplicates()

# Convert specific numeric columns to DoubleType
numeric_columns = [
    "author_num_games_owned",
    "author_num_reviews",
    "author_playtime_forever",
    "author_playtime_last_two_weeks",
    "author_playtime_at_review",
    "days_since_last_play",
    "sentiment_score",
]

for col_name in numeric_columns:
    cleaned_reviews_df = cleaned_reviews_df.withColumn(col_name, col(col_name).cast(DoubleType()))

# Index and One-Hot Encode the 'game' column
game_indexer = StringIndexer(inputCol="game", outputCol="game_index")
game_encoder = OneHotEncoder(inputCol="game_index", outputCol="game_onehot")

# Create a pipeline for encoding
pipeline = Pipeline(stages=[game_indexer, game_encoder])

# Fit and transform the cleaned DataFrame
encoded_df = pipeline.fit(cleaned_reviews_df).transform(cleaned_reviews_df)

# Select only the useful columns
final_df = encoded_df.select(
    "game",
    "game_index",
    "game_onehot",
    "author_num_games_owned",
    "author_num_reviews",
    "author_playtime_forever",
    "author_playtime_last_two_weeks",
    "author_playtime_at_review",
    "days_since_last_play",
    "sentiment_score",
    "votes_up",
    "voted_up"
)

# Show the resulting DataFrame
final_df.show()

# Optional: Verify the schema
final_df.printSchema()




+----------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+--------------------+--------------------+----------+
|game_index|      game_onehot|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|days_since_last_play|     sentiment_score|  voted_up|
+----------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+--------------------+--------------------+----------+
|     184.0|(299,[184],[1.0])|                  77.0|               9.0|                  828.0|                           0.0|                    828.0|              1480.0|-0.07500000000000001|         0|
|     184.0|(299,[184],[1.0])|                 193.0|               8.0|                 3136.0|                           0.0|                    714.0|                13.

[Stage 13:>                                                         (0 + 1) / 1]                                                                                

In [None]:
# Assemble features
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[
    "game"
    "game_index",
    "game_onehot",
    "author_num_games_owned",
    "author_num_reviews",
    "author_playtime_forever",
    "author_playtime_last_two_weeks",
    "author_playtime_at_review",
    "days_since_last_play",
    "sentiment_score",
    "votes_up",
    "voted_up"
],
    outputCol="features"
)

final_assembled_df = assembler.transform(final_df)

final_assembled_df.show(10)

[Stage 16:>                                                         (0 + 1) / 1]

+----------+---------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+--------------------+--------------------+--------+--------------------+
|game_index|    game_onehot|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|days_since_last_play|     sentiment_score|voted_up|            features|
+----------+---------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+--------------------+--------------------+--------+--------------------+
|       4.0|(299,[4],[1.0])|                   0.0|               9.0|                 4789.0|                          48.0|                   2683.0|              2310.0|                 0.0|       1|(308,[0,5,301,302...|
|       4.0|(299,[4],[1.0])|                 195.0|               5.0|                 8780.0|          

                                                                                

In [None]:
# Define the GCS path for the trusted folder
trusted2_folder_path = "gs://my-bigdata-project-bl/trusted2/final_assembled.parquet"

# Save the DataFrame to the trusted folder in Parquet format
final_assembled_df.write.mode("overwrite").parquet(trusted2_folder_path)

print(f"Reviews With Features saved to {trusted2_folder_path}")

                                                                                

Reviews With Features saved to gs://my-bigdata-project-bl/trusted2/final_assembled.parquet
