In [41]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

df = spark.read.option("header", "true").csv("data/steam_reviews.csv")
df.show()



+---------------------------+----------+--------------------+---------+--------+------------------------------------+-----------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+
|                        _c0|    app_id|            app_name|review_id|language|                              review|timestamp_created|timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|   author.steamid|author.num_games_owned|author.num_reviews|author.playtime_forever|author.playtime_last_two_weeks|author.playtime_at_review|author.last_played|
+---------------------------+----------+--------------------+---------+--------+--------------------------

24/02/01 20:09:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , app_id, app_name, review_id, language, review, timestamp_created, timestamp_updated, recommended, votes_helpful, votes_funny, weighted_vote_score, comment_count, steam_purchase, received_for_free, written_during_early_access, author.steamid, author.num_games_owned, author.num_reviews, author.playtime_forever, author.playtime_last_two_weeks, author.playtime_at_review, author.last_played
 Schema: _c0, app_id, app_name, review_id, language, review, timestamp_created, timestamp_updated, recommended, votes_helpful, votes_funny, weighted_vote_score, comment_count, steam_purchase, received_for_free, written_during_early_access, author.steamid, author.num_games_owned, author.num_reviews, author.playtime_forever, author.playtime_last_two_weeks, author.playtime_at_review, author.last_played
Expected: _c0 but found: 
CSV file: file:///Users/alexandergunawan/data_engineer/data/steam_reviews.csv


In [42]:
df.schema

StructType([StructField('_c0', StringType(), True), StructField('app_id', StringType(), True), StructField('app_name', StringType(), True), StructField('review_id', StringType(), True), StructField('language', StringType(), True), StructField('review', StringType(), True), StructField('timestamp_created', StringType(), True), StructField('timestamp_updated', StringType(), True), StructField('recommended', StringType(), True), StructField('votes_helpful', StringType(), True), StructField('votes_funny', StringType(), True), StructField('weighted_vote_score', StringType(), True), StructField('comment_count', StringType(), True), StructField('steam_purchase', StringType(), True), StructField('received_for_free', StringType(), True), StructField('written_during_early_access', StringType(), True), StructField('author.steamid', StringType(), True), StructField('author.num_games_owned', StringType(), True), StructField('author.num_reviews', StringType(), True), StructField('author.playtime_for

In [44]:
# we reaplce all the dots with _ in the column names
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

new_columns = [c.replace(".", "_") for c in df.columns]

df = df.toDF(*new_columns)
df.schema


StructType([StructField('_c0', StringType(), True), StructField('app_id', StringType(), True), StructField('app_name', StringType(), True), StructField('review_id', StringType(), True), StructField('language', StringType(), True), StructField('review', StringType(), True), StructField('timestamp_created', StringType(), True), StructField('timestamp_updated', StringType(), True), StructField('recommended', StringType(), True), StructField('votes_helpful', StringType(), True), StructField('votes_funny', StringType(), True), StructField('weighted_vote_score', StringType(), True), StructField('comment_count', StringType(), True), StructField('steam_purchase', StringType(), True), StructField('received_for_free', StringType(), True), StructField('written_during_early_access', StringType(), True), StructField('author_steamid', StringType(), True), StructField('author_num_games_owned', StringType(), True), StructField('author_num_reviews', StringType(), True), StructField('author_playtime_for

In [55]:
#change the author.playtime_forever to int:
from pyspark.sql.functions import round
df = df.withColumn("author_playtime_forever_hrs", round(df["author_playtime_forever"].cast(IntegerType())/60))



In [56]:
df.head(5)

24/02/01 20:14:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , app_id, app_name, review_id, language, review, timestamp_created, timestamp_updated, recommended, votes_helpful, votes_funny, weighted_vote_score, comment_count, steam_purchase, received_for_free, written_during_early_access, author.steamid, author.num_games_owned, author.num_reviews, author.playtime_forever, author.playtime_last_two_weeks, author.playtime_at_review, author.last_played
 Schema: _c0, app_id, app_name, review_id, language, review, timestamp_created, timestamp_updated, recommended, votes_helpful, votes_funny, weighted_vote_score, comment_count, steam_purchase, received_for_free, written_during_early_access, author.steamid, author.num_games_owned, author.num_reviews, author.playtime_forever, author.playtime_last_two_weeks, author.playtime_at_review, author.last_played
Expected: _c0 but found: 
CSV file: file:///Users/alexandergunawan/data_engineer/data/steam_reviews.csv


[Row(_c0='0', app_id='292030', app_name='The Witcher 3: Wild Hunt', review_id='85185598', language='schinese', review='不玩此生遗憾，RPG游戏里的天花板，太吸引人了', timestamp_created='1611381629', timestamp_updated='1611381629', recommended='True', votes_helpful='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='True', received_for_free='False', written_during_early_access='False', author_steamid='76561199095369542', author_num_games_owned='6', author_num_reviews='2', author_playtime_forever='1909.0', author_playtime_last_two_weeks='1448.0', author_playtime_at_review='1909.0', author_last_played='1611343383.0', author_playtime_forever_hrs=32.0),
 Row(_c0='1', app_id='292030', app_name='The Witcher 3: Wild Hunt', review_id='85185250', language='schinese', review='拔DIAO无情打桩机--杰洛特!!!', timestamp_created='1611381030', timestamp_updated='1611381030', recommended='True', votes_helpful='0', votes_funny='0', weighted_vote_score='0.0', comment_count='0', steam_purchase='True', rece

In [58]:
# figure out how many hours are played - the top 1% percentile. 
# This is a good way to find out how many hours are played by the most active players

df.createOrReplaceTempView("steam_reviews") #created temporary view named steam_reviews using df.
result = spark.sql("SELECT percentile_approx(author_playtime_forever_hrs, 0.95) FROM steam_reviews")
result.show(truncate=False)

# the 95th percentile played more than 1436 hours. 

[Stage 38:>                                                         (0 + 1) / 1]

+-----------------------------------------------------------+
|percentile_approx(author_playtime_forever_hrs, 0.95, 10000)|
+-----------------------------------------------------------+
|1436.0                                                     |
+-----------------------------------------------------------+



                                                                                

In [63]:
# display 5 rows where the playtime is above the 99th percentile
df.filter((df["author_playtime_forever_hrs"] > 1436) & (df["author_playtime_forever_hrs"] < 100000)& (df["language"]=="english")).show(5)

24/02/01 20:18:13 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , app_id, app_name, review_id, language, review, timestamp_created, timestamp_updated, recommended, votes_helpful, votes_funny, weighted_vote_score, comment_count, steam_purchase, received_for_free, written_during_early_access, author.steamid, author.num_games_owned, author.num_reviews, author.playtime_forever, author.playtime_last_two_weeks, author.playtime_at_review, author.last_played
 Schema: _c0, app_id, app_name, review_id, language, review, timestamp_created, timestamp_updated, recommended, votes_helpful, votes_funny, weighted_vote_score, comment_count, steam_purchase, received_for_free, written_during_early_access, author.steamid, author.num_games_owned, author.num_reviews, author.playtime_forever, author.playtime_last_two_weeks, author.playtime_at_review, author.last_played
Expected: _c0 but found: 
CSV file: file:///Users/alexandergunawan/data_engineer/data/steam_reviews.csv


+-----+------+--------------------+---------+--------+--------------------+-----------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+---------------------------+
|  _c0|app_id|            app_name|review_id|language|              review|timestamp_created|timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|author_playtime_forever_hrs|
+-----+------+--------------------+---------+--------+--------------------+-----------------+-----------------+-----------+-----------

                                                                                