# üîå Ingest√£o de Dados: Origem -> Databricks (Deploy)

## üéØ Objetivo
Ingest√£o dos dados transacionais para o Data Lake em **Produ√ß√£o**.

## ‚öôÔ∏è Processo
1.  **Conex√£o Segura**: Key Vault.
2.  **Leitura JDBC**: Driver SQL.
3.  **Escrita Otimizada**: Delta Lake.

---

# **0 Configura√ß√µes Iniciais:**

## **0.1 Packages:** Importa√ß√£o das bibliotecas utilizadas no projeto

In [0]:
# Fun√ß√£o utilit√°ria para conex√£o segura
%load_ext autoreload
%autoreload 2

# Bibliotecas padr√£o
import json
import logging
import requests
import pandas as pd
import sys
import os

# Adiciona o diret√≥rio atual ao path para encontrar o m√≥dulo 'src'
sys.path.append(os.getcwd())
from src.ingestion.connectors import connect_jdbc, url_jdbc

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, avg, max, lit, udf, when, log, to_timestamp
)
from pyspark.sql.window import Window
from pyspark.sql.types import (
    DoubleType, StringType, IntegerType, FloatType,
    StructType, StructField, ArrayType, TimestampType
)
from datetime import datetime
from datetime import timedelta

# Configura√ß√µes do Spark
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [0]:
DATA_INICIAL = (datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=90)).strftime("%Y-%m-01")
ANO = str(int((datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=90)).strftime("%Y")) - 1)

## **0.2 Conex√£o:** Conectar via JDBC

In [0]:
# Configura√ß√£o de otimiza√ß√£o do Spark (Delta Lake)
# --- CONFIGURA√á√ïES GLOBAIS DE OTIMIZA√á√ÉO (BEST PRACTICES) ---
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")

In [0]:
# Fun√ß√£o utilit√°ria para conex√£o segura
# --- 1. CONFIGURA√á√ÉO DE CREDENCIAIS E CONEX√ÉO ---
# Tenta pegar do Key Vault (Databricks Secrets) ou usa valores manuais (placeholder)
try:
    tenant_id = dbutils.secrets.get(scope="ss-keyvault-dev", key="FBC-API-TENANT-ID")
    client_id = dbutils.secrets.get(scope="ss-keyvault-dev", key="FBC-API-CLIENT-ID")
    client_secret = dbutils.secrets.get(scope="ss-keyvault-dev", key="FBC-API-CLIENT-SECRET")
    print("‚úÖ Credenciais recuperadas do Key Vault com sucesso.")
except:
    print("‚ö†Ô∏è Segredos n√£o encontrados.")
scope = "https://database.windows.net/.default"

# Gera propriedades de conex√£o
connection_properties = connect_jdbc(tenant_id, client_id, client_secret, scope)
jdbc_url = url_jdbc(dbutils.secrets.get(scope="ss-keyvault-dev", key="FBC-BCO-HOSTNAME-BIP"), "BIP")
jdbc_url_ds = url_jdbc(dbutils.secrets.get(scope="ss-keyvault-dev", key="FBC-BCO-HOSTNAME-DSP"), "DSP")

# **1 Tabelas:**

## **1.1 - Lojas:**

