In [0]:
from pyspark.sql.functions import col, expr
import requests
import json
import time
from datetime import datetime
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging

In [0]:

logger = logging.getLogger(__name__)

# logger.debug("Teste de log DEBUG")
# logger.info("This is an info message.")
# logger.warning("This is a warning message.")
# logger.error("This is an error message.")
# logger.critical("This is a critical message.")

In [0]:

try:
    logger.info("1/4 - Buscando segredos do Databricks Secrets...")
    scope_name = "guilherme_ferreira_checkpoint2_lh"
    user = dbutils.secrets.get(scope=scope_name, key="db_user")
    password = dbutils.secrets.get(scope=scope_name, key="db_pass")
    host = dbutils.secrets.get(scope=scope_name, key="db_host")
    port = dbutils.secrets.get(scope=scope_name, key="db_port")
    logger.info("Segredos obtidos com sucesso.")

    logger.info("\n2/4 - Configurando a conexão JDBC...")
    jdbc_url = f"jdbc:sqlserver://{host}:{port};databaseName=AdventureWorks"
    connection_props = {
      "user": user,
      "password": password,
      "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "encrypt": "true",
      "trustServerCertificate": "true"
    }
    logger.info("Configuração da conexão finalizada.")

    spark.sql("USE CATALOG ted_dev")
    spark.sql("USE SCHEMA dev_guilherme_sobrinho")
    logger.info("Configuração de CATALOG e SCHEMA finalizada.")


except Exception as e:
    logger.error(f"\nOCORREU UM ERRO DURANTE A EXECUÇÃO:")
    logger.error(e)



In [0]:
# Função para ingestão com schema dinâmico do banco e sanitização de colunas


def sanitizar_colunas(df):
    colunas_corrigidas = [col(c).alias(c.replace(" ", "_")) for c in df.columns]
    return df.select(*colunas_corrigidas)

def ingerir_tabela_completa(nome_tabela_completo):
    nome_logico = nome_tabela_completo.split(".")[-1].lower()
    nome_delta = f"raw_database_{nome_logico}"
    logger.info(f"Ingerindo: {nome_tabela_completo} como {nome_delta}")
    query = f"(SELECT * FROM {nome_tabela_completo}) AS temp"

    # Lê dados do banco
    df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_props)

    df = sanitizar_colunas(df)

    # Grava a tabela no Delta Lake com o prefixo raw_database_<nome-da-tabela>
    df.write.format("delta").mode("overwrite").saveAsTable(nome_delta)

    logger.info(f"{nome_delta} salva com sucesso.")


# Lê o catálogo de tabelas disponíveis no banco
df_tabelas = spark.read.jdbc(
    url=jdbc_url,
    table="(SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE') AS temp",
    properties=connection_props
)

# Gera a coluna com o nome completo da tabela (schema.table)
df_tabelas_completo = (
    df_tabelas
    .withColumn("nome_completo", expr("concat(TABLE_SCHEMA, '.', TABLE_NAME)"))
    .selectExpr("nome_completo as tabela")
)

# Limita a 1000 tabelas para evitar sobrecarga no driver, o total de tabelas ingeridas é cerca de 70.
tabelas_para_ingestao = df_tabelas_completo.limit(1000).collect()
logger.info(f"Total de tabelas para ingestão: {len(tabelas_para_ingestao)}")

# Loop para ingerir as tabelas usando a função definida acima
for row in tabelas_para_ingestao:
    tabela_nome = row["tabela"]
    try:
        ingerir_tabela_completa(tabela_nome)
    except Exception as e:
        logger.error(f" Erro ao ingerir {tabela_nome}: {e}")



In [0]:
# Retry configurável por timeout
MAX_RETRIES = 3
LIMITS_REDUZIDOS = [100, 50, 20]  # Tentativas progressivas com limites menores

