# Project Title
### Data Engineering Capstone Project

#### Project Summary
Steam games Data warehouse using spark run on EMR cluster

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
The scope of this project will be creating a steam games reviews data lake. Data from 2 sources will be incorporated to be used in data science projects


#### Describe and Gather Data 
The first dataset is a csv file of game reviews on steam for around 300 different games containing the review body, date of publishing, and other metrics of the quality of reviews, data is obtained using Steam's provided API outlined in the Steamworks documentation: https://partner.steamgames.com/doc/store/getreviews
found here:
https://www.kaggle.com/datasets/najzeko/steam-reviews-2021

The second dataset is a json file containing data of 75000 games including their name, publisher, date, categories etc., the dataset is found here: https://www.kaggle.com/datasets/deepann/80000-steam-games-dataset?resource=download
  

In [1]:
sc.install_pypi_package("pandas==0.25.1")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1669244644810_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Downloading pandas-0.25.1-cp36-cp36m-manylinux1_x86_64.whl (10.5 MB)
Collecting python-dateutil>=2.6.1
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.2

In [10]:
# Do all imports and installs here
import pandas as pd
import os
import numpy as np
import io
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, BooleanType as Bool, FloatType as Flt
from pyspark.sql.functions import col,isnan, when, count, sum as fsum
# from pyspark.sql.functions import monotonically_increasing_id

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: Explore and Assess the Data
#### Explore and clean the Data 


### first let's work with the games data

In [3]:

