# Data Cleaning

In [1]:
import findspark

from delta import *
from pyspark import SparkContext, SparkConf, RDD
import csv
from io import StringIO
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType,
    BooleanType,
)
from delta import configure_spark_with_delta_pip


findspark.init()


CSV_FOLDER = "hdfs:///steam_reviews/csvs"
TMP_FOLDER = "hdfs:///steam_reviews/steam_reviews_temp"
TMP_TABLE = "hdfs:///steam_reviews/steam_reviews_collected"
DELTA_TABLE = "hdfs:///steam_reviews/steam_reviews_processed"

# AUTHOR_DELTA = "hdfs:///steam_reviews/steam_reviews_author"
# GAME_DELTA = "hdfs:///steam_reviews/steam_reviews_game"

In [2]:
conf = (
    SparkConf()
    .setAppName("steam_reviews_csvs")
    .set("spark.driver.memory", "6g")
    .set("spark.executor.memory", "6g")
)
sc = SparkContext(conf=conf)

# Preprocessing data

Due to the feedback received during the presentation, we re-preprocessed the data and this time used spark

The preprocessing takes the filename and the text/csv data from the rdd and updates to csv with game name and appid extracted from the filename.
In addition newlines are removed and from the the rows.

In [3]:
header = "appid,game,index,recommendationid,language,review,timestamp_created,timestamp_updated,voted_up,votes_up,votes_funny,weighted_vote_score,comment_count,steam_purchase,received_for_free,written_during_early_access,author_steamid,author_num_games_owned,author_num_reviews,author_playtime_forever,author_playtime_last_two_weeks,author_playtime_at_review,author_last_played"

In [4]:
def preprocess_csv(data_init):
    try:
        file_name, data = data_init
        appid, game = tuple(file_name.split("/")[-1].split("_"))
        game = game.replace(".csv", "")

        file = list(csv.reader(StringIO(data)))

        header = ["appid", "game", "index"] + file[0][1:]

        output = StringIO()
        writer = csv.writer(output)

        # writer.writerow(header)
        for row in file[1:]:
            writer.writerow(
                [
                    str(c).replace("\r\n", " ").replace("\n", " ").replace(",", " - ")
                    for c in [appid, game] + row
                ]
            )

        data = output.getvalue()
        output.close()
        return data
    except Exception as e:
        print(data)
        print(e)
        return None

# Upload csvs to hadoop

Due to the size of the data, there is not enough memory to use collect() or just saveAsTextFile(). 
The map job has to be split up into partitions, saved and re-read once the job is complete.
It is therefore saved to a temporary folder.

