In [0]:
/Volumes/workspace/default/rawdata/steam/application_categories.csv

In [0]:
app_devs_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/application_developers.csv")
)

developers_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/developers.csv")
)


In [0]:
display(app_devs_df.printSchema())
app_devs_df.head(5)


root
 |-- appid: string (nullable = true)
 |-- developer_id: string (nullable = true)



[Row(appid='10', developer_id='59202'),
 Row(appid='20', developer_id='59202'),
 Row(appid='30', developer_id='59202'),
 Row(appid='40', developer_id='59202'),
 Row(appid='50', developer_id='21806')]

In [0]:
display(developers_df.printSchema())
developers_df.head(5)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



[Row(id='1', name='Skala Entertainment'),
 Row(id='2', name='Flicksync'),
 Row(id='3', name='SN Mobile Technology'),
 Row(id='4', name='Chestnut Team'),
 Row(id='5', name='SQYD.studio')]

In [0]:
from pyspark.sql.functions import collect_set, concat_ws, col

app_developers_agg_df = (
    app_devs_df
    .join(
        developers_df,
        app_devs_df.developer_id == developers_df.id,
        how="left"
    )
    .groupBy(app_devs_df.appid)
    .agg(
        concat_ws(", ", collect_set(col("name"))).alias("developers")
    )
)


In [0]:
display(app_developers_agg_df.limit(10))


appid,developers
1000010,NEXT Studios
1000080,IndieLeague Studio
1000160,SinVR
1000210,SinVR
1000280,Villain Role
1000400,Midgar Studio
1000420,MyACG Studio
1000640,Team Clam
1000712,Fishing Planet LLC
1000714,Fishing Planet LLC


In [0]:
app_publishers_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/application_publishers.csv")
)

publishers_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/publishers.csv")
)


In [0]:
# Schema check
app_publishers_df.printSchema()
publishers_df.printSchema()

# Sample rows
display(app_publishers_df.limit(5))
display(publishers_df.limit(5))

# Row counts (sanity)
print("application_publishers rows:", app_publishers_df.count())
print("publishers rows:", publishers_df.count())


root
 |-- appid: string (nullable = true)
 |-- publisher_id: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



appid,publisher_id
10,50182
20,50182
30,50182
40,50182
50,50182


id,name
1,Skala Entertainment
2,Playdigious Originals
3,SN Mobile Technology
4,SQYD.studio
5,Susewind Games


application_publishers rows: 223048
publishers rows: 85699


In [0]:
from pyspark.sql.functions import collect_set, concat_ws, col

app_publishers_agg_df = (
    app_publishers_df
    .join(
        publishers_df,
        app_publishers_df.publisher_id == publishers_df.id,
        how="left"
    )
    .groupBy(app_publishers_df.appid)
    .agg(
        concat_ws(", ", collect_set(col("name"))).alias("publishers")
    )
)


In [0]:
# Schema check
app_publishers_agg_df.printSchema()

# Sample rows
display(app_publishers_agg_df.limit(10))

# Row count sanity
print("Aggregated publishers rows:", app_publishers_agg_df.count())

# Null check (should be low / zero)
from pyspark.sql.functions import count, when

app_publishers_agg_df.select(
    count(when(col("publishers").isNull(), 1)).alias("null_publishers")
).show()


root
 |-- appid: string (nullable = true)
 |-- publishers: string (nullable = false)



appid,publishers
1000010,"Team17, NEXT Studios"
1000080,2P Games
1000110,重庆环游者网络科技
1000160,SinVR
1000210,SinVR
1000280,Villain Role
1000400,Dear Villagers
1000640,Team Clam
1000712,Fishing Planet LLC
1000770,WASD Games


Aggregated publishers rows: 214372
+---------------+
|null_publishers|
+---------------+
|              0|
+---------------+



In [0]:
app_genres_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/application_genres.csv")
)

genres_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/genres.csv")
)


In [0]:
# Schema check
app_genres_df.printSchema()
genres_df.printSchema()

