In [1]:
spark.sql("SHOW TABLES").show(truncate=False)

StatementMeta(, d7ab54d9-0bef-4496-a9ba-751df390a25f, 3, Finished, Available, Finished)

+-------------------------------------------+-------------------+-----------+
|namespace                                  |tableName          |isTemporary|
+-------------------------------------------+-------------------+-----------+
|ws_retail_analytics.lh_retail_analytics.dbo|bronze_features_raw|false      |
|ws_retail_analytics.lh_retail_analytics.dbo|bronze_sales_raw   |false      |
|ws_retail_analytics.lh_retail_analytics.dbo|bronze_stores_raw  |false      |
+-------------------------------------------+-------------------+-----------+



In [45]:
#Ligar ao spark delta, ver o schema analisar se os tipos estão corretos
#Tabela Sales

df_sales_bronze = spark.read.table("bronze_sales_raw")
df_sales_bronze.printSchema()

#selecionar as colunas úteis
from pyspark.sql.functions import col

df_sales_selected = df_sales_bronze.select(
    col("Store").alias("store_id"),
    col("Dept").alias("department_id"),
    col("Date").alias("sales_date"),
    col("Weekly_Sales").alias("weekly_sales"),
    col("IsHoliday").alias("is_holiday")
)

#Conversão dos tipos
from pyspark.sql.functions import to_date
from pyspark.sql.types import IntegerType, DecimalType
from pyspark.sql.functions import when

df_sales_typed = (
    df_sales_selected
    .withColumn("store_id", col("store_id").cast(IntegerType()))
    .withColumn("department_id", col("department_id").cast(IntegerType()))
    .withColumn("weekly_sales", col("weekly_sales").cast(DecimalType(12, 2)))

    .withColumn(
        "is_holiday",
        when(col("is_holiday") == "TRUE", True)
        .when(col("is_holiday") == "FALSE", False)
        .otherwise(None)
    )

    .withColumn("sales_date", to_date(col("sales_date"), "dd/MM/yyyy")) #ver o formato da data no ficheiro dos dados
)

#aplicação de regras técnicas 
df_sales_clean = df_sales_typed.filter(
    col("store_id").isNotNull() &
    col("department_id").isNotNull() &
    col("sales_date").isNotNull()
)

#selecionar as colunas úteis
from pyspark.sql.functions import col

df_sales_selected = df_sales_bronze.select(
    col("Store").alias("store_id"),
    col("Dept").alias("department_id"),
    col("Date").alias("sales_date"),
    col("Weekly_Sales").alias("weekly_sales"),
    col("IsHoliday").alias("is_holiday")
)

#Criar a tabela silver
(
    df_sales_clean
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_sales")
)

#validação
spark.read.table("silver_sales").printSchema()
spark.read.table("silver_sales").show(5)

StatementMeta(, 793c2255-ce12-40f3-a069-e5fbbbd49603, 47, Finished, Available, Finished)

