In [0]:
# 1. EXTRAÇÃO DOS API'S SALES_ORDER_HEADER, PURCHASE_ORDER_DETAIL E PURCHASE_ORDER_HEADER
# ========================================X=======================================

import requests
from requests.auth import HTTPBasicAuth
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
import pandas as pd
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configuração otimizada do Spark
spark = SparkSession.builder \
    .appName("LargeAPIExtraction") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

dbutils = DBUtils(spark)

def fetch_paginated_data(endpoint, auth):
    base_url = "http://18.209.218.63:8080"
    url = f"{base_url}{endpoint}"
    limit = 1000  # Máximo de registros por página (ajuste conforme a API permitir)
    offset = 0
    all_data = []

    while True:
        try:
            response = requests.get(
                url,
                auth=auth,
                params={"offset": offset, "limit": limit},  # Parâmetros chave!
                timeout=120
            )
            response.raise_for_status()
            data = response.json()
            
            if not data.get("data"):
                print("Fim dos dados alcançado.")
                break
                
            records = data["data"]
            all_data.extend(records)
            
            print(f"Offset {offset}: +{len(records)} registros (Total: {len(all_data)})")
            
            # Critério de parada: se a API retornar menos registros que o limite
            if len(records) < limit:
                break
                
            offset += limit  # Avança para o próximo lote

        except Exception as e:
            print(f"Erro no offset {offset}: {str(e)}")
            break
            
    return all_data

# 2. Processamento com schema evolution
def process_endpoint(table_suffix, endpoint, auth, schema):
    try:
        print(f"Iniciando extração para {table_suffix}")
        
        data = fetch_paginated_data(endpoint, auth)
        
        if not data:
            print(f"Nenhum dado retornado para {table_suffix}")
            return
            
        # Converter para DataFrame Spark
        pdf = pd.DataFrame(data)
        sdf = spark.createDataFrame(pdf)
        
        tabela_destino = f"{schema}.raw_api_{table_suffix}"
        
        # Configuração para tratamento de schema evolution
        (sdf.write
           .mode("overwrite")
           .format("delta")
           .option("mergeSchema", "true")  # Permite evolução do schema
           .option("overwriteSchema", "true")  # Sobrescreve o schema se necessário
           .option("optimizeWrite", "true")
           .saveAsTable(tabela_destino))
        
        # Verificação pós-escrita
        df_check = spark.read.table(tabela_destino)
        print(f"Concluído: {tabela_destino} com {df_check.count()} registros")
        print(f"Schema final:")
        df_check.printSchema()
        
    except Exception as e:
        print(f"Falha crítica no processamento de {table_suffix}: {str(e)}")
        raise  # Opcional: remover se quiser continuar após erros

# Configurações principais
auth = HTTPBasicAuth(
    dbutils.secrets.get(scope="sqlserver_scope", key="api_user"),
    dbutils.secrets.get(scope="sqlserver_scope", key="api_pass")
)

endpoints = {
    "sales_order_detail": "/SalesOrderDetail",
    "sales_order_header": "/SalesOrderHeader",
    "purchase_order_detail": "/PurchaseOrderDetail",
    "purchase_order_header": "/PurchaseOrderHeader"
}

schema = "ted_dev.dev_andre_silva"

# 3. Execução com paralelismo controlado
max_workers = 2  # Número de endpoints processados simultaneamente

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = {
        executor.submit(
            process_endpoint,
            table_suffix,
            endpoint,
            auth,
            schema
        ): table_suffix for table_suffix, endpoint in endpoints.items()
    }
    
    for future in as_completed(futures):
        table_suffix = futures[future]
        try:
            future.result()
        except Exception as e:
            print(f"Erro não tratado em {table_suffix}: {str(e)}")

