In [None]:
!pip install -q fastapi uvicorn python-multipart aiofiles pyngrok langchain langchain_community langchain-google-genai nest_asyncio

In [None]:
import os
from google.colab import userdata
os.environ["GOOGLE_API_KEY"] = userdata.get("GOOGLE_API_KEY_1")

In [None]:
from pyngrok import ngrok
ngrok.set_auth_token("*********************")

In [None]:
# ✅ Imports principais
# =====================================
import shutil
import sqlite3
import zipfile
from pathlib import Path
import logging
import pandas as pd
import nest_asyncio
import uvicorn
import threading
from fastapi import FastAPI, UploadFile, File, HTTPException
import ast
import re

from langchain.agents import create_sql_agent, Tool, initialize_agent, AgentType
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
from langchain_community.utilities.sql_database import SQLDatabase
from langchain_google_genai import ChatGoogleGenerativeAI

In [None]:
# ✅ Configura o logger do Uvicorn e outros
# =====================================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("uvicorn")

In [None]:
# ✅ Inicializar modelo Gemini-flash
# =====================================
llm = ChatGoogleGenerativeAI(
    model="models/gemini-2.0-flash",
    temperature=0,
    google_api_key=secret_value_0
)

In [None]:
# ✅ Diretórios e banco de dados
# =====================================
DATA_DIR = Path("/tmp/data")
INPUT_DIR = DATA_DIR / "input"
TEMP_DIR = DATA_DIR / "temp"
DB_PATH = DATA_DIR / "notas.db"

for d in [INPUT_DIR, TEMP_DIR]:
    d.mkdir(parents=True, exist_ok=True)

In [None]:
# ✅ ETL do arquivo ZIP para SQLite
# =====================================
def run_etl_pipeline(zip_path: Path) -> bool:
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(TEMP_DIR)

        csv_files = list(TEMP_DIR.glob("*.csv"))
        cabecalho_file = [f for f in csv_files if "cabecalho" in f.name.lower()]
        itens_file = [f for f in csv_files if "itens" in f.name.lower()]

        if not cabecalho_file or not itens_file:
            cabecalho_file, itens_file = csv_files[:2]

        cab = pd.read_csv(cabecalho_file[0])
        itens = pd.read_csv(itens_file[0])

        cab.columns = cab.columns.str.lower().str.replace(" ", "_").str.replace("/", "_")
        itens.columns = itens.columns.str.lower().str.replace(" ", "_").str.replace("/", "_")

        chave = next(
            (k for k in ["chave", "chave_de_acesso", "numero_nf", "id_nota", "id"]
             if k in cab.columns and k in itens.columns), None)

        if not chave:
            print("❌ Nenhuma chave comum.")
            return False

        merged = pd.merge(cab, itens, on=chave, how="left")
        merged["processed_at"] = pd.Timestamp.now().isoformat()

        conn = sqlite3.connect(DB_PATH)
        merged.to_sql("notas_fiscais", conn, if_exists="replace", index=False)
        conn.close()

        shutil.rmtree(TEMP_DIR)
        TEMP_DIR.mkdir(exist_ok=True)

        return True
    except Exception as e:
        print("❌ Erro no ETL:", e)
        return False

In [None]:
# ✅ App FastAPI
# =====================================
app = FastAPI()

@app.get("/")
def root():
    logger.info("✅ Rota / acessada com sucesso")
    return {"mensagem": "API rodando com sucesso"}

In [None]:
# ✅ Endpoint: Upload ZIP
# =====================================
@app.post("/upload/")
async def upload(file: UploadFile = File(...)):
    logger.info(f"📁 Arquivo recebido: {file.filename}")
    zip_path = INPUT_DIR / file.filename
    with open(zip_path, "wb") as f:
        f.write(await file.read())

    if run_etl_pipeline(zip_path):
        return {"status": "ok", "message": "Arquivo processado com sucesso!"}
    return {"status": "erro", "message": "Falha ao processar ZIP."}

