In [1]:
%pip install -qqqr requirements.txt

Note: you may need to restart the kernel to use updated packages.


#### IMPORTS

In [2]:
from os import getenv
from os.path import exists
from pandas import read_csv, read_sql, DataFrame
import sqlalchemy as sqlalc
from dotenv import load_dotenv
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field
from langchain_google_genai import ChatGoogleGenerativeAI
from magic import from_file
import motor_ocr_otimizado as ocr
import streamlit as st

class SemResposta(Exception):
    pass

  from .autonotebook import tqdm as notebook_tqdm


#### <b>AGENTE 1: Aquisi√ß√£o de Documentos</b>
<b>Responsabilidade:</b> Obter e pr√©-processar documentos fiscais<br/><br/>
<b>Funcionalidades:</b>
<ul><li>Interface para upload manual de arquivos (PDF, imagens)</li></ul>
<ul><li>Integra√ß√£o com APIs de √≥rg√£os governamentais (SEFAZ)</li></ul>
<ul><li>Valida√ß√£o inicial de formato e integridade dos documentos</li></ul>
<ul><li>Organiza√ß√£o e cataloga√ß√£o dos arquivos recebidos</li></ul>

In [None]:
def agente1(): # FRONTEND

    print("Executando o agente 1...")
    
    st.set_page_config(page_title="Agente NFe", layout="centered")
    st.title("ü§ñ Agente Inteligente para Notas Fiscais")

    uploaded_file = st.file_uploader("üìÇ Envie um arquivo CSV, PDF ou PNG da NFe", type=["csv","pdf","png"])
    
    pergunta = st.text_input("üìù Digite sua pergunta sobre os dados:")
    
    if st.button("üîç Consultar"):
        if not uploaded_file:
            st.error("Voc√™ precisa fazer o upload de um arquivo CSV, PDF ou de uma imagem PNG.")
            
        elif not pergunta.strip():
            st.error("Digite uma pergunta v√°lida.")
            
        else:
            with st.spinner("Analisando os dados com IA..."):
                try:
                    resultado_df = agente3(pergunta, uploaded_file) # INTERA√á√ÉO COM O USU√ÅRIO

                    if (isinstance(resultado_df,str) and resultado_df == "SemResposta") or (isinstance(resultado_df, DataFrame) and resultado_df.empty):
                        st.warning("Consulta realizada, mas nenhum dado foi encontrado.")                  
                    
                    elif isinstance(resultado_df, DataFrame) and not resultado_df.empty:
                        st.success("‚úÖ Resultado encontrado:")
                        st.dataframe(resultado_df)                                        
                        
                except Exception as e:
                    st.error(f"Erro ao processar: {e}")

#### <b>AGENTE 2: Extra√ß√£o e Aprendizado</b>
<b>Responsabilidade:</b> Processar documentos e extrair dados relevantes<br/><br/>
<b>Funcionalidades:</b>
<ul><li>OCR avan√ßado para digitaliza√ß√£o de documentos</li></ul>
<ul><li>NLP para identifica√ß√£o e extra√ß√£o de campos espec√≠ficos</li></ul>
<ul><li>IA para adapta√ß√£o a novos layouts</li></ul>
<ul><li>Valida√ß√£o cruzada de dados extra√≠dos</li></ul>