Iniciando extração para sales_order_detail
Iniciando extração para sales_order_header
Offset 0: +1000 registros (Total: 1000)
Offset 0: +1000 registros (Total: 1000)
Offset 1000: +1000 registros (Total: 2000)
Offset 1000: +1000 registros (Total: 2000)
Offset 2000: +1000 registros (Total: 3000)
Offset 3000: +1000 registros (Total: 4000)
Offset 4000: +1000 registros (Total: 5000)
Offset 5000: +1000 registros (Total: 6000)
Offset 6000: +1000 registros (Total: 7000)
Offset 7000: +1000 registros (Total: 8000)
Offset 8000: +1000 registros (Total: 9000)
Offset 9000: +1000 registros (Total: 10000)
Offset 10000: +1000 registros (Total: 11000)
Offset 11000: +1000 registros (Total: 12000)
Offset 12000: +1000 registros (Total: 13000)
Offset 13000: +1000 registros (Total: 14000)
Offset 14000: +1000 registros (Total: 15000)
Offset 15000: +1000 registros (Total: 16000)
Offset 16000: +1000 registros (Total: 17000)
Offset 17000: +1000 registros (Total: 18000)
Offset 18000: +1000 registros (Total: 19000

In [0]:
# 2. EXTRAÇÃO APENAS DO SALES_ORDER_DETAIL
# ==========================X=======================

import requests
from requests.auth import HTTPBasicAuth
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# Spark otimizado
spark = SparkSession.builder \
    .appName("LargeAPIExtraction") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

dbutils = DBUtils(spark)

# Função de extração recursiva com divisão em blocos menores em caso de falha
def fetch_data_recursive(endpoint, auth, offset, limit, max_retries=3, min_limit=100):
    base_url = "http://18.209.218.63:8080"
    url = f"{base_url}{endpoint}"

    for attempt in range(max_retries):
        try:
            print(f" Offset {offset} | Limit {limit} | Tentativa {attempt + 1}")
            response = requests.get(
                url,
                auth=auth,
                params={"offset": offset, "limit": limit},
                timeout=120
            )
            response.raise_for_status()
            data = response.json()
            if not data.get("data"):
                return []
            return data["data"]
        except Exception as e:
            print(f" Erro no offset {offset}, tentativa {attempt + 1}: {str(e)}")
            time.sleep(2 ** attempt)

    # Se falhar, tenta dividir o bloco
    if limit > min_limit:
        print(f" Dividindo bloco de offset {offset} | Limit {limit}")
        mid = limit // 2
        first_half = fetch_data_recursive(endpoint, auth, offset, mid, max_retries, min_limit)
        second_half = fetch_data_recursive(endpoint, auth, offset + mid, limit - mid, max_retries, min_limit)
        return first_half + second_half
    else:
        print(f" Falha crítica em offset {offset} mesmo com limite mínimo ({min_limit}).")
        return []

# Função principal de paginação
def fetch_paginated_data(endpoint, auth, initial_limit=150000):
    offset = 0
    all_data = []

    while True:
        records = fetch_data_recursive(endpoint, auth, offset, initial_limit)
        if not records:
            break
        all_data.extend(records)
        print(f" Offset {offset}: +{len(records)} registros (Total acumulado: {len(all_data)})")
        if len(records) < initial_limit:
            break
        offset += initial_limit

    return all_data

# Processamento e gravação Delta
def process_endpoint(table_suffix, endpoint, auth, schema):
    try:
        print(f"\n🔄 Iniciando extração para {table_suffix}...\n")
        
        data = fetch_paginated_data(endpoint, auth)
        
        if not data:
            print(f"⚠️ Nenhum dado retornado para {table_suffix}")
            return
            
        pdf = pd.DataFrame(data)
        sdf = spark.createDataFrame(pdf)
        
        tabela_destino = f"{schema}.raw_api_{table_suffix}"
        
        sdf.write \
            .mode("overwrite") \
            .format("delta") \
            .option("mergeSchema", "true") \
            .option("overwriteSchema", "true") \
            .option("optimizeWrite", "true") \
            .saveAsTable(tabela_destino)
        
        df_check = spark.read.table(tabela_destino)
        print(f"✅ Concluído: {tabela_destino} com {df_check.count()} registros")
        df_check.printSchema()
        
    except Exception as e:
        print(f"❌ Falha crítica no processamento de {table_suffix}: {str(e)}")
        raise

# Configuração de autenticação
auth = HTTPBasicAuth(
    dbutils.secrets.get(scope="sqlserver_scope", key="api_user"),
    dbutils.secrets.get(scope="sqlserver_scope", key="api_pass")
)

# Endpoint específico
endpoints = {
    "sales_order_detail": "/SalesOrderDetail"
}

schema = "ted_dev.dev_andre_silva"

# Execução com ThreadPool (mesmo que tenha 1 endpoint, deixa pronto pra escalar)
max_workers = 2

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = {
        executor.submit(
            process_endpoint,
            table_suffix,
            endpoint,
            auth,
            schema
        ): table_suffix for table_suffix, endpoint in endpoints.items()
    }
    
    for future in as_completed(futures):
        table_suffix = futures[future]
        try:
            future.result()
        except Exception as e:
            print(f"❗ Erro não tratado em {table_suffix}: {str(e)}")




🔄 Iniciando extração para sales_order_detail...

 Offset 0 | Limit 150000 | Tentativa 1
 Erro no offset 0, tentativa 1: HTTPConnectionPool(host='18.209.218.63', port=8080): Read timed out. (read timeout=120)
 Offset 0 | Limit 150000 | Tentativa 2
 Erro no offset 0, tentativa 2: HTTPConnectionPool(host='18.209.218.63', port=8080): Read timed out. (read timeout=120)
 Offset 0 | Limit 150000 | Tentativa 3
 Offset 0: +121317 registros (Total acumulado: 121317)
✅ Concluído: ted_dev.dev_andre_silva.raw_api_sales_order_detail com 121317 registros
root
 |-- SalesOrderID: long (nullable = true)
 |-- SalesOrderDetailID: long (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- ProductID: long (nullable = true)
 |-- SpecialOfferID: long (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitPriceDiscount: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)
 |-- CarrierTrackingNum

In [0]:
%sql
-- 3. Verificar estrutura tabela (TESTE DE VALIDAÇÃO)
-- Exemplo:
SELECT * FROM ted_dev.dev_andre_silva.raw_api_sales_order_detail LIMIT 15;

SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate,CarrierTrackingNumber
43659,1,1,776,1,2024.994,0.0,2024.994,b207c96d-d9e6-402b-8470-2cc176c42283,2011-05-31T00:00:00,4911-403C-98
43659,2,3,777,1,2024.994,0.0,6074.982,7abb600d-1e77-41be-9fe5-b9142cfc08fa,2011-05-31T00:00:00,4911-403C-98
43659,3,1,778,1,2024.994,0.0,2024.994,475cf8c6-49f6-486e-b0ad-afc6a50cdd2f,2011-05-31T00:00:00,4911-403C-98
43659,4,1,771,1,2039.994,0.0,2039.994,04c4de91-5815-45d6-8670-f462719fbce3,2011-05-31T00:00:00,4911-403C-98
43659,5,1,772,1,2039.994,0.0,2039.994,5a74c7d2-e641-438e-a7ac-37bf23280301,2011-05-31T00:00:00,4911-403C-98
43659,6,2,773,1,2039.994,0.0,4079.988,ce472532-a4c0-45ba-816e-eefd3fd848b3,2011-05-31T00:00:00,4911-403C-98
43659,7,1,774,1,2039.994,0.0,2039.994,80667840-f962-4ee3-96e0-aeca108e0d4f,2011-05-31T00:00:00,4911-403C-98
43659,8,3,714,1,28.8404,0.0,86.5212,e9d54907-e7b7-4969-80d9-76ba69f8a836,2011-05-31T00:00:00,4911-403C-98
43659,9,1,716,1,28.8404,0.0,28.8404,aa542630-bdcd-4ce5-89a0-c1bf82747725,2011-05-31T00:00:00,4911-403C-98
43659,10,6,709,1,5.7,0.0,34.2,ac769034-3c2f-495c-a5a7-3b71cdb25d4e,2011-05-31T00:00:00,4911-403C-98


In [0]:
# 4. Verificar se o count() bate com o esperado (TESTE DE VALIDAÇÃO)
# ================================X=================================
for table in ["sales_order_detail", "sales_order_header", "purchase_order_detail", "purchase_order_header"]:
    df = spark.table(f"ted_dev.dev_andre_silva.raw_api_{table}")
    print(f"{table}: {df.count():,} registros")

sales_order_detail: 121,317 registros
sales_order_header: 31,465 registros
purchase_order_detail: 8,845 registros
purchase_order_header: 4,012 registros
