## All reviews from all games

In [1]:
import sys
import pyspark
import os
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql.functions import input_file_name
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
import re
from pyspark.sql.types import StringType
from google.api_core import page_iterator
from google.cloud import storage

sys.executable

'/opt/conda/miniconda3/bin/python'

In [9]:
#CREDENTIALS = '/home/vyago/.google/credentials/google_credentials.json'
#GCS_JAR = "./lib/gcs-connector-hadoop3-latest.jar"

BQ_JAR = "gs://spark-lib/bigquery/spark-3.1-bigquery-0.27.0-preview.jar"
BQ_PROJECT_ID = "steam-data-engineering-gcp"
BQ_DATASET = 'steam_raw'
BQ_TABLE = 'reviews'

TEMP_BUCKET = 'steam-datalake-dataset'
BUCKET = "steam-datalake-reviews"
BUCKET_SUBDIR = "proc"

conf = SparkConf() \
    .setAppName('steam-gcp-dataproc') \
    .set("spark.jars", f"{BQ_JAR}") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")

#   .setMaster('local[*]') \
#   .set("spark.jars", f"{GCS_JAR}, {BQ_JAR}") \


In [10]:
sc = SparkContext(conf=conf)

#hadoop_conf = sc._jsc.hadoopConfiguration()
#hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
#hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
#hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", CREDENTIALS)
#hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/26 17:12:37 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/09/26 17:12:37 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/09/26 17:12:37 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/09/26 17:12:37 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [11]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

Connect to GCS

In [12]:
# https://stackoverflow.com/questions/49538327/pyspark-string-pattern-from-columns-values-and-regexp-expression

def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df


def proc_json(raw_df, field)  :
    rows = raw_df[field]
    return(rows)

def get_previous_word(text):
    matches = re.search('.*/(\d+)-.*', text)
    return matches.group(1)

extract_game_id = F.udf(
    lambda text: get_previous_word(text),
    StringType()
)


Read data

In [14]:
all_games = spark.read.json(f"gs://{BUCKET}/{BUCKET_SUBDIR}/*", multiLine=True) \
                      .withColumn("filename", input_file_name())
all_games.printSchema()

root
 |-- cursor: string (nullable = true)
 |-- query_summary: struct (nullable = true)
 |    |-- num_reviews: long (nullable = true)
 |    |-- review_score: long (nullable = true)
 |    |-- review_score_desc: string (nullable = true)
 |    |-- total_negative: long (nullable = true)
 |    |-- total_positive: long (nullable = true)
 |    |-- total_reviews: long (nullable = true)
 |-- reviews: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: struct (nullable = true)
 |    |    |    |-- last_played: long (nullable = true)
 |    |    |    |-- num_games_owned: long (nullable = true)
 |    |    |    |-- num_reviews: long (nullable = true)
 |    |    |    |-- playtime_at_review: long (nullable = true)
 |    |    |    |-- playtime_forever: long (nullable = true)
 |    |    |    |-- playtime_last_two_weeks: long (nullable = true)
 |    |    |    |-- steamid: string (nullable = true)
 |    |    |-- comment_count: long (nullable = true)
 |    |    |--

                                                                                

In [15]:
all_games.show() # compute intesive WARNING !

