# Silver Layer - Clean and Structure the Data

🎯 **Goal**: Make the data clean, consistent, and usable for downstream tasks

🔧 **Tasks a data engineer performs**:
- Read from the bronze layer
- Flatten nested structures (e.g. explode arrays)
- Drop invalid or duplicate rows
- Convert data types to the correct formats (e.g., height as float)
- Rename columns for clarity or consistency
- Store result as a new dataset

### 🧹 This notebook: Silver Layer – Cleaned & Structured Pokémon Data

In this notebook, we:
- Read the raw Pokémon data from the bronze Delta table
- Extract selected fields from the nested `raw_json` column
- Explode arrays such as `types` into individual rows
- Ensure data types are consistent (e.g. height as float)
- Rename or restructure columns for clarity
- Save the cleaned data to a new Delta table in the silver layer at `../data/silver/pokemon`

This layer prepares the data for analysis and ML by enforcing structure and consistency.


# Step 1: Import Spark and setup the session


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, from_json
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("DemoPipe - Silver Layer") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Confirm Spark is running
spark.version

# Step 2: Define paths to bronze and silver tables

In [None]:
bronze_path = "../data/bronze/pokemon"
silver_path = "../data/silver/pokemon"

# For Databricks:
# bronze_path = "dbfs:/tmp/bronze/pokemon"
# silver_path = "dbfs:/tmp/silver/pokemon"

# For Microsoft Fabric:
# bronze_path = "Tables/bronze_pokemon"
# silver_path = "Tables/silver_pokemon"

# Step 3: Load the raw bronze Delta table

In [None]:
bronze_df = spark.read.format("delta").load(bronze_path)
bronze_df.printSchema()
bronze_df.show(1, truncate=False)

# Step 4: Define schema to parse JSON string

In [None]:
parsed_schema = StructType([
    StructField("height", IntegerType(), True),
    StructField("weight", IntegerType(), True),
    StructField("base_experience", IntegerType(), True),
    StructField("types", ArrayType(
        StructType([
            StructField("slot", IntegerType(), True),
            StructField("type", StructType([
                StructField("name", StringType(), True),
                StructField("url", StringType(), True)
            ]), True)
        ])
    ), True),
    StructField("stats", ArrayType(
        StructType([
            StructField("base_stat", IntegerType(), True),
            StructField("effort", IntegerType(), True),
            StructField("stat", StructType([
                StructField("name", StringType(), True),
                StructField("url", StringType(), True)
            ]), True)
        ])
    ), True),
    StructField("abilities", ArrayType(
        StructType([
            StructField("is_hidden", BooleanType(), True),
            StructField("slot", IntegerType(), True),
            StructField("ability", StructType([
                StructField("name", StringType(), True),
                StructField("url", StringType(), True)
            ]), True)
        ])
    ), True),
    StructField("moves", ArrayType(
        StructType([
            StructField("move", StructType([
                StructField("name", StringType(), True),
                StructField("url", StringType(), True)
            ]), True)
        ])
    ), True)
])


# Step 5: Parse raw JSON string to struct

In [None]:
df_parsed = bronze_df.withColumn("parsed", from_json(col("raw_json"), parsed_schema))
df_parsed.printSchema()

# Step 6: Extract fields from struct


In [None]:
base_df = df_parsed.select(
    col("id"),
    col("name"),
    col("parsed.height").alias("height"),
    col("parsed.weight").alias("weight"),
    col("parsed.base_experience").alias("base_experience")
)

types_df = df_parsed.select("id", "name", explode("parsed.types").alias("type")) \
    .select("id", "name", col("type.type.name").alias("type_name"))

abilities_df = df_parsed.select("id", "name", explode("parsed.abilities").alias("ability")) \
    .select("id", "name", col("ability.ability.name").alias("ability_name"), col("ability.is_hidden"))

stats_df = df_parsed.select("id", "name", explode("parsed.stats").alias("stat")) \
    .select("id", "name", col("stat.stat.name").alias("stat_name"), col("stat.base_stat").alias("stat_value"))

moves_df = df_parsed.select("id", "name", explode("parsed.moves").alias("move")) \
    .select("id", "name", col("move.move.name").alias("move_name"))

In [None]:
base_df.printSchema()
types_df.printSchema()
abilities_df.printSchema()
stats_df.printSchema()
moves_df.printSchema()

In [None]:
stats_df.show(20, truncate=False)

# Step 7: Save exploded tables seperately for accurate multi-valued representation

In [None]:
base_df.write.format("delta").mode("overwrite").save(f"{silver_path}/base")
types_df.write.format("delta").mode("overwrite").save(f"{silver_path}/types")
abilities_df.write.format("delta").mode("overwrite").save(f"{silver_path}/abilities")
stats_df.write.format("delta").mode("overwrite").save(f"{silver_path}/stats")
moves_df.write.format("delta").mode("overwrite").save(f"{silver_path}/moves")

# Optional: Read and verify

In [None]:
spark.read.format("delta").load(f"{silver_path}/base").show(5)
spark.read.format("delta").load(f"{silver_path}/types").show(5)
spark.read.format("delta").load(f"{silver_path}/abilities").show(5)
spark.read.format("delta").load(f"{silver_path}/stats").show(5)
spark.read.format("delta").load(f"{silver_path}/moves").show(5)