# Sample rows
display(app_genres_df.limit(5))
display(genres_df.limit(5))

# Row counts
print("application_genres rows:", app_genres_df.count())
print("genres rows:", genres_df.count())


root
 |-- appid: string (nullable = true)
 |-- genre_id: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



appid,genre_id
10,122
20,122
30,122
40,122
50,122


id,name
1,Aventură
2,Многопользовательские игры
3,Nezávislé
4,Strategie
5,Strategy


application_genres rows: 587515
genres rows: 154


In [0]:
from pyspark.sql.functions import collect_set, concat_ws, col

app_genres_agg_df = (
    app_genres_df
    .join(
        genres_df,
        app_genres_df.genre_id == genres_df.id,
        how="left"
    )
    .groupBy(app_genres_df.appid)
    .agg(
        concat_ws(", ", collect_set(col("name"))).alias("genres")
    )
)


In [0]:
# Schema check
app_genres_agg_df.printSchema()

# Sample rows
display(app_genres_agg_df.limit(10))

# Row count sanity
print("Aggregated genres rows:", app_genres_agg_df.count())

# Null check
from pyspark.sql.functions import count, when

app_genres_agg_df.select(
    count(when(col("genres").isNull(), 1)).alias("null_genres")
).show()


root
 |-- appid: string (nullable = true)
 |-- genres: string (nullable = false)



appid,genres
1000010,"Adventure, Strategy, Indie, RPG"
1000080,"Indie, Action, Adventure, RPG"
1000210,"Free To Play, Simulation"
1000280,"Indie, RPG"
1000400,"RPG, Indie, Action, Strategy, Adventure"
1000640,"Indie, Adventure"
1000712,"Massively Multiplayer, Sports, Simulation, Free To Play"
1000714,"Simulation, Sports, Massively Multiplayer, Free To Play"
1000715,"Sports, Massively Multiplayer, Free To Play, Simulation"
1000770,"Simulation, Casual, Indie, RPG, Adventure"


Aggregated genres rows: 207942
+-----------+
|null_genres|
+-----------+
|          0|
+-----------+



In [0]:
app_categories_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/application_categories.csv")
)

categories_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/categories.csv")
)


In [0]:
# Schema check
app_categories_df.printSchema()
categories_df.printSchema()

# Sample rows
display(app_categories_df.limit(5))
display(categories_df.limit(5))

# Row counts
print("application_categories rows:", app_categories_df.count())
print("categories rows:", categories_df.count())


root
 |-- appid: string (nullable = true)
 |-- category_id: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



appid,category_id
10,72
10,90
10,130
10,133
10,193


id,name
1,Remote Play en móvil
2,Flera spelare
3,Tablette Remote Play
4,Multiplayer
5,Таблицы лидеров Steam


application_categories rows: 1077156
categories rows: 462


In [0]:
from pyspark.sql.functions import collect_set, concat_ws, col

app_categories_agg_df = (
    app_categories_df
    .join(
        categories_df,
        app_categories_df.category_id == categories_df.id,
        how="left"
    )
    .groupBy(app_categories_df.appid)
    .agg(
        concat_ws(", ", collect_set(col("name"))).alias("categories")
    )
)


In [0]:
# Schema check
app_categories_agg_df.printSchema()

# Sample rows
display(app_categories_agg_df.limit(10))

# Row count sanity
print("Aggregated categories rows:", app_categories_agg_df.count())

# Null check
from pyspark.sql.functions import count, when

app_categories_agg_df.select(
    count(when(col("categories").isNull(), 1)).alias("null_categories")
).show()


root
 |-- appid: string (nullable = true)
 |-- categories: string (nullable = false)



