##Ingest gamelist.json file

In [0]:
%run "../2.utils/0.configuration"

#### 1 - Read the json file using the spark dataframe reader

###### 1.1 import functions

In [0]:
from pyspark.sql.functions import current_timestamp, lit, to_date, regexp_replace, to_timestamp, current_date
from delta.tables import DeltaTable

###### 1.2 read games.csv with defined schema

In [0]:
games_df = spark.read.json(f"{raw_path}/gamelist.json", multiLine=True)

###### 1.3 check dataframe output

In [0]:
display(games_df)

#### 2 - Select wanted columns / fix release date and add create/update timestamps

In [0]:
selected_games_df = games_df.select(
                                    games_df["appid"],
                                    games_df["name"],
                                    games_df["release_date"],
                                    games_df["price"],
                                    games_df["currency"],
                                    games_df["short_description"],
                                    games_df["header_image"],
                                    games_df["windows"],
                                    games_df["mac"],
                                    games_df["linux"],
                                    games_df["metacritic_score"],
                                    games_df["developers"],
                                    games_df["publishers"],
                                    games_df["categories"],
                                    games_df["genres"])\
                                    .withColumn("release_date", regexp_replace("release_date", ",", ''))\
                                    .withColumn("release_date", to_date("release_date", format='d MMM yyyy'))\
                                    .withColumn("create_timestamp",current_timestamp())\
                                    .withColumn("update_timestamp",lit(""))

###### 2.1 check dataframe output for selected columns

In [0]:
display(selected_games_df)

#### 3 - Clean Data

In [0]:
final_df = selected_games_df.na.drop(subset=["release_date"])

###### 3.1 check final dataframe output

In [0]:
display(final_df)

#### 4 - Write dataframe to delta table.
* Check if table exists
* If table exists check for old and new results
* Update timestamp if appid already exists
* Insert everything new if row dont exist

In [0]:
if (spark._jsparkSession.catalog().tableExists("steam_processed.games")):
    deltaTable = DeltaTable.forPath(spark, f"{processed_path}/games")
    deltaTable.alias("tgt").merge(
        final_df.alias("src"),
        "tgt.appid = src.appid") \
      .whenMatchedUpdate(set={"update_timestamp": current_timestamp()}) \
      .whenNotMatchedInsertAll()\
      .execute()
else:
    final_df.write.mode("overwrite").format("delta").saveAsTable("steam_processed.games")

###### 4.1 Test read of delta table

In [0]:
%sql SELECT * FROM steam_processed.games where release_date >= '2022-01-01' ORDER BY release_date DESC;

In [0]:
dbutils.notebook.exit("ended successfully")

###### 4.2 small data tests and data verifying

In [0]:
%sql select count(appid) from steam_processed.games;

In [0]:
%sql select count(appid) from steam_processed.games;

In [0]:
%sql select appid, count(1) as counter from steam_processed.games group by appid having counter > 1;