In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window

In [0]:
source = spark.sparkContext
path = "s3://full-stack-bigdata-datasets/Big_Data/Project_Steam/steam_game_output.json"
df = spark.read.json(path)

In [0]:
num_rows = df.count()
print(f"The DataFrame contains {num_rows} rows.")

The DataFrame contains 55691 rows.


In [0]:
df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- appid: long (nullable = true)
 |    |-- categories: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- ccu: long (nullable = true)
 |    |-- developer: string (nullable = true)
 |    |-- discount: string (nullable = true)
 |    |-- genre: string (nullable = true)
 |    |-- header_image: string (nullable = true)
 |    |-- initialprice: string (nullable = true)
 |    |-- languages: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- negative: long (nullable = true)
 |    |-- owners: string (nullable = true)
 |    |-- platforms: struct (nullable = true)
 |    |    |-- linux: boolean (nullable = true)
 |    |    |-- mac: boolean (nullable = true)
 |    |    |-- windows: boolean (nullable = true)
 |    |-- positive: long (nullable = true)
 |    |-- price: string (nullable = true)
 |    |-- publisher: string (nullable = true)
 |    |-- release_date: string (nullable = true)
 |    |-

In [0]:
df.show()

+--------------------+-------+
|                data|     id|
+--------------------+-------+
|{10, [Multi-playe...|     10|
|{1000000, [Single...|1000000|
|{1000010, [Single...|1000010|
|{1000030, [Multi-...|1000030|
|{1000040, [Single...|1000040|
|{1000080, [Multi-...|1000080|
|{1000100, [Single...|1000100|
|{1000110, [Multi-...|1000110|
|{1000130, [Single...|1000130|
|{1000280, [Single...|1000280|
|{1000310, [Multi-...|1000310|
|{1000360, [Multi-...|1000360|
|{1000370, [Single...|1000370|
|{1000380, [Single...|1000380|
|{1000410, [Single...|1000410|
|{1000470, [Single...|1000470|
|{1000480, [Single...|1000480|
|{1000500, [Multi-...|1000500|
|{1000510, [], 0, ...|1000510|
|{1000540, [Multi-...|1000540|
+--------------------+-------+
only showing top 20 rows



In [0]:
df = df.select(df.data)

In [0]:
df = df.select(df.data.dropFields('tags').alias('data'))

In [0]:
df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- appid: long (nullable = true)
 |    |-- categories: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- ccu: long (nullable = true)
 |    |-- developer: string (nullable = true)
 |    |-- discount: string (nullable = true)
 |    |-- genre: string (nullable = true)
 |    |-- header_image: string (nullable = true)
 |    |-- initialprice: string (nullable = true)
 |    |-- languages: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- negative: long (nullable = true)
 |    |-- owners: string (nullable = true)
 |    |-- platforms: struct (nullable = true)
 |    |    |-- linux: boolean (nullable = true)
 |    |    |-- mac: boolean (nullable = true)
 |    |    |-- windows: boolean (nullable = true)
 |    |-- positive: long (nullable = true)
 |    |-- price: string (nullable = true)
 |    |-- publisher: string (nullable = true)
 |    |-- release_date: string (nullable = true)
 |    |-

In [0]:
df_publishe = df.select(df.data.getField("publisher").alias("publisher"))
publisher_counts = df_publishe.groupBy("publisher").count()
publisher_with_most_games = publisher_counts.sort(F.desc("count")).first()
 
print(f"The publisher with the most games is: {publisher_with_most_games['publisher']}")
 
top_15_publishers = publisher_counts.sort(F.desc("count")).limit(15)
print("Top 15 publishers on steam are:")
top_15_publishers = top_15_publishers.withColumn('publisher', 
                   F.when(F.length(F.trim(F.col('publisher'))) == 0, 'no publisher')
                    .otherwise(F.col('publisher')))
display(top_15_publishers)

The publisher with the most games is: Big Fish Games
Top 15 publishers on steam are:


publisher,count
Big Fish Games,422
8floor,202
SEGA,165
Strategy First,151
Square Enix,141
Choice of Games,140
Sekai Project,132
HH-Games,132
no publisher,132
Ubisoft,127


Databricks visualization. Run in Databricks to view.

In [0]:
release_year = df.select(F.year(F.to_date(df["data.release_date"], format="yyyy/MM/d")).alias("release_year"))
games_per_year = release_year.groupBy("release_year").count()
games_per_year_sorted = games_per_year.sort(F.desc("release_year")).limit(10)
display(games_per_year)

release_year,count
2003.0,3
2007.0,98
2018.0,7663
2015.0,2566
2006.0,61
2022.0,7451
2013.0,469
,222
1997.0,2
2014.0,1550


Databricks visualization. Run in Databricks to view.

In [0]:
best_games = df.select(df["data.name"].alias("video_game"),df["data.positive"].alias("positive_ratings"))
top_rated_games = best_games.sort(F.desc("positive_ratings")).limit(15)
display(top_rated_games)

video_game,positive_ratings
Counter-Strike: Global Offensive,5943345
Dota 2,1534895
Grand Theft Auto V,1229265
PUBG: BATTLEGROUNDS,1185361
Terraria,1014711
Tom Clancy's Rainbow Six Siege,942910
Garry's Mod,861240
Team Fortress 2,846407
Rust,732513
Left 4 Dead 2,643836


Databricks visualization. Run in Databricks to view.

In [0]:
release_year = df.select(F.year(F.to_date(df["data.release_date"], format="yyyy/MM/d")).alias("release_year"))
games_per_year = release_year.groupBy("release_year").count()
games_per_year_sorted = games_per_year.sort(F.desc("release_year")).limit(15)
display(games_per_year_sorted)

release_year,count
2022,7451
2021,8805
2020,8287
2019,6949
2018,7663
2017,6006
2016,4176
2015,2566
2014,1550
2013,469


Databricks visualization. Run in Databricks to view.

In [0]:
languages = df.select(F.explode(F.split(df['data.languages'], ', ')).alias('language'))
 
# Count the number of games per language
language_counts = languages.groupBy('language').count()
 
# Show the top 10 most represented languages
display(language_counts.sort('count', ascending=False).limit(10))

language,count
English,55116
German,14019
French,13426
Russian,12922
Simplified Chinese,12782
Spanish - Spain,12233
Japanese,10368
Italian,9304
Portuguese - Brazil,6750
Korean,6599


Databricks visualization. Run in Databricks to view.

In [0]:
genres = df.select(F.explode(F.split(df['data.genre'], ', ')).alias('genre'))
 
# Count the number of games per language
counts = genres.groupBy('genre').count()
 
# Show the top 10 most represented languages
display(counts.sort('count', ascending=False).limit(10))

genre,count
Indie,39681
Action,23759
Casual,22086
Adventure,21431
Strategy,10895
Simulation,10836
RPG,9534
Early Access,6145
Free to Play,3393
Sports,2666


Databricks visualization. Run in Databricks to view.

In [0]:
genres = df.select(df['data.genre'].alias('genre'), df['data.owners'].alias('owners'), df['data.price'].alias('price'))
 
# Split the owners field into two columns
genres = genres.withColumn("owners_start", F.expr("float(regexp_replace(split(owners, ' .. ')[0], ',', ''))"))
genres = genres.withColumn("owners_end", F.expr("float(regexp_replace(split(owners, ' .. ')[1], ',', ''))"))
genres = genres.withColumn("price", genres["price"].cast(T.FloatType())/100.0)
# Compute the average of the owners_start and owners_end, and multiply by the price
genres = genres.withColumn("estimated_gross", (genres["owners_start"] + genres["owners_end"]) / 2 * genres["price"])
genres = genres.withColumn('genre', F.explode(F.split(F.col('genre'), ', ')))
# Calculate the total estimated gross for each genre
genre_gross = genres.groupBy('genre').sum('estimated_gross')
 
# Show the genres with the highest total estimated gross
display(genre_gross.sort('sum(estimated_gross)', ascending=False).limit(10))

genre,sum(estimated_gross)
Action,58756454100.0
Adventure,37245738450.0
Indie,32346577100.0
RPG,27173143100.0
Strategy,20150041050.0
Simulation,18769749550.0
Casual,8080956650.0
Massively Multiplayer,5930157750.0
Early Access,5458661450.0
Sports,3149897350.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Count the number of games available on each platform
platform_counts = df.agg(F.sum(df['data.platforms.windows'].cast("int")).alias('windows_count'),
                         F.sum(df['data.platforms.mac'].cast("int")).alias('mac_count'),
                         F.sum(df['data.platforms.linux'].cast("int")).alias('linux_count'))
 
display(platform_counts)

windows_count,mac_count,linux_count
55676,12770,8458
