#### Silver Layer
Silver Layer: Transform the data to a columnar storage format such as parquet or delta, and partition it by brewery location. Please explain any other transformations you perform

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
from pyspark.sql.utils import AnalysisException


In [0]:
partitions = spark.sql("SHOW PARTITIONS ab_inbev_lake.bronze.breweries_api_data")
last_partition = partitions.agg(max(col("execution_date")).alias("last_partition")).collect()[0]["last_partition"]
df_bronze = spark.read.table("ab_inbev_lake.bronze.breweries_api_data").filter(col("execution_date") == last_partition)

In [0]:
df_bronze = (
    df_bronze
    .withColumnRenamed('id', 'cd_id')\
    .withColumnRenamed('name', 'nm_brewery')\
    .withColumnRenamed('brewery_type', 'ds_brewery_type')\
    .withColumnRenamed('address_1', 'nm_address_1')\
    .withColumnRenamed('address_2', 'nm_address_2')\
    .withColumnRenamed('address_3', 'nm_address_3')\
    .withColumnRenamed('country', 'nm_country')\
    .withColumnRenamed('state', 'nm_state')\
    .withColumnRenamed('city', 'nm_city')\
    .withColumnRenamed('postal_code', 'cd_postal_code')\
    .withColumnRenamed('longitude', 'nr_longitude')\
    .withColumnRenamed('latitude', 'nr_latitude')\
    .withColumnRenamed('phone', 'ds_phone')\
    .withColumnRenamed('website_url', 'url_website')\
    .withColumnRenamed('state_province', 'nm_state_province')\
    .withColumnRenamed('execution_date', 'dt_insert')\
    .withColumnRenamed('street', 'nm_street')\
    .withColumnRenamed('url_website', 'ds_website')\
)

df_bronze = (
    df_bronze
    .withColumn("nr_longitude", col("nr_longitude").cast("double"))
    .withColumn("nr_latitude", col("nr_latitude").cast("double"))
    .withColumn("ds_status", when(col("ds_brewery_type") == 'closed', 'inactive').otherwise('active'))
    .withColumn("dt_update", col("dt_insert"))
)

In [0]:
expected_schema = StructType([
    StructField("cd_id", StringType(), False),
    StructField("nm_brewery", StringType(), True),
    StructField("ds_brewery_type", StringType(), True),
    StructField("nm_address_1", StringType(), True),
    StructField("nm_address_2", StringType(), True),
    StructField("nm_address_3", StringType(), True),
    StructField("nm_country", StringType(), True),
    StructField("nm_state", StringType(), True),
    StructField("nm_city", StringType(), True),
    StructField("cd_postal_code", StringType(), True),
    StructField("nr_longitude", DoubleType(), True),
    StructField("nr_latitude", DoubleType(), True),
    StructField("ds_phone", StringType(), True),
    StructField("ds_website", StringType(), True),
    StructField("nm_state_province", StringType(), True),
    StructField("dt_insert", TimestampType(), True),
    StructField("nm_street", StringType(), True),
    StructField("ds_status", StringType(), True),
    StructField("dt_update", TimestampType(), True)
])

def validate_schema(df: DataFrame, expected_schema: StructType) -> bool:
    """
    Verifica se o schema do DataFrame corresponde exatamente ao schema esperado.

    Parameters
    ----------
    df : DataFrame
        DataFrame do Spark a ser validado.
    expected_schema : StructType
        Schema esperado para validação.

    Returns
    -------
    bool
        True se o schema do DataFrame for igual ao esperado, False caso contrário.
    """
    df_fields = [(f.name, f.dataType) for f in df.schema.fields]
    expected_fields = [(f.name, f.dataType) for f in expected_schema.fields]
    return df_fields == expected_fields

In [0]:
def get_table_schema(table_name: str) -> StructType:
    """
    Returns the schema of the specified table.

    Parameters
    ----------
    table_name : str
        Name of the table to retrieve the schema from.

    Returns
    -------
    StructType or None
        The schema of the table if it exists, otherwise None.
    """
    try:
        return spark.table(table_name).schema
    except AnalysisException:
        return None

In [0]:
df_bronze_casted = df_bronze.select(
    [col(field.name).cast(field.dataType) for field in expected_schema.fields]
)

if validate_schema(df_bronze_casted, expected_schema):

    silver_table = "ab_inbev_lake.silver.breweries"

    table_schema = get_table_schema(silver_table)

    if table_schema is None or table_schema != expected_schema:
        (
            df_bronze_casted
            .write
            .format("delta")
            .mode("overwrite")
            .partitionBy("nm_country")
            .option("overwriteSchema", "true")
            .option("mergeSchema", "true")
            .saveAsTable(silver_table)
        )

    compare_cols = [
        c for c in df_bronze_casted.columns if c not in ["cd_id", "dt_insert", "dt_update"]
    ]
    change_condition = " OR ".join([f"source.{c} <> target.{c}" for c in compare_cols])

    delta_silver = DeltaTable.forName(spark, silver_table)
    (
        delta_silver.alias("target")
        .merge(
            df_bronze_casted.alias("source"),
            "target.cd_id = source.cd_id"
        )
        .whenMatchedUpdate(
            condition=change_condition,
            set={
                **{col: f"source.{col}" for col in compare_cols},
                "dt_update": "source.dt_update"
            }
        )
        .whenNotMatchedInsert(
            values={col: f"source.{col}" for col in df_bronze_casted.columns}
        )
        .execute()
    )

    spark.sql(f"OPTIMIZE {silver_table}")

    spark.sql(f"OPTIMIZE {silver_table} ZORDER BY (nm_city)")
else:
    raise ValueError("Schema mismatch: DataFrame does not match the expected schema.")