# Instala bibliotecas necess√°rias e importa m√≥dulos essenciais.



In [None]:
!pip install -q unsloth[colab-new] faiss-cpu sentence-transformers datasets accelerate trl peft transformers
!pip install --no-deps xformers "trl<0.9.0" peft accelerate bitsandbytes
!pip install transformers datasets

In [None]:
import json
import re
import os
import unicodedata
import faiss
import numpy as np
import torch
import random
import pandas as pd
import sqlite3

from unsloth import FastLanguageModel, is_bfloat16_supported
from transformers import pipeline
from transformers import TrainingArguments
from sentence_transformers import SentenceTransformer
from datasets import load_dataset
from datasets import Dataset
from trl import SFTTrainer
from dotenv import load_dotenv
from huggingface_hub import login
from datetime import date, timedelta

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Download e explora√ß√£o inicial dos dados

In [None]:
!git clone https://github.com/pubmedqa/pubmedqa.git

file_path = 'pubmedqa/data/ori_pqal.json'

with open(file_path, 'r') as f:
    data = json.load(f)

sample_key = list(data.keys())[0]
print(f"\nCampos dispon√≠veis: {list(data[sample_key].keys())}\n")

print("=" * 60)
print("Explora√ß√£o de dados - PubMedQA")
print("=" * 60)

for i, key in enumerate(list(data.keys())[:3]):
    item = data[key]

    print(f"\nExemplo {i+1} | ID: {key}")
    print("-" * 60)
    print(f"Question: {item.get('QUESTION', 'N/A')}")

    context = " ".join(item.get('CONTEXTS', []))
    print(f"Context: {context[:300]}...")

    print(f"Labels: {item.get('LABELS', 'N/A')}")
    print(f"Decision: {item.get('final_decision', 'N/A')}")
    print(f"Answer: {item.get('LONG_ANSWER', 'N/A')[:200]}...")
    print(f"Meshes: {item.get('MESHES', 'N/A')}")
    print(f"Year: {item.get('YEAR', 'N/A')}")
    print(f"Reasoning required pred: {item.get('reasoning_required_pred', 'N/A')}")
    print(f"Reasoning free pred: {item.get('reasoning_free_pred', 'N/A')}")

print(f"\n\nTotal de registros: {len(data)}")

# Pr√©-processamento - Limpeza, Normaliza√ß√£o e Anonimiza√ß√£o dos Textos

In [None]:
MAP_DECISION = {
  "yes": "SIM",
  "no": "N√ÉO",
  "maybe": "TALVEZ"
}


def anonymize_text(text):
    """Remove dados sens√≠veis (LGPD/HIPAA compliance)"""
    if not isinstance(text, str):
        return ""

    text = re.sub(r'(Dr\.|Dra\.|Doctor|Prof\.|MD)\s+[A-Z][a-z]+(\s+[A-Z][a-z]+)?', '[NOME]', text)
    text = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '[EMAIL]', text)
    locations = r'(Israel|Denmark|Chile|Texas|France|United Kingdom|UK|USA|Pakistan|Karachi|Jordan|Japan|Australia|North Carolina|Washington)'
    text = re.sub(locations, '[LOCAL]', text, flags=re.IGNORECASE)
    text = re.sub(r'\b\d{6,}\b', '[ID]', text)
    text = re.sub(r'\b(19|20)\d{2}\b', '[ANO]', text)
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '[URL]', text)
    return text

def clean_text(text):
  if not text:
    return ""
  text = unicodedata.normalize("NFKC", text)
  text = re.sub(r"\s+", " ", text).strip()

  text = anonymize_text(text)

  return text


processed_docs = []

for item in data.values():
  question = clean_text(item.get("QUESTION"))
  context = clean_text(" ".join(item.get("CONTEXTS", [])))
  answer = clean_text(item.get("LONG_ANSWER", ""))
  decision = MAP_DECISION.get(item.get("final_decision", ""), "")


  if question and context:
    processed_docs.append({
      "question": question,
      "context": context,
      "answer": answer,
      "decision": decision,
      "year": item.get("YEAR")
    })


print(len(processed_docs))

# Preparando conjunto de dados em portugu√™s para treino de tradu√ß√£o

In [None]:
!git clone https://github.com/diegosdomingos/tech-challenge-3.git

file_path = 'tech-challenge-3/data/language_alignment_pt.jsonl'

dataset = load_dataset(
    "json",
    data_files="/content/drive/MyDrive/rag/language_alignment_pt.json",  #Alterar quando o dataset j√° estiver na main
    split="train"
)

print(dataset.column_names)

def messages_to_text(example):
    text = ""
    for msg in example["messages"]:
        if msg["role"] == "system":
            text += f"<<SYS>>\n{msg['content']}\n<</SYS>>\n\n"
        elif msg["role"] == "user":
            text += f"[INST] {msg['content']} [/INST]\n"
        elif msg["role"] == "assistant":
            text += msg["content"]
    return {"text": text}

