In [None]:
from data_processing import Sparkinit
from data_setup.configuracoes import formatar_sql
from pyspark.sql.types import StructType, StructField, TimestampType, LongType, StringType, DecimalType, BinaryType
spark_start = Sparkinit()

spark = spark_start.buscar_sessao_spark()

print(f"WebUI SparkJobs: {spark.sparkContext.uiWebUrl}")
spark.getActiveSession()

# Vendas

In [None]:
import os
# Carregando o dataframe para verificar a estrutura das colunas e tipos de dados
dados = spark.read.format("parquet").load(os.path.abspath(r"C:\Users\gustavo.lopes\Documentos\GitHub\desafio_panvel-data_engineer\datalake\transient\VENDAS"))

dados.printSchema()

dados.show(n=1, vertical=True)
dados.columns

In [None]:
# Criando a tabela temporária e realizando a consulta que depois usaremos para transformar a camada raw
dados.createOrReplaceTempView("vendas_tmp")

query = formatar_sql(""" -- 'Tabela Vendas parquet'
SELECT 
    COALESCE(date_format(vt.d_dt_vd, 'yyyy-MM-dd HH:mm:ss'), '') AS data_emissao,
    CAST(vt.n_id_fil AS BIGINT) AS codigo_filial,
    CAST(vt.n_id_vd_fil AS BIGINT) AS id_venda_filial,
    COALESCE(CAST(vt.v_cli_cod AS STRING), '') AS codigo_cliente,
    CAST(vt.n_vlr_tot_vd AS DECIMAL(38, 2)) AS valor_total_venda,
    CAST(vt.n_vlr_tot_desc AS DECIMAL(38, 2)) AS valor_total_desconto,
    CASE 
        WHEN vt.v_cpn_eml  = 'SIM' THEN True
        ELSE False
    END AS enviado_email,
    COALESCE(CAST(vt.tp_pgt AS STRING), '') AS tipo_pagamento
FROM vendas_tmp as vt""")

spark.sql(query).show()

# Pedidos

In [None]:

# Carregando o dataframe para verificar a estrutura das colunas e tipos de dados
dados = spark.read.format("parquet").load(os.path.abspath(r"C:\Users\gustavo.lopes\Documentos\GitHub\desafio_panvel-data_engineer\datalake\transient\PEDIDOS"))

dados.printSchema()

dados.show(n=1, vertical=True)
dados.columns

In [None]:
# Criando a tabela temporária e realizando a consulta que depois usaremos para transformar a camada raw
dados.createOrReplaceTempView("pedidos_tmp")

query = formatar_sql("""SELECT 
    CAST(pd.n_id_pdd AS BIGINT) AS id_pedido,
    COALESCE(CAST(pd.d_dt_eft_pdd AS DATE), '') AS data_realizacao_pedido,
    COALESCE(date_format(pd.d_dt_entr_pdd, 'yyyy-MM-dd HH:mm:ss'), '') AS data_entrega,
    CASE 
        WHEN pd.v_cnl_orig_pdd = 'L' THEN 'Loja'
        WHEN pd.v_cnl_orig_pdd = 'A' THEN 'App'
        WHEN pd.v_cnl_orig_pdd = 'S' THEN 'Site'
    END AS canal_origem_pedido,
    COALESCE(CAST(pd.v_uf_entr_pdd AS STRING), '') AS UF_pedido,
    COALESCE(CAST(pd.v_lc_ent_pdd AS STRING), '') AS cidade_entrega,
    CAST(pd.n_vlr_tot_pdd AS DECIMAL(38,2)) AS valor_total_pedido
FROM pedidos_tmp as pd""")

spark.sql(query).show()

# Pedidos Vendas

In [None]:

# Carregando o dataframe para verificar a estrutura das colunas e tipos de dados
dados = spark.read.format("parquet").load(os.path.abspath(r"C:\Users\gustavo.lopes\Documentos\GitHub\desafio_panvel-data_engineer\datalake\transient\PEDIDO_VENDA"))

dados.printSchema()

dados.show(n=1, vertical=True)
dados.columns

In [None]:
# Criando a tabela temporária e realizando a consulta que depois usaremos para transformar a camada raw
dados.createOrReplaceTempView("pedido_venda_tmp")

