# 1. Import Dependencies

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, countDistinct, when, col

In [0]:
df_silver = spark.table("silver.tbl_breweries")

#2. Transform data

Transformando data para trazer a region e a agregação solicitada

In [0]:
df_silver_corrected = df_silver.withColumn("country", when(col("country") == "United States", "USA").otherwise(col("country")))

# Adicionar a coluna region com base na state_province
df_with_region = df_silver_corrected \
    .withColumn("region", 
                when(col("country") == 'USA', 
                     when(col("state_province").isin(["Washington", "Oregon", "Idaho", "Montana", "Wyoming"]), "Northwest")
                     .when(col("state_province").isin(["Minnesota", "Wisconsin", "Michigan", "Illinois", "Indiana"]), "North-Central")
                     .when(col("state_province").isin(["New York", "New Jersey", "Massachusetts", "Connecticut", "Rhode Island"]), "Northeast")
                     .when(col("state_province").isin(["Ohio", "Pennsylvania", "Virginia", "North Carolina", "South Carolina"]), "Southeast")
                     .when(col("state_province").isin(["Texas", "Oklahoma", "Arizona", "New Mexico", "Colorado"]), "Southwest")
                     .otherwise("West"))
                .when(col("country") == 'Ireland', 
                      when(col("state_province") == "Laois", "Laois")
                      .when(col("state_province") == "Dublin", "Dublin")
                      .when(col("state_province").isin(["Limerick", "Cork", "Waterford"]), "Munster")
                      .when(col("state_province").isin(["Galway", "Sligo", "Leitrim"]), "Connacht")
                      .when(col("state_province").isin(["Belfast", "Derry"]), "Ulster")
                      .otherwise("Leinster"))
                .otherwise("Unknown")) \
    .withColumn("country_region", col("country") + "_" + col("region"))

# Calcular a quantidade de cervejas por estado, tipo, país e região
aggregated_df = df_with_region \
    .groupBy("country", "state_province", "region", "brewery_type") \
    .agg(
        count("*").alias("quantity"),
        countDistinct("brewery_id").alias("breweries")
    ) \
    .orderBy("country", "state_province", "region", "brewery_type")

# Mostrar os resultados
aggregated_df.display()

country,state_province,region,brewery_type,quantity,breweries
Ireland,Laois,Laois,micro,1,1
USA,Arizona,Southwest,micro,3,3
USA,California,West,closed,1,1
USA,California,West,large,1,1
USA,California,West,micro,2,2
USA,Colorado,Southwest,brewpub,1,1
USA,Colorado,Southwest,large,1,1
USA,Colorado,Southwest,micro,1,1
USA,Colorado,Southwest,proprietor,1,1
USA,Delaware,West,micro,1,1


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS gold;

In [0]:
# Salvar o DataFrame agregado como uma tabela Delta na camada Gold e particionar por region como solicitado
aggregated_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("region") \
    .saveAsTable("gold.tbl_g_breweries")

In [0]:
%sql

select * from gold.tbl_g_breweries

country,state_province,region,brewery_type,quantity,breweries
USA,California,West,closed,1,1
USA,California,West,large,1,1
USA,California,West,micro,2,2
USA,Delaware,West,micro,1,1
USA,Iowa,West,micro,1,1
USA,Maryland,West,contract,1,1
USA,Mississippi,West,micro,1,1
USA,Nevada,West,micro,1,1
USA,Vermont,West,micro,1,1
USA,Arizona,Southwest,micro,3,3