root
 |-- Store: string (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Weekly_Sales: string (nullable = true)
 |-- IsHoliday: string (nullable = true)
 |-- ingestion_date: string (nullable = true)
 |-- ingestion_timestamp: string (nullable = true)
 |-- source_file: string (nullable = true)



In [15]:
#Tratamento da tabela raw features

df_features_bronze = spark.read.table("bronze_features_raw")
df_features_bronze.printSchema()

from pyspark.sql.functions import col

#selecionar as colunas e mudar o alias name
df_features_selected = df_features_bronze.select(
    col("Store").alias("store_id"),
    col("Date").alias("feature_date"),
    col("Temperature").alias("temperature"),
    col("Fuel_Price").alias("fuel_price"),
    col("MarkDown1").alias("markdown_1"),
    col("MarkDown2").alias("markdown_2"),
    col("MarkDown3").alias("markdown_3"),
    col("MarkDown4").alias("markdown_4"),
    col("MarkDown5").alias("markdown_5"),
    col("CPI").alias("cpi"),
    col("Unemployment").alias("unemployment"),
    col("IsHoliday").alias("is_holiday")
)

#converter os tipos de dados da tabela features para os tipos específicos requeridos, habilitando paa joins futuros
#este código corrige: formato regional de data no ficheiro, valores sentinela "NA",cast silencioso para Null

from pyspark.sql.functions import to_date
from pyspark.sql.types import IntegerType, DecimalType
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import when

df_features_typed = (
    df_features_selected
    .withColumn("store_id", col("store_id").cast(IntegerType()))
    .withColumn("temperature", col("temperature").cast(DecimalType(6, 2)))
    .withColumn("fuel_price", col("fuel_price").cast(DecimalType(6, 3)))


    .withColumn("markdown_1", regexp_replace(col("markdown_1"), "NA", "").cast(DecimalType(12,2)))
    .withColumn("markdown_2", regexp_replace(col("markdown_2"), "NA", "").cast(DecimalType(12,2)))
    .withColumn("markdown_3", regexp_replace(col("markdown_3"), "NA", "").cast(DecimalType(12,2)))
    .withColumn("markdown_4", regexp_replace(col("markdown_4"), "NA", "").cast(DecimalType(12,2)))
    .withColumn("markdown_5", regexp_replace(col("markdown_5"), "NA", "").cast(DecimalType(12,2)))

    .withColumn("cpi", col("cpi").cast(DecimalType(8, 3)))
    .withColumn("unemployment", col("unemployment").cast(DecimalType(5, 2)))

    # withColumn("is_holiday", col("is_holiday").cast(BooleanType())) 

    .withColumn(
        "is_holiday",
        when(col("is_holiday") == "TRUE", True)
        .when(col("is_holiday") == "FALSE", False)
        .otherwise(None)
    )
    #no ficheiro aparecem como string TRUE, FALSE

    .withColumn("feature_date", to_date(col("feature_date"), "dd/MM/yyyy")) #ver o formato da data no ficheiro dos dados
)


#testar
df_features_typed.select("feature_date").show(10)

#Aplicação das regras técnicas - Filtrar e exibir apenas os não nulos

df_features_clean = df_features_typed.filter(
    col("store_id").isNotNull() &
    col("feature_date").isNotNull()
)

#Criação da tabela Silver
(
    df_features_clean
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_features")
)

#validação
spark.read.table("silver_features").printSchema()
spark.read.table("silver_features").show(5)

StatementMeta(, 793c2255-ce12-40f3-a069-e5fbbbd49603, 17, Finished, Available, Finished)

In [30]:
#Tratamento da Bronze Stores

df_stores_bronze = spark.read.table("bronze_stores_raw")
df_stores_bronze.printSchema()


from pyspark.sql.functions import col

df_stores_selected = df_stores_bronze.select(
    col("Store").alias("store_id"),
    col("Type").alias("store_type"),
    col("Size").alias("store_size")
)

from pyspark.sql.types import IntegerType

df_stores_typed = (
    df_stores_selected
    .withColumn("store_id", col("store_id").cast(IntegerType()))
    .withColumn("store_size", col("store_size").cast(IntegerType()))
)

df_stores_typed.printSchema()
df_stores_typed.show(truncate=False)

#remover duplicados
df_stores_clean = (
    df_stores_typed
    .filter(col("store_id").isNotNull())
    .dropDuplicates(["store_id"])
)

(
    df_stores_clean
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_stores")
)

spark.read.table("silver_stores").printSchema()
spark.read.table("silver_stores").show(10)



StatementMeta(, 793c2255-ce12-40f3-a069-e5fbbbd49603, 32, Finished, Available, Finished)

root
 |-- Store: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- ingestion_date: string (nullable = true)
 |-- ingestion_timestamp: string (nullable = true)
 |-- source_file: string (nullable = true)