def ingerir_api(nome_tabela_logico, endpoint, limit=100):
    """
    Função para ingerir dados paginados da API e salvar no Delta Lake.
    """
    logger.info(f"\n Iniciando ingestão da API: {endpoint} → raw_api_{nome_tabela_logico}")
    url_base = f"http://18.209.218.63:8080/{endpoint}"
    api_user = dbutils.secrets.get(scope="guilherme_ferreira_checkpoint2_lh", key="api_user")
    api_password = dbutils.secrets.get(scope="guilherme_ferreira_checkpoint2_lh", key="api_password")
    auth = (api_user, api_password)
    total_registros = 0
    todos_dados = []

    for tentativa, limit_atual in enumerate(LIMITS_REDUZIDOS, start=1):
        logger.info(f"\n Tentativa {tentativa} com limit={limit_atual}")
        offset = 0
        todos_dados.clear()
        total_registros = 0

        try:
            while True:
                url = f"{url_base}?offset={offset}&limit={limit_atual}"
                try:
                    res = requests.get(url, auth=auth, timeout=10)
                    logger.info(f"Status da resposta: {res.status_code}, conteúdo: {len(res.content)} bytes")
                except requests.exceptions.Timeout:
                    logger.error(f"Timeout ao acessar {url}")
                    break 
                except requests.exceptions.RequestException as e:
                    logger.error(f"Erro na requisição: {e}")
                    break

                if res.status_code != 200:
                    logger.info(f"API respondeu com status {res.status_code}: {res.text}")
                    break

                json_data = res.json()
                dados = json_data.get("data", [])

                if not dados:
                    logger.warning("Nenhum dado retornado, encerrando a paginação.")
                    break

                todos_dados.extend(dados)
                total_registros += len(dados)
                logger.info(f"Recebidos {len(dados)} registros, total acumulado: {total_registros}")

                offset += limit_atual
                total_api = json_data.get("total", None)
                if total_api is not None and offset >= total_api:
                    break

                time.sleep(0.2)

            if total_registros > 0:
                break

        except Exception as e:
            logger.error(f" Erro ao processar tentativa {tentativa} do endpoint {endpoint}: {e}")

    if total_registros == 0:
        logger.warning("Nenhum dado foi obtido da API após todas as tentativas.")
        return

    try:
        spark = SparkSession.builder.getOrCreate()
        df = spark.createDataFrame(todos_dados)

        # df = spark.read.json(spark.sparkContext.parallelize([json.dumps(todos_dados)]))
        # df = df.selectExpr("explode(value) as row").select("row.*")

        nome_delta = f"ted_dev.dev_guilherme_sobrinho.raw_api_{nome_tabela_logico}"
        spark.sql("USE CATALOG ted_dev")
        spark.sql("USE SCHEMA dev_guilherme_sobrinho")
        logger.info("Configuração de CATALOG e SCHEMA finalizada.")
        df.write.format("delta").mode("overwrite").saveAsTable(nome_delta)
        logger.info(f" {nome_delta} salva com sucesso com {total_registros} registros.")

    except Exception as e:
        logger.error(f" Erro ao salvar dados no Delta Lake para {endpoint}: {e}")

    logger.info(f" Finalizado endpoint {endpoint} com {total_registros} registros.")


def ingerir_varios_endpoints(endpoints, max_workers=1):
    """
    Executa ingestão paralela com número controlado de workers para vários endpoints.
    """
    logger.info(f"\n Iniciando ingestão paralela de {len(endpoints)} endpoints com max_workers={max_workers}")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(ingerir_api, nome, endpoint): nome for nome, endpoint in endpoints
        }

        for future in as_completed(futures):
            nome = futures[future]
            try:
                future.result()
            except Exception as e:
                logger.error(f" Erro na tarefa {nome}: {e}")

    logger.info("Ingestão paralela finalizada.")


# Lista de endpoints (com nomes lógicos e nomes reais)
endpoints = [
    ("salesorderdetail", "SalesOrderDetail"),
    ("salesorderheader", "SalesOrderHeader"),
    ("purchaseorderdetail", "PurchaseOrderDetail"),
    ("purchaseorderheader", "PurchaseOrderHeader"),
]

# Executar com paralelismo ajustável (pode aumentar conforme o comportamento da API)
ingerir_varios_endpoints(endpoints, max_workers=2)  # cuidado pra não aumentar demais e causar timeout


In [0]:
# # Teste se a API está funcionando
# url = "http://18.209.218.63:8080/SalesOrderHeader?offset=0&limit=10"
# api_user = dbutils.secrets.get(scope="guilherme_ferreira_checkpoint2_lh", key="api_user")
# api_password = dbutils.secrets.get(scope="guilherme_ferreira_checkpoint2_lh", key="api_password")
# res = requests.get(url, auth=(api_user, api_password))
# logger.info(res.status_code)
# logger.info(res.text)


In [0]:
# Trecho para excluir tudo que eu ingeri


# schema = "dev_guilherme_sobrinho"

# # Listar todas as tabelas do schema
# tabelas = spark.sql(f"SHOW TABLES IN {schema}").filter("isTemporary = false").select("tableName").collect()

# for t in tabelas:
#     nome_tabela = t["tableName"]
#     full_table_name = f"{schema}.{nome_tabela}"
#     try:
#         logger.info(f"Apagando tabela {full_table_name} ...")
#         spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
#         logger.info(f"Tabela {full_table_name} apagada.")
#     except Exception as e:
#         logger.error(f"Erro ao apagar {full_table_name}: {e}")