games = spark.read.json('s3://mussa-bucket/games_data.json')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+--------------------+--------------------+--------------------+--------------------+----+--------------------+-----+----------+--------------------+--------------------+
|          categories|        date|           developer|           full_desc|             img_url|                name|pegi|           popu_tags|price| publisher|        requirements|            url_info|
+--------------------+------------+--------------------+--------------------+--------------------+--------------------+----+--------------------+-----+----------+--------------------+--------------------+
|[Online PvPLAN, P...|Nov 16, 2018|          Innersloth|[About This Game ...|https://steamcdn-...|            Among Us|null|[Multiplayer, Onl...|  499|Innersloth|[[,, [,  1 ,  Win...|[945360, app, htt...|
|[Steam Achievemen...|Aug 21, 2012|Valve, Hidden Pat...|[About This Game ...|https://steamcdn-...|Counter-Strike: G...|null|[Shooter, Multipl...| free|     Valve|[[[ nVidia GeForc.

In [9]:
pd.set_option('display.max_columns', None)
games.limit(3).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                          categories          date  \
0  [Online PvPLAN, PvPOnline Co-opLAN, Co-opCross...  Nov 16, 2018   
1  [Steam Achievements Full, controller supportSt...  Aug 21, 2012   
2  [MMOOnline PvPOnline, Co-opSteam Achievements ...   Aug 3, 2020   

                          developer  \
0                        Innersloth   
1  Valve, Hidden Path Entertainment   
2                        Mediatonic   

                                           full_desc  \
0  (About This Game Play with 4-10 player online ...   
1  (About This Game Counter-Strike: Global Offens...   
2  (About This Game Fall Guys: Ultimate Knockout ...   

                                             img_url  \
0  https://steamcdn-a.akamaihd.net/steam/apps/945...   
1  https://steamcdn-a.akamaihd.net/steam/apps/730...   
2  https://steamcdn-a.akamaihd.net/steam/apps/109...   

                               name  pegi  \
0                          Among Us  None   
1  Counter-Strike: Glo

In [4]:
# here we drop duplicates of games name and dropping null values for game_name
games_no_dups = games.dropDuplicates(subset=['name']).dropna(subset=['name']) #.withColumn('game_id', monotonically_increasing_id())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Now it's time to work with the reviews data

In [5]:
# here we define a schema for the csv
schema_reviews = R([
    Fld('1st_col',Int()), 
    Fld('game_id',Str()),
    Fld('game_name',Str()), 
    Fld('review_id',Str()),
    Fld('language',Str()),   
    Fld('review',Str()),    
    Fld('timestamp_created',Dbl()),    
    Fld('timestamp_updated',Dbl()),   
    Fld('recommended',Bool()),        
    Fld('votes_helpful',Int()),        
    Fld('votes_funny',Int()),        
    Fld('weighted_vote_score',Flt()),        
    Fld('comment_count',Int()),
    Fld('steam_purchase',Bool()),            
    Fld('received_for_free',Bool()),  
    Fld('written_during_early_access',Bool()),             
    Fld('author_steamid',Str()),            
    Fld('author_num_games_owned',Int()),
    Fld('author_num_reviews',Int()),                
    Fld('author_playtime_forever',Flt()),                
    Fld('author_playtime_last_two_weeks',Flt()),
    Fld('author_playtime_at_review',Flt()),                    
    Fld('author_last_played',Dbl())  
])
                        

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
reviews = spark.read.csv('s3://mussa-bucket/steam_reviews.csv',header=True, schema=schema_reviews)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# let's see how big is the data
reviews.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40848659

#### Well not too much data I suppose!
#### we will start working on the data and in the end we are going to be left with less records

In [21]:
reviews.limit(3).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   1st_col game_id                 game_name review_id  language  \
0        0  292030  The Witcher 3: Wild Hunt  85185598  schinese   
1        1  292030  The Witcher 3: Wild Hunt  85185250  schinese   
2        2  292030  The Witcher 3: Wild Hunt  85185111  schinese   

                    review  timestamp_created  timestamp_updated  recommended  \
0  不玩此生遗憾，RPG游戏里的天花板，太吸引人了       1.611382e+09       1.611382e+09         True   
1       拔DIAO无情打桩机--杰洛特!!!       1.611381e+09       1.611381e+09         True   
2                    巫师3NB       1.611381e+09       1.611381e+09         True   

   votes_helpful  votes_funny  weighted_vote_score  comment_count  \
0              0            0                  0.0              0   
1              0            0                  0.0              0   
2              0            0                  0.0              0   

   steam_purchase  received_for_free  written_during_early_access  \
0            True              False                    

In [None]:
# let's discover the languages available in our data
reviews.groupby('language').count().orderBy('count', ascending=False).show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------+
|language|   count|
+--------+--------+
|    null|14498420|
| english| 9635437|
|schinese| 3764967|
| russian| 2348900|
|       0| 1389226|
+--------+--------+
only showing top 5 rows

#### We can see that there is a huge amount of nulls, Moreover, reviews in languages other than English
#### won't be understandable to me given my humble knowledge, so we will drop the nulls and keep English reviews,
#### so if you want a specific language feel free to add it to the next filter that we are going to apply

In [7]:
# taking only English language and dropping null values
reviews_english = reviews.filter("language ='english'").dropna()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# since review_id is going to be a primary key to our fact table, we want to check if it has duplicates
reviews_english.groupby('review_id').count().orderBy('count', ascending=False).filter('count > 1').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40009

In [8]:
# dropping the duplicates
r_eng_no_dups = reviews_english.dropDuplicates(subset=["review_id"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# check
r_eng_no_dups.groupby('review_id').count().orderBy('count', ascending=False).filter('count > 1').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

### Step 3: Define the Data Model
####  Conceptual Data Model
the data model here is quite simple, consisting of one fact table which is reviews with review_id as primary key,
and 2 dimension tables which are the games table with game_name as primary key, and the review authors table
with author_steamid  as primary key

#### fact table -> reviews:

| column | description |
| :-- | :-- |
| review_id (string) |primary key for review ids
| game_id (string) | game id |
| game_name (string) | game name |
| review (string) | review body |
| timestamp_created (double) | time of review publishing |
| recommended (bool) | whether the reviewer recommends the game or not |
| votes_helpful (integer) | number of votes praising the review |
| weighted_vote_score (float) | a score based on number of helpful votes |
| comment_count (integer) | number of comments in a review |
| steam_purchase (bool) | wether or not the game was purchased on steam |
| received_for_free (bool) | wether or not the game was received for free |
| written_during_early_access (bool) | Whether or not the review was written during early access |
| author_steamid (string) | id of the reviewer |
| author_playtime_at_review (float) | reviewer playtime of reviewed game at time of review |



#### dimension tables:

##### 1.games:
| column | description |
| ------ | ------ |
| name(string) | title of the game |
| categories(array of string) | categories of the game |
| date(string) | game's date of release |
| developer(string) | game's developer |
| full_desc(struct of string) | full description |
| pegi(struct) | adult content rating  |
| popu_tags(array of string) | popular tags  |
| price(string) | initial price of sales |
| publisher(string) | publisher of the game |

##### 2.authors:
| column | description |
| ------ | :------: |
| author_steamid(string) | reviewer id |
| author_num_games_owned(integer) | number of games owned by the reviewer |
| author_num_reviews(integer) | number of reviews published by the reviewer |
| author_playtime_forever(float) | total playtime of all games owned by the reviewer |


### Step 4: Run Pipelines to Model the Data 
#### Create the data model

In [19]:
games_final = games_no_dups.select('name', 'categories', 'date', 'developer', 'full_desc','pegi', 'popu_tags', 'price','publisher')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
reviews_final = r_eng_no_dups.select('review_id', 'game_id', 'game_name', 'review', 'timestamp_created', 'recommended', 'votes_helpful',
                           'weighted_vote_score', 'comment_count', 'steam_purchase', 'received_for_free', 'written_during_early_access',
                           'author_steamid','author_playtime_at_review')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
authors = r_eng_no_dups.dropDuplicates(subset=['author_steamid'])\
                        .select('author_steamid', 'author_num_games_owned', 'author_num_reviews',
                                'author_playtime_forever')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### now we write all tables back to s3 as parquet files

In [11]:
games_final.write.mode('overwrite').parquet('s3://mussa-bucket/data_parquet/games.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
reviews_final.write.mode('overwrite').parquet('s3://mussa-bucket/data_parquet/reviews.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
authors.write.mode('overwrite').parquet('s3://mussa-bucket/data_parquet/authors.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### now we reload our parquet data and do some quality checks

In [33]:
games1 = spark.read.parquet('s3://mussa-bucket/data_parquet/games.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
reviews1 = spark.read.parquet('s3://mussa-bucket/data_parquet/reviews.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
authors1 = spark.read.parquet('s3://mussa-bucket/data_parquet/authors.parquet')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Data Quality Checksfor primary keys
#### 1) Null check

 

In [None]:
games1.select(count(when(isnan('name') | col('name').isNull(), 'name')).alias('name')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+
|name|
+----+
|   0|
+----+

In [54]:
reviews1.select(count(when(isnan('review_id') | col('review_id').isNull(), 'review_id')).alias('review_id')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+
|review_id|
+---------+
|        0|
+---------+

In [32]:
authors1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in authors1.columns]
   ).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----------------------+------------------+-----------------------+------------------+
|author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_last_played|
+--------------+----------------------+------------------+-----------------------+------------------+
|             0|                     0|                 0|                      0|                 0|
+--------------+----------------------+------------------+-----------------------+------------------+

#### .......................................................
#### 2) duplicates check
#### .......................................................


In [None]:
games1.count() == games1.dropDuplicates().count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

In [56]:
reviews1.count() == reviews1.dropDuplicates().count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

In [57]:
authors1.count() == authors1.dropDuplicates().count()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

## Step 5: Project Write Up

### choice of tools:
#### I have chosen to use jupyter notebook on EMR as it offers some super quick and robust enviroment since the notebook and s3 bucket are both on the same
#### availability zone (us-west-2)
### how often the data should be updated and why:
#### Well since this data is game related I think it would be adequate to update it once or twice a year, as to give enough time 
#### for new games to be published, then played and reviewed

### Possible cases
### 1. Data was increases by 100x
#### --> In this case, we can increase emr cluster nodes to be able to handle more load.

### 2. The pipelines needs to run on a daily basis by 7 am every day
#### --> For this case we can start using apache airflow and set the dag to be run '@daily'

### 3. The database needs to be accessed by 100+ people.
#### --> data on s3 is quite available so i guess there is no problem in that (please correct me if I am wrong)
#### but if you mean we have a lot of data users querying the data at the same time, then we can maybe put our data on columnar redshift cluster
#### and we can even create data marts tailored to these users if they fall under the same department or have similar needs