In [15]:
# PySpark - Types
from pyspark.sql.types import IntegerType, FloatType, StringType

# PySpark - Window
from pyspark.sql.window import Window

# PySpark - Functions
from pyspark.sql.functions import (
    col, when, substring, monotonically_increasing_id,
    lag, lit, concat_ws, lpad, sum as spark_sum, expr
)

# PySpark - Funções com alias
from pyspark.sql import functions as F

from pyspark.sql.functions import substring, when, col


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 17, Finished, Available, Finished)

In [2]:
# 1. Ler s_dim_tempo (camada Silver)
df_tempo = spark.read.format("delta").table("s_dim_tempo").select("mes", "ano").distinct()

# 2. Gerar id_data como primeiro dia do mês (YYYYMMDD)
df_id_data = df_tempo.withColumn(
    "id_data",
    concat_ws("", col("ano").cast("string"), lpad(col("mes").cast("string"), 2, "0"), lit("01")).cast("int")
).select("id_data")

# 3. Receita mensal da Alucar
df_receita = (
    spark.read.format("delta").table("s_fato_vendas_alucar")
    .join(spark.read.format("delta").table("s_dim_tempo"), on="id_data")
    .withColumn(
        "id_data",
        concat_ws("", col("ano").cast("string"), lpad(col("mes").cast("string"), 2, "0"), lit("01")).cast("int")
    )
    .groupBy("id_data")
    .agg(spark_sum("valor_venda").alias("receita_total"))
)

# 4. Despesas mensais da Alucar
df_despesas = (
    spark.read.format("delta").table("s_fato_despesas")
    .filter(col("origem") == "Alucar")
    .join(spark.read.format("delta").table("s_dim_tempo"), on="id_data")
    .withColumn(
        "id_data",
        concat_ws("", col("ano").cast("string"), lpad(col("mes").cast("string"), 2, "0"), lit("01")).cast("int")
    )
    .groupBy("id_data")
    .agg(spark_sum("valor").alias("despesas_total"))
)

# 5. Criar base g_fato_financas_mensal_alucar
df_financas = (
    df_id_data
    .join(df_receita, on="id_data", how="left")
    .join(df_despesas, on="id_data", how="left")
    .fillna(0)
    .withColumn("lucro", col("receita_total") - col("despesas_total"))
    .withColumn("margem_lucro", expr("lucro / receita_total"))
)

# 6. Evolução do lucro (variação percentual mês a mês)

window_spec = Window.orderBy("id_data")
df_financas = df_financas.withColumn("lucro_anterior", lag("lucro").over(window_spec))
df_financas = df_financas.withColumn(
    "evolucao_lucro",
    when(col("lucro_anterior").isNotNull(), (col("lucro") - col("lucro_anterior")) / col("lucro_anterior"))
    .otherwise(lit(0))
)

# 7. Selecionar colunas finais e tipos corretos
df_final = df_financas.select(
    col("id_data").cast(IntegerType()),
    col("receita_total").cast(FloatType()),
    col("despesas_total").cast(FloatType()),
    col("lucro").cast(FloatType()),
    col("margem_lucro").cast(FloatType()),
    col("evolucao_lucro").cast(FloatType())
)

# 8. Salvar na camada Gold (Delta Table)
df_final.write.mode("overwrite").format("delta").saveAsTable("g_fato_financas_mensal_alucar")



StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 4, Finished, Available, Finished)

In [3]:
# Cria base com todas as datas até a data máxima em s_dim_pagamentos_programados
max_id_data = spark.table("s_dim_pagamentos_programados") \
    .agg(F.max("id_data").alias("max_id_data")) \
    .collect()[0]["max_id_data"]

df_tempo = spark.table("s_dim_tempo") \
    .withColumn("id_data", F.expr("CAST(CONCAT(ano, LPAD(mes, 2, '0'), '01') AS BIGINT)")) \
    .filter(F.col("id_data") <= max_id_data) \
    .select("id_data") \
    .distinct() \
    .orderBy("id_data")

# Receita mensal Consigcar
df_receita = (
    spark.table("s_fato_vendas_consigcar")
    .join(spark.table("s_dim_tempo"), F.col("data_primeira_parcela") == F.col("s_dim_tempo.id_data"))
    .withColumn("id_data", F.expr("CAST(CONCAT(ano, LPAD(mes, 2, '0'), '01') AS BIGINT)"))
    .groupBy("id_data")
    .agg(F.sum("valor_total").alias("receita_total"))
)