In [0]:
query = '''
SELECT DISTINCT
       CONVERT(VARCHAR, CD.codigo_loja)                                  AS codigo_loja
      ,CONVERT(VARCHAR, P.BIC_KNA1_KUNNR)                                AS codigo_ponto_loja
      ,UPPER(P.[KNA1_NOME])                                              AS nome_loja
      ,P.BIC_KNA1_ZZDTABER_LOJA                                          AS data_abertura_ponto_loja
      ,P.BIC_KNA1_ZZDTFECH_LOJA                                          AS data_fechamento_ponto_loja
      ,UPPER(P.KNA1_REGIO)                                               AS estado_loja
      ,UPPER(P.KNA1_ORT01)                                               AS cidade_ponto_loja
      ,UPPER(P.KNA1_ORT02)                                               AS bairro_ponto_loja
      ,P.KNA1_ZLATITUDE                                                  AS latitude_loja
      ,P.KNA1_ZLONGITUDE                                                 AS longitude_loja
      ,IIF(P.VIP = 'VIP', 'SIM', 'NAO')                                  AS ponto_vip_loja
      ,UPPER(P.ZD_KNA1_NOME)                                             AS regional_global_loja
      ,UPPER(P.VDR_aCARTEIRAS_TITLE)                                     AS carteira_vd_loja
      ,UPPER(P.VDR_aREGIONAIS_TITLE)                                     AS regional_vd_loja
      ,UPPER(L.[BIC_KNA1_ZZDTABER_LOJA])                                 AS data_abertura_loja
      ,UPPER(L.[BIC_KNA1_ZZDTFECH_LOJA])                                 AS data_fechamento_loja
      ,UPPER(L.[BIC_CLASSE])                                             AS classe_loja
      ,UPPER(L.[BIC_STATUS])                                             AS status_loja
      ,UPPER(L.[ZSDT_STATUS_LOJA_DESCRICAO])                             AS status_micro_loja
      ,UPPER(L.[BIC_TIPO_GRUPO])                                         AS tipo_grupo_loja
      ,UPPER(TNLST_BEZEI)                                                AS tipo_loja
      ,UPPER(L.[TVK1T_VTEXT])                                            AS modelo_loja
      ,UPPER(L.[TVK2T_VTEXT])                                            AS versao_layout_loja
      ,UPPER(L.[BIC_CLUSTER_OPERACAO])                                   AS cluster_loja
      ,i.CD_SIT                                                          AS codigo_situacao_territorial
      ,UPPER(i.NM_SIT)                                                   AS descricao_situacao_territorial
      ,i.CD_UF                                                           AS codigo_uf
      ,UPPER(i.NM_UF)                                                    AS nome_uf
      ,UPPER(i.SIGLA_UF)                                                 AS sigla_uf
      ,i.CD_MUN                                                          AS codigo_municipio
      ,UPPER(i.NM_MUN)                                                   AS nome_municipio
      ,i.CD_DIST                                                         AS codigo_distrito
      ,UPPER(i.NM_DIST)                                                  AS nome_distrito
      ,i.CD_SUBDIST                                                      AS codigo_subdistrito
      ,UPPER(i.NM_SUBDIST)                                               AS nome_subdistrito
  FROM GLD.CMC_aPONTOS P
  LEFT JOIN GLD.CMC_aSAP_LOJAS L 
      ON BIC_CHKATIVA = 1 AND L.BIC_KNVP_KUNN2 = P.BIC_KNA1_KUNNR
  LEFT JOIN (
      SELECT 
          CONVERT(INT, COD_LOJA)                                         AS codigo_loja,
          CONVERT(INT, BIC_PTO)                                          AS codigo_ponto_loja
      FROM GLD.CMC_aRMR_LOJAS) CD 
      ON CD.codigo_ponto_loja = P.BIC_KNA1_KUNNR
  LEFT JOIN BRZ.IBG_gSETORES_LOJAS i
      ON CD.codigo_loja = i.COD_LOJA
'''
spark.read.jdbc(url=jdbc_url, table=f'''({query}) as tablename''', properties=connection_properties)\
    .write.mode("overwrite")\
    .format("delta")\
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true")\
    .saveAsTable("ds_dev.cvc_pred.lojas")

## **1.2 - Faturamento:**

### **1.2.1 - Faturamento Global:**

In [0]:
print(query)

In [0]:
query = f'''
SELECT 
    V.COD_PONTO_RCKY                      AS codigo_loja,
    V.DATA_VDA                            AS data,
    UPPER(NC.descricao_canal)             AS descricao_canal_venda,
    SUM(VLR_BRUT)                         AS valor_venda_bruta,
    SUM(V.VLR_LIQU)                       AS valor_venda_liquida
FROM GLD.CMC_vSELL_OUT AS V
LEFT JOIN 
(SELECT DISTINCT ID_CNVD             AS id_canal,
                BIC_GRUPO_CNVD       AS descricao_canal FROM SLV.RMR_cGRUPO_CANAL_VDA) AS NC 
ON NC.id_canal=V.CANAL
WHERE V.DATA_VDA  >= '{DATA_INICIAL}'
GROUP BY V.COD_PONTO_RCKY, V.DATA_VDA, V.CANAL,NC.descricao_canal
'''
spark.read.jdbc(url=jdbc_url, table=f'''({query}) as tablename''', properties=connection_properties)\
    .write.mode("overwrite") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name")\
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.faturamento")

