## Camada Silver

- Aqui irei criar as tabelas fatos e dimensoes, tendo mais preocupacao com a ingestao e tipagem do dado em si

### Dimensoes

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, to_date, col
from pyspark.sql.types import (
    StructType,
    StructField,
    LongType,
    DateType,
    StringType,
    FloatType,
    IntegerType
)

In [0]:
#schema
schema_dim_brand = StructType([
    StructField("CE_BRAND_FLVR", StringType(), nullable=False),
    StructField("BRAND_NM", StringType(), True)
])

#agregando dado para criar dimensao
dim_brand = spark.table("ab_inbev.bronze.beverage_sales").select("CE_BRAND_FLVR", "BRAND_NM").distinct()

#forcando schema
dim_brand = spark.createDataFrame(dim_brand.rdd, schema=schema_dim_brand)

#escrita
dim_brand.write.mode("overwrite").saveAsTable("ab_inbev.silver.dim_brand")

#chave primaria
spark.sql("ALTER TABLE ab_inbev.silver.dim_brand ALTER COLUMN CE_BRAND_FLVR SET NOT NULL") #gambiarra
spark.sql("ALTER TABLE ab_inbev.silver.dim_brand ADD PRIMARY KEY (CE_BRAND_FLVR)")

spark.table("ab_inbev.silver.dim_brand").display()

In [0]:
#schema
schema_dim_channels = StructType([
    StructField("TRADE_CHNL_DESC", StringType(), nullable=False),
    StructField("TRADE_GROUP_DESC", StringType(), True),
    StructField("TRADE_TYPE_DESC", StringType(), True)
])

#agregando dado para criar dimensao
dim_channels = spark.table("ab_inbev.bronze.beverage_sales_and_channels") \
    .select("TRADE_CHNL_DESC", "TRADE_GROUP_DESC", "TRADE_TYPE_DESC") \
    .distinct()

#forcando schema
dim_channels = spark.createDataFrame(dim_channels.rdd, schema=schema_dim_channels)

#escrita
dim_channels.write.mode("overwrite").saveAsTable("ab_inbev.silver.dim_channels")

#chave primaria
spark.sql("ALTER TABLE ab_inbev.silver.dim_channels ALTER COLUMN TRADE_CHNL_DESC SET NOT NULL") #gambiarra
spark.sql("ALTER TABLE ab_inbev.silver.dim_channels ADD PRIMARY KEY (TRADE_CHNL_DESC)")

spark.table("ab_inbev.silver.dim_channels").display()

In [0]:
spark.table("ab_inbev.bronze.beverage_sales").select("PKG_CAT", "Pkg_Cat_Desc", "TSR_PCKG_NM").distinct().display()

In [0]:
#schema
schema_dim_pkg = StructType([
    StructField("TSR_PCKG_NM", StringType(), nullable=False),
    StructField("PKG_CAT", StringType(), True),
    StructField("Pkg_Cat_Desc", StringType(), True)
])

#agregando dado para criar dimensao
dim_pkg = spark.table("ab_inbev.bronze.beverage_sales").select("TSR_PCKG_NM", "PKG_CAT", "Pkg_Cat_Desc").distinct()


#forcando schema
dim_pkg = spark.createDataFrame(dim_pkg.rdd, schema=schema_dim_pkg)

#escrita
dim_pkg.write.mode("overwrite").saveAsTable("ab_inbev.silver.dim_pkg")

#chave primaria
spark.sql("ALTER TABLE ab_inbev.silver.dim_pkg ALTER COLUMN TSR_PCKG_NM SET NOT NULL")
spark.sql("ALTER TABLE ab_inbev.silver.dim_pkg ADD PRIMARY KEY (TSR_PCKG_NM)")

spark.table("ab_inbev.silver.dim_pkg").display()

In [0]:
# schema
schema_dim_date = StructType(
    [
        StructField("DATE", DateType(), nullable=False),
        StructField("YEAR", IntegerType(), False),
        StructField("MONTH", IntegerType(), False),
        StructField("PERIOD", IntegerType(), False),
    ]
)

# agregando dado para criar dimensao
dim_date = (
    spark.table("ab_inbev.bronze.beverage_sales")
    .withColumn("DATE", to_date(col("DATE"), "M/d/yyyy"))
    .withColumn("YEAR", col("YEAR").cast("int"))
    .withColumn("MONTH", col("MONTH").cast("int"))
    .withColumn("PERIOD", col("PERIOD").cast("int"))
    .select("DATE", "YEAR", "MONTH", "PERIOD")
    .distinct()
)

# forcando schema
dim_date = spark.createDataFrame(dim_date.rdd, schema=schema_dim_date)

# escrita
dim_date.write.mode("overwrite").saveAsTable("ab_inbev.silver.dim_date")

# chave primaria
spark.sql("ALTER TABLE ab_inbev.silver.dim_date ALTER COLUMN `DATE` SET NOT NULL")
spark.sql("ALTER TABLE ab_inbev.silver.dim_date ADD PRIMARY KEY (`DATE`)")

spark.table("ab_inbev.silver.dim_date").display()

### Fatos

In [0]:
schema_fato = StructType(
    [
        StructField("sale_id", LongType(), nullable=False),
        StructField("DATE", DateType(), nullable=False),
        StructField("CE_BRAND_FLVR", StringType(), nullable=False),
        StructField("TRADE_CHNL_DESC", StringType(), nullable=False),
        StructField("TSR_PCKG_NM", StringType(), nullable=False),
        StructField("Volume", FloatType(), True),
        StructField("Btlr_Org_LVL_C_Desc", StringType(), True)
    ]
)

df_fato = (
    spark.table("ab_inbev.bronze.beverage_sales_and_channels")
    .withColumn("sale_id", monotonically_increasing_id())
    .withColumn("DATE", to_date(col("DATE"), "M/d/yyyy"))
    .withColumn("Volume", col("Volume").cast("float"))

    .select(
        "sale_id", "DATE", "CE_BRAND_FLVR", "TRADE_CHNL_DESC", "TSR_PCKG_NM", "Volume", "Btlr_Org_LVL_C_Desc"
    )
)

df_fato = spark.createDataFrame(df_fato.rdd, schema=schema_fato)

#escrita
df_fato.write.mode("overwrite").saveAsTable("ab_inbev.silver.fato_sales")

#chave primaria
spark.sql("ALTER TABLE ab_inbev.silver.fato_sales ALTER COLUMN sale_id SET NOT NULL")
spark.sql("ALTER TABLE ab_inbev.silver.fato_sales ADD PRIMARY KEY (sale_id)")

#chaves estrangeiras
spark.sql("ALTER TABLE ab_inbev.silver.fato_sales ADD FOREIGN KEY (`DATE`) REFERENCES ab_inbev.silver.dim_date(`DATE`)") 
spark.sql("ALTER TABLE ab_inbev.silver.fato_sales ADD FOREIGN KEY (CE_BRAND_FLVR) REFERENCES ab_inbev.silver.dim_brand(CE_BRAND_FLVR)")
spark.sql("ALTER TABLE ab_inbev.silver.fato_sales ADD FOREIGN KEY (TRADE_CHNL_DESC) REFERENCES ab_inbev.silver.dim_channels(TRADE_CHNL_DESC)")  
spark.sql("ALTER TABLE ab_inbev.silver.fato_sales ADD FOREIGN KEY (TSR_PCKG_NM) REFERENCES ab_inbev.silver.dim_pkg(TSR_PCKG_NM)")

spark.table("ab_inbev.silver.fato_sales").display()