In [0]:
# Importa o módulo 'functions' da biblioteca 'pyspark.sql' como 'F', 
# permitindo o uso de funções SQL no contexto do Spark DataFrame.
from pyspark.sql import functions as F

In [0]:
# Define o fuso horário da sessão Spark para 'America/Sao_Paulo'
spark.conf.set("spark.sql.session.timeZone", "America/Sao_Paulo")

In [0]:
# Caminhos dos arquivos CSV de vendas para cada país, utilizados como origem dos dados
origem_path_CA = "fomacao_microsoft_power_bi_profisional.bronze.ca_sales"  # Canadá
origem_path_DE = "fomacao_microsoft_power_bi_profisional.bronze.de_sales"  # Alemanha
origem_path_FR = "fomacao_microsoft_power_bi_profisional.bronze.fr_sales"  # França
origem_path_MX = "fomacao_microsoft_power_bi_profisional.bronze.mx_sales"  # México

In [0]:
# Carrega os dados de vendas do Canadá, removendo colunas técnicas de ingestão
df_silver_CA_Sales = (spark.read.table(origem_path_CA)
                           .drop("_ingest_ts_utc", "_ingest_date", "_source_path", "_source_file"))

# Carrega os dados de vendas da Alemanha, removendo colunas técnicas de ingestão
df_silver_DE_Sales = (spark.read.table(origem_path_DE)
                           .drop("_ingest_ts_utc", "_ingest_date", "_source_path", "_source_file")
                           # Converte a coluna 'Zip' para o tipo string
                           .withColumn("Zip", F.col("Zip").cast("string")))

# Carrega os dados de vendas da França, removendo colunas técnicas de ingestão
df_silver_FR_Sales = (spark.read.table(origem_path_FR)
                           .drop("_ingest_ts_utc", "_ingest_date", "_source_path", "_source_file"))

# Carrega os dados de vendas do México, removendo colunas técnicas de ingestão
df_silver_MX_Sales = (spark.read.table(origem_path_MX)
                           .drop("_ingest_ts_utc", "_ingest_date", "_source_path", "_source_file")
                           # Converte a coluna 'Zip' para o tipo string
                           .withColumn("Zip", F.col("Zip").cast("string"))) 

In [0]:
# Realiza a união dos dados de vendas dos quatro países (Canadá, Alemanha, França e México)
# utilizando 'unionByName' para garantir o alinhamento das colunas por nome.
df_sales_union = (df_silver_CA_Sales.unionByName(df_silver_DE_Sales)
                                    .unionByName(df_silver_FR_Sales)
                                    .unionByName(df_silver_MX_Sales))

# # Exibe o DataFrame resultante da união dos dados de vendas
# display(df_sales_union)

In [0]:
# Adiciona a coluna '_ingest_ts_utc' com o timestamp atual e a coluna '_ingest_date' com a data derivada do timestamp,
# para rastrear o momento de ingestão dos dados no DataFrame unificado de vendas.
df_sales_union = (df_sales_union.withColumn('_ingest_ts_utc', F.current_timestamp())
                                .withColumn("_ingest_date", F.to_date(F.col("_ingest_ts_utc"))))

In [0]:
# Define o nome da tabela de destino e o caminho no esquema 'silver'
tbl = "Fato_Vendas"
destino_path = f"fomacao_microsoft_power_bi_profisional.silver.{tbl}"

# Escreve o DataFrame unificado de vendas como uma tabela Delta, sobrescrevendo dados existentes,
# atualizando o schema conforme necessário e particionando pela coluna '_ingest_date'
df_silver = (df_sales_union.write
                           .format('delta')
                           .mode('overwrite')
                           .option('overwriteSchema', 'true')
                           .partitionBy('_ingest_date')
                           .saveAsTable(destino_path))

# Exibe a contagem de registros da tabela criada
print(f'Contagem de registros: {spark.table(destino_path).count()}')