In [None]:
def agente2(pergunta,llm,engine,arquivo):

    print('\nExecutando agente 2...')
    
    # CATALOGANDO OS ARQUIVOS NO BD
    inspector = sqlalc.inspect(engine) # INSPECTOR PARA LISTAR AS TABELAS DO BANCO DE DADOS

    tipo = from_file(arquivo, mime=True)
        
    """
        CSV -> text/plain
        PDF -> application/pdf
        PNG -> image/png
    """
    print(f"\nArquivo: {arquivo}, Tipo MIME detectado: {tipo}")
    
    # ESTOU AQUI
    if tipo != 'text/plain':
        imagem_proc = ocr.preprocessar_imagem(ocr.carregar_imagem(arquivo))
        texto = ocr.extrair_texto(imagem_proc)
        print("Texto extra√≠do:\n")
        print(texto)       
        
    else:
        # SER√Å CRIADO UM DATAFRAME PARA CADA ARQUIVO
        df = read_csv(arquivo)

        # INSERINDO COLUNA COM O NOME DO ARQUIVO NO DATAFRAME
        df['ARQUIVO'] = arquivo
    
    """
        Utilizando a LLM para identificar se os campos e registros da base de documentos, s√£o capazes de responder a pergunta
        do usu√°rio.

        Se sim, os arquivos s√£o persistidos no banco de dados, caso contr√°rio, o arquivo √© descartado.
    """
    # FORMATANDO A SA√çDA DA LLM COM JsonOutputParser
    class Resposta(BaseModel):
        resposta: str = Field(description="Responda Sim ou N√£o")

    parseador = JsonOutputParser(pydantic_object=Resposta)

    # CRIANDO O PROMPT PARA A LLM COM A SAIDA FORMATADA
    template = """√â poss√≠vel responder a pergunta {pergunta} do usu√°rio baseado no dataframe {df} ? {resposta}"""

    prompt_template = PromptTemplate(
                                        template=template,
                                        input_variables=["pergunta","df"],
                                        partial_variables={"resposta" : parseador.get_format_instructions()}
                                    )

    # CRIANDO A CADEIA DE EXECU√á√ÉO PARA A LLM
    chain = prompt_template | llm | parseador
    
    # INVOCANDO A LLM
    resposta = chain.invoke(input={"pergunta":pergunta, "df": df})['resposta']
        
    if resposta == "Sim":
        
        # PERSISTINDO OS DADOS NO BANCO DE DADOS
        print('Sim para o arquivo: ',arquivo)

        # PRECISA VERIFICAR SE A TABELA COM O NOME DO ARQUIVO J√Å EXISTE NO BANCO DE DADOS
        if arquivo in inspector.get_table_names():
            dftable = read_sql(arquivo, con=engine)

            # CUIDANDO DE DUPLICIDADE
            df = df[~df['CHAVE DE ACESSO'].isin(dftable['CHAVE DE ACESSO'])]

            # INSERINDO NO BANCO DE DADOS
            df.to_sql(name=arquivo, con=engine, if_exists='append', index=False)
        
        else:            
            df.to_sql(name=arquivo, con=engine, if_exists='replace', index=False)            
                    
        
        # FORMATANDO A SA√çDA DA LLM COM JsonOutputParser
        class Query(BaseModel):
            query: str = Field(description='Esta √© a query com DISTINCT aonde o nome de cada coluna e das tabelas devem ficar entre "')

        parseador = JsonOutputParser(pydantic_object=Query)

        # CRIANDO O PROMPT PARA A LLM COM A SAIDA FORMATADA
        template_query = """Qual query deve ser executada na tabela {nome_arquivo} com as colunas {colunas} para responder
        a pergunta {pergunta}? Se a query envolver as tabelas, deve ser feito um JOIN entre elas utlizando a coluna "CHAVE DE ACESSO" como chave. {formatacao_saida}"""

        prompt_template_query = PromptTemplate(
                                                template=template_query,
                                                input_variables=["pergunta","nome_arquivo","colunas"],
                                                partial_variables={"formatacao_saida" : parseador.get_format_instructions()}
                                              )

        # CRIANDO A CADEIA DE EXECU√á√ÉO PARA A LLM
        chain = prompt_template_query | llm | parseador

        with engine.connect() as con:
            query1 = sqlalc.text(f'PRAGMA table_info("{arquivo}")')
            rs = con.execute(query1)
            rows = rs.fetchall()
            colunas_query = sorted([col[1] for col in rows])
            
        query = chain.invoke(input={"pergunta":pergunta, "nome_arquivo":arquivo, "colunas":colunas_query})['query']

        print('\nQuery: ',query)

        # # OBTEN√á√ÉO DO RESULTADO DA QUERY
        with engine.connect() as con:
            df = read_sql(query, con)
            resposta = df

        return resposta
    
    elif resposta == "N√£o":
        return resposta