query = formatar_sql("""SELECT 
    CAST(pv.n_id_fil AS BIGINT) AS codigo_filial,
    CAST(pv.n_id_vd_fil AS BIGINT) AS id_venda_filial,
    CAST(pv.n_id_pdd AS BIGINT) AS id_pedido
FROM pedido_venda_tmp as pv""")

spark.sql(query).show()

# Itens Vendas

In [None]:

# Carregando o dataframe para verificar a estrutura das colunas e tipos de dados
dados = spark.read.format("parquet").load(os.path.abspath(r"C:\Users\gustavo.lopes\Documentos\GitHub\desafio_panvel-data_engineer\datalake\transient\ITENS_VENDAS"))

dados.printSchema()

dados.show(n=1, vertical=True)
dados.columns


In [None]:
# Criando a tabela temporária e realizando a consulta que depois usaremos para transformar a camada raw
dados.createOrReplaceTempView("itens_vendas_tmp")

query = formatar_sql("""SELECT 
    CAST(iv.n_id_fil AS BIGINT) AS codigo_filial,
    CAST(iv.n_id_vd_fil AS BIGINT) AS id_venda_filial,
    CAST(iv.n_id_it AS BIGINT) AS codigo_item_venda,
    CASE 
        WHEN iv.v_rc_elt  = 'SIM' THEN True
        ELSE False
    END AS com_receita_eletronica,
    CASE 
        WHEN iv.v_it_vd_conv = 'SIM' THEN 'Convênio'
        WHEN iv.v_it_vd_conv = 'NAO' AND iv.n_vlr_desc > 0 THEN 'Promoção'
        ELSE 'Sem Desconto'
    END AS tipo_desconto,
    CAST(iv.n_vlr_pis AS DECIMAL(38,2)) AS valor_pis_item,
    CAST(iv.n_vlr_vd AS DECIMAL(38,2)) AS valor_final_item,
    CAST(iv.n_vlr_desc AS DECIMAL(38,2)) AS valor_desconto_item,
    CAST(iv.n_qtd AS BIGINT) AS quantidade_itens
FROM itens_vendas_tmp as iv""")

spark.sql(query).show()

# Endereços Clientes

In [None]:

# Carregando o dataframe para verificar a estrutura das colunas e tipos de dados
dados = spark.read.format("parquet").load(os.path.abspath(r"C:\Users\gustavo.lopes\Documentos\GitHub\desafio_panvel-data_engineer\datalake\transient\ENDERECOS_CLIENTES"))

dados.printSchema()

dados.show(n=1, vertical=True)
dados.columns


In [None]:
# Criando a tabela temporária e realizando a consulta que depois usaremos para transformar a camada raw
dados.createOrReplaceTempView("enderecos_clientes_tmp")

query = formatar_sql(""" -- 'Tabela Endereços Clientes parquet'
SELECT 
    CAST(ec.v_id_cli AS STRING) AS codigo_cliente,
    CAST(ec.n_sq_end AS BIGINT) AS sequencia_endereco_cliente,
    COALESCE(date_format(ec.d_dt_exc, 'yyyy-MM-dd HH:mm:ss'), '') AS data_exclusao_endereco,
    CAST(ec.v_lcl AS STRING) AS cidade_endereco,
    CAST(ec.v_uf AS STRING) AS UF_endereco
FROM enderecos_clientes_tmp as ec""")

spark.sql(query).show()

# Clientes Opt

In [None]:


schema = StructType([
    StructField("v_id_cli", StringType(), True),
    StructField("b_push", StringType(), True),
    StructField("b_sms", StringType(), True),
    StructField("b_email", StringType(), True),
    StructField("b_call", StringType(), True)
])

# Carregando o dataframe para verificar a estrutura das colunas e tipos de dados
dados = spark.read.option("multiline","true").format("json").load(os.path.abspath(r"C:\Users\gustavo.lopes\Documentos\GitHub\desafio_panvel-data_engineer\datalake\transient\CLIENTES_OPT"))

dados.printSchema()

dados.show(n=2, vertical=True)
dados.columns


