Import packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, TimestampType
from pyspark.sql.functions import col, from_unixtime, when, trim, lit, to_date

Start spark session

In [2]:
spark_session = (SparkSession.builder
                    .master('local[*]')
                    .config("spark.executor.memory", "8g")
                    .config("spark.driver.memory", "4g")
                    .appName('test app')
                    .getOrCreate())   

Load the data from csv

In [3]:
schema = StructType([
    StructField("recommendationid", IntegerType(), True),
    StructField("appid", IntegerType(), True),
    StructField("game", StringType(), True),
    StructField("author_steamid", StringType(), True),
    StructField("author_num_games_owned", IntegerType(), True),
    StructField("author_num_reviews", IntegerType(), True),
    StructField("author_playtime_forever", IntegerType(), True),
    StructField("author_playtime_last_two_weeks", IntegerType(), True),
    StructField("author_playtime_at_review", IntegerType(), True),
    StructField("author_last_played", IntegerType(), True),
    StructField("language", StringType(), True),
    StructField("review", StringType(), True),
    StructField("timestamp_created", IntegerType(), True),
    StructField("timestamp_updated", IntegerType(), True),
    StructField("voted_up", IntegerType(), True),
    StructField("votes_up", IntegerType(), True),
    StructField("votes_funny", IntegerType(), True),
    StructField("weighted_vote_score", FloatType(), True),
    StructField("comment_count", IntegerType(), True),
    StructField("steam_purchase", IntegerType(), True),
    StructField("received_for_free", IntegerType(), True),
    StructField("written_during_early_access", IntegerType(), True),
    StructField("hidden_in_steam_china", IntegerType(), True),
    StructField("steam_china_location", StringType(), True)
])

In [4]:
df = spark_session.read.csv(
    "reviews/reviews.csv",
    header=True,
    schema=schema
)

df.printSchema()