#### <b>AGENTE 3: Resposta e Intera√ß√£o</b>
<b>Responsabilidade:</b> Interface inteligente com usu√°rios<br/><br/>
<b>Funcionalidades:</b>
<ul><li>Integra√ß√£o com LLMs para consultas em linguagem natural.</li></ul>

In [None]:
def agente3(pergunta,arquivo):

    if not exists('nfs_data.db'): # CRIA√á√ÉO DO BANCO DE DADOS PARA A PRIMEIRA EXECU√á√ÉO
        print('\nCriando o banco de dados nfs_data...')
        DATABASE_URL = "sqlite:///nfs_data.db" # Define o nome do arquivo do banco de dados
        engine = sqlalc.create_engine(DATABASE_URL)

    else:
        engine = sqlalc.create_engine("sqlite:///nfs_data.db") # Conecta ao banco de dados existente


    # INTEGRA√á√ÉO COM A LLM
    load_dotenv() # CARREGANDO O ARQUIVO COM A API_KEY

    llm = ChatGoogleGenerativeAI(
        model="gemini-2.5-flash",  # ou "gemini-2.5-pro"
        temperature=0.5,
        google_api_key=getenv("GOOGLE_API_KEY")
    )


    try:
            print('\nExecutando agente 3...')

            print('\nPergunta: ',pergunta)

            resposta = agente2(pergunta,engine,arquivo,llm) # A ENGINE N√ÉO √â FECHADA AUTOMATICAMENTE, APENAS AS CONEX√ïES QUANDO USADAS COM WITH

            if (isinstance(resposta,DataFrame)): # VERIFICA SE A LLM RESPONDEU SIM PARA ALGUM ARQUIVO, OU SEJA, SE √â CAPAZ DE RESPONDER A PERGUNTA DO USU√ÅRIO COM O
                                                 # ARQUIVO FORNECIDO
               
               return resposta

            elif resposta == "N√£o":
                raise SemResposta

    except SemResposta:
            resposta = "SemResposta"
            return resposta # RETORNANDO A EXCE√á√ÉO PARA O FRONTEND, AGENTE 1

#### <b>TESTANDO</b>

In [None]:
if __name__ == "__main__":

     # TESTANDO OS TIPOS DE ARQUIVO
     #arquivo = "202401_NFs_Cabecalho.csv"
     #arquivo = "Grupo_01_Proposta_de_Projeto.pdf"
     arquivo = "nota_fiscal_exemplo.png"
     
     #print('Diret√≥rio atual: ',getcwd())
     
#    # EXEMPLOS DE PERGUNTA PARA TESTE. ELAS DEVEM SER OBTIDAS DO FRONTEND
     pergunta = "Qual √© a chave de acesso da nota 3510129 ?"
     #pergunta = "Quem descobriu o Brasil ?"
     #pergunta = "Qual √© a descri√ß√£o dos servi√ßos de nf com n√∫mero 2525 ?"
     #pergunta = "Qual √© a descri√ß√£o dos servi√ßos e a natureza da opera√ß√£o da nf com n√∫mero 2525 ?"

     #resposta = agente3(pergunta, arquivo)  # Chama a fun√ß√£o principal com a pergunta e o diret√≥rio
     #print('\nResposta: \n',resposta)
     
     
     #agente1()  # Executa a fun√ß√£o que inicia o agente
     agente3(pergunta, arquivo)


In [None]:
# EXPORTAR ESSE NOTEBOOK PARA UM SCRIPT PYTHON ANTES
!streamlit run agente_nfs.py

Usage: streamlit run [OPTIONS] TARGET [ARGS]...
Try 'streamlit run --help' for help.

Error: Streamlit requires raw Python (.py) files, not .ipynb.
For more information, please see https://docs.streamlit.io
