In [0]:
%fs ls abfss://datalake@datalakegen2storagelele.dfs.core.windows.net

path,name,size,modificationTime
abfss://datalake@datalakegen2storagelele.dfs.core.windows.net/delta/,delta/,0,1697997388000
abfss://datalake@datalakegen2storagelele.dfs.core.windows.net/example/,example/,0,1697901489000


In [0]:
%fs ls /mnt/datalake/example/games

path,name,size,modificationTime
dbfs:/mnt/datalake/example/games/games.csv,games.csv,4835545,1698006177000
dbfs:/mnt/datalake/example/games/games_metadata.json,games_metadata.json,17836350,1698006188000
dbfs:/mnt/datalake/example/games/recommendations.csv,recommendations.csv,1858800401,1698006438000
dbfs:/mnt/datalake/example/games/users.csv,users.csv,183760643,1698006222000


## Annual Game Release Count


In [0]:
%scala
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter", ",")
.load("dbfs:/mnt/datalake/example/games/games.csv")
df.show()

In [0]:
%scala
df.printSchema()

In [0]:
%scala
import org.apache.spark.sql.functions._

In [0]:
%scala
val annual_release_count = df.filter(col("app_id").isNotNull and (col("app_id") =!= ""))
.groupBy(year(to_date(col("date_release"), "yyyy-MM-dd")).as("year"))
.agg(count("*").alias("count"))
.sort(col("count").desc)

annual_release_count.show()

In [0]:
%scala
annual_release_count.write.format("delta").mode("overwrite").save("/mnt/datalake/delta/games/annual_release_count")

## Operating System Breakdown for Game Distribution

In [0]:
%scala
df.printSchema()

In [0]:
%scala
val operating_distribution_count = df.agg(
  sum(when($"win"=== "true", 1).otherwise(0)).alias("win_count"),
  sum(when($"mac" === "true", 1).otherwise(0)).alias("mac_count"),
  sum(when($"linux" === "true", 1).otherwise(0)).alias("linux_count")

)
operating_distribution_count.show()

In [0]:
%scala
operating_distribution_count.write.format("delta").mode("overwrite").save("/mnt/datalake/delta/games/operating_distribution_count")

## Correlation Between Game Playtime and User Recommendations

In [0]:
%scala
val df_recommendation = spark.read.format("csv")
.option("header", "true")
.option("delimiter", ",")
.load("dbfs:/mnt/datalake/example/games/recommendations.csv")

In [0]:
%scala
df_recommendation.show()

In [0]:
%scala
val correlation_recommendation_time = df_recommendation.filter(col("is_recommended").isNotNull and (col("is_recommended") =!= ""))
.groupBy("is_recommended")
.agg(round(avg(col("hours")), 2).alias("average_hours_played"))

correlation_recommendation_time.show()

In [0]:
%scala
correlation_recommendation_time.printSchema()

In [0]:
%scala
correlation_recommendation_time.write.format("delta").mode("overwrite").save("/mnt/datalake/delta/games/correlation_recommendation_time")

## User Feedback On Steam Games

In [0]:
%scala
df.show()

In [0]:
%scala
val rating_count = df.filter(col("app_id").isNotNull and (col("app_id") =!= ""))
.groupBy("rating")
.count()
.sort(col("count").desc)

In [0]:
%scala
rating_count.show()

In [0]:
%scala
rating_count.write.format("delta").mode("overwrite").save("/mnt/datalake/delta/games/rating_count")

## Steam Game Tag Distribution

In [0]:
%scala
val df_game_tag = spark.read.format("json")
.json("dbfs:/mnt/datalake/example/games/games_metadata.json")
df_game_tag.show()

In [0]:
%scala
val explod_df = df_game_tag.select($"app_id", explode($"tags").alias("tag"))
explod_df.show()

In [0]:
%scala
val tag_count =explod_df.groupBy("tag").count().orderBy(desc("count"))
tag_count.show()

In [0]:
%scala
tag_count.write.format("delta").mode("overwrite").save("/mnt/datalake/delta/games/tag_count")

## Top 10 Days with the Most Game Reviews on Steam

In [0]:
%scala
df_recommendation.limit(10).show()

In [0]:
%scala
val top_10_day_reviews = df_recommendation.filter(col("app_id").isNotNull and (col("app_id") =!= ""))
.filter(col("date").isNotNull and (col("date") =!= ""))
.select("app_id", "date")
.groupBy("date")
.count()
.sort(desc("count"))
.limit(10)
top_10_day_reviews.show()

In [0]:
%scala
top_10_day_reviews.write.mode("overwrite").format("delta").save("/mnt/datalake/delta/games/top_10_day_reviews")

## Insights into User Responses to Steam Recommendation Reviews

In [0]:
%scala
df_recommendation.limit(10).show()

In [0]:
%scala
df_recommendation.printSchema()

In [0]:
%scala
val convert_df = df_recommendation.filter(col("app_id").isNotNull and (col("app_id") =!= ""))
.withColumn("helpful", col("helpful").cast("int"))
.withColumn("funny", col("funny").cast("int"))


In [0]:
%scala
val feedback_df = convert_df.agg(sum("helpful").alias("helpful_count"), sum("funny").alias("funny_count"))

In [0]:
%scala
feedback_df.show()

In [0]:
%scala
feedback_df.write.mode("overwrite").format("delta").save("/mnt/datalake/delta/games/feedback_df")