dataset = dataset.map(
    messages_to_text,
    batched=False,
    remove_columns=["messages"]
)

print(dataset.column_names)
print(dataset[0])

# Estrutura√ß√£o de Documentos para RAG (Recupera√ß√£o + Gera√ß√£o)
## - Cria textos formatados unificando quest√£o, contexto e resposta para busca sem√¢ntica.

In [None]:
rag_documents = []

for d in processed_docs:
  text = f"""
  Pergunta cient√≠fica:
  {d['question']}


  Evid√™ncia:
  {d['context']}


  Conclus√£o:
  {d['answer']}
  """
  rag_documents.append(text.strip())


len(rag_documents)

# Gera√ß√£o de Embeddings e Constru√ß√£o de √çndice FAISS

In [None]:
embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
embeddings = embedder.encode(rag_documents, show_progress_bar=True)

index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(np.array(embeddings))

# Create the directory if it doesn't exist
os.makedirs('/content/drive/MyDrive/rag', exist_ok=True)

faiss.write_index(index, "/content/drive/MyDrive/rag/medical_index.faiss")

with open('/content/drive/MyDrive/rag/medical_docs.json', 'w') as f:
  json.dump(rag_documents, f)

# Configura√ß√£o do Modelo para Treinamento com LoRA
## - Carrega modelo base e aplica adapta√ß√£o LoRA para reduzir custo de treino.

In [None]:
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/llama-3-8b-bnb-4bit",
    max_seq_length = 2048,
    dtype = None,
    load_in_4bit = True,
    device_map="auto"
)

model = FastLanguageModel.get_peft_model(
    model,
    r = 16,
    target_modules = ["q_proj", "k_proj", "v_proj", "o_proj"],
    lora_alpha = 16,
    lora_dropout = 0.05,
)

# Treinamento Supervisionado do Assistente M√©dico em Portugu√™s

In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=2,
    learning_rate=2e-4,
    logging_steps=5,
    save_strategy="epoch",
    fp16=True,
    report_to="none"
)

trainer = SFTTrainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=dataset,
    args=training_args,
    dataset_text_field="text",
    max_seq_length=512,
    packing=False
)

trainer.train()

# Autentica√ß√£o e Upload do Modelo Treinado no HuggingFace

In [None]:

# Carregar env
ENV_PATH = "/content/drive/MyDrive/token-hf/env"
load_dotenv(ENV_PATH)

HF_TOKEN = os.getenv("HF_TOKEN")
login(token=HF_TOKEN)

HF_REPO = f"{HF_USER_REPO}/assistente-medico-lora"

model.push_to_hub(HF_REPO)
tokenizer.push_to_hub(HF_REPO)

# Configura√ß√£o da Pipeline de Gera√ß√£o + Prompt do Assistente M√©dico

In [None]:
SYSTEM_PROMPT = """
Voc√™ √© um assistente m√©dico virtual.
Responda sempre em portugu√™s, com linguagem clara, emp√°tica e baseada em evid√™ncias cient√≠ficas.


Regras:
- N√£o invente informa√ß√µes.
- Se n√£o houver evid√™ncia suficiente, diga isso explicitamente.
- N√£o prescreva medicamentos nem indique tratamentos espec√≠ficos.
- N√£o indique rem√©dios ou tratamentos.
- Quando perguntarem por algum rem√©dio, voc√™ deve responder: N√£o estou autorizado a prescrever medicamentos, por favor, consulte um m√©dico. <FIM>
- Sempre cite a fonte da informa√ß√£o cient√≠fica

Responda de forma objetiva e finalize sempre a primeira resposta objetiva com o texto: <FIM>
"""

with open('/content/drive/MyDrive/rag/medical_docs.json') as f:
  docs = json.load(f)

index = faiss.read_index('/content/drive/MyDrive/rag/medical_index.faiss')


llm = pipeline(
  "text-generation",
  model=model,
  tokenizer=tokenizer,
  max_new_tokens=512,
  temperature=0.0,
  do_sample=False,
  repetition_penalty=1.1,
  return_full_text=False,
  eos_token_id=tokenizer.eos_token_id
  )

def retrieve_context(question, k=3):
  q_emb = embedder.encode([question])
  _, idx = index.search(q_emb, k)
  return "\n\n".join([docs[i] for i in idx[0]])

def medical_chat(question):

  # Adiciona <FIM> ao final da string `question` se n√£o estiver presente
  if "<FIM>" not in question:
    question += " <FIM>"

  context = retrieve_context(question)

  prompt = f"""
{SYSTEM_PROMPT}


Contexto cient√≠fico relevante:
{context}


Pergunta: {question}
Resposta:
"""
  output = llm(prompt)[0]["generated_text"]

  # üîí corta tudo depois do token de fim
  output = output.split("<FIM>")[0]

  return output.strip()

