<a href="https://colab.research.google.com/github/CidClayQuirino/api_als/blob/main/api_als.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:

import os
import time
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List

# ============================================
# CONFIGURAÇÕES DA API
# ============================================

LOGIN_URL = "https://integration.s360web.com/api/login"
API_BASE = "https://integration.s360web.com/api/v1"

# Use variáveis de ambiente se preferir não hardcodar credenciais
USER = os.getenv("S360_USER", "integracao.komatsu")
PASSWORD = os.getenv("S360_PASSWORD", "o8sqfl5h")

# Pasta local onde os CSV serão salvos (Ex.: Google Colab usa /content/)
PASTA_SAIDA = "/content/s360_export/"

# Período de coleta: últimos N dias (1 ano = 365)
PERIODO_DIAS = 365

# Timeout (conexão, leitura) e tentativas
TIMEOUT = (10, 60)     # (connect, read) em segundos
MAX_RETRIES = 3
RETRY_BACKOFF = 2      # base para backoff exponencial: 2, 4, 8...

# Rate limit entre chamadas (se necessário pela API)
RATE_LIMIT_SLEEP = 2   # segundos

# Configuração de CSV (mantenha ',' se preferir padrão internacional)
CSV_SEPARADOR = ","    # em pt-BR, alguns preferem ';'
CSV_ENCODING = "utf-8-sig"  # BOM para Excel (Windows)


# ============================================
# UTILITÁRIOS
# ============================================

def log_resp(r: requests.Response, prefix: str = "") -> None:
    """Loga informações úteis da resposta HTTP para diagnóstico."""
    ct = r.headers.get("Content-Type", "")
    print(f"{prefix}Status: {r.status_code} {r.reason} | Content-Type: {ct}")
    body = r.text or ""
    print(f"{prefix}Body (até 1000 chars): {body[:1000]}")
    # Cabeçalhos comumente relevantes
    importantes = ['date', 'server', 'x-request-id', 'x-correlation-id']
    print(f"{prefix}Headers: { {k: v for k, v in r.headers.items() if k.lower() in importantes} }")


def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)


# ============================================
# AUTENTICAÇÃO
# ============================================

def autenticar(session: Optional[requests.Session] = None) -> str:
    """
    Autentica e retorna o token. Faz retries para status transitórios (429/5xx)
    e detalha qualquer falha. Tenta chaves alternativas no payload se 400/401.
    """
    sess = session or requests.Session()

    payload = {"login": USER, "password": PASSWORD}
    alt_payloads = [
        {"username": USER, "password": PASSWORD},
        {"user": USER, "password": PASSWORD},
        {"email": USER, "password": PASSWORD},
    ]

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            r = sess.post(LOGIN_URL, json=payload, timeout=TIMEOUT)
        except requests.exceptions.RequestException as e:
            print(f"[LOGIN] Falha de rede na tentativa {attempt}/{MAX_RETRIES}: {e}")
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        if r.status_code == 200:
            try:
                data = r.json()
            except ValueError:
                log_resp(r, prefix="[LOGIN] ")
                raise RuntimeError("[LOGIN] Resposta não-JSON, impossível obter token.")

            token = data.get("token") or data.get("access_token") or data.get("accessToken") or data.get("jwt")
            if not token:
                print("[LOGIN] Token não encontrado nas chaves conhecidas. Chaves recebidas:", list(data.keys()))
                log_resp(r, prefix="[LOGIN] ")
                raise RuntimeError("[LOGIN] Token ausente na resposta.")
            return token

        log_resp(r, prefix=f"[LOGIN] Tentativa {attempt}/{MAX_RETRIES} ")
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        if r.status_code in (400, 401) and attempt == 1:
            print("[LOGIN] Tentando variações de payload (username/email)...")
            for alt in alt_payloads:
                try:
                    r2 = sess.post(LOGIN_URL, json=alt, timeout=TIMEOUT)
                except requests.exceptions.RequestException as e:
                    print(f"[LOGIN] Falha de rede com payload alternativo: {e}")
                    continue
                if r2.status_code == 200:
                    try:
                        data2 = r2.json()
                    except ValueError:
                        log_resp(r2, prefix="[LOGIN/ALT] ")
                        raise RuntimeError("[LOGIN/ALT] Resposta não-JSON.")
                    token = data2.get("token") or data2.get("access_token") or data2.get("accessToken") or data2.get("jwt")
                    if token:
                        print("[LOGIN] Autenticado com payload alternativo:", alt.keys())
                        return token
                    else:
                        log_resp(r2, prefix="[LOGIN/ALT] ")
                        raise RuntimeError("[LOGIN/ALT] Token ausente.")
                else:
                    log_resp(r2, prefix="[LOGIN/ALT] ")

        raise RuntimeError(f"[LOGIN] Erro ao autenticar. Status={r.status_code}. Veja logs acima.")

    raise RuntimeError("[LOGIN] Não foi possível autenticar após múltiplas tentativas.")


# ============================================
# CHAMADA GENÉRICA DE API COM RETRY
# ============================================

def chamar_api(method: str, endpoint: str, token: str,
               params: Optional[Dict[str, Any]] = None,
               body: Optional[Dict[str, Any]] = None,
               session: Optional[requests.Session] = None) -> Optional[Dict[str, Any]]:
    """
    Faz chamada GET/POST com token, timeout, retry para erros transitórios,
    e logs detalhados de falha.
    """
    sess = session or requests.Session()
    headers = {"Authorization": f"Bearer {token}"}
    url = f"{API_BASE.rstrip('/')}/{endpoint.lstrip('/')}"

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            if method.upper() == "GET":
                r = sess.get(url, headers=headers, params=params, timeout=TIMEOUT)
            else:
                r = sess.post(url, headers=headers, json=body, timeout=TIMEOUT)
        except requests.exceptions.RequestException as e:
            print(f"[API] Falha de rede em {method} {url} (tentativa {attempt}/{MAX_RETRIES}): {e}")
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        if r.status_code == 200:
            try:
                data = r.json()
            except ValueError:
                log_resp(r, prefix="[API] ")
                print("[API] Resposta não-JSON; retornando None.")
                return None
            time.sleep(RATE_LIMIT_SLEEP)
            return data

        log_resp(r, prefix=f"[API] {method} {endpoint} tentativa {attempt}/{MAX_RETRIES} ")

        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        return None

    print(f"[API] Desistindo de {method} {endpoint} após {MAX_RETRIES} tentativas.")
    return None


# ============================================
# COLETA DE EQUIPAMENTOS
# ============================================

def coletar_equipamentos(token: str, session: Optional[requests.Session] = None) -> Optional[List[Dict[str, Any]]]:
    """
    Retorna lista de equipamentos ativos. Se a API devolver objeto, tenta extrair 'items'.
    """
    params = {"ativo": "true"}
    dados = chamar_api("GET", "equipamento/list", token, params=params, session=session)
    if not dados:
        return None

    if isinstance(dados, list):
        return dados

    if isinstance(dados, dict) and "items" in dados:
        return dados["items"]

    return [dados]


# ============================================
# COLETA DE AMOSTRAS (sampleResult/search) COM PERÍODO E PAGINAÇÃO
# ============================================