In [0]:
query = f'''
WITH CALENDARIO AS (
  SELECT EXPLODE(SEQUENCE(TO_DATE('{DATA_INICIAL}'), CURRENT_DATE(), 
  INTERVAL 1 DAY)) AS data
),
FAT_SUBDISTRITO AS (SELECT
    f.data                                                                           AS data,
    l.tipo_loja                                                                      AS tipo_loja,
    l.codigo_subdistrito                                                             AS codigo_subdistrito,
    f.descricao_canal_venda                                                          AS descricao_canal_venda, 
    CASE WHEN MEDIAN(f.valor_venda_liquida)<0 THEN 0 ELSE MEDIAN(f.valor_venda_liquida)END  AS valor_venda_liquida
FROM ds_dev.cvc_pred.faturamento f
LEFT JOIN ds_dev.cvc_pred.lojas l
    ON l.codigo_loja=f.codigo_loja 
WHERE l.tipo_loja IS NOT NULL
GROUP BY
    l.tipo_loja,
    l.codigo_subdistrito,
    f.descricao_canal_venda,
    f.data),
FAT_DISTRITO AS (SELECT
    f.data                                                                           AS data,
    l.tipo_loja                                                                      AS tipo_loja,
    l.codigo_distrito                                                                AS codigo_distrito,
    f.descricao_canal_venda                                                          AS descricao_canal_venda, 
    CASE WHEN MEDIAN(f.valor_venda_liquida)<0 THEN 0 ELSE MEDIAN(f.valor_venda_liquida) END AS valor_venda_liquida
FROM ds_dev.cvc_pred.faturamento f
LEFT JOIN ds_dev.cvc_pred.lojas l
    ON l.codigo_loja=f.codigo_loja 
WHERE l.tipo_loja IS NOT NULL
GROUP BY 
    l.tipo_loja,
    l.codigo_distrito,
    f.descricao_canal_venda,
    f.data),
FAT_BAIRRO AS (SELECT
    f.data                                                                                  AS data,
    l.tipo_loja                                                                             AS tipo_loja,
    l.bairro_ponto_loja                                                                     AS bairro_ponto_loja,
    f.descricao_canal_venda                                                                 AS descricao_canal_venda, 
    CASE WHEN MEDIAN(f.valor_venda_liquida)<0 THEN 0 ELSE MEDIAN(f.valor_venda_liquida) END AS valor_venda_liquida
FROM ds_dev.cvc_pred.faturamento f
LEFT JOIN ds_dev.cvc_pred.lojas l
    ON l.codigo_loja=f.codigo_loja 
WHERE l.tipo_loja IS NOT NULL
GROUP BY 
    l.tipo_loja,
    l.bairro_ponto_loja,
    f.descricao_canal_venda,
    f.data),
 FAT_CIDADE AS (SELECT
    f.data                                                                                  AS data,
    l.tipo_loja                                                                             AS tipo_loja,
    l.cidade_ponto_loja                                                                     AS cidade_ponto_loja,
    f.descricao_canal_venda                                                                 AS descricao_canal_venda, 
    CASE WHEN MEDIAN(f.valor_venda_liquida)<0 THEN 0 ELSE MEDIAN(f.valor_venda_liquida) END AS valor_venda_liquida
FROM ds_dev.cvc_pred.faturamento f
LEFT JOIN ds_dev.cvc_pred.lojas l
    ON l.codigo_loja=f.codigo_loja 
WHERE l.tipo_loja IS NOT NULL
GROUP BY 
    l.tipo_loja,
    l.cidade_ponto_loja,
    f.descricao_canal_venda,
    f.data),   
FAT_UF AS (SELECT
    f.data                                                                                  AS data,
    l.tipo_loja                                                                             AS tipo_loja,
    l.estado_loja                                                                           AS estado_loja,
    f.descricao_canal_venda                                                                 AS descricao_canal_venda, 
    CASE WHEN MEDIAN(f.valor_venda_liquida)<0 THEN 0 ELSE MEDIAN(f.valor_venda_liquida) END AS valor_venda_liquida
FROM ds_dev.cvc_pred.faturamento f
LEFT JOIN ds_dev.cvc_pred.lojas l
    ON l.codigo_loja=f.codigo_loja 
WHERE l.tipo_loja IS NOT NULL
GROUP BY 
    l.tipo_loja,
    l.estado_loja,
    f.descricao_canal_venda,
    f.data)

SELECT
   d.data
  ,l.codigo_loja                                                                     AS codigo_loja
  ,c.descricao_canal_venda
  ,CASE WHEN f.valor_venda_liquida IS NOT NULL THEN f.valor_venda_liquida
        WHEN fsd.valor_venda_liquida IS NOT NULL THEN fsd.valor_venda_liquida
        WHEN fd.valor_venda_liquida IS NOT NULL THEN fd.valor_venda_liquida
        WHEN fb.valor_venda_liquida IS NOT NULL THEN fb.valor_venda_liquida
        WHEN fc.valor_venda_liquida IS NOT NULL THEN fc.valor_venda_liquida
        WHEN fu.valor_venda_liquida IS NOT NULL THEN fu.valor_venda_liquida END      AS valor_venda_liquida
FROM
  CALENDARIO d  
CROSS JOIN ds_dev.cvc_pred.lojas l 
CROSS JOIN (SELECT * FROM VALUES('E-COMMERCE'),('VENDA DIRETA'),('LOJA') AS (descricao_canal_venda)) c
LEFT JOIN ds_dev.cvc_pred.faturamento f  
    ON f.data=d.data 
    AND f.codigo_loja=l.codigo_loja 
    AND f.descricao_canal_venda=c.descricao_canal_venda 
    AND YEAR(f.data)>={ANO}
LEFT JOIN FAT_SUBDISTRITO fsd 
    ON fsd.data=d.data 
    AND fsd.tipo_loja=l.tipo_loja 
    AND fsd.codigo_subdistrito=l.codigo_subdistrito 
    AND fsd.descricao_canal_venda=c.descricao_canal_venda
LEFT JOIN FAT_DISTRITO fd 
    ON fd.data=d.data 
    AND fd.tipo_loja=l.tipo_loja
    AND fd.codigo_distrito=l.codigo_distrito 
    AND fd.descricao_canal_venda=c.descricao_canal_venda
LEFT JOIN FAT_BAIRRO fb 
    ON fb.data=d.data 
    AND fb.tipo_loja=l.tipo_loja 
    AND fb.bairro_ponto_loja=l.bairro_ponto_loja 
    AND fb.descricao_canal_venda=c.descricao_canal_venda
LEFT JOIN FAT_CIDADE fc 
    ON fc.data=d.data 
    AND fc.tipo_loja=l.tipo_loja 
    AND fc.cidade_ponto_loja=l.cidade_ponto_loja 
    AND fc.descricao_canal_venda=c.descricao_canal_venda
LEFT JOIN FAT_UF fu 
    ON fu.data=d.data 
    AND fu.tipo_loja=l.tipo_loja
    AND fu.estado_loja=l.estado_loja
    AND fu.descricao_canal_venda=c.descricao_canal_venda
WHERE l.codigo_loja is not null
ORDER BY 
  d.data, l.codigo_loja;
'''

