In [None]:
import pandas as pd
from pyspark.sql import functions as F

steam = spark.read.json('s3://full-stack-bigdata-datasets/Big_Data/Project_Steam/steam_game_output.json')

# First data visualisation

In [None]:
steam.show()

In [None]:
steam.select('data').take(1)

In [None]:
print(steam.count())
print(steam.select('id').distinct().count())
print(steam.select('data.appid').distinct().count())
print(steam.filter(F.col('data.appid') != F.col('id')).count())

##### "id" initial column contains the same info that data["appid"]. We can dive one level into the nested data frame and make our analysis on this.

In [None]:
steam_2 = steam.select('data')
steam_2.printSchema()

In [None]:
steam_2.show()

## Macro level analysis

In [None]:
#1 Which publisher has released the most games on Steam?
#  Grouping by publisher and counting occurences.

steam_2.groupBy('data.publisher').count().orderBy('count',ascending=False).show(10)

In [None]:
#2 What are the best rated games ?
#  Absolute count of positive rates per game

steam_2.orderBy("data.positive",ascending=False).select("data.name","data.positive").show(10)

In [None]:
#  Proportionally most liked games (among rated + 100000 times)

preprocess_rating = steam_2 \
    .withColumn("int_positive", F.col("data.positive").cast("int")) \
    .withColumn("int_negative", F.col("data.negative").cast("int")) \
    .withColumn("prop_rating", F.col("int_positive")/(F.col("int_positive")+F.col("int_negative")))
preprocess_rating.show(5)

prop_rating = preprocess_rating \
    .filter((F.col("int_positive")+F.col("int_negative")) > 100_000) \
    .orderBy("prop_rating", ascending=False) \
    .select("data.name", "prop_rating")
prop_rating.show(10)

In [None]:
#3 Which years with more releases? Releases during COVID (2020) ?
#  Extracting year from original dataset and counting occurences.

release_year = steam_2.withColumn("release_year", F.substring(F.col("data.release_date"), 1, 4)).select("release_year")
print(release_year.show(5))
release_year.groupBy('release_year').count().orderBy('count',ascending=False).show(20)


In [None]:
#4 How are the prizes distributed? Are there many games with a discount?
#  Groupby price and count occurences // count games with a discount

preprocess_price = steam_2 \
    .withColumn("price_float", F.col("data.price").cast("int")) \
    .withColumn("discount_float", F.col("data.discount").cast("int")) \
    .select("data.name","price_float","discount_float")
preprocess_price.show(3)

print(preprocess_price.groupBy('price_float').count().orderBy('count',ascending=False).show(20))

print(preprocess_price.filter(F.col("discount_float")> 0).count())


In [None]:
#5 What are the most represented languages?
#  Explode languages for each game and count occurences

language_count = steam_2 \
    .withColumn("languages_array", F.split("data.languages", ", ")) \
    .select("data.name", "languages_array")
language_count.show(5)

exploded_languages = language_count.withColumn("language", F.explode(language_count["languages_array"]))
exploded_languages.groupBy("language").count().orderBy('count',ascending=False).show(20)

In [None]:
#6 How many games prohibited for children under 16/18?
#  Delete non usable info, converting to int, filter to more than 16yrs required age

steam_2.groupBy('data.required_age').count().orderBy('count',ascending=False).show(25)

values_to_delete = ["21+", "7+","MA 15+"]
prep_prohibited_games = steam_2.filter(~F.col("data.required_age").isin(values_to_delete))
prep_prohibited_games.count()

prep_prohibited_games = prep_prohibited_games \
    .withColumn("int_required_age", F.col("data.required_age").cast("int")) \
    .filter(F.col("int_required_age") >= 16) \
    .select("int_required_age")

prohibited_games = prep_prohibited_games.groupBy('int_required_age').count().orderBy('count',ascending=False).show(10)

## Genres analysis

In [None]:
# What are the most represented genres?

In [17]:
# from pyspark.sql.types import StructType, StructField
# from typing import List, Dict, Generator, Union, Callable

# def walkSchema(schema: Union[StructType, StructField]) -> Generator[str, None, None]:
#     """Explores a PySpark schema:
    
#     schema: StructType | StructField
    
#     Yield
#     -----
#     A generator of strings, the name of each field in the schema
#     """
    
#     # we define a function _walk that produces a string generator from
#     # a dictionnary "schema_dct", and a string "prefix"
#     def _walk(schema_dct: Dict['str', Union['str', list, dict]],
#               prefix: str = "") -> Generator[str, None, None]:
#         assert isinstance(prefix, str), "prefix should be a string" # check if prefix is a string
        
#         # this function returns "name" if there's no prefix and "prefix.name" if prefix exists
#         fullName: Callable[str, str] = lambda name: ( 
#             name if not prefix else f"{prefix}.{name}")
        
#         # we get the next name one level lower from the dictionnary
#         name = schema_dct.get('name', '')
        
#         # if the type is struct then we search for the fields key
#         # if fields is there we apply the function again and dig one level deeper in
#         # the schema and set a prefix
#         if schema_dct['type'] == 'struct':
#             assert 'fields' in schema_dct, (
#                 "It's a StructType, we should have some fields")
#             for field in schema_dct['fields']:
#                 yield from _walk(field, prefix=prefix)
#         # if we have a dict type and we can't find fields then we
#         # dig one level deeper and apply the _walk function again
#         elif isinstance(schema_dct['type'], dict):
#             assert 'fields' not in schema_dct, (
#                 "We're missing some keys here")
#             yield from _walk(schema_dct['type'], prefix=fullName(name))
#         # If we finally reached the end and found a name we yield the full name
#         elif name:
#             yield fullName(name)
    
#     yield from _walk(schema.jsonValue())

In [18]:
# col_names = walkSchema(steam.schema)

# for col_name in walkSchema(steam.schema):
#   print(col_name)