In [1]:
from pyspark import SparkContext
from pyspark.sql import *
import os
import glob
from time import time
from delta import *

In [2]:
# sc = SparkContext.getOrCreate()
# spark = SparkSession(sc)
builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
start_time = time()

**Read Data from Landing Zone**

In [4]:
all_generations_df_json = spark.read.option("multiLine", "true").json("landing-zone/2022-10-03 09-43-29/json/items/generations/*.json")

In [5]:
checkpoint_1_time = time()
print("Time taken to read All Landing Data JSON: " + str(checkpoint_1_time - start_time))

Time taken to read All Landing Data JSON: 4.303964614868164


In [6]:
generation_1_df_json = spark.read.option("multiLine", "true").json("landing-zone/2022-10-03 09-43-29/json/items/generations/generation-i.json")

print(generation_1_df_json.printSchema())
row_count_all_generations_df_json = all_generations_df_json.count()
col_count_all_generations_df_json = len(all_generations_df_json.columns)
print(f"Rows: {row_count_all_generations_df_json}, Columns: {col_count_all_generations_df_json}")

root
 |-- abilities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
 |-- main_region: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- moves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- name: string (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- language: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- pokemon_species: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- types: array (nullable = true)
 |    |-- element: struct (containsNul

In [7]:
display(all_generations_df_json)
display(all_generations_df_json.show())

DataFrame[abilities: array<struct<name:string,url:string>>, id: bigint, main_region: struct<name:string,url:string>, moves: array<struct<name:string,url:string>>, name: string, names: array<struct<language:struct<name:string,url:string>,name:string>>, pokemon_species: array<struct<name:string,url:string>>, types: array<struct<name:string,url:string>>, version_groups: array<struct<name:string,url:string>>]

+--------------------+---+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+
|           abilities| id|         main_region|               moves|           name|               names|     pokemon_species|               types|      version_groups|
+--------------------+---+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+
|[{pickpocket, htt...|  5|{unova, https://p...|[{hone-claws, htt...|   generation-v|[{{ja-Hrkt, https...|[{victini, https:...|                  []|[{black-white, ht...|
|[{stench, https:/...|  3|{hoenn, https://p...|[{fake-out, https...| generation-iii|[{{ja-Hrkt, https...|[{treecko, https:...|[{shadow, https:/...|[{ruby-sapphire, ...|
|                  []|  1|{kanto, https://p...|[{pound, https://...|   generation-i|[{{ja-Hrkt, https...|[{bulbasaur, http...|[{normal, https:/...|[{red-bl

None

**Write Delta in Bronze Zone**

In [8]:
all_generations_df_json.write.format("delta").mode("overwrite").save(f"bronze-zone/items/generations.delta")

In [10]:
deltaTable = DeltaTable.forPath(spark, f"bronze-zone/items/generations.delta")
deltaTable.vacuum()
deltaTable.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      0|2022-10-04 11:19:...|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|  Serializable|        false|{numFiles -> 8, n...|        null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+



**Process Rest of the entities in Bronze Zone**

In [11]:
df_json = spark.read.option("multiLine", "true").json("landing-zone/2022-10-03 09-43-29/json/items/pokemons/*.json")
df_json.write.format("delta").mode("overwrite").save(f"bronze-zone/items/pokemons.delta")
df_json = spark.read.option("multiLine", "true").json("landing-zone/2022-10-03 09-43-29/json/items/stats/*.json")
df_json.write.format("delta").mode("overwrite").save(f"bronze-zone/items/stats.delta")
df_json = spark.read.option("multiLine", "true").json("landing-zone/2022-10-03 09-43-29/json/items/types/*.json")
df_json.write.format("delta").mode("overwrite").save(f"bronze-zone/items/types.delta")
df_json = spark.read.option("multiLine", "true").json("landing-zone/2022-10-03 09-43-29/json/items/version_groups/*.json")
df_json.write.format("delta").mode("overwrite").save(f"bronze-zone/items/version_groups.delta")
df_json = spark.read.option("multiLine", "true").json("landing-zone/2022-10-03 09-43-29/json/items/versions/*.json")
df_json.write.format("delta").mode("overwrite").save(f"bronze-zone/items/versions.delta")

In [None]:
checkpoint_2_time = time()
print("Time taken to write Data in Bronze Zone JSON: " + str(checkpoint_2_time - checkpoint_1_time))