# Sistema de Recomendação por Imagens

Este notebook documenta e demonstra o projeto de recomendação por similaridade visual. O sistema utiliza uma CNN pré-treinada (ResNet) para extrair embeddings das imagens e medir similaridade via cosseno.

Pipeline: coleta de imagens → extração de embeddings ResNet → normalização L2 → similaridade por cosseno → top-K resultados.

Principais componentes:
- `ImageEmbedder`: extrai embeddings da imagem.
- `index_images`: percorre um diretório, gera embeddings e salva o índice (`index.npz`).
- `recommend`: consulta o índice e retorna os itens mais similares.
- `write_jsonl`: persiste a saída em JSONL.

Consulte o README para instruções completas de CLI e testes.


## Instalação

Instale dependências (se necessário):

```
pip install torch torchvision pillow numpy python-dotenv
```

Opcional (Jupytext):

```
jupytext --version
```


### Importação de Bibliotecas


In [None]:
# Importações da biblioteca padrão
import os
import sys
import argparse
import json
import time
import tempfile
import shutil
from typing import List, Tuple

# Importações de terceiros para manipulação de arrays e imagens
import numpy as np
from PIL import Image
import subprocess

In [None]:
try:
    # Tenta carregar variáveis de ambiente de um arquivo .env, se existir
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    # Ignora se a biblioteca dotenv não estiver instalada ou falhar
    pass

### Importações de Deep Learning


In [None]:
# Importações do PyTorch e Torchvision para modelos de visão computacional
import torch
import torchvision.transforms as T
from torchvision import models

### Utilitário de Cronometragem


In [None]:
class Timer:
    """Classe utilitária para medir tempo de execução."""
    def __init__(self):
        # Marca o tempo inicial na instanciação
        self.t = time.time()
    def tick(self) -> float:
        # Calcula a diferença de tempo desde a última chamada ou inicialização
        now = time.time()
        d = now - self.t
        self.t = now
        return d

### Configurações Globais


In [None]:
# Define as extensões de imagem suportadas pelo sistema
SUPPORTED_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}

### Configurações Globais


In [None]:
# Define as extensões de imagem suportadas pelo sistema
SUPPORTED_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}

### Pré-processamento de Imagens