spark.sql(query) \
    .write.mode("overwrite") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name")\
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.faturamento_preenchido")

### **1.2.2 - Faturamento Item:**

In [0]:
query = f'''
SELECT
    V.COD_PONTO_RCKY                      AS codigo_loja,
    TRY_CAST(V.COD_PROD AS INT)           AS codigo_produto,
    V.DATA_VDA                            AS data,
    UPPER(NC.descricao_canal)             AS descricao_canal_venda,
    SUM(V.VLR_BRUT)                       AS valor_venda_bruta,
    SUM(V.VLR_LIQU)                       AS valor_venda_liquida
FROM GLD.CMC_vSELL_OUT_ITENS V
LEFT JOIN (
    SELECT DISTINCT 
        ID_CNVD                           AS id_canal,
        BIC_GRUPO_CNVD                    AS descricao_canal
    FROM SLV.RMR_cGRUPO_CANAL_VDA
) NC ON NC.id_canal = V.COD_CANAL
WHERE TRY_CAST(V.COD_PROD AS INT) IS NOT NULL
AND V.DATA_VDA >= '{DATA_INICIAL}'
GROUP BY 
    V.COD_PONTO_RCKY, 
    V.DATA_VDA, 
    NC.descricao_canal,
    V.COD_PROD
'''
spark.read.jdbc(url=jdbc_url, table=f'''({query}) as tablename''', properties=connection_properties) \
    .write.mode("overwrite") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.faturamento_produto")