# Cria√ß√£o de dataset de prontu√°rio (Fict√≠cio) com SQLite

In [None]:
DB_PATH = "prontuarios.db"

conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

cursor.execute("""
CREATE TABLE IF NOT EXISTS pacientes (
    patient_id TEXT PRIMARY KEY,
    nome TEXT,
    data_nascimento TEXT,
    idade INTEGER,
    sexo TEXT,
    alergias TEXT,
    comorbidades TEXT
)
""")

cursor.execute("""
CREATE TABLE IF NOT EXISTS atendimentos (
    atendimento_id INTEGER PRIMARY KEY AUTOINCREMENT,
    patient_id TEXT,
    data_atendimento TEXT,
    queixa_principal TEXT,
    anamnese TEXT,
    diagnostico TEXT,
    conduta TEXT,
    tratamentos_em_andamento TEXT,
    exames_solicitados TEXT,
    observacoes TEXT,
    FOREIGN KEY(patient_id) REFERENCES pacientes(patient_id)
)
""")

conn.commit()
conn.close()

# Populando os dados fict√≠cios

In [None]:
nomes = [
    "Ana Paula Souza", "Ana Carolina Lima", "Bruno Silva", "Carlos Eduardo Rocha",
    "Daniela Martins", "Eduardo Nogueira", "Fernanda Alves", "Gabriel Pacheco",
    "Helena Ribeiro", "Igor Farias", "Juliana Torres", "Lucas Fernandes",
    "Mariana Araujo", "Natalia Pacheco", "Otavio Nunes", "Paula Guedes",
    "Rafael Moreira", "Sabrina Lopes", "Thiago Barros", "Vanessa Farias",
    "William Teixeira", "Ana Beatriz Costa"
]

diagnosticos = [
    "Hipertens√£o arterial sist√™mica",
    "Diabetes mellitus tipo 2",
    "Asma br√¥nquica",
    "Infec√ß√£o do trato urin√°rio",
    "Pneumonia adquirida na comunidade",
    "Transtorno de ansiedade generalizada",
    "Gastrite cr√¥nica",
    "Enxaqueca cr√¥nica"
]

alergias_lista = [
    "Dipirona", "Penicilina", "Sulfa", "Nenhuma conhecida"
]

comorbidades_lista = [
    "Hipertens√£o", "Diabetes", "Dislipidemia", "Obesidade", "Nenhuma"
]

def gerar_data_nascimento(idade):
    hoje = date.today()
    return hoje - timedelta(days=idade * 365)

conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()

for i, nome in enumerate(nomes, start=1):
    idade = random.randint(18, 85)
    patient_id = f"PAT{i:04d}"

    cursor.execute("""
        INSERT OR IGNORE INTO pacientes
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (
        patient_id,
        nome,
        gerar_data_nascimento(idade).isoformat(),
        idade,
        random.choice(["F", "M"]),
        random.choice(alergias_lista),
        random.choice(comorbidades_lista)
    ))

    for _ in range(random.randint(1, 4)):
        cursor.execute("""
            INSERT INTO atendimentos (
                patient_id,
                data_atendimento,
                queixa_principal,
                anamnese,
                diagnostico,
                conduta,
                tratamentos_em_andamento,
                exames_solicitados,
                observacoes
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            patient_id,
            (date.today() - timedelta(days=random.randint(1, 1200))).isoformat(),
            "Dor, mal-estar e sintomas gerais",
            "Paciente relata in√≠cio dos sintomas h√° alguns dias, sem fatores agravantes claros.",
            random.choice(diagnosticos),
            "Conduta expectante e acompanhamento ambulatorial",
            "Uso cont√≠nuo de medica√ß√£o conforme prescri√ß√£o",
            "Hemograma completo, glicemia, PCR",
            "Paciente orientado quanto aos sinais de alarme"
        ))

conn.commit()
conn.close()

# Regras para consultar prontu√°rios

In [None]:
def buscar_pacientes_por_nome(nome_parcial: str):
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    cursor.execute("""
        SELECT patient_id, nome, idade
        FROM pacientes
        WHERE LOWER(nome) LIKE LOWER(?)
    """, (f"%{nome_parcial}%",))

    resultados = cursor.fetchall()
    conn.close()
    return resultados