In [None]:
def _default_transforms() -> T.Compose:
    """
    Define as transformações padrão para alimentar a ResNet.
    """
    return T.Compose([
        T.Resize(256),       # Redimensiona a imagem
        T.CenterCrop(224),   # Corta o centro 224x224
        T.ToTensor(),        # Converte para Tensor do PyTorch
        # Normaliza com as médias e desvios padrão do ImageNet
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

### Motor de Extração de Embeddings


In [None]:
class ImageEmbedder:
    """
    Classe responsável por carregar o modelo e extrair vetores de características (embeddings).
    """
    def __init__(self, device: str = "cpu", model_name: str = "resnet50"):
        # Define o dispositivo (CPU ou GPU)
        self.device = torch.device(device if torch.cuda.is_available() or device == "cpu" else "cpu")
        self.model_name = model_name
        # Carrega o modelo e as transformações
        self.model, self.transform = self._load_model_and_transform()
        self.model.eval() # Modo de avaliação (não treina)
        self.model.to(self.device) # Move para o dispositivo correto

    def _load_model_and_transform(self):
        """Carrega a arquitetura ResNet pré-treinada."""
        weights = None
        transform = None
        model = None
        try:
            # Tenta carregar com pesos atualizados da ImageNet
            if self.model_name == "resnet50":
                weights = models.ResNet50_Weights.IMAGENET1K_V2
                model = models.resnet50(weights=weights)
                transform = weights.transforms()
            elif self.model_name == "resnet18":
                weights = models.ResNet18_Weights.IMAGENET1K_V1
                model = models.resnet18(weights=weights)
                transform = weights.transforms()
            else:
                # Fallback para ResNet50
                weights = models.ResNet50_Weights.IMAGENET1K_V2
                model = models.resnet50(weights=weights)
                transform = weights.transforms()
        except Exception:
            # Fallback para pesos None e transforms manuais se falhar (ex: sem internet ou versão antiga)
            if self.model_name == "resnet18":
                model = models.resnet18(weights=None)
            else:
                model = models.resnet50(weights=None)
            transform = _default_transforms()
        
        # Remove a última camada (classificador) para obter apenas as features
        backbone = torch.nn.Sequential(*list(model.children())[:-1])
        return backbone, transform

    def embed(self, image_path: str) -> np.ndarray:
        """Gera o vetor de embedding para uma única imagem."""
        if not os.path.isfile(image_path):
            raise FileNotFoundError(f"Arquivo não encontrado: {image_path}")
        try:
            img = Image.open(image_path).convert("RGB")
        except Exception as e:
            raise RuntimeError(f"Falha ao abrir imagem: {image_path}: {str(e)}")
            
        # Executa a inferência sem calcular gradientes
        with torch.inference_mode():
            x = self.transform(img).unsqueeze(0).to(self.device) # Adiciona dimensão de batch
            feat = self.model(x)
            feat = feat.view(feat.size(0), -1) # Flatten
            v = feat[0].detach().cpu().numpy().astype(np.float32) # Converte para numpy
        return v

### Persistência de Dados


In [None]:
def save_index(embeddings: np.ndarray, paths: List[str], out_path: str):
    """Salva os embeddings e os caminhos dos arquivos em um arquivo comprimido .npz."""
    os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
    # Salva arrays numpy comprimidos
    np.savez_compressed(out_path, embeddings=embeddings, paths=np.array(paths))

### Carregamento de Dados


In [None]:
def load_index(index_path: str) -> Tuple[np.ndarray, List[str]]:
    """Carrega o índice de embeddings do disco."""
    if not os.path.isfile(index_path):
        raise FileNotFoundError(f"Índice não encontrado: {index_path}")
    z = np.load(index_path, allow_pickle=True)
    emb = z["embeddings"].astype(np.float32)
    paths = list(z["paths"].tolist())
    # Validação básica da integridade dos dados
    if emb.ndim != 2 or len(paths) != emb.shape[0]:
        raise RuntimeError("Índice inválido")
    return emb, paths

### Normalização Vetorial


In [None]:
def normalize_rows(mat: np.ndarray) -> np.ndarray:
    """Normaliza as linhas de uma matriz (L2 norm) para cálculo de similaridade de cosseno."""
    # Calcula a norma L2 de cada linha
    n = np.linalg.norm(mat, axis=1, keepdims=True)
    # Evita divisão por zero
    n[n == 0] = 1.0
    return mat / n

### Função Principal de Indexação


In [None]:
def index_images(image_dir: str, out_path: str, device: str = "cpu", model_name: str = "resnet50", batch_size: int = 1) -> Tuple[np.ndarray, List[str]]:
    """Processo completo de indexação: lista imagens -> gera embeddings -> salva."""
    timer = Timer()
    paths = list_images(image_dir)
    embeder = ImageEmbedder(device=device, model_name=model_name)
    vecs = []
    
    # Itera sobre todas as imagens encontradas
    for p in paths:
        try:
            v = embeder.embed(p)
            vecs.append(v)
        except Exception as e:
            # Loga erro mas continua o processo
            print(json.dumps({"erro": str(e), "arquivo": p}))
    
    if not vecs:
        raise RuntimeError("Nenhuma embedding foi gerada")
    
    # Empilha vetores em uma matriz
    E = np.vstack(vecs)
    # Normaliza para permitir busca por produto escalar (cosseno)
    E = normalize_rows(E)
    
    # Salva o resultado
    save_index(E, paths[:E.shape[0]], out_path)
    
    # Imprime estatísticas
    print(json.dumps({"tempo_segundos": round(timer.tick(), 3), "imagens_indexadas": E.shape[0], "dim": int(E.shape[1])}))
    return E, paths[:E.shape[0]]

### Função Principal de Recomendação


In [None]:
def recommend(index_path: str, query_image: str, topk: int = 5, device: str = "cpu", model_name: str = "resnet50") -> List[Tuple[str, float]]:
    """Realiza a recomendação buscando imagens similares no índice."""
    # Carrega o índice existente
    emb, paths = load_index(index_path)
    emb = normalize_rows(emb)
    
    # Gera embedding da imagem de consulta
    embeder = ImageEmbedder(device=device, model_name=model_name)
    q = embeder.embed(query_image)
    q = q.astype(np.float32)
    
    # Normaliza o vetor de consulta
    qn = q / (np.linalg.norm(q) + 1e-12)
    
    # Calcula similaridade (produto escalar de vetores normalizados = similaridade de cosseno)
    scores = emb.dot(qn)
    
    # Ordena decrescentemente pelos scores
    idx = np.argsort(-scores)[:max(1, topk)]
    
    # Retorna os caminhos e scores dos top-k
    return [(paths[i], float(scores[i])) for i in idx]

### Saída de Dados


In [None]:
def write_jsonl(items: List[Tuple[str, float]], out_path: str):
    """Escreve os resultados da recomendação em formato JSON Lines."""
    os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        for p, s in items:
            f.write(json.dumps({"path": p, "score": round(s, 6)}) + "\n")

### Interface de Linha de Comando (CLI)


In [None]:
def make_parser() -> argparse.ArgumentParser:
    """Configura os argumentos da linha de comando."""
    parser = argparse.ArgumentParser(prog="image_recommender", description="Sistema de recomendação por imagens baseado em embeddings de CNN")
    sub = parser.add_subparsers(dest="cmd", required=False)

    # Subcomando 'index'
    p_index = sub.add_parser("index", help="Indexa diretório de imagens")
    p_index.add_argument("--images", required=False, default=os.getenv("IMAGE_DIR", "images"))
    p_index.add_argument("--out", required=False, default=os.getenv("INDEX_PATH", "index.npz"))
    p_index.add_argument("--device", required=False, default=os.getenv("DEVICE", "cpu"))
    p_index.add_argument("--model", required=False, default=os.getenv("MODEL_NAME", "resnet50"))

    # Subcomando 'recommend'
    p_rec = sub.add_parser("recommend", help="Gera recomendações para uma imagem de consulta")
    p_rec.add_argument("--index", required=False, default=os.getenv("INDEX_PATH", "index.npz"))
    p_rec.add_argument("--query", required=True)
    p_rec.add_argument("--topk", type=int, required=False, default=5)
    p_rec.add_argument("--out", required=False, default="recommendations.jsonl")
    p_rec.add_argument("--device", required=False, default=os.getenv("DEVICE", "cpu"))
    p_rec.add_argument("--model", required=False, default=os.getenv("MODEL_NAME", "resnet50"))

    # Flag para testes
    parser.add_argument("--run-tests", action="store_true", help="Executa testes unitários, integração e aceitação")
    return parser

### Geração de Dados para Testes


In [None]:
def _create_dummy_images(root: str) -> List[str]:
    """Cria imagens sintéticas coloridas para fins de teste."""
    os.makedirs(root, exist_ok=True)
    paths = []
    rng = np.random.default_rng(42)
    colors = [(220, 20, 60), (65, 105, 225), (34, 139, 34), (255, 165, 0), (128, 0, 128)]
    for i, c in enumerate(colors):
        img = Image.new("RGB", (256, 256), c)
        arr = np.array(img)
        # Adiciona ruído aleatório
        noise = rng.integers(0, 20, size=arr.shape, dtype=np.uint8)
        arr = np.clip(arr + noise, 0, 255)
        img = Image.fromarray(arr)
        p = os.path.join(root, f"dummy_{i}.jpg")
        img.save(p)
        paths.append(p)
    return paths

### Testes Unitários


In [None]:
def run_unit_tests() -> List[str]:
    """Executa testes isolados das funções principais."""
    results = []
    tmp = tempfile.mkdtemp(prefix="recommender_unit_")
    try:
        images = _create_dummy_images(tmp)
        # Teste do Embedder
        embeder = ImageEmbedder(device="cpu", model_name=os.getenv("MODEL_NAME", "resnet50"))
        v = embeder.embed(images[0])
        assert isinstance(v, np.ndarray)
        assert v.ndim == 1
        assert v.size >= 256
        results.append("unit_embeddings_shape_ok")
        
        # Teste da Indexação
        E, paths = index_images(tmp, os.path.join(tmp, "index.npz"), device="cpu", model_name=os.getenv("MODEL_NAME", "resnet50"))
        assert E.shape[0] == len(paths)
        assert E.ndim == 2
        results.append("unit_index_build_ok")
    finally:
        shutil.rmtree(tmp, ignore_errors=True)
    return results

### Testes de Integração


In [None]:
def run_integration_tests() -> List[str]:
    """Testa o fluxo combinado de indexação e recomendação."""
    results = []
    tmp = tempfile.mkdtemp(prefix="recommender_integ_")
    try:
        images = _create_dummy_images(tmp)
        idx_path = os.path.join(tmp, "index.npz")
        
        # Executa indexação
        index_images(tmp, idx_path, device="cpu", model_name=os.getenv("MODEL_NAME", "resnet50"))
        
        # Executa recomendação
        recs = recommend(idx_path, images[0], topk=3, device="cpu", model_name=os.getenv("MODEL_NAME", "resnet50"))
        
        assert len(recs) == 3
        assert isinstance(recs[0][0], str) and isinstance(recs[0][1], float)
        results.append("integration_recommendations_ok")
    finally:
        shutil.rmtree(tmp, ignore_errors=True)
    return results

### Testes de Aceitação


In [None]:
def run_acceptance_tests() -> List[str]:
    """Testa a aplicação via linha de comando (CLI) simulando um usuário final."""
    results = []
    tmp = tempfile.mkdtemp(prefix="recommender_accept_")
    try:
        images = _create_dummy_images(tmp)
        idx_path = os.path.join(tmp, "index.npz")
        
        # Chama o script via subprocess para indexar
        cmd_index = [sys.executable, os.path.abspath(__file__), "index", "--images", tmp, "--out", idx_path]
        p1 = subprocess.Popen(cmd_index, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out1, err1 = p1.communicate()
        if p1.returncode != 0:
            raise RuntimeError(f"Falha CLI index: {err1.decode('utf-8', 'ignore')}")
        
        # Chama o script via subprocess para recomendar
        cmd_rec = [sys.executable, os.path.abspath(__file__), "recommend", "--index", idx_path, "--query", images[0], "--topk", "2", "--out", os.path.join(tmp, "recs.jsonl")]
        p2 = subprocess.Popen(cmd_rec, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out2, err2 = p2.communicate()
        if p2.returncode != 0:
            raise RuntimeError(f"Falha CLI recommend: {err2.decode('utf-8', 'ignore')}")
        
        results.append("acceptance_cli_flow_ok")
    finally:
        shutil.rmtree(tmp, ignore_errors=True)
    return results

### Função Principal (Main)


In [None]:
def main():
    """Função orquestradora da aplicação."""
    parser = make_parser()
    args = parser.parse_args()
    
    # Se solicitado, roda os testes
    if args.run_tests:
        all_results = []
        try:
            all_results.extend(run_unit_tests())
            all_results.extend(run_integration_tests())
            all_results.extend(run_acceptance_tests())
        except Exception as e:
            print(json.dumps({"tests": "failed", "erro": str(e)}))
            sys.exit(1)
        print(json.dumps({"tests": "passed", "results": all_results}))
        return
    
    # Verifica se algum comando foi passado
    if not args.cmd:
        parser.print_help()
        sys.exit(0)
    
    # Executa o comando escolhido
    if args.cmd == "index":
        try:
            index_images(args.images, args.out, device=args.device, model_name=args.model)
        except Exception as e:
            print(json.dumps({"erro": str(e)}))
            sys.exit(1)
    elif args.cmd == "recommend":
        try:
            items = recommend(args.index, args.query, topk=int(args.topk), device=args.device, model_name=args.model)
            write_jsonl(items, args.out)
            print(json.dumps({"recomendacoes": len(items), "saida": args.out}))
        except Exception as e:
            print(json.dumps({"erro": str(e)}))
            sys.exit(1)
    else:
        parser.print_help()

### Bloco de Execução


In [None]:
if __name__ == "__main__":
    # Ponto de entrada para execução via script
    main()

## Uso no Notebook

Após executar as células de definição (acima), use as células abaixo para indexar e recomendar.
Extensões suportadas: `.jpg`, `.jpeg`, `.png`, `.bmp`, `.webp`.


In [None]:
# Exemplo: indexar e recomendar dentro do notebook

# Define parâmetros básicos
images_dir = os.getenv("IMAGE_DIR", "images")
index_path = os.getenv("INDEX_PATH", "index.npz")
device = os.getenv("DEVICE", "cpu")
model = os.getenv("MODEL_NAME", "resnet50")

# Etapa 1: Indexação
if os.path.isdir(images_dir):
    E, paths = index_images(images_dir, index_path, device=device, model_name=model)
    print(f"Indexado: {len(paths)} imagens, dim={E.shape[1]}")
else:
    print(f"Diretório de imagens não encontrado: {images_dir}")

# Etapa 2: Recomendação
# Para consultar, defina o caminho de uma imagem existente e descomente:
query_img = "consulta.jpg"
if os.path.isfile(query_img):
    recs = recommend(index_path, query_img, topk=5, device=device, model_name=model)
    for p, s in recs:
        print(p, round(s, 4))
else:
    print(f"Imagem de consulta não encontrada: {query_img}")