In [None]:
from games_analytics.twitch_api import TwitchDataSource
from games_analytics.twitch_tracker_api import TwitchTrackerDataSource
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.appName("TwitchDataSourceApp").getOrCreate()

client_id = dbutils.secrets.get("twitch", "client_id")
client_secret = dbutils.secrets.get("twitch", "client_secret")

spark.dataSource.register(TwitchDataSource)
spark.dataSource.register(TwitchTrackerDataSource)

twitch_df  = (
    spark.read
    .format("twitch_stream")
    .option("client_id", client_id)
    .option("client_secret", client_secret)
    .load()
)

game_ids = [row["id"] for row in twitch_df.select("id").distinct().collect()]
game_ids_str = ",".join(game_ids)

twitch_tracker_df = (spark.read
    .format("twitch_tracker")
    .option("game_ids", game_ids_str)
    .load())

games_data_df = (
    twitch_df.join(twitch_tracker_df, twitch_df.id == twitch_tracker_df.id, "left")
    .select(
        current_timestamp().alias("ingestion_timestamp"),
        twitch_df.id,
        twitch_df.name,
        twitch_df.box_art_url,
        twitch_df.igdb_id,
        twitch_tracker_df.avg_viewers,
        twitch_tracker_df.avg_channels,
        twitch_tracker_df.rank,
        twitch_tracker_df.hours_watched
    )
)

games_data_df.write.format("delta").mode("overwrite").saveAsTable("games_analytics.dev.twitch_games_data")