def carregar_prontuario(patient_id: str):
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    cursor.execute("""
        SELECT patient_id, nome, data_nascimento, idade, sexo, alergias, comorbidades
        FROM pacientes
        WHERE patient_id = ?
    """, (patient_id,))
    paciente = cursor.fetchone()

    cursor.execute("""
        SELECT data_atendimento, queixa_principal, anamnese, diagnostico,
               conduta, tratamentos_em_andamento, exames_solicitados, observacoes
        FROM atendimentos
        WHERE patient_id = ?
        ORDER BY data_atendimento DESC
    """, (patient_id,))
    atendimentos = cursor.fetchall()

    conn.close()

    if not paciente:
        return "Paciente n√£o encontrado."

    texto = f"""
PACIENTE
ID: {paciente[0]}
Nome: {paciente[1]}
Data de nascimento: {paciente[2]}
Idade: {paciente[3]}
Sexo: {paciente[4]}
Alergias: {paciente[5]}
Comorbidades: {paciente[6]}

ATENDIMENTOS:
"""

    for a in atendimentos:
        texto += f"""
Data: {a[0]}
Queixa principal: {a[1]}
Anamnese: {a[2]}
Diagn√≥stico: {a[3]}
Conduta: {a[4]}
Tratamentos em andamento: {a[5]}
Exames solicitados: {a[6]}
Observa√ß√µes: {a[7]}
---
"""

    return texto


# LLM para entender a pergunta do prontu√°rio (Utiliza√ß√£o da mesma llm LLaMA)

In [None]:
def extrair_intencao_e_nome(pergunta: str):
    prompt = f"""
Extraia informa√ß√µes estruturadas da pergunta abaixo.

Pergunta:
"{pergunta}"

Responda APENAS em JSON, sem nenhum texto extra:

{{
  "intencao": "consultar_prontuario" | "outra",
  "nome_paciente": string | null
}}

Responda de forma objetiva e finalize sempre a primeira resposta objetiva com o texto: <FIM>

"""
    resposta = medical_chat(prompt)

    try:
        json_str = re.search(r"\{.*\}", resposta, re.DOTALL).group()
        return json.loads(json_str)
    except:
        return {"intencao": "outra", "nome_paciente": None}


# Router (Consulta prontu√°rio ou Dados de pubmedqa?)

In [None]:
estado = {
    "aguardando_escolha": False,
    "opcoes": []
}

def router_chat(pergunta: str):

    # 1Ô∏è‚É£ Se o usu√°rio precisa escolher um paciente
    if estado["aguardando_escolha"]:
        escolha = pergunta.strip().lower()

        for pid, nome, idade in estado["opcoes"]:
            if escolha == pid.lower() or escolha == nome.lower():
                estado["aguardando_escolha"] = False
                estado["opcoes"] = []
                prontuario = carregar_prontuario(pid)
                return medical_chat(prontuario)

        return medical_chat(
            "Ainda existem mais de um paciente poss√≠vel. "
            "Pode informar o ID ou o nome completo?"
        )

    # 2Ô∏è‚É£ Entender a pergunta
    parsed = extrair_intencao_e_nome(pergunta)

    if parsed["intencao"] != "consultar_prontuario":
        return medical_chat(pergunta)

    nome = parsed["nome_paciente"]

    if not nome:
        return medical_chat(
            "Claro üôÇ Qual √© o nome do paciente que voc√™ deseja consultar?"
        )

    pacientes = buscar_pacientes_por_nome(nome)

    if len(pacientes) == 0:
        return medical_chat(
            "N√£o encontrei nenhum paciente com esse nome. "
            "Voc√™ pode informar o nome completo?"
        )

    if len(pacientes) > 1:
        estado["aguardando_escolha"] = True
        estado["opcoes"] = pacientes

        lista = "\n".join(
            [f"- {nome} (ID: {pid}, Idade: {idade})"
             for pid, nome, idade in pacientes]
        )

        return medical_chat(
            "Encontrei mais de um paciente com esse nome:\n\n"
            f"{lista}\n\n"
            "Qual deles voc√™ deseja consultar?"
        )

    prontuario = carregar_prontuario(pacientes[0][0])
    return medical_chat(prontuario)


# Testes de Consulta ao Assistente M√©dico com Exemplos


In [None]:
# Teste normal de consulta
question = "O que a literatura indica sobre o uso de aspirina em preven√ß√£o prim√°ria?"
print(f"Resposta 1 (normal): {medical_chat(question)}")

# Testa restri√ß√£o
question = "Qual medicamento √© eficaz para pedra nos rins?"
print(f"Resposta 2 (restri√ß√£o): {medical_chat(question)}")

# CHAT Assistente

In [None]:
print("Assistente M√©dico iniciado")
print("Digite 'sair' para encerrar\n")

while True:
    user_input = input("Voc√™: ").strip()

    if user_input.lower() in ["sair", "exit", "quit"]:
        print("Assistente: Sess√£o encerrada.")
        break

    resposta = router_chat(user_input)
    print(f"Assistente: {resposta}\n")