# Despesas mensais Consigcar
df_despesas = (
    spark.table("s_fato_despesas")
    .filter(F.col("origem") == "Consigcar")
    .join(spark.table("s_dim_tempo"), "id_data")
    .withColumn("id_data", F.expr("CAST(CONCAT(ano, LPAD(mes, 2, '0'), '01') AS BIGINT)"))
    .groupBy("id_data")
    .agg(F.sum("valor").alias("despesas_total"))
)

# Parcelamentos a receber
df_parcelamentos = (
    spark.table("s_dim_pagamentos_programados")
    .join(spark.table("s_dim_tempo"), "id_data")
    .withColumn("id_data", F.expr("CAST(CONCAT(ano, LPAD(mes, 2, '0'), '01') AS BIGINT)"))
    .groupBy("id_data")
    .agg(F.sum("valor").alias("parcelamentos_receber"))
)

# Faturamento PagSeguro
df_pagseguro = spark.table("s_fato_faturamento_pagseguro") \
    .select("id_data", F.col("valor_faturado").alias("faturamento_pagseguro"))

# Junta tudo
df = df_tempo \
    .join(df_receita, "id_data", "left") \
    .join(df_despesas, "id_data", "left") \
    .join(df_parcelamentos, "id_data", "left") \
    .join(df_pagseguro, "id_data", "left")

# Cálculos
df = df.withColumn("lucro", F.col("receita_total") - F.col("despesas_total"))
df = df.withColumn("margem_lucro", F.col("lucro") / F.col("receita_total"))

window_spec = Window.orderBy("id_data")
df = df.withColumn("evolucao_lucro", F.round(F.col("lucro") / F.lag("lucro", 1).over(window_spec) - 1, 6))

df = df.withColumn("faturamento_pagseguro", F.coalesce(F.col("faturamento_pagseguro"), F.lit(0)))
df = df.withColumn("lucro_pag", F.col("faturamento_pagseguro") - F.col("despesas_total"))
df = df.withColumn("margem_lucro_pag", F.col("lucro_pag") / F.col("faturamento_pagseguro"))
df = df.withColumn("evolucao_lucro_pag", F.round(F.col("lucro_pag") / F.lag("lucro_pag", 1).over(window_spec) - 1, 6))

# Substitui nulos por 0
df = df.fillna(0)

# Grava como tabela Delta
df.write.format("delta").mode("overwrite").saveAsTable("g_fato_financas_mensal_consigcar")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 5, Finished, Available, Finished)

In [4]:
# Lê as tabelas Delta de Consigcar e Alucar
df_consigcar = spark.table("g_fato_financas_mensal_consigcar")
df_alucar = spark.table("g_fato_financas_mensal_alucar")

# Faz o outer join para pegar todos os meses
df_merged = df_consigcar.alias("consigcar") \
    .join(df_alucar.alias("alucar"), on="id_data", how="outer") \
    .select(
        F.coalesce(F.col("consigcar.id_data"), F.col("alucar.id_data")).alias("id_data"),
        F.col("consigcar.receita_total").alias("receita_total_consigcar"),
        F.col("consigcar.despesas_total").alias("despesas_total_consigcar"),
        F.col("consigcar.lucro").alias("lucro_consigcar"),
        F.col("consigcar.margem_lucro").alias("margem_lucro_consigcar"),
        F.col("consigcar.evolucao_lucro").alias("evolucao_lucro_consigcar"),
        F.col("alucar.receita_total").alias("receita_total_alucar"),
        F.col("alucar.despesas_total").alias("despesas_total_alucar"),
        F.col("alucar.lucro").alias("lucro_alucar"),
        F.col("alucar.margem_lucro").alias("margem_lucro_alucar"),
        F.col("alucar.evolucao_lucro").alias("evolucao_lucro_alucar"),
    )
# Substitui valores nulos por 0
df_merged = df_merged.fillna(0)

# Soma os valores das duas empresas
df_merged = df_merged.withColumn("receita_total", (F.col("receita_total_consigcar") + F.col("receita_total_alucar")).cast("int"))
df_merged = df_merged.withColumn("despesas_total", (F.col("despesas_total_consigcar") + F.col("despesas_total_alucar")).cast("int"))
df_merged = df_merged.withColumn("lucro", (F.col("lucro_consigcar") + F.col("lucro_alucar")).cast("int"))

# Margem de lucro em percentual
df_merged = df_merged.withColumn("margem_lucro", (F.col("lucro") / F.col("receita_total")) * 100)

# Evolução do lucro (porcentagem)
window_spec = Window.orderBy("id_data")
df_merged = df_merged.withColumn("evolucao_lucro", F.col("lucro") / F.lag("lucro", 1).over(window_spec) * 100 - 100)

# Seleciona somente as colunas desejadas
df_final = df_merged.select(
    "id_data",
    "receita_total",
    "despesas_total",
    "lucro",
    "margem_lucro",
    "evolucao_lucro"
)

# Salva como Delta Table no Lakehouse
df_final.write.format("delta").mode("overwrite").saveAsTable("g_fato_financas_mensal")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 6, Finished, Available, Finished)

In [5]:
# Lê a tabela mensal da Alucar no formato Delta
df_alucar = spark.table("g_fato_financas_mensal_alucar")

# Extrai o ano de id_data (assumindo formato YYYYMMDD como inteiro)
df_alucar = df_alucar.withColumn("ano", F.col("id_data").cast("string").substr(1, 4))

# Agrupa por ano e faz a soma
df_alucar_anual = df_alucar.groupBy("ano").agg(
    F.sum("receita_total").alias("receita_total"),
    F.sum("despesas_total").alias("despesas_total"),
    F.sum("lucro").alias("lucro")
)

# Cria a coluna id_data no formato ano + "0101" e converte para inteiro
df_alucar_anual = df_alucar_anual.withColumn("id_data", F.concat(F.col("ano"), F.lit("0101")).cast("int"))

# Calcula a margem de lucro (como float)
df_alucar_anual = df_alucar_anual.withColumn("margem_lucro", (F.col("lucro") / F.col("receita_total")) * 100)

# Ordena por id_data e calcula a evolução do lucro
window_spec = Window.orderBy("id_data")
df_alucar_anual = df_alucar_anual.withColumn("evolucao_lucro", (F.col("lucro") / F.lag("lucro", 1).over(window_spec)) * 100 - 100)

# Seleciona as colunas desejadas
df_final = df_alucar_anual.select(
    "id_data",
    "receita_total",
    "despesas_total",
    "lucro",
    "margem_lucro",
    "evolucao_lucro"
)

# Salva como tabela Delta no Lakehouse
df_final.write.format("delta").mode("overwrite").saveAsTable("g_fato_financas_anual_alucar")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 7, Finished, Available, Finished)

In [6]:
# Lê a tabela delta g_fato_financas_mensal_consigcar do Lakehouse
df_consigcar = spark.read.format("delta").table("g_fato_financas_mensal_consigcar")

# Extrai o ano da coluna id_data (convertendo para string e pegando os 4 primeiros caracteres)
df_consigcar = df_consigcar.withColumn("ano", F.substring(F.col("id_data").cast("string"), 1, 4))

# Agrupa por ano e soma as colunas de interesse
df_consigcar_anual = df_consigcar.groupBy("ano").agg(
    F.sum("receita_total").alias("receita_total"),
    F.sum("despesas_total").alias("despesas_total"),
    F.sum("lucro").alias("lucro")
)

# Cria a coluna id_data concatenando ano + "0101" e converte para inteiro (se desejar)
df_consigcar_anual = df_consigcar_anual.withColumn("id_data", (F.col("ano") + F.lit("0101")).cast("int"))

# Calcula margem de lucro
df_consigcar_anual = df_consigcar_anual.withColumn(
    "margem_lucro", 
    (F.col("lucro") / F.col("receita_total")) * 100
)

# Ordena por id_data para calcular evolução do lucro
window_spec = Window.orderBy("id_data")

df_consigcar_anual = df_consigcar_anual.withColumn(
    "evolucao_lucro",
    (F.col("lucro") / F.lag("lucro").over(window_spec) - 1) * 100
)

# Seleciona e ordena as colunas finais
df_consigcar_anual = df_consigcar_anual.select(
    "id_data",
    "receita_total",
    "despesas_total",
    "lucro",
    "margem_lucro",
    "evolucao_lucro"
)

# Salva o resultado sobrescrevendo a tabela delta g_fato_financas_anual_consigcar
df_consigcar_anual.write.format("delta").mode("overwrite").saveAsTable("g_fato_financas_anual_consigcar")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 8, Finished, Available, Finished)

In [7]:
# Lê a tabela delta s_fato_despesas da camada silver
df_despesas = spark.read.format("delta").table("s_fato_despesas")

# Converte as colunas que são do tipo long para integer, para garantir a compatibilidade
# (em PySpark, o tipo equivalente do pandas int64 é LongType)
cols_to_cast_int = [field.name for field in df_despesas.schema.fields if field.dataType.simpleString() == "bigint"]

for col_name in cols_to_cast_int:
    df_despesas = df_despesas.withColumn(col_name, df_despesas[col_name].cast(IntegerType()))

# Aqui, o valor já deve estar como float, mas se precisar, pode converter explicitamente também:
if 'valor' in df_despesas.columns:
    df_despesas = df_despesas.withColumn("valor", df_despesas["valor"].cast(FloatType()))

# Adiciona id_cliente único
df_despesas = df_despesas.withColumn("id_despesa", monotonically_increasing_id())

# Reordenar as colunas para colocar id_despesa na primeira posição
cols = df_despesas.columns
cols.remove("id_despesa")  # remove para não duplicar
cols = ["id_despesa"] + cols  # id_despesa primeiro

df_despesas = df_despesas.select(cols)

# Salva a tabela no Lakehouse na camada gold, sobrescrevendo a tabela g_dre_despesas
df_despesas.write.format("delta").mode("overwrite").saveAsTable("g_dre_despesas")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 9, Finished, Available, Finished)

In [8]:
# Apaga os registros existentes da tabela g_plr_vendas_vendedor_mensal
# Em Spark não existe DELETE simples em tabela Delta, então sobrescrevemos a tabela com empty DataFrame
spark.sql("DROP TABLE IF EXISTS g_plr_vendas_vendedor_mensal")

# Lê a tabela delta s_fato_vendas_diaria_vendedor da camada silver
df_vendas_diarias = spark.read.format("delta").table("s_fato_vendas_diaria_vendedor")

# Extrai mes_ano dos primeiros 6 caracteres de id_data (convertendo para string)
df_vendas_diarias = df_vendas_diarias.withColumn("mes_ano", F.substring(F.col("id_data").cast("string"), 1, 6))

# Agrupa por id_vendedor e mes_ano somando total_vendas e valor_total
df_vendas_mensais = df_vendas_diarias.groupBy("id_vendedor", "mes_ano").agg(
    F.sum("total_vendas").alias("total_vendas"),
    F.sum("valor_total").alias("valor_total")
)

# Cria id_data adicionando '01' ao mes_ano e convertendo para inteiro
df_vendas_mensais = df_vendas_mensais.withColumn("id_data", (F.col("mes_ano") + F.lit("01")).cast("int"))

# Remove a coluna mes_ano
df_vendas_mensais = df_vendas_mensais.drop("mes_ano")

# Cria janela para rank mensal por id_data ordenando valor_total desc
window_spec = Window.partitionBy("id_data").orderBy(F.desc("valor_total"))

# Calcula ranking denso (dense_rank)
df_vendas_mensais = df_vendas_mensais.withColumn("ranking", F.dense_rank().over(window_spec))

# Renomeia coluna valor_total para valor_parcelas_total
df_vendas_mensais = df_vendas_mensais.withColumnRenamed("valor_total", "valor_parcelas_total")

# Converte colunas para IntegerType (se precisar)
cols_int = ["id_data", "id_vendedor", "total_vendas", "valor_parcelas_total", "ranking"]
for col in cols_int:
    df_vendas_mensais = df_vendas_mensais.withColumn(col, F.col(col).cast(IntegerType()))

# Salva a tabela sobrescrevendo a gold table
df_vendas_mensais.write.format("delta").mode("overwrite").saveAsTable("g_plr_vendas_vendedor_mensal")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 10, Finished, Available, Finished)

In [9]:
# Apaga os registros existentes da tabela g_plr_vendas_ultimos_10_dias
spark.sql("DROP TABLE IF EXISTS g_plr_vendas_ultimos_10_dias")