## **1.3 - Campanha:** 

In [0]:
query='''
SELECT 
        UPPER(CAMP_AGRUP.CAMPANHA_GRUPO)                                     AS descricao_campanha,
        TRY_CONVERT(INT, CAMP.MATNR)                                         AS codigo_produto,
        DATEADD(DAY, Numbers.Number, CONVERT(DATE, CAMP_AGRUP.BIC_DATAINI, 112))    AS data
    FROM [SLV].[SAP_pZSDT_CAMPANHALIN] AS CAMP
    LEFT JOIN GLD.BIC_pCAMPANHA_AGRUPADA AS CAMP_AGRUP
        ON CAMP.CODCAMP = CAMP_AGRUP.CODCAMP
    INNER JOIN (
    SELECT TOP 10000 ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) - 1 AS Number
    FROM (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t1(n)
    CROSS JOIN (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t2(n)
    CROSS JOIN (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t3(n)
    CROSS JOIN (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t4(n)
) AS Numbers
        ON DATEADD(DAY, Numbers.Number, CONVERT(DATE, CAMP_AGRUP.BIC_DATAINI, 112)) <= CONVERT(DATE, CAMP_AGRUP.BIC_DATAFIM, 112)
    WHERE CAMP.MATNR <> ' '
'''
spark.read.jdbc(url=jdbc_url, table=f'''({query}) AS tablename''', properties=connection_properties)\
     .write.mode("overwrite") \
     .format("delta") \
     .option("delta.columnMapping.mode", "name") \
     .option("overwriteSchema", "true") \
     .saveAsTable("ds_dev.cvc_pred.campanhas")

## **1.4 - Promo√ß√µes:** 

In [0]:
query='''
SELECT    
         p.id_promocao
        ,DATEADD(DAY, Numbers.Number, CONVERT(DATE, p.data_inicio_promocao, 112))           AS data
        ,p.nome_promocao                        
        ,p.data_inicio_promocao                 
        ,p.data_fim_promocao
        ,p.tipo_promocao
        ,p.prioridade_promocao
        ,p.tipo_acionamento_promocao
        ,p.tipo_acumulacao_promocao
        ,p.unica_promocao
        ,ip.codigo_produto
        ,ip.quantidade
        ,ip.rateio_promocao
        ,ip.valor_percentual_promocao
  FROM (SELECT ID_PROM                                                                    AS id_promocao
      ,ROW_NUMBER() OVER (PARTITION BY ID_PROM ORDER BY ULTATU_PROM DESC)                 AS ranking_atualizacao_promocao
      ,UPPER(DESC_PROM)                                                                   AS nome_promocao                                      
      ,DATINI_PROM                                                                        AS data_inicio_promocao                     
      ,DATFIN_PROM                                                                        AS data_fim_promocao
      ,TIPO_PROM                                                                          AS tipo_promocao
      ,PRIORIDADE_PROM                                                                    AS prioridade_promocao
      ,TIPOACIONA_PROM                                                                    AS tipo_acionamento_promocao
      ,TIPOACUM_PROM                                                                      AS tipo_acumulacao_promocao
      ,UNICA_PROM                                                                         AS unica_promocao
  FROM BRZ.RMR_pPROMOCOES  ) p
LEFT JOIN (SELECT  
       ID_PROM                                                                            AS id_promocao                 
      ,COD_PROD                                                                           AS codigo_produto
      ,QTDE_ITPR                                                                          AS quantidade
      ,ULTATU_ITPR                                                                        AS ultima_atualizacao
      ,ROW_NUMBER() OVER (PARTITION BY ID_PROM,COD_PROD ORDER BY ULTATU_ITPR DESC)        AS ranking_atualizacao_promocao_item
      ,RATEIO_ITPR                                                                        AS rateio_promocao
      ,DESCPERC_ITPR                                                                      AS valor_percentual_promocao
  FROM BRZ.RMR_pITENS_PROMOCAO
  WHERE COD_PROD IS NOT NULL AND ID_PROM IS NOT NULL) ip
    ON p.id_promocao = ip.id_promocao
INNER JOIN (
    SELECT TOP 10000 ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) - 1 AS Number
    FROM (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t1(n)
    CROSS JOIN (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t2(n)
    CROSS JOIN (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t3(n)
    CROSS JOIN (VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)) AS t4(n)
) AS Numbers
        ON DATEADD(DAY, Numbers.Number, CONVERT(DATE, p.data_inicio_promocao, 112)) <= CONVERT(DATE,p.data_fim_promocao, 112)
    WHERE p.ranking_atualizacao_promocao = 1
  AND ip.ranking_atualizacao_promocao_item = 1
  AND p.id_promocao IS NOT NULL
'''
spark.read.jdbc(url=jdbc_url, table=f'''({query}) AS tablename''', properties=connection_properties)\
    .write.mode("overwrite") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.promocoes")