In [None]:
# Criando a tabela temporária e realizando a consulta que depois usaremos para transformar a camada raw
dados.createOrReplaceTempView("clientes_opt_tmp")

query = formatar_sql(""" -- 'Tabela Clientes Opt Json'
SELECT 
    CAST(co.v_id_cli AS STRING) AS codigo_cliente,
    CASE 
        WHEN co.b_push  = True THEN 'SIM'
        WHEN co.b_push  = False THEN 'NÃO'
        ELSE ''
    END AS autoriza_notificacao_push,
    CASE 
        WHEN co.b_sms  = True THEN 'SIM'
        WHEN co.b_sms  = False THEN 'NÃO'
        ELSE ''
    END AS autoriza_notificacao_sms,
    CASE 
        WHEN co.b_email  = True THEN 'SIM'
        WHEN co.b_email  = False THEN 'NÃO'
        ELSE ''
    END AS autoriza_notificacao_email,
    CASE 
        WHEN co.b_call  = True THEN 'SIM'
        WHEN co.b_call  = False THEN 'NÃO'
        ELSE ''
    END AS autoriza_notificacao_ligacao
FROM clientes_opt_tmp as co""")

spark.sql(query).show()

# Clientes

In [None]:

# Carregando o dataframe para verificar a estrutura das colunas e tipos de dados
dados = spark.read.format("parquet").load(os.path.abspath(os.path.join(os.getcwd(), "datalake/transient/CLIENTES")))

dados.printSchema()

dados.show(n=1, vertical=True)
dados.columns


In [None]:
# Criando a tabela temporária e realizando a consulta que depois usaremos para transformar a camada raw
dados.createOrReplaceTempView("clientes_tmp")

query = formatar_sql(""" -- 'Tabela Endereços Clientes parquet'
SELECT 
    CAST(c.v_id_cli AS STRING) AS codigo_cliente,
    COALESCE(CAST(c.d_dt_nasc AS DATE), '') AS data_nascimento_cliente,
    CASE 
        WHEN c.v_sx_cli  = 'F' THEN 'Feminino'
        WHEN c.v_sx_cli  = 'M' THEN 'Masculino'
        ELSE '' 
    END AS genero_biologico_cliente,
    CASE 
        WHEN c.n_est_cvl  = 1 THEN 'Solteiro'
        WHEN c.n_est_cvl  = 2 THEN 'Casado'
        WHEN c.n_est_cvl  = 3 THEN 'Viúvo'
        WHEN c.n_est_cvl  = 4 THEN 'Desquitado'
        WHEN c.n_est_cvl  = 5 THEN 'Divorciado'
        WHEN c.n_est_cvl  = 6 THEN 'Outros'
        ELSE ''
    END AS estado_civil_cliente
FROM clientes_tmp as c""")
spark.sql(query).show()

In [None]:
""" -- 'Tabela Endereços Clientes parquet'
SELECT 
    CAST(c.v_id_cli AS STRING) AS codigo_cliente,
    COALESCE(CAST(c.d_dt_nasc AS DATE), '') AS data_nascimento_cliente,
    CASE 
        WHEN c.v_sx_cli  = 'F' THEN 'Feminino'
        WHEN c.v_sx_cli  = 'M' THEN 'Masculino'
        ELSE '' 
    END AS genero_biologico_cliente,
    CASE 
        WHEN c.n_est_cvl  = 1 THEN 'Solteiro'
        WHEN c.n_est_cvl  = 2 THEN 'Casado'
        WHEN c.n_est_cvl  = 3 THEN 'Viúvo'
        WHEN c.n_est_cvl  = 4 THEN 'Desquitado'
        WHEN c.n_est_cvl  = 5 THEN 'Divorciado'
        WHEN c.n_est_cvl  = 6 THEN 'Outros'
        ELSE ''
    END AS estado_civil_cliente
FROM clientes_tmp as c"""

In [None]:
import os
import json
# Mapeia e carrega os parâmetros no dicionário json
file_json = os.path.abspath(os.path.join(os.getcwd(), "data_setup/config/spark_jobs.json"))

with open(file=file_json, mode="+rt") as dict_file:
    config = json.loads(dict_file.read())
    dict_file.close()
config["transient_config_dict"]["clientes"]