* hdfs dfs -mkdir /steam_reviews
* hdfs dfs -mkdir /steam_reviews/csvs
* hdfs dfs -put steam_reviews_csvs/*.csv /steam_reviews/csvs

In [5]:
reviews = sc.wholeTextFiles(CSV_FOLDER)
processed_reviews = reviews.mapPartitions(
    lambda partition: (preprocess_csv(record) for record in partition)
).filter(lambda row: row if row and row != header else False)

processed_reviews.saveAsTextFile(TMP_FOLDER)

                                                                                

# Uploading to delta table
Now that the data is preprocessed with the bear necessities to have all necessary field, the data can be written to a deltatable

In [6]:
builder = (
    SparkSession.builder.appName("Preprocess")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .config("spark.driver.maxResultSize", "2g")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [7]:
schema = StructType(
    [
        StructField("appid", StringType()),
        StructField("game", StringType()),
        StructField("index", StringType()),
        StructField("recommendationid", StringType()),
        StructField("language", StringType()),
        StructField("review", StringType()),
        StructField("timestamp_created", IntegerType()),
        StructField("timestamp_updated", IntegerType()),
        StructField("voted_up", BooleanType()),
        StructField("votes_up", IntegerType()),
        StructField("votes_funny", IntegerType()),
        StructField("weighted_vote_score", DoubleType()),
        StructField("comment_count", IntegerType()),
        StructField("steam_purchase", BooleanType()),
        StructField("received_for_free", BooleanType()),
        StructField("written_during_early_access", BooleanType()),
        StructField("author_steamid", StringType()),
        StructField("author_num_games_owned", IntegerType()),
        StructField("author_num_reviews", IntegerType()),
        StructField("author_playtime_forever", IntegerType()),
        StructField("author_playtime_last_two_weeks", IntegerType()),
        StructField("author_playtime_at_review", IntegerType()),
        StructField("author_last_played", IntegerType()),
    ]
)

In [8]:
reviews = spark.read.csv(TMP_FOLDER, schema=schema)

# df = df.toDF(*header.split(","))
reviews.show()

                                                                                

+-------+--------------------+-----+----------------+--------+--------------------+-----------------+-----------------+--------+--------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+
|  appid|                game|index|recommendationid|language|              review|timestamp_created|timestamp_updated|voted_up|votes_up|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|
+-------+--------------------+-----+----------------+--------+--------------------+-----------------+-----------------+--------+--------+-----------+-------------------+-------------+-------

# Process Further and optimize

Now that the data is stored as a deltatable we further process it by removing unncesessary columns, fixing game names, ...

In [9]:
reviews.count()

                                                                                

4592067

In [10]:
rdd = reviews.rdd

# Fix the game names

Get the distinct game names, then ask chatGPT to fix the names and use the appid as the key to update the names.

In [11]:
distinct_game_names = (
    rdd.map(lambda row: ((row["appid"], row["game"]), 1))
    .reduceByKey(lambda acc, _: acc)
    .map(lambda game: game[0])
    .collect()
)
distinct_game_names

                                                                                

[('1085660', 'Destiny2'),
 ('284160', 'BeamNGdrive'),
 ('325610', 'TotalWarATTILA'),
 ('440900', 'ConanExiles'),
 ('493340', 'PlanetCoaster'),
 ('644930', 'TheyAreBillions'),
 ('666140', 'MyTimeAtPortia'),
 ('962130', 'Grounded'),
 ('221100', 'DayZ'),
 ('236850', 'EuropaUniversalisIV'),
 ('377160', 'Fallout4'),
 ('383980', 'RivalsofAether'),
 ('393380', 'Squad'),
 ('200210', 'RealmoftheMadGodExalt'),
 ('218620', 'PAYDAY2'),
 ('233860', 'Kenshi'),
 ('311690', 'EntertheGungeon'),
 ('322170', 'GeometryDash'),
 ('335300', 'DARKSOULSIIScholaroftheFirstSin'),
 ('418530', 'Spelunky2'),
 ('212680', 'FTLFasterThanLight'),
 ('234140', 'MadMax'),
 ('244210', 'AssettoCorsa'),
 ('361420', 'ASTRONEER'),
 ('477160', 'HumanFallFlat'),
 ('489830', 'TheElderScrollsVSkyrimSpecialEdition'),
 ('812140', 'AssassinsCreedOdyssey'),
 ('863550', 'HITMAN2'),
 ('945360', 'AmongUs'),
 ('997070', 'MarvelsAvengers'),
 ('105600', 'Terraria'),
 ('1097150', 'FallGuys'),
 ('1151340', 'Fallout76'),
 ('1238860', 'Battlefi

In [12]:
# Asked chatGPT to "fix" the game names by adding spaces, special characters and titleling correctly and so on
# fmt: off
game_names = {"377160": "Fallout 4", "211820": "Starbound", "221100": "DayZ", "294100": "RimWorld", "244450": "Men of War: Assault Squad 2", "250900": "The Binding of Isaac: Rebirth", "107410": "Arma 3", "1087280": "Out of the Park Baseball 21", "1097150": "Fall Guys", "264710": "Subnautica", "270880": "American Truck Simulator", "1066780": "Transport Fever 2", "644930": "They Are Billions", "739630": "Phasmophobia", "782330": "DOOM Eternal", "32470": "STAR WARS Empire at War: Gold Pack", "431730": "Aseprite", "493340": "Planet Coaster", "550": "Left 4 Dead 2", "552500": "Warhammer: Vermintide 2", "393380": "Squad", "4000": "Garry's Mod", "1030840": "Mafia: Definitive Edition", "1100620": "Football Manager 2020 Touch", "640820": "Pathfinder: Kingmaker", "812140": "Assassin's Creed Odyssey", "361420": "ASTRONEER", "49520": "Borderlands 2", "548430": "Deep Rock Galactic", "570940": "DARK SOULS: REMASTERED", "379430": "Kingdom Come: Deliverance", "34460": "Sid Meier's Civilization IV: Beyond the Sword", "367520": "Hollow Knight", "1307550": "Craftopia", "218620": "PAYDAY 2", "289070": "Sid Meier's Civilization VI", "311690": "Enter the Gungeon", "594570": "Total War: WARHAMMER II", "613100": "House Flipper", "105450": "Age of Empires III (2007)", "1118200": "People Playground", "997070": "Marvel's Avengers", "374320": "DARK SOULS III", "1190460": "DEATH STRANDING", "1250410": "Microsoft Flight Simulator", "314160": "Microsoft Flight Simulator X: Steam Edition", "1151340": "Fallout 76", "976310": "Mortal Kombat 11", "262060": "Darkest Dungeon", "1217060": "Gunfire Reborn", "637090": "BATTLETECH", "646570": "Slay the Spire", "666140": "My Time At Portia", "703080": "Planet Zoo", "719040": "Wasteland 3", "1225330": "NBA 2K21", "212680": "FTL: Faster Than Light", "239140": "Dying Light", "244850": "Space Engineers", "346110": "ARK: Survival Evolved", "457140": "Oxygen Not Included", "47890": "The Sims 3", "526870": "Satisfactory", "418530": "Spelunky 2", "1172620": "Sea of Thieves", "200210": "Realm of the Mad God Exalt", "286160": "Tabletop Simulator", "304390": "For Honor", "242760": "The Forest", "582010": "Monster Hunter: World", "594650": "Hunt: Showdown", "1089350": "NBA 2K20", "813780": "Age of Empires II: Definitive Edition", "227300": "Euro Truck Simulator 2", "648800": "Raft", "686810": "Hell Let Loose", "310950": "Street Fighter V", "325610": "Total War: ATTILA", "381210": "Dead by Daylight", "4760": "Rome: Total War", "268500": "XCOM 2", "1094520": "Sands of Salzaar", "244210": "Assetto Corsa", "251570": "7 Days to Die", "678950": "DRAGON BALL FighterZ", "365360": "Battle Brothers", "383120": "Empyrion: Galactic Survival", "427520": "Factorio", "466560": "Northgard", "489830": "The Elder Scrolls V: Skyrim Special Edition", "508440": "Totally Accurate Battle Simulator", "552520": "Far Cry 5", "418460": "Rising Storm 2: Vietnam", "1259970": "eFootball PES 2021 SEASON UPDATE", "1283970": "YoloMouse", "220200": "Kerbal Space Program", "311210": "Call of Duty: Black Ops III", "322170": "Geometry Dash", "1030830": "Mafia II: Definitive Edition", "10500": "Total War: EMPIRE Definitive Edition", "10": "Counter-Strike", "824270": "KovaaK's 2.0", "834910": "ATLAS", "334230": "Town of Salem", "359320": "Elite Dangerous", "219990": "Grim Dawn", "281990": "Stellaris", "322330": "Don't Starve Together", "578080": "PLAYERUNKNOWN'S BATTLEGROUNDS", "105600": "Terraria", "1113000": "Persona 4 Golden", "815370": "Green Hell", "872790": "Football Manager 2019", "976730": "Halo: The Master Chief Collection", "1238860": "Battlefield 4", "787860": "Farming Simulator 19", "387990": "Scrap Mechanic", "440900": "Conan Exiles", "524220": "NieR: Automata", "383980": "Rivals of Aether", "261550": "Mount & Blade II: Bannerlord", "232090": "Killing Floor 2", "234140": "Mad Max", "779340": "Total War: THREE KINGDOMS", "287700": "METAL GEAR SOLID V: THE PHANTOM PAIN", "573090": "Stormworks: Build and Rescue", "389730": "TEKKEN 7", "394360": "Hearts of Iron IV", "397540": "Borderlands 3", "376210": "The Isle", "306130": "The Elder Scrolls Online", "620980": "Beat Saber", "620": "Portal 2", "1080110": "F1 2020", "814380": "Sekiro: Shadows Die Twice", "900883": "The Elder Scrolls IV: Oblivion", "945360": "Among Us", "960090": "Bloons TD 6", "236850": "Europa Universalis IV", "252490": "Rust", "629760": "MORDHAU", "632360": "Risk of Rain 2", "1085660": "Destiny 2", "1100600": "Football Manager 2020", "962130": "Grounded", "996470": "eFootball PES 2020", "274920": "FaceRig", "1213210": "Command & Conquer Remastered Collection", "1222670": "The Sims 4", "221380": "Age of Empires II (2013)", "72850": "The Elder Scrolls V: Skyrim", "805550": "Assetto Corsa Competizione", "284160": "BeamNG.drive", "380600": "Fishing Planet", "266840": "Age of Mythology: Extended Edition", "275850": "No Man's Sky", "1102190": "Monster Train", "1174180": "Red Dead Redemption 2", "645630": "Car Mechanic Simulator 2018", "719890": "Beasts of Bermuda", "257420": "Serious Sam 4", "367500": "Dragon's Dogma: Dark Arisen", "412020": "Metro Exodus", "431960": "Wallpaper Engine", "4700": "Total War: MEDIEVAL II Definitive Edition", "477160": "Human: Fall Flat", "48700": "Mount & Blade: Warband", "513710": "SCUM", "382310": "Eco", "39210": "FINAL FANTASY XIV Online", "413150": "Stardew Valley", "335300": "DARK SOULS II: Scholar of the First Sin", "34030": "Total War: NAPOLEON Definitive Edition", "356190": "Middle-earth: Shadow of War", "359550": "Tom Clancy's Rainbow Six Siege", "1151640": "Horizon Zero Dawn", "201270": "Total War: SHOGUN 2", "208650": "Batman: Arkham Knight", "214950": "Total War: ROME II Emperor Edition", "233860": "Kenshi", "240": "Counter-Strike: Source", "581320": "Insurgency: Sandstorm", "582660": "Black Desert Online", "1129580": "Medieval Dynasty", "863550": "HITMAN 2"}
# fmt: on

# Table

In [13]:
def rename_game_column(data: tuple[RDD, str]):
    row, appid = data
    new_row = {}
    for col, val in row.asDict().items():
        if col == "game":
            val = game_names[appid]
        new_row[col] = val
    return Row(**new_row)

In [14]:
drop = [
    "index",
    "timestamp_created",
    "timestamp_updated",
    "votes_up",
    "votes_funny",
    "steam_purchase",
    "received_for_free",
    "author_num_games_owned",
    "author_last_played",
    "weighted_vote_score",
    "written_during_early_access",
    "comment_count",
    "author_playtime_last_two_weeks",
    "author_playtime_at_review",
]

In [15]:
def remove_columns(row: RDD):
    return Row(**{k: v for k, v in row.asDict().items() if k not in drop})

# Preprocess

In [16]:
rdd.take(10)

[Row(appid='1030830', game='MafiaIIDefinitiveEdition', index='0', recommendationid='77191065', language='english', review='Just fucking God knows -  why this game weight 44 GB while original game only 8.  I dont see that improvements for extra 36GD -  only a stupid car from 3rd Mafia -  which you cannot even upgrade properly.', timestamp_created=1602094246, timestamp_updated=1602094246, voted_up=False, votes_up=0, votes_funny=0, weighted_vote_score=0.0, comment_count=0, steam_purchase=True, received_for_free=False, written_during_early_access=False, author_steamid='76561198329320604', author_num_games_owned=55, author_num_reviews=5, author_playtime_forever=83, author_playtime_last_two_weeks=83, author_playtime_at_review=83, author_last_played=1602093984),
 Row(appid='1030830', game='MafiaIIDefinitiveEdition', index='1', recommendationid='77189401', language='english', review="Another masterpiece doesn't let down the series", timestamp_created=1602092025, timestamp_updated=1602092025, v

num rows with missing values before dropping columns

In [17]:
num_rows_with_missing_values = (
    rdd.filter(lambda row: None in row.asDict().values())
    .map(lambda row: 1)
    .reduce(lambda x, y: x + y)
)
num_rows_with_missing_values

                                                                                

190869

In [18]:
# Removie columns that we decided we won't use.
rdd = rdd.map(remove_columns)

num rows missing values after removing columns

In [19]:
num_rows_with_missing_values = (
    rdd.filter(lambda row: None in row.asDict().values())
    .map(lambda row: 1)
    .reduce(lambda x, y: x + y)
)
num_rows_with_missing_values

                                                                                

10063

In [20]:
# Remove rows with missing valuse
rdd = rdd.filter(lambda row: None not in row.asDict().values())

In [21]:
# Fix the names in the game column.
rdd = rdd.map(lambda row: (row, row["appid"])).map(rename_game_column)

In [22]:
df = rdd.toDF()
df.show()

+-------+--------------------+----------------+--------+--------------------+--------+-----------------+------------------+-----------------------+
|  appid|                game|recommendationid|language|              review|voted_up|   author_steamid|author_num_reviews|author_playtime_forever|
+-------+--------------------+----------------+--------+--------------------+--------+-----------------+------------------+-----------------------+
|1030830|Mafia II: Definit...|        77191065| english|Just fucking God ...|   false|76561198329320604|                 5|                     83|
|1030830|Mafia II: Definit...|        77189401| english|Another masterpie...|    true|76561198271082576|                24|                    686|
|1030830|Mafia II: Definit...|        77187242| english|Best in series up...|    true|76561198049220968|                 2|                    540|
|1030830|Mafia II: Definit...|        77168878| english|               good |    true|76561198842120161|        

In [23]:
df.count()

                                                                                

4582004

In [24]:
rdd = df.rdd

In [25]:
total_num_authors = (
    rdd.map(lambda row: (row["author_steamid"], 1))
    .reduceByKey(lambda acc, id: acc)
    .map(lambda x: 1)
    .reduce(lambda x, y: x + y)
)
total_num_authors

                                                                                

3091493

In [26]:
distinct_authorids = (
    rdd.map(lambda row: (row["author_steamid"], 1))
    .reduceByKey(lambda acc, id: acc)
    .map(lambda row: row[0])
    .collect()
)
distinct_authorids

                                                                                

['76561198065128919',
 '76561198865948243',
 '76561198857009362',
 '76561198846517279',
 '76561198306705878',
 '76561198236015050',
 '76561198093500364',
 '76561198090765334',
 '76561198056608663',
 '76561198129285788',
 '76561197968519809',
 '76561198079070333',
 '76561197995424812',
 '76561199019236878',
 '76561198410918961',
 '76561197963369971',
 '76561198193252390',
 '76561198263712087',
 '76561198296875526',
 '76561198140898545',
 '76561198155919173',
 '76561198061168308',
 '76561198116645838',
 '76561198247972198',
 '76561197972187848',
 '76561198133427380',
 '76561198020036381',
 '76561198123928333',
 '76561198004459340',
 '76561197991516314',
 '76561198175910673',
 '76561198105653015',
 '76561198103522668',
 '76561198271241824',
 '76561198205535341',
 '76561198020577507',
 '76561198076923010',
 '76561198376490412',
 '76561198875363553',
 '76561198150194184',
 '76561198350282091',
 '76561198355429650',
 '76561198047267614',
 '76561198041764881',
 '76561197998732196',
 '76561198

In [27]:
# Due to issues with converting the author_steamids to integers for the large numbers, we remap to smaller ids
ids = [i for i in range(1000000, 9999999)]

author_map = {}
for i, author in enumerate(distinct_authorids):
    author_map[author] = ids[i]

In [28]:
def recreate_userid(data: RDD):
    return Row(
        **{
            k: author_map[v] if k == "author_steamid" else v
            for k, v in data.asDict().items()
        }
    )

In [29]:
rdd = rdd.map(recreate_userid)

In [30]:
df = rdd.toDF()
df.write.format("delta").mode("overwrite").save(DELTA_TABLE)

                                                                                

In [31]:
df = spark.read.format("delta").load(DELTA_TABLE)
df.show()

+------+-------------+----------------+--------+--------------------+--------+--------------+------------------+-----------------------+
| appid|         game|recommendationid|language|              review|voted_up|author_steamid|author_num_reviews|author_playtime_forever|
+------+-------------+----------------+--------+--------------------+--------+--------------+------------------+-----------------------+
|334230|Town of Salem|        33616217| english|Good for people w...|    true|       3398771|                 5|                   4201|
|334230|Town of Salem|        33615630| english|              Fun af|    true|       3398772|                 2|                    917|
|334230|Town of Salem|        33615126| english|People throw game...|   false|       2580799|                 8|                    155|
|334230|Town of Salem|        33611378| english|this game is addi...|    true|       2632514|                 1|                   5871|
|334230|Town of Salem|        33594117| e