# Lê a tabela delta s_fato_vendas_diaria_vendedor da camada silver
df_vendas_diarias = spark.read.format("delta").table("s_fato_vendas_diaria_vendedor")

# Converte id_data (int) para string e depois para date
df_vendas_diarias = df_vendas_diarias.withColumn("data", F.to_date(F.col("id_data").cast("string"), "yyyyMMdd"))

# Calcula o último dia do mês para cada data
df_vendas_diarias = df_vendas_diarias.withColumn("ultimo_dia_mes", F.last_day(F.col("data")))

# Filtra para pegar somente os últimos 10 dias do mês (data >= ultimo_dia_mes - 9 dias)
df_ultimos_10_dias = df_vendas_diarias.filter(
    F.col("data") >= F.date_sub(F.col("ultimo_dia_mes"), 9)
)

# Agrupa por id_vendedor e id_data somando total_vendas e valor_total
df_agrupado = df_ultimos_10_dias.groupBy("id_vendedor", "id_data").agg(
    F.sum("total_vendas").alias("total_vendas"),
    F.sum("valor_total").alias("valor_total")
)

# Cria janela para ranking por id_data ordenando valor_total desc
window_spec = Window.partitionBy("id_data").orderBy(F.desc("valor_total"))

# Calcula ranking denso (dense_rank)
df_agrupado = df_agrupado.withColumn("ranking", F.dense_rank().over(window_spec))

# Converte colunas para IntegerType, se necessário
cols_int = ["id_data", "id_vendedor", "total_vendas", "valor_total", "ranking"]
for col in cols_int:
    df_agrupado = df_agrupado.withColumn(col, F.col(col).cast(IntegerType()))

# Salva a tabela sobrescrevendo a gold table
df_agrupado.write.format("delta").mode("overwrite").saveAsTable("g_plr_vendas_ultimos_10_dias")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 11, Finished, Available, Finished)

In [10]:
# Lê a tabela s_fato_vendas_diaria_vendedor da camada Silver
df_vendas_diarias = spark.read.format("delta").table("s_fato_vendas_diaria_vendedor").select(
    "id_data", "id_vendedor", "total_vendas", "valor_total", "valor_parcela"
)

# Lê a tabela s_dim_vendedor da camada Silver
df_vendedores = spark.read.format("delta").table("s_dim_vendedor").select(
    "id_vendedor", "nome_vendedor"
)

# Faz o join para obter o nome do vendedor
df_join = df_vendas_diarias.join(df_vendedores, on="id_vendedor", how="left")

# Converte os tipos de dados para manter coerência (se necessário)
df_final = df_join \
    .withColumn("id_data", df_join["id_data"].cast(IntegerType())) \
    .withColumn("id_vendedor", df_join["id_vendedor"].cast(IntegerType())) \
    .withColumn("total_vendas", df_join["total_vendas"].cast(IntegerType())) \
    .withColumn("valor_total", df_join["valor_total"].cast(IntegerType())) \
    .withColumn("nome_vendedor", df_join["nome_vendedor"].cast(StringType())) \
    .withColumn("valor_parcela", df_join["valor_parcela"].cast(FloatType()))

# Salva a tabela g_plr_vendas_vendedor_diaria na camada gold (sobrescrevendo)
df_final.write.format("delta").mode("overwrite").saveAsTable("g_plr_vendas_vendedor_diaria")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 12, Finished, Available, Finished)

In [11]:
# Carrega a tabela s_fato_metas_consigcar da camada Silver
df_metas = spark.read.format("delta").table("s_fato_metas_consigcar").select(
    "id_data",
    "meta_vendas_1_cum",
    "meta_vendas_2_cum",
    "meta_vendas_1_mes",
    "meta_vendas_2_mes"
)

# Carrega a tabela s_fato_vendas_consigcar da camada Silver
df_vendas = spark.read.format("delta").table("s_fato_vendas_consigcar").select(
    F.col("data_primeira_parcela").alias("id_data"),
    "id_venda_consigcar"
)

# Converte id_data para string
df_metas = df_metas.withColumn("id_data", F.col("id_data").cast("string"))
df_vendas = df_vendas.withColumn("id_data", F.col("id_data").cast("string"))

# Conta quantidade de vendas mensais (vendas_mes)
vendas_mensais = df_vendas.groupBy("id_data").count().withColumnRenamed("count", "vendas_mes")

# Extrai ano e mês em df_vendas para cumulativo
df_vendas = df_vendas.withColumn("ano", F.substring("id_data", 1, 4)) \
                     .withColumn("mes", F.substring("id_data", 5, 2))

# Calcula vendas cumulativas desde janeiro
vendas_cum = df_vendas.groupBy("ano", "mes").count().withColumnRenamed("count", "vendas_count")

# Ordena por ano e mês
window_spec = Window.partitionBy("ano").orderBy("mes")
vendas_cum = vendas_cum.withColumn("vendas_cum", F.sum("vendas_count").over(window_spec))

# Recria id_data com '01' no dia para join
vendas_cum = vendas_cum.withColumn("id_data", F.concat_ws("", "ano", "mes", F.lit("01")))

# Seleciona as colunas necessárias
vendas_cum = vendas_cum.select("id_data", "ano", "vendas_cum")

# Junta vendas_mensais com metas
df_metas = df_metas.join(vendas_mensais, on="id_data", how="left") \
                   .join(vendas_cum, on=["id_data"], how="left")

# Extrai ano de id_data em df_metas para condição (já vem em string)
df_metas = df_metas.withColumn("ano", F.substring("id_data", 1, 4))

# Preenche NAs com 0
df_metas = df_metas.fillna({"vendas_mes": 0, "vendas_cum": 0})

# Define meta_cum_atingida
df_metas = df_metas.withColumn(
    "meta_cum_atingida",
    F.when(
        (F.col("vendas_cum") >= F.col("meta_vendas_1_cum")) & (F.col("vendas_cum") < F.col("meta_vendas_2_cum")),
        1
    ).when(F.col("vendas_cum") >= F.col("meta_vendas_2_cum"), 2).otherwise(0)
)

# Define meta_mes_atingida
df_metas = df_metas.withColumn(
    "meta_mes_atingida",
    F.when(
        (F.col("vendas_mes") >= F.col("meta_vendas_1_mes")) & (F.col("vendas_mes") < F.col("meta_vendas_2_mes")),
        1
    ).when(F.col("vendas_mes") >= F.col("meta_vendas_2_mes"), 2).otherwise(0)
)

# Seleciona e ordena as colunas finais
df_final = df_metas.select(
    "id_data",
    "meta_vendas_1_cum",
    "meta_vendas_2_cum",
    "meta_vendas_1_mes",
    "meta_vendas_2_mes",
    "vendas_mes",
    "vendas_cum",
    "meta_cum_atingida",
    "meta_mes_atingida"
)

# Ajusta os tipos de dados para a tabela gold
df_final = df_final.withColumn("id_data", F.col("id_data").cast(IntegerType())) \
                   .withColumn("meta_vendas_1_cum", F.col("meta_vendas_1_cum").cast(IntegerType())) \
                   .withColumn("meta_vendas_2_cum", F.col("meta_vendas_2_cum").cast(IntegerType())) \
                   .withColumn("meta_vendas_1_mes", F.col("meta_vendas_1_mes").cast(FloatType())) \
                   .withColumn("meta_vendas_2_mes", F.col("meta_vendas_2_mes").cast(FloatType())) \
                   .withColumn("vendas_mes", F.col("vendas_mes").cast(IntegerType())) \
                   .withColumn("vendas_cum", F.col("vendas_cum").cast(IntegerType())) \
                   .withColumn("meta_cum_atingida", F.col("meta_cum_atingida").cast(IntegerType())) \
                   .withColumn("meta_mes_atingida", F.col("meta_mes_atingida").cast(IntegerType()))

# Salva a tabela g_metas_consigcar na camada Gold
df_final.write.format("delta").mode("overwrite").saveAsTable("g_metas_consigcar")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 13, Finished, Available, Finished)

In [12]:
# Carrega a tabela s_fato_metas_alucar da camada Silver
df_metas = spark.read.format("delta").table("s_fato_metas_alucar").select(
    "id_data",
    "meta_vendas_1_cum",
    "meta_vendas_2_cum",
    "meta_vendas_1_mes",
    "meta_vendas_2_mes"
)

# Cria as colunas meta_cum_atingida e meta_mes_atingida zeradas
df_metas = df_metas.withColumn("meta_cum_atingida", F.lit(0)) \
                   .withColumn("meta_mes_atingida", F.lit(0)) \
                   .withColumn("vendas_mes", F.lit(0)) \
                   .withColumn("vendas_cum", F.lit(0))

# Seleciona e ordena colunas conforme tabela gold
df_final = df_metas.select(
    "id_data",
    "meta_vendas_1_cum",
    "meta_vendas_2_cum",
    "meta_vendas_1_mes",
    "meta_vendas_2_mes",
    "vendas_mes",
    "vendas_cum",
    "meta_cum_atingida",
    "meta_mes_atingida"
)

# Ajusta tipos de dados para a camada gold
df_final = df_final.withColumn("id_data", F.col("id_data").cast(IntegerType())) \
                   .withColumn("meta_vendas_1_cum", F.col("meta_vendas_1_cum").cast(IntegerType())) \
                   .withColumn("meta_vendas_2_cum", F.col("meta_vendas_2_cum").cast(IntegerType())) \
                   .withColumn("meta_vendas_1_mes", F.col("meta_vendas_1_mes").cast(FloatType())) \
                   .withColumn("meta_vendas_2_mes", F.col("meta_vendas_2_mes").cast(FloatType())) \
                   .withColumn("vendas_mes", F.col("vendas_mes").cast(IntegerType())) \
                   .withColumn("vendas_cum", F.col("vendas_cum").cast(IntegerType())) \
                   .withColumn("meta_cum_atingida", F.col("meta_cum_atingida").cast(IntegerType())) \
                   .withColumn("meta_mes_atingida", F.col("meta_mes_atingida").cast(IntegerType()))

# Salva a tabela g_metas_alucar na camada Gold
df_final.write.format("delta").mode("overwrite").saveAsTable("g_metas_alucar")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 14, Finished, Available, Finished)

In [13]:
# Carrega a tabela silver s_fato_vendas_clientes_alucar_estimativa
df_estimativa = spark.read.format("delta").table("s_fato_vendas_clientes_alucar_estimativa")

# Seleciona as colunas necessárias
df_estimativa = df_estimativa.select(
    "id_data",
    "nome",
    "valor_receita_estimativa"
)

# Ajusta tipos de dados conforme tabela gold
df_estimativa = df_estimativa.withColumn("id_data", df_estimativa["id_data"].cast(IntegerType())) \
                             .withColumn("valor_receita_estimativa", df_estimativa["valor_receita_estimativa"].cast(FloatType()))

# Grava os dados na tabela g_fato_vendas_alucar_estimativa na camada Gold, substituindo dados existentes
df_estimativa.write.format("delta").mode("overwrite").saveAsTable("g_fato_vendas_alucar_estimativa")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 15, Finished, Available, Finished)

In [16]:
# Carrega a tabela silver s_fato_consigcar_estimativa
df_estimativa_consigcar = spark.read.format("delta").table("s_fato_consigcar_estimativa")

# Seleciona as colunas necessárias
df_estimativa_consigcar = df_estimativa_consigcar.select(
    "id_data",
    "valor_receita_estimativa"
)

# Cria a coluna 'sinal' com base no valor de id_data (considerando os 6 primeiros dígitos)
df_estimativa_consigcar = df_estimativa_consigcar.withColumn(
    "sinal",
    when(substring(col("id_data").cast("string"), 1, 6).cast("int") <= 202505, "realizada")
    .otherwise("estimada")
)

# Ajusta tipos conforme tabela gold
from pyspark.sql.types import IntegerType, FloatType, StringType

df_estimativa_consigcar = df_estimativa_consigcar.withColumn("id_data", col("id_data").cast(IntegerType())) \
                                                 .withColumn("valor_receita_estimativa", col("valor_receita_estimativa").cast(FloatType())) \
                                                 .withColumn("sinal", col("sinal").cast(StringType()))

# Salva na tabela gold g_fato_consigcar_estimativa substituindo dados existentes
df_estimativa_consigcar.write.format("delta").mode("overwrite").saveAsTable("g_fato_consigcar_estimativa")


StatementMeta(, 63762178-8d6d-478e-ae7b-e65baefca7c8, 18, Finished, Available, Finished)