root
 |-- recommendationid: integer (nullable = true)
 |-- appid: integer (nullable = true)
 |-- game: string (nullable = true)
 |-- author_steamid: string (nullable = true)
 |-- author_num_games_owned: integer (nullable = true)
 |-- author_num_reviews: integer (nullable = true)
 |-- author_playtime_forever: integer (nullable = true)
 |-- author_playtime_last_two_weeks: integer (nullable = true)
 |-- author_playtime_at_review: integer (nullable = true)
 |-- author_last_played: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- review: string (nullable = true)
 |-- timestamp_created: integer (nullable = true)
 |-- timestamp_updated: integer (nullable = true)
 |-- voted_up: integer (nullable = true)
 |-- votes_up: integer (nullable = true)
 |-- votes_funny: integer (nullable = true)
 |-- weighted_vote_score: float (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- steam_purchase: integer (nullable = true)
 |-- received_for_free: integer (nullable = t

In [14]:
df.show()

+----------------+-----+--------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+---------+----------------------------------+-----------------+-----------------+----------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+---------------------+--------------------+
|recommendationid|appid|          game|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played| language|                            review|timestamp_created|timestamp_updated|  voted_up|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|hidden_in_steam_china|steam_china_location|
+----------------+-----+--------------+-----------------+----------------------+------------------

In [5]:
# Ids
COL_RECOMMENDATION_ID = "recommendation_id"
COL_APP_ID = "app_id"
COL_AUTHOR_STEAM_ID = "author_steam_id" 

# Game
COL_GAME_TITLE = "game_title"

# Author
COL_AUTHOR_NUM_GAMES = "author_num_games_owned"
COL_AUTHOR_NUM_REVIEWS = "author_num_reviews"
COL_AUTHOR_PLAYTIME_LIFETIME = "author_playtime_lifetime" # lifetime playtime tracked in game (originaly in minutes, later converted to hours)
COL_AUTHOR_PLAYTIME_LAST_TWO_WEEKS = "author_playtime_last_two_weeks" # playtime tracked in the past two weeks for this game (holds no useful information, will drop)
COL_AUTHOR_PLAYTIME_AT_REVIEW = "author_playtime_at_review" # playtime when the review was written (originaly in minutes, later converted to hours)
COL_AUTHOR_LAST_PLAYED = "author_last_played"

AUTHOR_COLUMNS = [COL_AUTHOR_NUM_GAMES, COL_AUTHOR_NUM_REVIEWS, COL_AUTHOR_PLAYTIME_LIFETIME, COL_AUTHOR_PLAYTIME_AT_REVIEW, COL_AUTHOR_LAST_PLAYED]

# Review details
COL_LANGUAGE = "language"
COL_REVIEW = "review" # review text
COL_CREATED_AT = "created_at"
COL_UPDATED_AT = "updated_at"
COL_IS_VOTED_UP = "is_voted_up" # if the review was positive or negative

REVIEW_DETAILS_COLUMNS = [COL_LANGUAGE, COL_REVIEW, COL_CREATED_AT, COL_UPDATED_AT, COL_IS_VOTED_UP]

# Players response to the review
COL_NUM_VOTES_UP = "num_votes_up" # number of people who voted the review up
COL_NUM_VOTES_FUNNY = "num_votes_funny" # number of people who voted the review funny
COL_WEIGHTED_VOTE_SCORE = "weighted_vote_score" # a helpfulness score (steam generated)
COL_NUM_COMMENTS = "num_comment" # number of comments to the review

PLAYERS_RESPONSE_COLUMNS = [COL_NUM_VOTES_UP, COL_NUM_VOTES_FUNNY, COL_WEIGHTED_VOTE_SCORE, COL_NUM_COMMENTS]

# Game state at a time of the review
COL_IS_PURCHASED = "is_purchased" # if the user purchased the game on Steam
COL_IS_RECEIVED_FOR_FREE = "is_received_for_free" # true if the user checked a box saying they got the app for free
COL_IS_WRITTEN_DURING_EARLY_ACCESS = "is_written_during_early_access" # if the user posted this review while the game was in Early Access

PURCHASE_STATE_COLUMNS = [COL_IS_PURCHASED, COL_IS_RECEIVED_FOR_FREE, COL_IS_WRITTEN_DURING_EARLY_ACCESS]

# Ching chong
COL_IS_HIDDEN_IN_CHINA = "is_hidden_in_china"
COL_CHINA_LOCATION = "china_location" # ??? no idea; only 1 in 0.5 million rows has a value, will delete

rename_dict = {
    "recommendationid": COL_RECOMMENDATION_ID,
    "appid": COL_APP_ID,
    "game": COL_GAME_TITLE,
    "author_steamid": COL_AUTHOR_STEAM_ID,
    "author_num_games_owned": COL_AUTHOR_NUM_GAMES,
    "author_num_reviews": COL_AUTHOR_NUM_REVIEWS,
    "author_playtime_forever": COL_AUTHOR_PLAYTIME_LIFETIME,
    "author_playtime_last_two_weeks": COL_AUTHOR_PLAYTIME_LAST_TWO_WEEKS,
    "author_playtime_at_review": COL_AUTHOR_PLAYTIME_AT_REVIEW,
    "author_last_played": COL_AUTHOR_LAST_PLAYED,
    "language": COL_LANGUAGE,
    "review": COL_REVIEW,
    "timestamp_created": COL_CREATED_AT,
    "timestamp_updated": COL_UPDATED_AT,
    "voted_up": COL_IS_VOTED_UP,
    "votes_up": COL_NUM_VOTES_UP,
    "votes_funny": COL_NUM_VOTES_FUNNY,
    "weighted_vote_score": COL_WEIGHTED_VOTE_SCORE,
    "comment_count": COL_NUM_COMMENTS,
    "steam_purchase": COL_IS_PURCHASED,
    "received_for_free": COL_IS_RECEIVED_FOR_FREE,
    "written_during_early_access": COL_IS_WRITTEN_DURING_EARLY_ACCESS,
    "hidden_in_steam_china": COL_IS_HIDDEN_IN_CHINA,
    "steam_china_location": COL_CHINA_LOCATION,
}

Rename and drop

In [6]:
for old_name, new_name in rename_dict.items():
    df = df.withColumnRenamed(old_name, new_name)
    
df = df.drop(COL_IS_HIDDEN_IN_CHINA, COL_CHINA_LOCATION, COL_AUTHOR_PLAYTIME_LAST_TWO_WEEKS)

Cast to correct data types

In [7]:
df = df \
    .withColumn(COL_IS_PURCHASED, col(COL_IS_PURCHASED).cast(BooleanType())) \
    .withColumn(COL_IS_RECEIVED_FOR_FREE, col(COL_IS_RECEIVED_FOR_FREE).cast(BooleanType())) \
    .withColumn(COL_IS_WRITTEN_DURING_EARLY_ACCESS, col(COL_IS_WRITTEN_DURING_EARLY_ACCESS).cast(BooleanType())) \
    .withColumn(COL_IS_VOTED_UP, col(COL_IS_VOTED_UP).cast(BooleanType()))

Convert minutes to hours

In [8]:
playtime_columns = [
    COL_AUTHOR_PLAYTIME_LIFETIME, 
    COL_AUTHOR_PLAYTIME_AT_REVIEW,
]

for playtime_col in playtime_columns:
    df = df.withColumn(playtime_col, (col(playtime_col) / 60).cast(FloatType()))

Convert timestamp to date

In [9]:
timestamp_columns = [
    COL_AUTHOR_LAST_PLAYED, 
    COL_UPDATED_AT, 
    COL_CREATED_AT,
]

for timestamp_col in timestamp_columns:
    df = df.withColumn(timestamp_col, from_unixtime(col(timestamp_col)).cast(TimestampType()))

In [20]:
df.show()

+-----------------+------+--------------+-----------------+----------------------+------------------+------------------------+-------------------------+-------------------+---------+----------------------------------+-------------------+-------------------+-----------+------------+---------------+-------------------+-----------+------------+--------------------+------------------------------+
|recommendation_id|app_id|    game_title|  author_steam_id|author_num_games_owned|author_num_reviews|author_playtime_lifetime|author_playtime_at_review| author_last_played| language|                            review|         created_at|         updated_at|is_voted_up|num_votes_up|num_votes_funny|weighted_vote_score|num_comment|is_purchased|is_received_for_free|is_written_during_early_access|
+-----------------+------+--------------+-----------------+----------------------+------------------+------------------------+-------------------------+-------------------+---------+--------------------------

Filter out unreasonable data

In [10]:
unreasonably_funny_comments = col(COL_NUM_VOTES_FUNNY) > 10000000

df = (df
    .filter(col(COL_CREATED_AT) <= col(COL_UPDATED_AT))
    .filter(~unreasonably_funny_comments)
    .withColumn(COL_REVIEW, when(col(COL_REVIEW).isNull(), "").otherwise(trim(col(COL_REVIEW))))
    )

Filter out dates prior to steam release

In [None]:
# Define the Steam release date
steam_release_date = to_date(lit("2003-09-12"))

# Identify missing values for last played
missing_values_for_last_played = col(COL_AUTHOR_LAST_PLAYED) < steam_release_date

# Played the game but missing last played
played_the_game_missing_values = missing_values_for_last_played & (col(COL_AUTHOR_PLAYTIME_LIFETIME) > 0)

# Never played the game
never_played_the_game = missing_values_for_last_played & (col(COL_AUTHOR_PLAYTIME_LIFETIME) <= 0)

# Keep rows that are either not missing last played or never played the game
df = df.filter(~missing_values_for_last_played | (col(COL_AUTHOR_PLAYTIME_LIFETIME) <= 0))

# Set `author_last_played` to null for rows that never played the game
df = df.withColumn(COL_AUTHOR_LAST_PLAYED, when(never_played_the_game, None).otherwise(col(COL_AUTHOR_LAST_PLAYED)))

# filter out created at before the release date
df = df.filter(col(COL_CREATED_AT) >= steam_release_date)

Drop NULL

In [12]:
df = df.dropna(subset=COL_GAME_TITLE)

Save to parquet

In [13]:
output_path = "reviews.parquet"

df.write.mode("overwrite").parquet(output_path)