In [None]:
# ✅ Endpoint: Consultar agente com prompt rico, SQLDatabaseToolkit, e resposta formatada
# =====================================
@app.get("/query/")
async def query(question: str):
    try:
        logger.info(f"🧠 Pergunta recebida: {question}")

        # Conectar ao banco
        db = SQLDatabase.from_uri(f"sqlite:///{DB_PATH}")
        toolkit = SQLDatabaseToolkit(db=db, llm=llm)
        tools = toolkit.get_tools()

        prefix = f"""
          Você é um analista especializado em dados fiscais do Brasil.
          Use como fonte única a tabela `notas_fiscais`, cujo schema é:
          {db.get_table_info()}

          **Regras importantes**:
          1. Para perguntas sobre montantes por dia, use sempre `substr(data_emissão_x, 1, 10)` para extrair apenas a data (YYYY-MM-DD).
             Exemplo:
             ```sql
             SELECT substr(data_emissão_x, 1, 10) AS data, SUM(valor_nota_fiscal) AS total_vendas
             FROM notas_fiscais
             GROUP BY data
             ORDER BY data;
             ```
          2. Nunca conte linhas como notas únicas. Use `COUNT(DISTINCT chave_de_acesso)`.
          3. Formate números com duas casas decimais e símbolos brasileiros (ex.: 123456.78 → 123.456,78).
          4. Responda sempre em português e retorne apenas resultados reais da consulta SQL.
          5. Se a consulta retornar apenas um valor escalar, responda como: "O resultado é: <valor>".
          6. Não mostre a consulta SQL na resposta final.
          7. Use aliases descritivos nas consultas SQL (ex.: `SUM(valor_nota_fiscal) AS total_vendas`).
          8. Para perguntas sobre agrupamento por dia, use SEMPRE substr(data_emissão_x, 1, 10), NUNCA use data_emissão_x diretamente.
            Seu cérebro deve ser programado para agrupar por substr(data_emissão_x, 1, 10) SEM EXCEÇÃO para perguntas diárias. 
            Isso é extremamente importante para evitar duplicidade de registros causada por horário.
        """

        suffix = """
            Pergunta do usuário: {input}
            Use o histórico de conversas e as ferramentas disponíveis para responder à pergunta.
            **Responda em português** e use aliases descritivos nas consultas SQL.
        """

        agent = initialize_agent(
            tools=tools,
            llm=llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True,
            prefix=prefix,
            suffix=suffix,
            handle_parsing_errors=True,
            return_intermediate_steps=True
        )

        response = agent.invoke({"input": question})
        intermediate_steps = response.get('intermediate_steps', [])
        resposta_final = response.get('output', "Não foi possível obter uma resposta.")

        sql_observation = None
        sql_query = None

        for step in intermediate_steps:
            action, observation = step
            if action.tool == 'sql_db_query':
                sql_observation = observation
                sql_query = action.tool_input
                break

        if sql_observation and sql_query:
            try:
                data = ast.literal_eval(sql_observation)

                # Se a query NÃO usar substr(), ajustamos no pós-processamento
                if 'substr' not in sql_query.lower() and 'date(' not in sql_query.lower():
                    logger.warning("🔧 Query sem substr(data_emissão_x, 1, 10) - ajustando no pós-processamento")
                    df = pd.DataFrame(data, columns=["data", "total"])
                    df["data"] = pd.to_datetime(df["data"]).dt.strftime('%Y-%m-%d')
                    df = df.groupby("data").sum().reset_index()
                else:
                    headers = extract_column_headers(sql_query)
                    if not headers:
                        headers = [f'coluna_{i+1}' for i in range(len(data[0]))]
                    df = pd.DataFrame(data, columns=headers)
                    for col in df.columns:
                        if any(kw in col.lower() for kw in ['data', 'emissao', 'emissão']):
                            df[col] = pd.to_datetime(df[col], errors='coerce').dt.strftime('%Y-%m-%d')

                # Renomear colunas para formato mais claro
                column_mapping = {
                    'data_emissão_x': 'Data Emissão',
                    'data_emissão_y': 'Data Destino',
                    'valor_nota_fiscal': 'Valor Total',
                    'chave_de_acesso': 'Chave de Acesso'
                }

                df.rename(columns=lambda x: column_mapping.get(x.lower(), x.title().replace('_', ' ')), inplace=True)

                # Formatar valores monetários
                for col in df.select_dtypes(include=['number']).columns:
                    df[col] = df[col].apply(lambda x: f"{x:,.2f}".replace(",", "X").replace(".", ",").replace("X", "."))

                # Determinar tipo de resposta
                if len(df) == 1 and len(df.columns) == 1:
                    resposta_final = f"O resultado é: {df.iloc[0, 0]}"
                else:
                    markdown_table = df.to_markdown(index=False, tablefmt="pipe")
                    resposta_final = markdown_table

            except Exception as e:
                logger.error(f"❌ Erro ao processar resultado da query: {e}")
                resposta_final = sql_observation

        logger.info(f"✅ Resposta final:\n{resposta_final}")
        return {"status": "success", "message": resposta_final}

    except Exception as e:
        logger.exception("❌ Erro durante a consulta")
        return {"status": "error", "message": str(e)}


def extract_column_headers(sql_query: str) -> list:
    """Extrai cabeçalhos da parte SELECT da query."""
    try:
        match = re.search(r'SELECT\s+(.*?)\s+FROM', sql_query, re.IGNORECASE | re.DOTALL)
        if not match:
            return []
        cols = match.group(1).strip().split(',')
        headers = []
        for col in cols:
            col = col.strip()
            if ' AS ' in col.upper():
                alias = col.upper().split(' AS ')[1].strip()
                headers.append(alias.lower())
            elif '.' in col:
                headers.append(col.split('.')[-1].strip())
            else:
                headers.append(col)
        return headers
    except:
        return []


In [None]:
# ✅ START do servidor
# =====================================
def start_api():
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

nest_asyncio.apply()
threading.Thread(target=start_api, daemon=True).start()

# =====================================
# ✅ NGROK
# =====================================
public_url = ngrok.connect(8000)
logger.info(f"🚀 Sua API está disponível publicamente em: {public_url}")