## **1.5 - Material:**

In [0]:
query = ''' 
SELECT 
 TRY_CAST(MATNR AS INT)                                            AS  codigo_produto
,COALESCE(UPPER(MAKTX),'DESCONHECIDO')                             AS  descricao_material
,COALESCE(UPPER(LICENCIADOR),'DESCONHECIDO')                       AS  licenciador_material
,COALESCE(UPPER(MARCA),'DESCONHECIDO')                             AS  marca_material
,COALESCE(UPPER(SUBMARCA),'DESCONHECIDO')                          AS  submarca_material
,COALESCE(UPPER(FAMILIA),'DESCONHECIDO')                           AS  familia_material
,COALESCE(UPPER(SUBFAMILIA),'DESCONHECIDO')                        AS  subfamilia_material
,COALESCE(UPPER(SABOR),'DESCONHECIDO')                             AS  sabor_material
,COALESCE(UPPER(ACUCAR),'NAO')                                     AS  acucar_material
,COALESCE(UPPER(LACTOSE),'NAO')                                    AS  lactose_material
,COALESCE(UPPER(ALCOOL),'NAO')                                     AS  alcool_material
,COALESCE(UPPER(TEMA),'DESCONHECIDO')                              AS  tema_material
,COALESCE(UPPER(CATEGORIA),'DESCONHECIDO')                         AS  categoria_material
,COALESCE(UPPER(EMBALAGEM_APRESENTACAO),'DESCONHECIDO')            AS  embalagens_material
    FROM GLD.CCS_iMATERIAIS
WHERE  TRY_CAST(MATNR AS INT) IS NOT NULL
'''
spark.read.jdbc(url=jdbc_url, table=f'''({query}) AS tablename''', properties=connection_properties)\
    .write.mode("overwrite") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.materiais")

## **1.6 - Calend√°rio:**

In [0]:
query = '''
SELECT
        REPLACE(IDCLD, 'BR', '') AS codigo_municipio,
        UPPER(CALENDARIO) AS gentilico,
        DATA AS data,
        TPR AS tipo_feriado,
        UPPER(EVENTO) AS nome_feriado,
        RINIATV AS horario_de_inicio,
        RFIMATV AS horario_de_fim
     FROM dbo.dCALENDARIO_FNT
     WHERE LEFT(IDCLD, 2) = 'BR'
       AND ISNUMERIC(REPLACE(IDCLD, 'BR', '')) = 1
  '''

spark.read.jdbc(url=jdbc_url_ds, table=f'''({query}) AS tablename''', properties=connection_properties)\
.write.mode("overwrite") \
.format("delta") \
.option("delta.columnMapping.mode", "name") \
.option("overwriteSchema", "true") \
.saveAsTable("ds_dev.cvc_pred.calendario_eventos")

In [0]:
df_feriado=pd.read_excel(r'https://www.anbima.com.br/feriados/arqs/feriados_nacionais.xls').dropna()
df_feriado['Data']=pd.to_datetime(df_feriado.Data)
df_feriado.rename(columns={'Data':'data','Dia da Semana':'dia_da_semana','Feriado':'nome_feriado'},inplace=True)
df_feriado['nome_feriado']=df_feriado['nome_feriado'].str.upper()
df_feriado['dia_da_semana']=df_feriado['dia_da_semana'].str.upper()
spark.createDataFrame(df_feriado)\
    .write.mode("overwrite")\
    .format("delta")\
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true")\
    .saveAsTable("ds_dev.cvc_pred.calendario_anbima_feriados")