def coletar_amostras(token: str, session: Optional[requests.Session] = None) -> List[Dict[str, Any]]:
    """
    Coleta amostras dos últimos PERIODO_DIAS até a data atual, varrendo todas as páginas.
    Retorna uma lista consolidada de itens.
    """
    data_fim = datetime.now().strftime("%Y-%m-%d")
    data_inicio = (datetime.now() - timedelta(days=PERIODO_DIAS)).strftime("%Y-%m-%d")

    print(f"[Amostras] Coletando dados de {data_inicio} até {data_fim}")

    page = 1
    page_size = 500
    todos_itens: List[Dict[str, Any]] = []

    while True:
        body = {
            "readingStatus": None,   # busca tudo
            "markRead": True,        # marcar como lido
            "startDate": data_inicio,
            "endDate": data_fim,
            "page": page,
            "pageSize": page_size
        }

        dados = chamar_api("POST", "sampleResult/search", token, body=body, session=session)
        if not dados:
            # Sem dados ou erro não transitório
            break

        # Normalmente a API retorna {items: [...], total: X, page: Y}
        if isinstance(dados, dict) and isinstance(dados.get("items"), list):
            itens = dados["items"]
        elif isinstance(dados, list):
            itens = dados
        else:
            itens = [dados]

        todos_itens.extend(itens)
        print(f"[Amostras] Página {page}: coletados {len(itens)} itens (total acumulado: {len(todos_itens)})")

        # Heurística de término: se retornou menos que page_size, provavelmente acabou
        if len(itens) < page_size:
            break

        page += 1
        time.sleep(1)  # pequena pausa entre páginas

    return todos_itens


# ============================================
# SALVAR CSV
# ============================================

def salvar_csv(dados: Any, nome: str) -> None:
    """
    Salva lista/dict em CSV com UTF-8 na pasta configurada.
    """
    ensure_dir(PASTA_SAIDA)
    caminho = os.path.join(PASTA_SAIDA, nome)

    if isinstance(dados, list):
        df = pd.DataFrame(dados)
    elif isinstance(dados, dict):
        df = pd.DataFrame([dados])
    else:
        raise ValueError("Formato de dados não suportado para CSV.")

    # Exporta CSV (excel-friendly)
    df.to_csv(caminho, index=False, encoding=CSV_ENCODING, sep=CSV_SEPARADOR)
    print(f"Arquivo salvo: {caminho}")


# ============================================
# EXECUÇÃO PRINCIPAL
# ============================================

def executar_rotina():
    print("Iniciando coleta:", datetime.now())

    session = requests.Session()
    token = autenticar(session=session)

    # Equipamentos
    equipamentos = coletar_equipamentos(token, session=session)
    if equipamentos:
        salvar_csv(equipamentos, f"equipamentos_{datetime.now().date()}.csv")
    else:
        print("[Equipamentos] Nenhum dado retornado.")

    # Amostras (todas as páginas dos últimos PERIODO_DIAS)
    amostras = coletar_amostras(token, session=session)
    if amostras:
        salvar_csv(amostras, f"amostras_{datetime.now().date()}.csv")
    else:
        print("[Amostras] Nenhum dado retornado.")

    print("Finalizado:", datetime.now())


# ============================================
# MAIN
# ============================================

if __name__ == "__main__":
    ensure_dir(PASTA_SAIDA)
    try:
        executar_rotina()
    except Exception as e:
        print("Falha na execução:", e)


Iniciando coleta: 2026-01-13 16:36:45.787751
[LOGIN] Tentativa 1/3 Status: 400 Bad Request | Content-Type: 
[LOGIN] Tentativa 1/3 Body (até 1000 chars): 
[LOGIN] Tentativa 1/3 Headers: {'Date': 'Tue, 13 Jan 2026 16:36:46 GMT'}
[LOGIN] Tentando variações de payload (username/email)...
[LOGIN] Autenticado com payload alternativo: dict_keys(['username', 'password'])
[API] GET equipamento/list tentativa 1/3 Status: 405 Method Not Allowed | Content-Type: application/xml;charset=UTF-8
[API] GET equipamento/list tentativa 1/3 Body (até 1000 chars): <Map><timestamp>1768322207225</timestamp><status>405</status><error>Method Not Allowed</error><path>/api/v1/equipamento/list</path></Map>
[API] GET equipamento/list tentativa 1/3 Headers: {'Date': 'Tue, 13 Jan 2026 16:36:46 GMT'}
[Equipamentos] Nenhum dado retornado.
[Amostras] Coletando dados de 2025-01-13 até 2026-01-13
[Amostras] Página 1: coletados 1 itens (total acumulado: 1)
Arquivo salvo: /content/s360_export/amostras_2026-01-13.csv
Finaliza

In [4]:

import os
import time
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Tuple

# ============================================
# CONFIGURAÇÕES DA API
# ============================================

LOGIN_URL = "https://integration.s360web.com/api/login"
API_BASE = "https://integration.s360web.com/api/v1"

USER = os.getenv("S360_USER", "integracao.komatsu")
PASSWORD = os.getenv("S360_PASSWORD", "o8sqfl5h")

PASTA_SAIDA = "/content/s360_export/"
PERIODO_DIAS = 365         # backfill de 1 ano
CHUNK_DIAS = 31            # janelas mensais para cobrir 100%
PAGE_SIZE = 500            # tamanhos grandes (ver limite no portal)
TIMEOUT = (10, 60)         # (connect, read)
MAX_RETRIES = 3
RETRY_BACKOFF = 2          # base exponencial
SLEEP_ENTRE_CHAMADAS = 25  # orientação S360

CSV_SEPARADOR = ","
CSV_ENCODING = "utf-8-sig"

# ============================================
# UTILITÁRIOS
# ============================================

def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)

def log_resp(r: requests.Response, prefix: str = "") -> None:
    ct = r.headers.get("Content-Type", "")
    print(f"{prefix}Status: {r.status_code} {r.reason} | Content-Type: {ct}")
    body = r.text or ""
    print(f"{prefix}Body (até 1000 chars): {body[:1000]}")
    importantes = ['date', 'server', 'x-request-id', 'x-correlation-id']
    print(f"{prefix}Headers: { {k: v for k, v in r.headers.items() if k.lower() in importantes} }")

# ============================================
# AUTENTICAÇÃO
# ============================================

def autenticar(session: Optional[requests.Session] = None) -> str:
    sess = session or requests.Session()
    payloads = [
        {"login": USER, "password": PASSWORD},    # já funciona no seu ambiente
        {"username": USER, "password": PASSWORD}, # variação vista no dev portal
        {"email": USER, "password": PASSWORD},
    ]
    for attempt in range(1, MAX_RETRIES + 1):
        for payload in payloads:
            try:
                r = sess.post(LOGIN_URL, json=payload, timeout=TIMEOUT)
            except requests.exceptions.RequestException as e:
                print(f"[LOGIN] Falha de rede {attempt}/{MAX_RETRIES}: {e}")
                time.sleep(RETRY_BACKOFF ** attempt)
                continue

            if r.status_code == 200:
                try:
                    data = r.json()
                except ValueError:
                    log_resp(r, prefix="[LOGIN] ")
                    raise RuntimeError("[LOGIN] Resposta não-JSON.")
                token = data.get("token") or data.get("access_token") or data.get("accessToken") or data.get("jwt")
                if token:
                    print("[LOGIN] Autenticado com payload:", list(payload.keys()))
                    return token
                log_resp(r, prefix="[LOGIN] ")
                raise RuntimeError("[LOGIN] Token ausente na resposta.")
            else:
                log_resp(r, prefix="[LOGIN] ")

        # 429/5xx: backoff
        time.sleep(RETRY_BACKOFF ** attempt)

    raise RuntimeError("[LOGIN] Não foi possível autenticar após múltiplas tentativas.")

# ============================================
# CHAMADAS DE API (GET/POST) COM RETRY
# ============================================

def chamar_api(method: str, endpoint: str, token: str,
               params: Optional[Dict[str, Any]] = None,
               body: Optional[Dict[str, Any]] = None,
               session: Optional[requests.Session] = None) -> Optional[Dict[str, Any]]:
    sess = session or requests.Session()
    headers = {"Authorization": f"Bearer {token}"}
    url = f"{API_BASE.rstrip('/')}/{endpoint.lstrip('/')}"

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            if method.upper() == "GET":
                r = sess.get(url, headers=headers, params=params, timeout=TIMEOUT)
            else:
                r = sess.post(url, headers=headers, json=body, timeout=TIMEOUT)
        except requests.exceptions.RequestException as e:
            print(f"[API] Falha de rede em {method} {url} ({attempt}/{MAX_RETRIES}): {e}")
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        if r.status_code == 200:
            try:
                data = r.json()
            except ValueError:
                log_resp(r, prefix="[API] ")
                print("[API] Resposta não-JSON; retornando None.")
                return None
            # sleep entre chamadas (orientação S360)
            time.sleep(SLEEP_ENTRE_CHAMADAS)
            return data

        log_resp(r, prefix=f"[API] {method} {endpoint} tentativa {attempt}/{MAX_RETRIES} ")
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        return None

    print(f"[API] Desistindo de {method} {endpoint} após {MAX_RETRIES} tentativas.")
    return None

# ============================================
# EQUIPAMENTOS (ativos)
# ============================================

def coletar_equipamentos(token: str, session: Optional[requests.Session] = None) -> Optional[List[Dict[str, Any]]]:
    params = {"ativo": "true"}
    dados = chamar_api("GET", "equipamento/list", token, params=params, session=session)
    if not dados:
        return None
    if isinstance(dados, list):
        return dados
    if isinstance(dados, dict) and "items" in dados:
        return dados["items"]
    return [dados]

# ============================================
# AMOSTRAS: RESULTADOS (sampleResult/search) com CHUNK + paginação
# ============================================

def gerar_janelas_data(periodo_dias: int, chunk_dias: int) -> List[Tuple[str, str]]:
    """Gera janelas [startDate, endDate] em ISO YYYY-MM-DD, do passado até hoje."""
    hoje = datetime.now().date()
    inicio = hoje - timedelta(days=periodo_dias)
    janelas = []
    cursor = inicio
    while cursor <= hoje:
        fim_chunk = min(cursor + timedelta(days=chunk_dias - 1), hoje)
        janelas.append((cursor.strftime("%Y-%m-%d"), fim_chunk.strftime("%Y-%m-%d")))
        cursor = fim_chunk + timedelta(days=1)
    return janelas

def coletar_resultados(token: str, periodo_dias: int = PERIODO_DIAS,
                       chunk_dias: int = CHUNK_DIAS,
                       session: Optional[requests.Session] = None) -> List[Dict[str, Any]]:
    """
    Coleta 100% dos resultados em janelas de 'chunk_dias', paginando até o fim.
    Usa readingStatus=null (com filtro de data) e markRead=null (não marca como lidas).
    """
    todos: List[Dict[str, Any]] = []
    janelas = gerar_janelas_data(periodo_dias, chunk_dias)
    print(f"[Resultados] Janelas geradas: {len(janelas)} (de {janelas[0][0]} até {janelas[-1][1]})")

    for (data_inicio, data_fim) in janelas:
        page = 1
        print(f"[Resultados] Janela {data_inicio} → {data_fim}")
        while True:
            body = {
                "readingStatus": None,   # null: todos (com filtro de data)
                "markRead": None,        # null: não marcar como lidas no backfill
                "startDate": data_inicio,
                "endDate": data_fim,
                "page": page,
                "pageSize": PAGE_SIZE,
            }
            dados = chamar_api("POST", "sampleResult/search", token, body=body, session=session)
            if not dados:
                print(f"[Resultados] Sem dados ou falha na janela {data_inicio} → {data_fim}, página {page}.")
                break

            if isinstance(dados, dict) and isinstance(dados.get("items"), list):
                itens = dados["items"]
            elif isinstance(dados, list):
                itens = dados
            else:
                itens = [dados]

            qtd = len(itens)
            todos.extend(itens)
            print(f"[Resultados] {data_inicio} → {data_fim} | página {page} | itens {qtd} | acumulado {len(todos)}")

            if qtd < PAGE_SIZE:
                break  # última página da janela
            page += 1

    return todos

# ============================================
# SALVAR CSV
# ============================================

def salvar_csv(dados: Any, nome: str) -> None:
    ensure_dir(PASTA_SAIDA)
    caminho = os.path.join(PASTA_SAIDA, nome)
    if isinstance(dados, list):
        df = pd.DataFrame(dados)
    elif isinstance(dados, dict):
        df = pd.DataFrame([dados])
    else:
        raise ValueError("Formato de dados não suportado para CSV.")
    df.to_csv(caminho, index=False, encoding=CSV_ENCODING, sep=CSV_SEPARADOR)
    print(f"Arquivo salvo: {caminho}")

# ============================================
# EXECUÇÃO PRINCIPAL
# ============================================

def executar_rotina():
    print("Iniciando coleta:", datetime.now())
    session = requests.Session()
    token = autenticar(session=session)

    # Equipamentos (ativos)
    equipamentos = coletar_equipamentos(token, session=session)
    if equipamentos:
        salvar_csv(equipamentos, f"equipamentos_{datetime.now().date()}.csv")
    else:
        print("[Equipamentos] Nenhum dado retornado.")

    # Resultados de amostra (100% do último ano em janelas + paginação)
    resultados = coletar_resultados(token, periodo_dias=PERIODO_DIAS, chunk_dias=CHUNK_DIAS, session=session)
    if resultados:
        salvar_csv(resultados, f"resultados_{datetime.now().date()}.csv")
    else:
        print("[Resultados] Nenhum dado retornado.")

    print("Finalizado:", datetime.now())

if __name__ == "__main__":
    ensure_dir(PASTA_SAIDA)
    try:
        executar_rotina()
    except Exception as e:
        print("Falha na execução:", e)


Iniciando coleta: 2026-01-13 17:34:22.389842
[LOGIN] Status: 400 Bad Request | Content-Type: 
[LOGIN] Body (até 1000 chars): 
[LOGIN] Headers: {'Date': 'Tue, 13 Jan 2026 17:34:23 GMT'}
[LOGIN] Autenticado com payload: ['username', 'password']
[API] GET equipamento/list tentativa 1/3 Status: 405 Method Not Allowed | Content-Type: application/xml;charset=UTF-8
[API] GET equipamento/list tentativa 1/3 Body (até 1000 chars): <Map><timestamp>1768325663774</timestamp><status>405</status><error>Method Not Allowed</error><path>/api/v1/equipamento/list</path></Map>
[API] GET equipamento/list tentativa 1/3 Headers: {'Date': 'Tue, 13 Jan 2026 17:34:23 GMT'}
[Equipamentos] Nenhum dado retornado.
[Resultados] Janelas geradas: 12 (de 2025-01-13 até 2026-01-13)
[Resultados] Janela 2025-01-13 → 2025-02-12
[Resultados] 2025-01-13 → 2025-02-12 | página 1 | itens 1 | acumulado 1
[Resultados] Janela 2025-02-13 → 2025-03-15
[Resultados] 2025-02-13 → 2025-03-15 | página 1 | itens 1 | acumulado 2
[Resultados

In [1]:

import os
import re
import time
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Tuple

# ============================================
# CONFIGURAÇÕES DA API
# ============================================

LOGIN_URL = "https://integration.s360web.com/api/login"
API_BASE = "https://integration.s360web.com/api/v1"

USER = os.getenv("S360_USER", "integracao.komatsu")
PASSWORD = os.getenv("S360_PASSWORD", "o8sqfl5h")

PASTA_SAIDA = "/content/s360_export/"
PERIODO_DIAS = 365         # backfill de 1 ano
CHUNK_DIAS = 31            # janelas mensais para cobrir 100%
PAGE_SIZE = 500            # tamanho de página
TIMEOUT = (10, 60)         # (connect, read)
MAX_RETRIES = 3
RETRY_BACKOFF = 2          # base exponencial
SLEEP_ENTRE_CHAMADAS = 25  # orientação S360

CSV_SEPARADOR = ","
CSV_ENCODING = "utf-8-sig"

# ============================================
# UTILITÁRIOS
# ============================================

def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)

def log_resp(r: requests.Response, prefix: str = "") -> None:
    ct = r.headers.get("Content-Type", "")
    print(f"{prefix}Status: {r.status_code} {r.reason} | Content-Type: {ct}")
    body = r.text or ""
    print(f"{prefix}Body (até 1000 chars): {body[:1000]}")
    importantes = ['date', 'server', 'x-request-id', 'x-correlation-id']
    print(f"{prefix}Headers: { {k: v for k, v in r.headers.items() if k.lower() in importantes} }")

def get(o: Dict[str, Any], path: str, default: Any = None) -> Any:
    """
    Acesso seguro a objetos aninhados usando caminho com pontos.
    Ex.: get(sample, 'equipment.model')
    """
    cur = o
    for p in path.split('.'):
        if cur is None:
            return default
        if isinstance(cur, dict):
            cur = cur.get(p, default)
        else:
            return default
    return cur

def normalize_decimal(value: Any) -> Tuple[Optional[float], str, bool]:
    """
    Tenta converter strings como '24,921660', '11,80', '1,817' em float com ponto.
    Mantém o texto original em 'clean' e informa se é numérico via 'is_num'.
    Não força conversão para tokens como '<1', '23/21/15', '-', 'I.P'.
    """
    if value is None:
        return None, "", False
    if isinstance(value, (int, float)):
        return float(value), str(value), True
    s = str(value).strip()
    # Casos não-numéricos óbvios
    if s in {"-", "", "I.P"} or "/" in s or s.startswith("<") or s.startswith(">"):
        return None, s, False
    # Troca vírgula por ponto e remove espaços
    s2 = s.replace(",", ".")
    # Mantém sinais e ponto; remove outros caracteres não numéricos
    s2 = re.sub(r"[^0-9\.\-eE]", "", s2)
    try:
        num = float(s2)
        return num, s, True
    except ValueError:
        return None, s, False

def join_names(items: List[Dict[str, Any]], field_path: str = "name", sep: str = " | ") -> str:
    """
    Junta nomes de uma lista de objetos (e.g., testPackages) em uma string.
    """
    vals = []
    for it in items or []:
        v = get(it, field_path)
        if v:
            vals.append(str(v))
    return sep.join(vals)

# ============================================
# AUTENTICAÇÃO
# ============================================

def autenticar(session: Optional[requests.Session] = None) -> str:
    sess = session or requests.Session()
    payloads = [
        {"login": USER, "password": PASSWORD},    # funciona no seu ambiente
        {"username": USER, "password": PASSWORD}, # variação vista no dev portal
        {"email": USER, "password": PASSWORD},
    ]
    for attempt in range(1, MAX_RETRIES + 1):
        for payload in payloads:
            try:
                r = sess.post(LOGIN_URL, json=payload, timeout=TIMEOUT)
            except requests.exceptions.RequestException as e:
                print(f"[LOGIN] Falha de rede {attempt}/{MAX_RETRIES}: {e}")
                time.sleep(RETRY_BACKOFF ** attempt)
                continue

            if r.status_code == 200:
                try:
                    data = r.json()
                except ValueError:
                    log_resp(r, prefix="[LOGIN] ")
                    raise RuntimeError("[LOGIN] Resposta não-JSON.")
                token = data.get("token") or data.get("access_token") or data.get("accessToken") or data.get("jwt")
                if token:
                    print("[LOGIN] Autenticado com payload:", list(payload.keys()))
                    return token
                log_resp(r, prefix="[LOGIN] ")
                raise RuntimeError("[LOGIN] Token ausente na resposta.")
            else:
                log_resp(r, prefix="[LOGIN] ")

        time.sleep(RETRY_BACKOFF ** attempt)

    raise RuntimeError("[LOGIN] Não foi possível autenticar após múltiplas tentativas.")

# ============================================
# CHAMADAS DE API (GET/POST) COM RETRY
# ============================================

def chamar_api(method: str, endpoint: str, token: str,
               params: Optional[Dict[str, Any]] = None,
               body: Optional[Dict[str, Any]] = None,
               session: Optional[requests.Session] = None) -> Optional[Dict[str, Any]]:
    sess = session or requests.Session()
    headers = {"Authorization": f"Bearer {token}"}
    url = f"{API_BASE.rstrip('/')}/{endpoint.lstrip('/')}"

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            if method.upper() == "GET":
                r = sess.get(url, headers=headers, params=params, timeout=TIMEOUT)
            else:
                r = sess.post(url, headers=headers, json=body, timeout=TIMEOUT)
        except requests.exceptions.RequestException as e:
            print(f"[API] Falha de rede em {method} {url} ({attempt}/{MAX_RETRIES}): {e}")
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        if r.status_code == 200:
            try:
                data = r.json()
            except ValueError:
                log_resp(r, prefix="[API] ")
                print("[API] Resposta não-JSON; retornando None.")
                return None
            time.sleep(SLEEP_ENTRE_CHAMADAS)  # orientação S360
            return data

        log_resp(r, prefix=f"[API] {method} {endpoint} tentativa {attempt}/{MAX_RETRIES} ")
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        return None

    print(f"[API] Desistindo de {method} {endpoint} após {MAX_RETRIES} tentativas.")
    return None

# ============================================
# EQUIPAMENTOS (ativos)
# ============================================

def coletar_equipamentos(token: str, session: Optional[requests.Session] = None) -> Optional[List[Dict[str, Any]]]:
    params = {"ativo": "true"}
    dados = chamar_api("GET", "equipamento/list", token, params=params, session=session)
    if not dados:
        return None
    if isinstance(dados, list):
        return dados
    if isinstance(dados, dict) and "items" in dados:
        return dados["items"]
    return [dados]

# ============================================
# RESULTADOS (sampleResult/search) com CHUNK + paginação
# ============================================

def gerar_janelas_data(periodo_dias: int, chunk_dias: int) -> List[Tuple[str, str]]:
    hoje = datetime.now().date()
    inicio = hoje - timedelta(days=periodo_dias)
    janelas = []
    cursor = inicio
    while cursor <= hoje:
        fim_chunk = min(cursor + timedelta(days=chunk_dias - 1), hoje)
        janelas.append((cursor.strftime("%Y-%m-%d"), fim_chunk.strftime("%Y-%m-%d")))
        cursor = fim_chunk + timedelta(days=1)
    return janelas

def coletar_resultados(token: str, periodo_dias: int = PERIODO_DIAS,
                       chunk_dias: int = CHUNK_DIAS,
                       session: Optional[requests.Session] = None) -> List[Dict[str, Any]]:
    todos: List[Dict[str, Any]] = []
    janelas = gerar_janelas_data(periodo_dias, chunk_dias)
    print(f"[Resultados] Janelas geradas: {len(janelas)} (de {janelas[0][0]} até {janelas[-1][1]})")

    for (data_inicio, data_fim) in janelas:
        page = 1
        print(f"[Resultados] Janela {data_inicio} → {data_fim}")
        while True:
            body = {
                "readingStatus": None,   # null: todos (com filtro de data)
                "markRead": None,        # null: não marcar como lidas no backfill
                "startDate": data_inicio,
                "endDate": data_fim,
                "page": page,
                "pageSize": PAGE_SIZE,
            }
            dados = chamar_api("POST", "sampleResult/search", token, body=body, session=session)
            if not dados:
                print(f"[Resultados] Sem dados ou falha na janela {data_inicio} → {data_fim}, página {page}.")
                break

            if isinstance(dados, dict) and isinstance(dados.get("items"), list):
                itens = dados["items"]
            elif isinstance(dados, list):
                itens = dados
            else:
                itens = [dados]

            qtd = len(itens)
            todos.extend(itens)
            print(f"[Resultados] {data_inicio} → {data_fim} | página {page} | itens {qtd} | acumulado {len(todos)}")

            if qtd < PAGE_SIZE:
                break
            page += 1

    return todos

# ============================================
# ETL: FLATTEN (1 linha = 1 teste por amostra)
# ============================================

def flatten_resultados(resultados: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Transforma cada amostra em N linhas (uma por teste), com metadados completos.
    Converte resultValue para número (quando possível), mantendo original.
    """
    linhas: List[Dict[str, Any]] = []

    for s in resultados or []:
        # Metadados de nível amostra
        sample_id         = s.get("id")
        sample_number     = s.get("sampleNumber")
        external_code     = s.get("externalCode")
        result_date       = s.get("resultDate")
        reading_status    = s.get("readingStatus")

        # validResult (status geral, avaliação e ação de inspeção)
        vr_status         = get(s, "validResult.resultStatus")
        vr_eval           = get(s, "validResult.evaluation")
        vr_action         = get(s, "validResult.inspectionAction")

        # Metadados de equipamento/cliente/site/compartimento
        equip_model       = get(s, "equipment.model")
        equip_serial      = get(s, "equipment.serial")
        equip_tag         = get(s, "equipment.tag")
        equip_family      = get(s, "equipment.family.name")
        equip_maker       = get(s, "equipment.maker.name")
        site_name         = get(s, "site.name") or get(s, "equipment.site.name")
        site_ext_code     = get(s, "site.externalCode") or get(s, "equipment.site.externalCode")
        customer_name     = get(s, "customer.name")
        customer_id       = get(s, "customer.id")
        operation_name    = get(s, "operation.name")
        lab_name          = get(s, "laboratory.name")
        payment_terms     = s.get("paymentTerms")

        comp_name         = get(s, "compartment.name") or get(s, "collectionData.compartmentName")
        comp_type         = get(s, "compartment.type.name") or get(s, "collectionData.compartmentType.name")
        comp_volume       = get(s, "compartment.volume")
        coll_date_sampled = get(s, "collectionData.dateSampled")
        coll_reg_date     = get(s, "collectionData.registrationDate")
        coll_time_type    = get(s, "collectionData.timeType")
        coll_fluid_time   = get(s, "collectionData.fluidTime")
        coll_equipt_time  = get(s, "collectionData.equipmentTime")
        oil_viscosity     = get(s, "collectionData.oil.viscosity.name")
        oil_manufacturer  = get(s, "collectionData.oil.manufacturer.name")
        oil_changed       = get(s, "collectionData.oilChanged")

        test_packages     = join_names(s.get("testPackages") or [], "name")

        # Lista de testes por amostra
        tests = s.get("testResults") or []
        if not tests:
            # Ainda assim, registramos uma linha "sem teste" para rastreabilidade
            linhas.append({
                "sampleId": sample_id,
                "sampleNumber": sample_number,
                "externalCode": external_code,
                "resultDate": result_date,
                "readingStatus": reading_status,
                "validResultStatus": vr_status,
                "validResultEvaluation": vr_eval,
                "validResultInspectionAction": vr_action,
                "equipmentModel": equip_model,
                "equipmentSerial": equip_serial,
                "equipmentTag": equip_tag,
                "equipmentFamily": equip_family,
                "equipmentMaker": equip_maker,
                "siteName": site_name,
                "siteExternalCode": site_ext_code,
                "customerName": customer_name,
                "customerId": customer_id,
                "operationName": operation_name,
                "laboratoryName": lab_name,
                "paymentTerms": payment_terms,
                "compartmentName": comp_name,
                "compartmentType": comp_type,
                "compartmentVolume": comp_volume,
                "collectionDateSampled": coll_date_sampled,
                "collectionRegistrationDate": coll_reg_date,
                "collectionTimeType": coll_time_type,
                "collectionFluidTime": coll_fluid_time,
                "collectionEquipmentTime": coll_equipt_time,
                "oilViscosity": oil_viscosity,
                "oilManufacturer": oil_manufacturer,
                "oilChanged": oil_changed,
                "testGroup": None,
                "testName": None,
                "testAbbreviation": None,
                "testUnitOfMeasure": None,
                "testMethod": None,
                "testValueType": None,
                "testOrder": None,
                "resultValue_raw": None,
                "resultValue_num": None,
                "resultValue_is_numeric": False,
                "resultStatus": None,
                "testPackages": test_packages,
            })
            continue

        for tr in tests:
            test_group       = get(tr, "test.testGroup.name")
            test_name        = get(tr, "test.translation.name")
            test_abbrev      = get(tr, "test.translation.abbreviation")
            test_uom         = get(tr, "test.translation.unitOfMeasure")
            test_method      = get(tr, "test.translation.method")
            test_value_type  = get(tr, "test.valueType")
            test_order       = get(tr, "test.order")
            result_status    = tr.get("resultStatus")
            result_value     = tr.get("resultValue")

            num, raw, is_num = normalize_decimal(result_value)

            linhas.append({
                # Amostra
                "sampleId": sample_id,
                "sampleNumber": sample_number,
                "externalCode": external_code,
                "resultDate": result_date,
                "readingStatus": reading_status,
                "validResultStatus": vr_status,
                "validResultEvaluation": vr_eval,
                "validResultInspectionAction": vr_action,

                # Equip/cliente/site
                "equipmentModel": equip_model,
                "equipmentSerial": equip_serial,
                "equipmentTag": equip_tag,
                "equipmentFamily": equip_family,
                "equipmentMaker": equip_maker,
                "siteName": site_name,
                "siteExternalCode": site_ext_code,
                "customerName": customer_name,
                "customerId": customer_id,
                "operationName": operation_name,
                "laboratoryName": lab_name,
                "paymentTerms": payment_terms,

                # Compartimento / coleta / óleo
                "compartmentName": comp_name,
                "compartmentType": comp_type,
                "compartmentVolume": comp_volume,
                "collectionDateSampled": coll_date_sampled,
                "collectionRegistrationDate": coll_reg_date,
                "collectionTimeType": coll_time_type,
                "collectionFluidTime": coll_fluid_time,
                "collectionEquipmentTime": coll_equipt_time,
                "oilViscosity": oil_viscosity,
                "oilManufacturer": oil_manufacturer,
                "oilChanged": oil_changed,

                # Teste
                "testGroup": test_group,
                "testName": test_name,
                "testAbbreviation": test_abbrev,
                "testUnitOfMeasure": test_uom,
                "testMethod": test_method,
                "testValueType": test_value_type,
                "testOrder": test_order,
                "resultValue_raw": raw,
                "resultValue_num": num,
                "resultValue_is_numeric": is_num,
                "resultStatus": result_status,

                # Pacotes de teste (lista)
                "testPackages": test_packages,
            })

    return linhas

# ============================================
# SALVAR CSV
# ============================================

def salvar_csv(dados: Any, nome: str) -> None:
    ensure_dir(PASTA_SAIDA)
    caminho = os.path.join(PASTA_SAIDA, nome)
    if isinstance(dados, list):
        df = pd.DataFrame(dados)
    elif isinstance(dados, dict):
        df = pd.DataFrame([dados])
    else:
        raise ValueError("Formato de dados não suportado para CSV.")
    df.to_csv(caminho, index=False, encoding=CSV_ENCODING, sep=CSV_SEPARADOR)
    print(f"Arquivo salvo: {caminho}")

# ============================================
# EXECUÇÃO PRINCIPAL
# ============================================

def executar_rotina():
    print("Iniciando coleta:", datetime.now())
    session = requests.Session()
    token = autenticar(session=session)

    # Equipamentos (ativos)
    equipamentos = coletar_equipamentos(token, session=session)
    if equipamentos:
        salvar_csv(equipamentos, f"equipamentos_{datetime.now().date()}.csv")
    else:
        print("[Equipamentos] Nenhum dado retornado.")

    # Resultados (100% do último ano em janelas + paginação)
    resultados = coletar_resultados(token, periodo_dias=PERIODO_DIAS, chunk_dias=CHUNK_DIAS, session=session)
    if resultados:
        # Salva o bruto (como referência/origem)
        salvar_csv(resultados, f"resultados_{datetime.now().date()}.csv")

        # ETL: flatten → uma linha por teste (tabular para BI/Excel)
        resultados_flat = flatten_resultados(resultados)
        if resultados_flat:
            salvar_csv(resultados_flat, f"resultados_flat_{datetime.now().date()}.csv")
        else:
            print("[Flatten] Nenhuma linha gerada (lista de testes vazia em todas as amostras).")
    else:
        print("[Resultados] Nenhum dado retornado.")

    print("Finalizado:", datetime.now())

# ============================================
# MAIN
# ============================================

if __name__ == "__main__":
    ensure_dir(PASTA_SAIDA)
    try:
        executar_rotina()
    except Exception as e:
        print("Falha na execução:", e)


Iniciando coleta: 2026-01-13 20:44:04.092111
[LOGIN] Status: 400 Bad Request | Content-Type: 
[LOGIN] Body (até 1000 chars): 
[LOGIN] Headers: {'Date': 'Tue, 13 Jan 2026 20:44:04 GMT'}
[LOGIN] Autenticado com payload: ['username', 'password']
[API] GET equipamento/list tentativa 1/3 Status: 405 Method Not Allowed | Content-Type: application/xml;charset=UTF-8
[API] GET equipamento/list tentativa 1/3 Body (até 1000 chars): <Map><timestamp>1768337045420</timestamp><status>405</status><error>Method Not Allowed</error><path>/api/v1/equipamento/list</path></Map>
[API] GET equipamento/list tentativa 1/3 Headers: {'Date': 'Tue, 13 Jan 2026 20:44:04 GMT'}
[Equipamentos] Nenhum dado retornado.
[Resultados] Janelas geradas: 12 (de 2025-01-13 até 2026-01-13)
[Resultados] Janela 2025-01-13 → 2025-02-12
[Resultados] 2025-01-13 → 2025-02-12 | página 1 | itens 1 | acumulado 1
[Resultados] Janela 2025-02-13 → 2025-03-15
[Resultados] 2025-02-13 → 2025-03-15 | página 1 | itens 1 | acumulado 2
[Resultados

In [2]:

import os
import re
import time
import requests
import pandas as pd
from csv import QUOTE_MINIMAL
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Tuple

# ============================================
# CONFIGURAÇÕES DA API
# ============================================

LOGIN_URL = "https://integration.s360web.com/api/login"
API_BASE = "https://integration.s360web.com/api/v1"

USER = os.getenv("S360_USER", "integracao.komatsu")
PASSWORD = os.getenv("S360_PASSWORD", "o8sqfl5h")

PASTA_SAIDA = "/content/s360_export/"
PERIODO_DIAS = 365         # backfill de 1 ano
CHUNK_DIAS = 31            # janelas mensais para cobrir 100%
PAGE_SIZE = 500            # tamanho de página
TIMEOUT = (10, 60)         # (connect, read)
MAX_RETRIES = 3
RETRY_BACKOFF = 2          # base exponencial
SLEEP_ENTRE_CHAMADAS = 25  # orientação S360

# >>> Ajuste para Excel pt-BR: separador ';'
CSV_SEPARADOR = ";"
CSV_ENCODING = "utf-8-sig"

# Ordem fixa das colunas no CSV flatten
COLUMNS = [
    "sampleId","sampleNumber","externalCode","resultDate","readingStatus",
    "validResultStatus","validResultEvaluation","validResultInspectionAction",
    "equipmentModel","equipmentSerial","equipmentTag","equipmentFamily","equipmentMaker",
    "siteName","siteExternalCode","customerName","customerId","operationName","laboratoryName",
    "paymentTerms","compartmentName","compartmentType","compartmentVolume",
    "collectionDateSampled","collectionRegistrationDate","collectionTimeType","collectionFluidTime",
    "collectionEquipmentTime","oilViscosity","oilManufacturer","oilChanged",
    "testGroup","testName","testAbbreviation","testUnitOfMeasure","testMethod","testValueType",
    "testOrder","resultValue_raw","resultValue_num","resultValue_is_numeric",
    "resultStatus","testPackages"
]

# ============================================
# UTILITÁRIOS
# ============================================

def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)

def log_resp(r: requests.Response, prefix: str = "") -> None:
    ct = r.headers.get("Content-Type", "")
    print(f"{prefix}Status: {r.status_code} {r.reason} | Content-Type: {ct}")
    body = r.text or ""
    print(f"{prefix}Body (até 1000 chars): {body[:1000]}")
    importantes = ['date', 'server', 'x-request-id', 'x-correlation-id']
    print(f"{prefix}Headers: { {k: v for k, v in r.headers.items() if k.lower() in importantes} }")

def get(o: Dict[str, Any], path: str, default: Any = "") -> Any:
    """
    Acesso seguro a objetos aninhados usando caminho com pontos.
    Retorna '' (string vazia) como default para ficar excel-friendly.
    """
    cur = o
    for p in path.split('.'):
        if cur is None:
            return default
        if isinstance(cur, dict):
            cur = cur.get(p, default)
        else:
            return default
    return cur if cur is not None else default

def normalize_decimal(value: Any) -> Tuple[Optional[float], str, bool]:
    """
    Converte strings como '24,921660' em float (com ponto). Mantém original em 'raw'.
    Não converte tokens como '<1', '23/21/15', '-', 'I.P'.
    """
    if value is None:
        return None, "", False
    if isinstance(value, (int, float)):
        return float(value), str(value), True
    s = str(value).strip()
    # Casos não-numéricos
    if s in {"-", "", "I.P"} or "/" in s or s.startswith("<") or s.startswith(">"):
        return None, s, False
    s2 = s.replace(",", ".")
    s2 = re.sub(r"[^0-9\.\-eE]", "", s2)
    try:
        num = float(s2)
        return num, s, True
    except ValueError:
        return None, s, False

def join_names(items: List[Dict[str, Any]], field_path: str = "name", sep: str = " | ") -> str:
    vals = []
    for it in items or []:
        v = get(it, field_path)
        if v:
            vals.append(str(v))
    return sep.join(vals)

# ============================================
# AUTENTICAÇÃO
# ============================================

def autenticar(session: Optional[requests.Session] = None) -> str:
    sess = session or requests.Session()
    payloads = [
        {"login": USER, "password": PASSWORD},
        {"username": USER, "password": PASSWORD},
        {"email": USER, "password": PASSWORD},
    ]
    for attempt in range(1, MAX_RETRIES + 1):
        for payload in payloads:
            try:
                r = sess.post(LOGIN_URL, json=payload, timeout=TIMEOUT)
            except requests.exceptions.RequestException as e:
                print(f"[LOGIN] Falha de rede {attempt}/{MAX_RETRIES}: {e}")
                time.sleep(RETRY_BACKOFF ** attempt)
                continue

            if r.status_code == 200:
                try:
                    data = r.json()
                except ValueError:
                    log_resp(r, prefix="[LOGIN] ")
                    raise RuntimeError("[LOGIN] Resposta não-JSON.")
                token = data.get("token") or data.get("access_token") or data.get("accessToken") or data.get("jwt")
                if token:
                    print("[LOGIN] Autenticado com payload:", list(payload.keys()))
                    return token
                log_resp(r, prefix="[LOGIN] ")
                raise RuntimeError("[LOGIN] Token ausente na resposta.")
            else:
                log_resp(r, prefix="[LOGIN] ")
        time.sleep(RETRY_BACKOFF ** attempt)
    raise RuntimeError("[LOGIN] Não foi possível autenticar após múltiplas tentativas.")

# ============================================
# CHAMADAS DE API (GET/POST) COM RETRY
# ============================================

def chamar_api(method: str, endpoint: str, token: str,
               params: Optional[Dict[str, Any]] = None,
               body: Optional[Dict[str, Any]] = None,
               session: Optional[requests.Session] = None) -> Optional[Dict[str, Any]]:
    sess = session or requests.Session()
    headers = {"Authorization": f"Bearer {token}"}
    url = f"{API_BASE.rstrip('/')}/{endpoint.lstrip('/')}"

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            if method.upper() == "GET":
                r = sess.get(url, headers=headers, params=params, timeout=TIMEOUT)
            else:
                r = sess.post(url, headers=headers, json=body, timeout=TIMEOUT)
        except requests.exceptions.RequestException as e:
            print(f"[API] Falha de rede em {method} {url} ({attempt}/{MAX_RETRIES}): {e}")
            time.sleep(RETRY_BACKOFF ** attempt)
            continue

        if r.status_code == 200:
            try:
                data = r.json()
            except ValueError:
                log_resp(r, prefix="[API] ")
                print("[API] Resposta não-JSON; retornando None.")
                return None
            time.sleep(SLEEP_ENTRE_CHAMADAS)
            return data

        log_resp(r, prefix=f"[API] {method} {endpoint} tentativa {attempt}/{MAX_RETRIES} ")
        if r.status_code in (429, 500, 502, 503, 504):
            time.sleep(RETRY_BACKOFF ** attempt)
            continue
        return None

    print(f"[API] Desistindo de {method} {endpoint} após {MAX_RETRIES} tentativas.")
    return None

# ============================================
# EQUIPAMENTOS (ativos)
# ============================================

def coletar_equipamentos(token: str, session: Optional[requests.Session] = None) -> Optional[List[Dict[str, Any]]]:
    params = {"ativo": "true"}
    dados = chamar_api("GET", "equipamento/list", token, params=params, session=session)
    if not dados:
        return None
    if isinstance(dados, list):
        return dados
    if isinstance(dados, dict) and "items" in dados:
        return dados["items"]
    return [dados]

# ============================================
# RESULTADOS com CHUNK + paginação
# ============================================

def gerar_janelas_data(periodo_dias: int, chunk_dias: int) -> List[Tuple[str, str]]:
    hoje = datetime.now().date()
    inicio = hoje - timedelta(days=periodo_dias)
    janelas = []
    cursor = inicio
    while cursor <= hoje:
        fim_chunk = min(cursor + timedelta(days=chunk_dias - 1), hoje)
        janelas.append((cursor.strftime("%Y-%m-%d"), fim_chunk.strftime("%Y-%m-%d")))
        cursor = fim_chunk + timedelta(days=1)
    return janelas

def coletar_resultados(token: str, periodo_dias: int = PERIODO_DIAS,
                       chunk_dias: int = CHUNK_DIAS,
                       session: Optional[requests.Session] = None) -> List[Dict[str, Any]]:
    todos: List[Dict[str, Any]] = []
    janelas = gerar_janelas_data(periodo_dias, chunk_dias)
    print(f"[Resultados] Janelas geradas: {len(janelas)} (de {janelas[0][0]} até {janelas[-1][1]})")

    for (data_inicio, data_fim) in janelas:
        page = 1
        print(f"[Resultados] Janela {data_inicio} → {data_fim}")
        while True:
            body = {
                "readingStatus": None,   # null: todos (com filtro de data)
                "markRead": None,        # null: não marcar como lidas no backfill
                "startDate": data_inicio,
                "endDate": data_fim,
                "page": page,
                "pageSize": PAGE_SIZE,
            }
            dados = chamar_api("POST", "sampleResult/search", token, body=body, session=session)
            if not dados:
                print(f"[Resultados] Sem dados ou falha na janela {data_inicio} → {data_fim}, página {page}.")
                break

            if isinstance(dados, dict) and isinstance(dados.get("items"), list):
                itens = dados["items"]
            elif isinstance(dados, list):
                itens = dados
            else:
                itens = [dados]

            qtd = len(itens)
            todos.extend(itens)
            print(f"[Resultados] {data_inicio} → {data_fim} | página {page} | itens {qtd} | acumulado {len(todos)}")

            if qtd < PAGE_SIZE:
                break
            page += 1
    return todos

# ============================================
# ETL: FLATTEN (1 linha = 1 teste por amostra)
# ============================================

def flatten_resultados(resultados: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    linhas: List[Dict[str, Any]] = []

    for s in resultados or []:
        sample_id         = s.get("id", "")
        sample_number     = s.get("sampleNumber", "")
        external_code     = s.get("externalCode", "")
        result_date       = s.get("resultDate", "")
        reading_status    = s.get("readingStatus", "")

        vr_status         = get(s, "validResult.resultStatus")
        vr_eval           = get(s, "validResult.evaluation")
        vr_action         = get(s, "validResult.inspectionAction")

        equip_model       = get(s, "equipment.model")
        equip_serial      = get(s, "equipment.serial")
        equip_tag         = get(s, "equipment.tag")
        equip_family      = get(s, "equipment.family.name")
        equip_maker       = get(s, "equipment.maker.name")

        site_name         = get(s, "site.name") or get(s, "equipment.site.name")
        site_ext_code     = get(s, "site.externalCode") or get(s, "equipment.site.externalCode")

        customer_name     = get(s, "customer.name")
        customer_id       = get(s, "customer.id")
        operation_name    = get(s, "operation.name")
        lab_name          = get(s, "laboratory.name")
        payment_terms     = s.get("paymentTerms", "")

        comp_name         = get(s, "compartment.name") or get(s, "collectionData.compartmentName")
        comp_type         = get(s, "compartment.type.name") or get(s, "collectionData.compartmentType.name")
        comp_volume       = get(s, "compartment.volume")

        coll_date_sampled = get(s, "collectionData.dateSampled")
        coll_reg_date     = get(s, "collectionData.registrationDate")
        coll_time_type    = get(s, "collectionData.timeType")
        coll_fluid_time   = get(s, "collectionData.fluidTime")
        coll_equipt_time  = get(s, "collectionData.equipmentTime")
        oil_viscosity     = get(s, "collectionData.oil.viscosity.name")
        oil_manufacturer  = get(s, "collectionData.oil.manufacturer.name")
        oil_changed       = get(s, "collectionData.oilChanged")

        test_packages     = join_names(s.get("testPackages") or [], "name")

        tests = s.get("testResults") or []
        if not tests:
            linhas.append({
                "sampleId": sample_id, "sampleNumber": sample_number, "externalCode": external_code,
                "resultDate": result_date, "readingStatus": reading_status,
                "validResultStatus": vr_status, "validResultEvaluation": vr_eval, "validResultInspectionAction": vr_action,
                "equipmentModel": equip_model, "equipmentSerial": equip_serial, "equipmentTag": equip_tag,
                "equipmentFamily": equip_family, "equipmentMaker": equip_maker,
                "siteName": site_name, "siteExternalCode": site_ext_code,
                "customerName": customer_name, "customerId": customer_id,
                "operationName": operation_name, "laboratoryName": lab_name,
                "paymentTerms": payment_terms,
                "compartmentName": comp_name, "compartmentType": comp_type, "compartmentVolume": comp_volume,
                "collectionDateSampled": coll_date_sampled, "collectionRegistrationDate": coll_reg_date,
                "collectionTimeType": coll_time_type, "collectionFluidTime": coll_fluid_time,
                "collectionEquipmentTime": coll_equipt_time, "oilViscosity": oil_viscosity,
                "oilManufacturer": oil_manufacturer, "oilChanged": oil_changed,
                "testGroup": "", "testName": "", "testAbbreviation": "", "testUnitOfMeasure": "",
                "testMethod": "", "testValueType": "", "testOrder": "",
                "resultValue_raw": "", "resultValue_num": None, "resultValue_is_numeric": False,
                "resultStatus": "", "testPackages": test_packages
            })
            continue

        for tr in tests:
            test_group       = get(tr, "test.testGroup.name")
            test_name        = get(tr, "test.translation.name")
            test_abbrev      = get(tr, "test.translation.abbreviation")
            test_uom         = get(tr, "test.translation.unitOfMeasure")
            test_method      = get(tr, "test.translation.method")
            test_value_type  = get(tr, "test.valueType")
            test_order       = get(tr, "test.order")
            result_status    = tr.get("resultStatus", "")
            result_value     = tr.get("resultValue", "")

            num, raw, is_num = normalize_decimal(result_value)

            linhas.append({
                "sampleId": sample_id, "sampleNumber": sample_number, "externalCode": external_code,
                "resultDate": result_date, "readingStatus": reading_status,
                "validResultStatus": vr_status, "validResultEvaluation": vr_eval, "validResultInspectionAction": vr_action,
                "equipmentModel": equip_model, "equipmentSerial": equip_serial, "equipmentTag": equip_tag,
                "equipmentFamily": equip_family, "equipmentMaker": equip_maker,
                "siteName": site_name, "siteExternalCode": site_ext_code,
                "customerName": customer_name, "customerId": customer_id,
                "operationName": operation_name, "laboratoryName": lab_name,
                "paymentTerms": payment_terms,
                "compartmentName": comp_name, "compartmentType": comp_type, "compartmentVolume": comp_volume,
                "collectionDateSampled": coll_date_sampled, "collectionRegistrationDate": coll_reg_date,
                "collectionTimeType": coll_time_type, "collectionFluidTime": coll_fluid_time,
                "collectionEquipmentTime": coll_equipt_time, "oilViscosity": oil_viscosity,
                "oilManufacturer": oil_manufacturer, "oilChanged": oil_changed,
                "testGroup": test_group, "testName": test_name, "testAbbreviation": test_abbrev,
                "testUnitOfMeasure": test_uom, "testMethod": test_method, "testValueType": test_value_type,
                "testOrder": test_order,
                "resultValue_raw": raw, "resultValue_num": num, "resultValue_is_numeric": is_num,
                "resultStatus": result_status, "testPackages": test_packages
            })
    return linhas

# ============================================
# SALVAR CSV
# ============================================

def salvar_csv(dados: Any, nome: str) -> None:
    ensure_dir(PASTA_SAIDA)
    caminho = os.path.join(PASTA_SAIDA, nome)

    # Normaliza None -> '' para todas as células (excel-friendly)
    def none_to_empty(x):
        if x is None:
            return ""
        return x

    if isinstance(dados, list):
        df = pd.DataFrame(dados)
    elif isinstance(dados, dict):
        df = pd.DataFrame([dados])
    else:
        raise ValueError("Formato de dados não suportado para CSV.")

    # Se for o flatten, força ordem de colunas
    cols_existentes = [c for c in COLUMNS if c in df.columns]
    if len(cols_existentes) == len(COLUMNS):
        df = df[COLUMNS]

    df = df.applymap(none_to_empty)

    df.to_csv(
        caminho,
        index=False,
        encoding=CSV_ENCODING,
        sep=CSV_SEPARADOR,
        quoting=QUOTE_MINIMAL
    )
    print(f"Arquivo salvo: {caminho}")

# ============================================
# VALIDAÇÃO (preview)
# ============================================

def validar_flat(linhas: List[Dict[str, Any]]) -> None:
    if not linhas:
        print("[Validacao] Flatten sem linhas.")
        return
    df = pd.DataFrame(linhas)
    print("\n[Validacao] Preview (5 linhas):")
    try:
        print(df[COLUMNS].head(5).to_string(index=False))
    except Exception:
        print(df.head(5).to_string(index=False))
    print(f"\n[Validacao] Total de linhas (testes): {len(df)}")
    print(f"[Validacao] Amostras distintas: {df['sampleId'].nunique() if 'sampleId' in df.columns else 'N/A'}")
    if 'testName' in df.columns:
        print(f"[Validacao] Exemplos de testes: {sorted(df['testName'].dropna().unique())[:10]}")

# ============================================
# EXECUÇÃO PRINCIPAL
# ============================================

def executar_rotina():
    print("Iniciando coleta:", datetime.now())
    session = requests.Session()
    token = autenticar(session=session)

    # Equipamentos (ativos)
    equipamentos = coletar_equipamentos(token, session=session)
    if equipamentos:
        salvar_csv(equipamentos, f"equipamentos_{datetime.now().date()}.csv")
    else:
        print("[Equipamentos] Nenhum dado retornado.")

    # Resultados (100% do último ano em janelas + paginação)
    resultados = coletar_resultados(token, periodo_dias=PERIODO_DIAS, chunk_dias=CHUNK_DIAS, session=session)
    if resultados:
        salvar_csv(resultados, f"resultados_{datetime.now().date()}.csv")

        # ETL: flatten → uma linha por teste (tabular para BI/Excel)
        resultados_flat = flatten_resultados(resultados)

        # Validação (preview no log) antes de salvar
        validar_flat(resultados_flat)

        if resultados_flat:
            salvar_csv(resultados_flat, f"resultados_flat_{datetime.now().date()}.csv")
        else:
            print("[Flatten] Nenhuma linha gerada (lista de testes vazia em todas as amostras).")
    else:
        print("[Resultados] Nenhum dado retornado.")

    print("Finalizado:", datetime.now())

# ============================================
# MAIN
# ============================================

if __name__ == "__main__":
    ensure_dir(PASTA_SAIDA)
    try:
        executar_rotina()
    except Exception as e:
        print("Falha na execução:", e)


Iniciando coleta: 2026-01-13 20:53:26.933129
[LOGIN] Status: 400 Bad Request | Content-Type: 
[LOGIN] Body (até 1000 chars): 
[LOGIN] Headers: {'Date': 'Tue, 13 Jan 2026 20:53:27 GMT'}
[LOGIN] Autenticado com payload: ['username', 'password']
[API] GET equipamento/list tentativa 1/3 Status: 405 Method Not Allowed | Content-Type: application/xml;charset=UTF-8
[API] GET equipamento/list tentativa 1/3 Body (até 1000 chars): <Map><timestamp>1768337608303</timestamp><status>405</status><error>Method Not Allowed</error><path>/api/v1/equipamento/list</path></Map>
[API] GET equipamento/list tentativa 1/3 Headers: {'Date': 'Tue, 13 Jan 2026 20:53:27 GMT'}
[Equipamentos] Nenhum dado retornado.
[Resultados] Janelas geradas: 12 (de 2025-01-13 até 2026-01-13)
[Resultados] Janela 2025-01-13 → 2025-02-12
[Resultados] 2025-01-13 → 2025-02-12 | página 1 | itens 1 | acumulado 1
[Resultados] Janela 2025-02-13 → 2025-03-15
[Resultados] 2025-02-13 → 2025-03-15 | página 1 | itens 1 | acumulado 2
[Resultados

  df = df.applymap(none_to_empty)


Arquivo salvo: /content/s360_export/resultados_2026-01-13.csv

[Validacao] Preview (5 linhas):
sampleId sampleNumber externalCode resultDate readingStatus validResultStatus validResultEvaluation validResultInspectionAction equipmentModel equipmentSerial equipmentTag equipmentFamily equipmentMaker siteName siteExternalCode customerName customerId operationName laboratoryName paymentTerms compartmentName compartmentType compartmentVolume collectionDateSampled collectionRegistrationDate collectionTimeType collectionFluidTime collectionEquipmentTime oilViscosity oilManufacturer oilChanged testGroup testName testAbbreviation testUnitOfMeasure testMethod testValueType testOrder resultValue_raw resultValue_num  resultValue_is_numeric resultStatus testPackages
                                                                                                                                                                                                                                             

  df = df.applymap(none_to_empty)