[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+-------+--------------------+
|              cursor|       query_summary|             reviews|success|            filename|
+--------------------+--------------------+--------------------+-------+--------------------+
|AoJ4qo/0rPoCfp6b6QI=|{100, null, null,...|[{{1656197665, 74...|      1|gs://steam-datala...|
|AoJw5pWI/v0CetHvlgM=|{100, null, null,...|[{{1643432700, 14...|      1|gs://steam-datala...|
|AoJ4qaHj9vICfueVkAI=|{100, null, null,...|[{{1596365869, 69...|      1|gs://steam-datala...|
|AoJwrseHlf0Ce/HBiAM=|{100, null, null,...|[{{1640537142, 27...|      1|gs://steam-datala...|
|AoJw9Y+FhIEDe/P3ugM=|{100, null, null,...|[{{1656626369, 36...|      1|gs://steam-datala...|
|AoJ47pP/yPACerap9QE=|{100, null, null,...|[{{1660175584, 10...|      1|gs://steam-datala...|
|AoJwwqSC+MgCdZSsLw==|{100, null, null,...|[{{1590322827, 27...|      1|gs://steam-datala...|
|AoJ49q3f3/ECfNaBhAI=|{100, null, null,...|[{{1646058950, 13

                                                                                

In [16]:
rdd = all_games.rdd
rdd = rdd.repartition(5)

In [17]:
print(f"Number of files processed: {rdd.count()}")

[Stage 4:>                                                          (0 + 2) / 2]

Number of files processed: 97


                                                                                

In [18]:
print("Nº partitons: ", rdd.getNumPartitions())

Nº partitons:  5


### Reviews 

In [19]:
df_reviews = rdd.flatMap(lambda item : proc_json(item, field = "reviews")) \
                .toDF()
df_reviews.printSchema()

root
 |-- author: struct (nullable = true)
 |    |-- last_played: long (nullable = true)
 |    |-- num_games_owned: long (nullable = true)
 |    |-- num_reviews: long (nullable = true)
 |    |-- playtime_at_review: long (nullable = true)
 |    |-- playtime_forever: long (nullable = true)
 |    |-- playtime_last_two_weeks: long (nullable = true)
 |    |-- steamid: string (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- language: string (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 |-- recommendationid: string (nullable = true)
 |-- review: string (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- timestamp_created: long (nullable = true)
 |-- timestamp_updated: long (nullable = true)
 |-- voted_up: boolean (nullable = true)
 |-- votes_funny: long (nullable = true)
 |-- votes_up: long (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)



22/09/26 17:14:27 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 7.0 (TID 11) (cluster-11ee-w-1.europe-west1-b.c.steam-data-engineering-gcp.internal executor 2): org.apache.spark.util.TaskCompletionListenerException: refCnt: 0, decrement: 1
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)



### Game ID 

In [20]:
df_gameid = rdd.map(  lambda item : [ proc_json(item, field = "reviews"),
                                      proc_json(item, field = "filename")] ) \
               .flatMap(lambda item:  [item[1] for i in item[0] ]) \
               .map(lambda item : Row(gameid  = item, )).toDF()

df_gameid = df_gameid.withColumn('gameid', extract_game_id('gameid'))
df_gameid.printSchema()

root
 |-- gameid: string (nullable = true)



### Join dataframes

In [21]:
w = Window().orderBy(lit('A'))
df_reviews = df_reviews.withColumn("row_num", row_number().over(w))
df_gameid = df_gameid.withColumn("row_num", row_number().over(w))

df_reviews = df_reviews.join(df_gameid, on = ["row_num"], how = "inner")

In [22]:
df_reviews.printSchema()

root
 |-- row_num: integer (nullable = true)
 |-- author: struct (nullable = true)
 |    |-- last_played: long (nullable = true)
 |    |-- num_games_owned: long (nullable = true)
 |    |-- num_reviews: long (nullable = true)
 |    |-- playtime_at_review: long (nullable = true)
 |    |-- playtime_forever: long (nullable = true)
 |    |-- playtime_last_two_weeks: long (nullable = true)
 |    |-- steamid: string (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- language: string (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 |-- recommendationid: string (nullable = true)
 |-- review: string (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- timestamp_created: long (nullable = true)
 |-- timestamp_updated: long (nullable = true)
 |-- voted_up: boolean (nullable = true)
 |-- votes_funny: long (nullable = true)
 |-- votes_up: long (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- written_during_early_access: bool

In [None]:
df_reviews.count() # compute intesive WARNING !

In [None]:
df_reviews.take(1) # compute intesive WARNING !

In [23]:
df_reviews_flat = flatten_df(df_reviews) 
df_reviews_flat.printSchema()

root
 |-- row_num: integer (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- language: string (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 |-- recommendationid: string (nullable = true)
 |-- review: string (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- timestamp_created: long (nullable = true)
 |-- timestamp_updated: long (nullable = true)
 |-- voted_up: boolean (nullable = true)
 |-- votes_funny: long (nullable = true)
 |-- votes_up: long (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)
 |-- gameid: string (nullable = true)
 |-- author_last_played: long (nullable = true)
 |-- author_num_games_owned: long (nullable = true)
 |-- author_num_reviews: long (nullable = true)
 |-- author_playtime_at_review: long (nullable = true)
 |-- author_playtime_forever: long (nullable = true)
 |-- author_playtime_last_two_weeks: long (nullable = true)
 |-- autho

### Write into BigQuery

- https://github.com/GoogleCloudDataproc/spark-bigquery-connector

In [25]:
# Saving the data to BigQuery
df_reviews_flat.write \
  .format('bigquery') \
  .option('table', f'{BQ_PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}') \
  .mode("append") \
  .option("temporaryGcsBucket",TEMP_BUCKET) \
  .save()

22/09/26 17:16:07 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/26 17:16:07 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/26 17:16:08 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/26 17:16:08 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/09/26 17:16:08 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc