## Informações Gerais
 | Informações | Detalhes |
 |------------|-------------|
 |Nome Tabela | silver.vendas |
 |Origem | Bronze Azure  |

## Histórico de Atualizações
 | Data | Desenvolvido por | Motivo |
 |:----:|--------------|--------|
 |20/07/2025 | Tulio Augusto  | Criação do notebook |
 |20/07/2025 | Tulio Augusto  | Inclusão coluna valor |

In [0]:
%run /Workspace/Repos/AGRICOPEL/Agricopel-Databricks/FINANCEIRO/CONTAS_RECEBER/00.global/functions

In [0]:
from pyspark.sql.functions import current_date, current_timestamp, expr
from pyspark.sql.functions import monotonically_increasing_id


In [0]:
df_vendas = spark.read.parquet("abfss://bronze@storageagrucopel.dfs.core.windows.net/vendas")

In [0]:
database = "silver.financeiro"
tabela = "vendas"

In [0]:
df_vendas.createOrReplaceTempView("vw_dados")

In [0]:
df_vendas = spark.sql("""
    select
        distinct
        produto as nm_produto,
        p.nr_preco_unitario,
        pais as nm_pais,
        c.nm_tipo_cliente,
        cast(quantidade as decimal(10,2)) as nr_quantidade,
        cast(trim(replace(replace(replace(valor_total, "R$ ",""), ".", ""),",",".")) as decimal(10,2)) as nr_valor_total,
        to_date(data_venda, "dd/MM/yyyy") as dt_data_venda,
        mes as nm_mes,
        cast(ano as int) as nr_ano,
        caminho_arquivo as nm_caminho_arquivo,
        current_timestamp() as dt_data_extracao
    from vw_dados 
    inner join silver.sistema_cadastros.clientes c on vw_dados.pais = c.nm_pais
    inner join silver.sistema_cadastros.produtos p on vw_dados.produto = p.nm_produto
""")

df_vendas = df_vendas.withColumn("id_venda", monotonically_increasing_id())


In [0]:
df_vendas = df_vendas.withColumn("data_carga", current_date())
df_vendas = df_vendas.withColumn("data_hora_carga", expr("current_timestamp() - INTERVAL 3 HOURS"))

In [0]:
comentario_tabela = 'Esta tabela é uma entidade corporativa da área financeira que contém as informações das vendas feitas'

lista_comentario_colunas = {
'id_venda' : 'Identificador único da venda.',
'nm_produto' : 'Nome do produto.',
'nr_preco_unitario' :  'Preço do produto por unidade.',
'nm_pais' : 'Nome do país.',
'nm_tipo_cliente' : 'Nome do tipo de cliente.',
'nr_quantidade' : 'Quantidade de produtos vendidos.',
'nr_valor_total' : 'Valor total da venda',
'dt_data_venda' : 'Data da venda.',
'nm_mes' : 'Nome do mês da venda.',
'nr_ano' : 'Ano da venda.',
'nm_caminho_arquivo' : 'Caminho do arquivo que contém os dados',
'dt_data_extracao' : 'data extração bronze.',
'data_carga' : 'Data que o registro foi carregado',
'data_hora_carga' : 'Data e hora que o registro foi carregado'
}

In [0]:
df_vendas.write \
          .format('delta') \
          .mode('overwrite') \
          .clusterBy("id_venda") \
          .option('overwriteSchema', 'true') \
          .saveAsTable(f'{database}.{tabela}')
adicionaComentariosTabela(database, tabela, comentario_tabela, lista_comentario_colunas)
print("Dados gravados com sucesso!")

In [0]:
spark.sql(f"OPTIMIZE {database}.{tabela}")
print(f"Processo de otimização finalizado!.")

In [0]:
%sql
select * from silver.financeiro.vendas