appid,categories
1000010,"Steam Trading Cards, Steam Achievements, Family Sharing, Partial Controller Support, Single-player, Steam Cloud"
1000080,"Family Sharing, Steam Achievements, Full controller support, Steam Trading Cards, Co-op, Single-player, Multi-player, Online Co-op"
1000210,"Single-player, Downloadable Content, In-App Purchases"
1000280,"Family Sharing, Single-player"
1000400,"Steam Cloud, Family Sharing, Downloadable Content, Stats, Full controller support, Steam Trading Cards, Single-player, Steam Workshop, Steam Achievements"
1000640,"Family Sharing, Steam Achievements, Single-player"
1000712,"Online Co-op, Downloadable Content, Multi-player, MMO, Full controller support, PvP, Online PvP, Single-player, Co-op"
1000714,"PvP, Online PvP, Downloadable Content, MMO, Full controller support, Online Co-op, Co-op, Single-player, Multi-player"
1000715,"Single-player, Downloadable Content, Multi-player, Online PvP, Online Co-op, Full controller support, MMO, Co-op, PvP"
1000770,"Partial Controller Support, Single-player, Family Sharing, Steam Achievements, Steam Trading Cards, Stats"


Aggregated categories rows: 225663
+---------------+
|null_categories|
+---------------+
|              0|
+---------------+



In [0]:
app_platforms_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/application_platforms.csv")
)

platforms_df = (
    spark.read
    .option("header", "true")
    .csv("/Volumes/workspace/default/rawdata/steam/platforms.csv")
)


In [0]:
# Schema check
app_platforms_df.printSchema()
platforms_df.printSchema()

# Sample rows
display(app_platforms_df.limit(5))
display(platforms_df.limit(5))

# Row counts
print("application_platforms rows:", app_platforms_df.count())
print("platforms rows:", platforms_df.count())


root
 |-- appid: string (nullable = true)
 |-- platform_id: string (nullable = true)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



appid,platform_id
10,1
10,2
10,3
20,1
20,2


id,name
1,windows
2,mac
3,linux


application_platforms rows: 334671
platforms rows: 3


In [0]:
from pyspark.sql.functions import collect_set, concat_ws, col

app_platforms_agg_df = (
    app_platforms_df
    .join(
        platforms_df,
        app_platforms_df.platform_id == platforms_df.id,
        how="left"
    )
    .groupBy(app_platforms_df.appid)
    .agg(
        concat_ws(", ", collect_set(col("name"))).alias("platforms")
    )
)


In [0]:
# Schema check
app_platforms_agg_df.printSchema()

# Sample rows
display(app_platforms_agg_df.limit(10))

# Row count sanity
print("Aggregated platforms rows:", app_platforms_agg_df.count())

# Null check
from pyspark.sql.functions import count, when

app_platforms_agg_df.select(
    count(when(col("platforms").isNull(), 1)).alias("null_platforms")
).show()


root
 |-- appid: string (nullable = true)
 |-- platforms: string (nullable = false)



appid,platforms
1000010,windows
1000030,"windows, mac"
1000050,windows
1000060,"mac, linux, windows"
1000080,"windows, mac"
1000110,windows
1000160,windows
1000210,windows
1000280,windows
1000400,windows


Aggregated platforms rows: 239664
+--------------+
|null_platforms|
+--------------+
|             0|
+--------------+



In [0]:
app_developers_agg_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_developers/")


In [0]:
app_publishers_agg_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_publishers/")


In [0]:
app_genres_agg_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_genres/")


In [0]:
app_categories_agg_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_categories/")


In [0]:
app_platforms_agg_df.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_platforms/")


In [0]:
dbutils.fs.ls("/Volumes/workspace/default/rawdata/steam/processed/aggregates/")


[FileInfo(path='dbfs:/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_categories/', name='app_categories/', size=0, modificationTime=1769069766175),
 FileInfo(path='dbfs:/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_developers/', name='app_developers/', size=0, modificationTime=1769069766175),
 FileInfo(path='dbfs:/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_genres/', name='app_genres/', size=0, modificationTime=1769069766175),
 FileInfo(path='dbfs:/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_platforms/', name='app_platforms/', size=0, modificationTime=1769069766175),
 FileInfo(path='dbfs:/Volumes/workspace/default/rawdata/steam/processed/aggregates/app_publishers/', name='app_publishers/', size=0, modificationTime=1769069766175)]