## **1.7 - Dados Publico:**

In [0]:
# URL da API
url="http://ipeadata.gov.br/api/odata4/Metadados"
# Cabe√ßalhos para simular um navegador
headers = {"User-Agent": "Mozilla/5.0"}
# Requisi√ß√£o dos dados
response = requests.get(url, headers=headers)
data = response.json()

# Extra√ß√£o dos valores
valores = data['value']
df_metadados=pd.DataFrame(valores)
df_metadados.columns=df_metadados.columns.str.lower()
spark.createDataFrame(pd.DataFrame(valores))\
    .write.mode("overwrite") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.metadados_ipea")

In [0]:
# Importa√ß√£o de bibliotecas essenciais
list_cod_font=['SGS12_IBCBR12','SGS12_IBCBRDESSAZ12','PMC12_VNAALIMN12','PMC12_VRAALIMN12','PMC12_VRSUPN12','PMC12_VRSUPNSA12',
'BM12_PIB12','DIMAC_INF1','DIMAC_INF2','DIMAC_INF3','DIMAC_INF4',
'DIMAC_INF5','DIMAC_INF6','IPP12_IPPC10ATIV12','PRECOS12_INPCAB12','PRECOS12_IPCAAB12',
'PNADC12_OCUPALOJ12','PNADC12_TDESOC12','CNC12_PEICRC12','FCESP12_IIC12','ABRAS12_INVNR12',
'GAC12_PPCTAXAC12','CNC12_ICF12','CNC12_ICFAB12','CNC12_ICFAC12','CNC12_ICFAJ12',
'IGP12_IGPMG12','IGP12_IPCMG12','PAN12_IVVRG12','FCESP12_IICF12','ANBIMA12_TJPOUP12']
def load_dados(codigo):
    # URL da API
    url=f"http://ipeadata.gov.br/api/odata4/ValoresSerie(SERCODIGO='{codigo}')"
    # Cabe√ßalhos para simular um navegador
    headers = {"User-Agent": "Mozilla/5.0"}
    # Requisi√ß√£o dos dados
    response=requests.get(url, headers=headers)
    data=response.json()
    # Extra√ß√£o dos valores
    valores = data['value']
    df=spark.createDataFrame(valores).withColumnsRenamed({ 'VALDATA':'data_raw','VALVALOR':'valor'})[['data_raw','valor']]
    from pyspark.sql.functions import to_timestamp
    # Convers√£o da coluna 'Data' para timestamp
    df_convertido = df.withColumn("data", to_timestamp('data_raw', "yyyy-MM-dd'T'HH:mm:ssXXX"))[['data','valor']]
    df_convertido=df_convertido.withColumn("sercodigo", lit(codigo))
    return df_convertido 
spark.sql('TRUNCATE TABLE ds_dev.cvc_pred.valores_ipea')
for index in list_cod_font:
    load_dados(codigo=index)\
    .write.mode("append") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.valores_ipea")

In [0]:
df = spark.sql('''
    SELECT CAST(data AS DATE) AS data, valor, sercodigo
    FROM ds_dev.cvc_pred.valores_ipea
''')
pivoted_df = df.groupBy("data").pivot("sercodigo", list_cod_font).sum("valor")
pivoted_df=pivoted_df.orderBy('data')
pivoted_df\
    .write.mode("append") \
    .format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ds_dev.cvc_pred.medidas_selecionadas_mercado")

In [0]:
# query = '''
# SELECT * FROM 
# CVC.BIP_vCORRELACOES_SPEARMAN_POR_LAG_POR_CANAL 
#   '''
# df = spark.read.jdbc(url=jdbc_url_ds, table=f'''({query}) AS tablename''', properties=connection_properties)
# df = df.toDF(*[c.lower() for c in df.columns])
# df.write.mode("append") \
#     .format("delta") \
#     .option("delta.columnMapping.mode", "name") \
#     .option("overwriteSchema", "true") \
#     .saveAsTable("ds_dev.cvc_pred.correlacao_spearman_por_lag_por_canal")

In [0]:
# spark.stop()