In [0]:
import requests
import pandas as pd
from datetime import timezone
from pyspark.sql import SparkSession, functions as F, types as T

spark = SparkSession.builder.getOrCreate()

# --- Função de conexão limpa ---
def connect_supabase(base_url: str, api_key: str):
    """
    Retorna uma sessão HTTP autenticada com o Supabase.
    Retorna None se não conseguir se conectar.
    """
    try:
        session = requests.Session()
        session.headers.update({
            "apikey": api_key,
            "Authorization": f"Bearer {api_key}",
            "Prefer": "count=exact"
        })
        resp = session.get(f"{base_url}/rest/v1", timeout=5)
        if resp.status_code in (200, 404):  # 404 é normal (endpoint sem tabela)
            return session
    except requests.exceptions.RequestException:
        pass
    return None


# --- Função principal ---
def upsert_sales_btc_from_supabase(base_url: str, api_key: str, page_size: int = 1000, incremental: bool = True):
    """
    Lê dados de sales_btc no Supabase com paginação e insere apenas novos registros no Delta.
    """
    CATALOG = "lakehouse_imersao_jornada"
    SCHEMA  = "postgres_public"
    FULL_TABLE = f"{CATALOG}.{SCHEMA}.sales_btc"

    # 1️⃣ Conectar
    session = connect_supabase(base_url, api_key)
    if session is None:
        print("❌ Falha na conexão com o Supabase.")
        return

    print("✅ Conexão estabelecida com o Supabase, carregando sales btc!!!!")

    # 2️⃣ Definir parâmetros
    params = {"select": "*", "order": "importado_em.asc,transaction_id.asc"}
    if incremental and spark.catalog.tableExists(FULL_TABLE):
        last_ts = spark.table(FULL_TABLE).select(F.max("importado_em").alias("mx")).collect()[0]["mx"]
        if last_ts:
            params["importado_em"] = f"gte.{last_ts.astimezone(timezone.utc).isoformat()}"

    # 3️⃣ Paginação
    all_rows = []
    start = 0
    url = f"{base_url}/rest/v1/sales_btc"

    while True:
        end = start + page_size - 1
        headers = {"Range": f"{start}-{end}"}
        resp = session.get(url, params=params, headers=headers, timeout=60)

        if resp.status_code not in (200, 206):
            print(f"⚠️ Erro {resp.status_code}: {resp.text[:200]}")
            break

        batch = resp.json()
        if not batch:
            break

        all_rows.extend(batch)
        if len(batch) < page_size:
            break
        start += page_size

    if not all_rows:
        print("⚙️ Nenhum dado novo retornado do Supabase.")
        return

    # 4️⃣ Converter para Spark
    pdf = pd.DataFrame(all_rows)
    df = spark.createDataFrame(pdf) \
        .withColumn("data_hora", F.to_timestamp("data_hora")) \
        .withColumn("importado_em", F.to_timestamp("importado_em")) \
        .withColumn("quantidade", F.col("quantidade").cast(T.DecimalType(10, 4)))

    # 5️⃣ Inserir apenas novos registros
    if not spark.catalog.tableExists(FULL_TABLE):
        df.write.mode("overwrite").saveAsTable(FULL_TABLE)
        print(f"🆕 Tabela criada e {df.count()} linhas inseridas em {FULL_TABLE}.")
        return

    df_existing = spark.table(FULL_TABLE).select("transaction_id")
    df_new_only = df.join(df_existing, on="transaction_id", how="left_anti")

    new_count = df_new_only.count()
    if new_count == 0:
        print("✔️ Nenhum novo registro para inserir.")
    else:
        df_new_only.write.mode("append").saveAsTable(FULL_TABLE)
        print(f"✅ {new_count} novos registros inseridos em {FULL_TABLE}.")

def upsert_sales_commodities_from_supabase(base_url: str, api_key: str, page_size: int = 1000, incremental: bool = True):
    """
    Lê dados de sales_commodities no Supabase (REST, paginado) e insere apenas novos registros no Delta.
    Destino: lakehouse_imersao_jornada.postgres_public.sales_commodities
    Requer a função connect_supabase(base_url, api_key) definida previamente.
    """
    CATALOG = "lakehouse_imersao_jornada"
    SCHEMA  = "postgres_public"
    FULL_TABLE = f"{CATALOG}.{SCHEMA}.sales_commodities"

    # 1) Conectar (sessão autenticada)
    session = connect_supabase(base_url, api_key)
    if session is None:
        print("❌ Falha na conexão com o Supabase.")
        return
    print("✅ Conexão estabelecida com o Supabase, carregando sales commodities!!!")

    # 2) Parâmetros (incremental opcional)
    params = {"select": "*", "order": "importado_em.asc,transaction_id.asc"}
    if incremental and spark.catalog.tableExists(FULL_TABLE):
        last_ts = spark.table(FULL_TABLE).select(F.max("importado_em").alias("mx")).collect()[0]["mx"]
        if last_ts:
            params["importado_em"] = f"gte.{last_ts.astimezone(timezone.utc).isoformat()}"

    # 3) Paginação via header Range
    all_rows = []
    start = 0
    url = f"{base_url}/rest/v1/sales_commodities"

    while True:
        end = start + page_size - 1
        headers = {"Range": f"{start}-{end}"}
        resp = session.get(url, params=params, headers=headers, timeout=60)

        if resp.status_code not in (200, 206):
            print(f"⚠️ Erro {resp.status_code}: {resp.text[:200]}")
            break

        batch = resp.json()
        if not batch:
            break

        all_rows.extend(batch)
        if len(batch) < page_size:
            break
        start += page_size

    if not all_rows:
        print("⚙️ Nenhum dado novo retornado do Supabase.")
        return

    # 4) Pandas -> Spark + casts
    pdf = pd.DataFrame(all_rows)
    df = spark.createDataFrame(pdf) \
        .withColumn("data_hora", F.to_timestamp("data_hora")) \
        .withColumn("importado_em", F.to_timestamp("importado_em")) \
        .withColumn("quantidade", F.col("quantidade").cast(T.DecimalType(10, 4)))

    # 5) Inserir apenas novos (anti-join pela PK)
    if not spark.catalog.tableExists(FULL_TABLE):
        df.write.mode("overwrite").saveAsTable(FULL_TABLE)
        print(f"🆕 Tabela criada e {df.count()} linhas inseridas em {FULL_TABLE}.")
        return

    df_existing = spark.table(FULL_TABLE).select("transaction_id")
    df_new_only = df.join(df_existing, on="transaction_id", how="left_anti")

    new_count = df_new_only.count()
    if new_count == 0:
        print("✔️ Nenhum novo registro para inserir.")
    else:
        df_new_only.write.mode("append").saveAsTable(FULL_TABLE)
        print(f"✅ {new_count} novos registros inseridos em {FULL_TABLE}.")



In [0]:
db_url = dbutils.secrets.get(scope="credencial-supabase", key="url-supabase")
db_api = dbutils.secrets.get(scope="credencial-supabase", key="api-supabase")

upsert_sales_btc_from_supabase(
    base_url=db_url,
    api_key=db_api,
    page_size=1000,
    incremental=True
)

upsert_sales_commodities_from_supabase(
    base_url=db_url,
    api_key=db_api,
    page_size=1000,
    incremental=True
)
