# Imports

In [60]:
import os
import sys
import faiss
import time
import logging
import numpy as np
import pickle
import json
from typing import List, Dict, Any
import faiss
import torch
import yaml
from transformers import AutoModel, AutoTokenizer, AutoProcessor
from PIL import Image
import re

# Importa vector stores

In [12]:
vs_study = faiss.read_index("artifacts/vector_store/faiss_study.index")  # Carregar o √≠ndice
vs_img = faiss.read_index("artifacts/vector_store/faiss_img.index")  # Carregar o √≠ndice
vs_text = faiss.read_index("artifacts/vector_store/faiss_txt.index")  #

# Fun√ß√µes de busca

In [13]:
def search_relevant_cases(query_emb, vector_store, ids, k=3):
    """"
    Pesquisa os estudos mais relevantes para uma determinada consulta.
    Args:
        - query_emb: embedding do estudo/imagem/texto de consulta
        - vector_store: vector store
        - k: n√∫mero de casos mais relevantes a serem retornados
    Returns:
        - IDs dos casos mais relevantes
        - Casos mais relevantes
    """
    # Preparar o vetor de consulta
    if isinstance(query_emb, np.ndarray):
        # Garantir que √© 2D e float32
        if query_emb.ndim == 1:
            query_embedding = query_emb.reshape(1, -1).astype(np.float32)
        else:
            query_embedding = query_emb.astype(np.float32)
    else:
        query_embedding = np.array(query_emb, dtype=np.float32).reshape(1, -1)
    
    # Normalizar o vetor de consulta
    faiss.normalize_L2(query_embedding)

    # Pesquisar os casos mais relevantes
    D, I = vector_store.search(query_embedding, k)

    # Retornar os IDs dos casos mais relevantes
    results = []
    for idx in I[0]:
        results.append(ids[idx])
        
    return results, I[0] 

## Teste para estudo

In [1]:
from src.f_utils.embedding_utils import load_embeddings

In [6]:
studies_emb = load_embeddings("artifacts/embeddings/e_study.npy")

‚úÖ Embeddings carregados com sucesso!
üìä Formato dos dados: <class 'numpy.ndarray'>
üìä Shape: (227835, 1152)


In [132]:
type(studies_emb)

numpy.ndarray

In [10]:
study_ids = load_embeddings("artifacts/embeddings/study_ids.npy")

‚úÖ Embeddings carregados com sucesso!
üìä Formato dos dados: <class 'numpy.ndarray'>
üìä Shape: (227835,)


In [7]:
ex_estudo = studies_emb[0]  # Exemplo de estudo para teste
ex_estudo.shape

(1152,)

In [18]:
estudos, idx = search_relevant_cases(ex_estudo, vs_study, study_ids, k=3)
print(f"- Estudos mais relevantes: {estudos}")
print(f"- √çndices dos estudos mais relevantes: {idx}")  

- Estudos mais relevantes: ['s50414267', 's53634677', 's52231919']
- √çndices dos estudos mais relevantes: [    0 81535 56599]


## Teste para imagem

In [19]:
images_emb = load_embeddings("artifacts/embeddings/e_img.npy")

‚úÖ Embeddings carregados com sucesso!
üìä Formato dos dados: <class 'numpy.ndarray'>
üìä Shape: (227835, 1152)


In [20]:
ex_img = images_emb[0]  # Exemplo de imagem para teste
ex_img.shape

(1152,)

In [22]:
imagens, idx_img = search_relevant_cases(ex_img, vs_img, study_ids, k=3)
print(f"- Imagens mais relevantes pertencem aos estudos: {imagens}")
print(f"- √çndices das imagens mais relevantes: {idx_img}")

- Imagens mais relevantes pertencem aos estudos: ['s59408044', 's54168089', 's57378102']
- √çndices das imagens mais relevantes: [163390 101213  33199]


## Teste para texto

In [24]:
texts_emb = load_embeddings("artifacts/embeddings/e_text.npy")

‚úÖ Embeddings carregados com sucesso!
üìä Formato dos dados: <class 'numpy.ndarray'>
üìä Shape: (227835, 1152)


In [25]:
ex_txt = texts_emb[0]  # Exemplo de texto para teste
ex_txt.shape

(1152,)

In [26]:
laudos, idx_txt = search_relevant_cases(ex_txt, vs_text, study_ids, k=3)
print(f"- Laudos mais relevantes pertencem aos estudos: {laudos}")
print(f"- √çndices dos laudos mais relevantes: {idx_txt}")

- Laudos mais relevantes pertencem aos estudos: ['s50414267', 's58823460', 's53634677']
- √çndices dos laudos mais relevantes: [     0 221444  81535]


# Fun√ß√£o: retorno de estudo a partir de estudo de entrada

In [61]:
def _extract_findings(self, report_text):
        """
        Extrai o texto entre 'FINDINGS:' e a pr√≥xima se√ß√£o.
        Caso n√£o encontre, retorna o texto completo truncado.
        """
        findings = ""
        try:
            match = re.search(r"FINDINGS:(.*?)(?:IMPRESSION:|CONCLUSION:|$)", report_text, flags=re.S | re.I)
            if match:
                findings = match.group(1).strip()
            else:
                # Normaliza quebras de linha para \n
                text = report_text.replace("\r\n", "\n").replace("\r", "\n").strip()

                # Divide em blocos por linhas em branco (um ou mais \n com espa√ßos possivelmente)
                blocks = re.split(r"\n\s*\n+", text)

                best_block = ""
                best_score = -1

                for block in blocks:
                    b = block.strip()

                    # Remove um poss√≠vel cabe√ßalho "T√çTULO:" no in√≠cio do bloco
                    # (t√≠tulos normalmente em mai√∫sculas, n√∫meros e s√≠mbolos comuns)
                    b_clean = re.sub(r"^\s*[A-Z0-9 ,./()\-]+:\s*", "", b)

                    # Se ficar vazio, volta ao bloco original
                    if not b_clean:
                        b_clean = b

                    # Calcula um "score" para decidir o maior bloco:
                    # - comprimento do texto sem colapsar as quebras (para preservar formato)
                    score = len(b_clean)

                    if score > best_score:
                        best_score = score
                        best_block = b_clean

                findings = best_block.strip()
        except:
            findings = report_text.strip()
        return findings
    

In [None]:
def extract_embeddings_from_text(text,model,tokenizer):
    """
    Extrai embeddings de um texto usando um modelo de linguagem pr√©-treinado.
    Args:
        - text: texto a ser tranformado em embedding (assume que o pr√©-processamento j√° foi feito).
        - tokenizer: tokenizador do modelo.
        - model: modelo de linguagem pr√©-treinado.
    Returns:
        - embedding: embedding do texto.
    """

    # Tokenizar o texto
    tokens = tokenizer(text, truncation=True, padding="max_length", return_tensors="pt")

    # Passar os tokens pelo modelo
    with torch.no_grad():
        emb = model.get_text_features(**tokens)
        emb = emb / emb.norm(dim=-1, keepdim=True)
    
    return emb

In [47]:
with open("configs/configs.yaml", "r") as f:
    config = yaml.safe_load(f)

model = AutoModel.from_pretrained(config['processor']['model'], token=config['processor']['auth_token']).to('cpu')
tokenizer = AutoTokenizer.from_pretrained(config['processor']['model'], token=config['processor']['auth_token'])
processor = AutoProcessor.from_pretrained(config['processor']['model'], token=config['processor']['auth_token'])

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


In [45]:
extract_embeddings_from_text("Teste", model, tokenizer)

tensor([[ 0.0225,  0.0067, -0.0090,  ..., -0.0192, -0.0099, -0.0047]])

In [53]:
def extract_embeddings_from_img(imgs,model,processor):
    """
    Extrai embeddings de uma imagem usando um modelo de linguagem pr√©-treinado.
    Args:
        - imgs: lista de imagens .jpg a ser tranformada em embedding
        - model: modelo de linguagem pr√©-treinado
        - processor: processador do modelo
    Returns:
        - embeddings: lista embeddings das imagens
    """

    emb_imgs = []
    for img in imgs:
        # Carregar a imagem
        img = Image.open(img).convert("RGB")

        # processar a imagem
        inputs = processor(images=img, return_tensors="pt")

        # Passar os tokens pelo modelo
        with torch.no_grad():
            emb = model.get_image_features(**inputs)
            emb = emb / emb.norm(dim=-1, keepdim=True)
        
        emb_imgs.append(emb)

    return emb_imgs

In [58]:
imagem = ["../dados/mimic/files/p10/p10000032/s50414267/02aa804e-bde0afdd-112c0b34-7bc16630-4e384014.jpg"]
extract_embeddings_from_img(imagem,model,processor)

[tensor([[-0.0110,  0.0186,  0.0156,  ..., -0.0484, -0.0240, -0.0201]])]

In [129]:
def extract_embedding_single_study(texto, imagens, model, tokenizer, processor, alpha=0.5):
    """
    Extrai embeddings de texto e imagens a partir de um de estudo.
    Args:
        - texto: texto do estudo (laudo - partindo do pre suposto que j√° passou por extract_findings)
        - imagens: lista de imagens do estudo
        - alpha: peso de ponderacao dos embeddings de texto e imagem  
    """

    # === 1) tokens e embeddings do texto ===
    emb_text = extract_embeddings_from_text(texto, tokenizer=tokenizer, model=model)#.detach().numpy()  # embedding do texto

    # === 2) tokens e embeddings das imagens ===
    emb_images = extract_embeddings_from_img(imagens, model=model, processor=processor)

    # === 3) faz pooling com imagens de entrada
    # Stacking para [N, D]
    emb_images = torch.stack(emb_images)  # [num_imagens, embedding_dim]
    
    # Pooling (m√©dia) ao longo das imagens
    emb_pool = emb_images.mean(dim=0)  # [embedding_dim]
    emb_pool = emb_pool / emb_pool.norm(dim=-1, keepdim=True)
    #print(emb_pool.shape)

    # === 4) fez media dos embeddings para embedding final
    e_study = alpha * emb_text + (1 - alpha) * emb_pool
    e_study = e_study / e_study.norm(dim=-1, keepdim=True)

    return e_study

In [None]:
def load_study_from_path(study_id, root_dir = "..dados/mimic/files"):
    """
    Fun√ß√£o que carrega laudo e imagens a partir de um caminho de pasta de estudo.
    Args:
        - study_id: pasta do estudo
    Returns:
        - dicionario de laudo e imagens
    """

    # Procurar o arquivo em todos os subdiret√≥rios
    text_path = ''
    pastas_p = [nome for nome in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, nome))]
    for dirpath in pastas_p:
        pastas_pacientes = [nome for nome in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, nome))]

        if f"{study_id}.txt" in filenames:
            text_path = os.path.join(dirpath, f'{study_id}.txt')
            print(f"Arquivo encontrado: {text_path}")
            break
    else:
        print("Arquivo n√£o encontrado.")

    # Procurar pasta do e listar arquivos (imagens)
    target_folder = study_id
    imagens = []

    for dirpath, dirnames, filenames in os.walk(root_dir):
        if target_folder in dirnames:
            folder_path = os.path.join(dirpath, target_folder)
            arquivos = os.listdir(folder_path)
            print(f"Pasta encontrada: {folder_path}")
            print("Arquivos:")
            for arquivo in arquivos:
                img = os.path.join(folder_path, arquivo)
                print(img)
                imagens.append(img)
            break
    else:
        print("Pasta n√£o encontrada.")

    estudo = {
        'imgens': imagens,
        'laudo': text_path
    }

    return estudo
    

In [97]:
caminho = '../dados/mimic/files/p10/p10000032/'
estudo = 's50414267'

# Carregar texto do laudo
with open(caminho + estudo + '.txt', 'r') as f:
    texto = f.read()

# Passar pelo m√©todo de extra√ß√£o de findings
findings = _extract_findings(None, texto)  # Se for m√©todo de classe, ajuste para self ou use como fun√ß√£o

exemplo = {
    'laudo': findings,
    'imagens': [caminho+estudo+'/'+f for f in os.listdir(caminho + estudo) if f.lower().endswith(".jpg")]
}
exemplo

{'laudo': 'There is no focal consolidation, pleural effusion or pneumothorax.  Bilateral\n nodular opacities that most likely represent nipple shadows. The\n cardiomediastinal silhouette is normal.  Clips project over the left lung,\n potentially within the breast. The imaged upper abdomen is unremarkable.\n Chronic deformity of the posterior left sixth and seventh ribs are noted.',
 'imagens': ['../dados/mimic/files/p10/p10000032/s50414267/174413ec-4ec4c1f7-34ea26b7-c5f994f8-79ef1962.jpg',
  '../dados/mimic/files/p10/p10000032/s50414267/02aa804e-bde0afdd-112c0b34-7bc16630-4e384014.jpg']}

In [109]:
extract_embeddings_from_img(exemplo['imagens'], model, processor)

[tensor([[-0.0489,  0.0295, -0.0231,  ..., -0.0157, -0.0255, -0.0183]]),
 tensor([[-0.0110,  0.0186,  0.0156,  ..., -0.0484, -0.0240, -0.0201]])]

In [111]:
extract_embeddings_from_text(exemplo['laudo'],model=model,tokenizer=tokenizer)

tensor([[-0.0141,  0.0146, -0.0266,  ..., -0.0346, -0.0764, -0.0155]])

In [130]:
extract_embedding_single_study(
    texto=exemplo['laudo'],
    imagens=exemplo['imagens'],
    model=model,
    tokenizer=tokenizer,
    processor=processor,
    alpha=0.5
).detach().cpu().numpy().shape

(1, 1152)

In [131]:
emb_estudo_exemplo = extract_embedding_single_study(
    texto=exemplo['laudo'],
    imagens=exemplo['imagens'],
    model=model,
    tokenizer=tokenizer,
    processor=processor,
    alpha=0.5
).detach().cpu().numpy()

estudos, idx = search_relevant_cases(emb_estudo_exemplo, vs_study, study_ids, k=3)
print(f"- Estudos mais relevantes: {estudos}")
print(f"- √çndices dos estudos mais relevantes: {idx}")  

- Estudos mais relevantes: ['s50414267', 's58823460', 's58997875']
- √çndices dos estudos mais relevantes: [     0 221444   5166]
