# Etapa 3: Emparelhamento e Geometria Epipolar

**Objetivo:** Utilizar as características da Etapa 2 para estabelecer as relações geométricas entre todos os pares de imagens, preparando os dados para a reconstrução 3D (Structure from Motion).

**Pipeline Refinado:**
1.  **Carregar Features:** Ler os arquivos `.npz` da Etapa 2.
2.  **Emparelhamento Total:** Realizar o matching de características entre **todos os pares possíveis de imagens**.
3.  **Filtragem Geométrica Robusta:**
    - Aplicar o **Ratio Test**.
    - Estimar a **Matriz Fundamental (F)** com **RANSAC**.
    - Estimar a **Matriz Intrínseca (K)** a partir dos dados EXIF (ou com heurística melhorada).
    - Calcular a **Matriz Essencial (E)** e recuperar a **Pose (R, t)**.
    - **Validar a Pose** para rejeitar movimentos de câmera degenerados.
    - **Triangular os pontos 3D** e aplicar a **verificação de cheirality** para garantir que os pontos estão na frente de ambas as câmeras.
4.  **Salvar Resultados:** Armazenar todas as informações validadas (`inliers`, F, E, R, t, K, pontos 3D) em arquivos `.npz`.

In [1]:
import os
import glob
import json
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import List, Tuple
from itertools import combinations
from PIL import Image
from PIL.ExifTags import TAGS

try:
    import pillow_heif
    pillow_heif.register_heif_opener()
except ImportError:
    pass

## Funções Auxiliares (Carregamento, Salvamento e Visualização)

In [2]:
def desserializar_keypoints(arr: np.ndarray) -> List[cv2.KeyPoint]:
    """Reconstrói lista de cv2.KeyPoint a partir do array (N,7)."""
    return [cv2.KeyPoint(x=r[0], y=r[1], size=r[2], angle=r[3], response=r[4], octave=int(r[5]), class_id=int(r[6])) for r in arr]

def carregar_features_npz(caminho_npz: str) -> Tuple[List[cv2.KeyPoint], np.ndarray, str]:
    """Lê um .npz salvo pela Etapa 2."""
    with np.load(caminho_npz, allow_pickle=True) as data:
        kps = desserializar_keypoints(data["keypoints"])
        desc = data["descriptors"]
        img_path = str(data["imagem_absoluta"])
    return kps, desc, img_path

def salvar_verificacao_geometrica_npz(caminho_saida: str, i: int, j: int, inlier_matches: List[cv2.DMatch], F: np.ndarray, E: np.ndarray, R: np.ndarray, t: np.ndarray, K: np.ndarray, pontos_3d: np.ndarray):
    """Salva todos os artefatos da verificação geométrica para um par de imagens."""
    matches_data = np.array([[m.queryIdx, m.trainIdx, m.distance] for m in inlier_matches], dtype=np.float32)
    num_inliers = len(inlier_matches)
    np.savez_compressed(
        caminho_saida,
        idx_i=i, idx_j=j,
        matches=matches_data,
        num_inliers=num_inliers,
        F=F if F is not None else np.eye(3),
        E=E if E is not None else np.eye(3),
        R=R if R is not None else np.eye(3),
        t=t if t is not None else np.zeros((3,1)),
        K=K,
        pontos_3d=pontos_3d
    )

def desenhar_matches_custom(img1, kps1, img2, kps2, matches, cor_linha=(255, 0, 255), espessura_linha=2):
    """Desenha matches entre duas imagens com linhas customizáveis."""
    h1, w1 = img1.shape[:2]
    h2, w2 = img2.shape[:2]
    output_img = np.zeros((max(h1, h2), w1 + w2, 3), dtype=np.uint8)
    output_img[:h1, :w1] = img1
    output_img[:h2, w1:] = img2
    if kps1 is None or kps2 is None: return output_img
    for m in matches:
        pt1 = (int(kps1[m.queryIdx].pt[0]), int(kps1[m.queryIdx].pt[1]))
        pt2 = (int(kps2[m.trainIdx].pt[0] + w1), int(kps2[m.trainIdx].pt[1]))
        cv2.line(output_img, pt1, pt2, cor_linha, espessura_linha)
    return output_img


def _pil_to_bgr(pil_img: "Image.Image") -> np.ndarray:
    """Converte PIL.Image para ndarray BGR uint8, aplicando orientação EXIF."""
    from PIL import ImageOps
    pil_img = ImageOps.exif_transpose(pil_img)
    if pil_img.mode not in ("RGB", "RGBA"):
        pil_img = pil_img.convert("RGB")
    arr = np.array(pil_img)  # RGB(A)
    if arr.ndim == 3 and arr.shape[2] == 4:
        arr = arr[:, :, :3]  # descarta alpha
    return cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)


def carregar_imagem(path: str) -> np.ndarray:
    """
    Lê imagem em BGR (uint8). Suporta formatos do OpenCV e HEIC/HEIF via Pillow.
    """
    ext = os.path.splitext(path)[1].lower()
    if ext in (".heic", ".heif"):
        try:
            from PIL import Image
            pil_img = Image.open(path)
            return _pil_to_bgr(pil_img)
        except Exception as e:
            print(f"[aviso] Falha ao carregar HEIC com Pillow: {e}")
            return None
    else:
        img = cv2.imread(path, cv2.IMREAD_COLOR)
        if img is None:
            print(f"[aviso] não consegui abrir via OpenCV: {path}")
        return img

## Funções de Emparelhamento e Geometria 

In [3]:
def estimar_K_da_imagem(caminho_imagem: str) -> np.ndarray:
    """Tenta extrair K do EXIF ou usa heurística melhorada."""
    try:
        img = Image.open(caminho_imagem)
        exif = img._getexif()
        if exif:
            focal_length_mm = None
            for tag, value in exif.items():
                if TAGS.get(tag) == 'FocalLength':
                    focal_length_mm = float(value)
                    break
            if focal_length_mm is not None:
                # Simplificação comum: assume sensor de 35mm (36mm de largura)
                sensor_width_mm = 36.0
                w, h = img.size
                fx = fy = (focal_length_mm / sensor_width_mm) * w
                cx, cy = w/2, h/2
                return np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]], dtype=np.float32)
    except Exception:
        pass # Silenciosamente falha e usa o fallback
    
    # Fallback para heurística melhorada se EXIF falhar ou não existir
    img_cv = cv2.imread(caminho_imagem)
    if img_cv is None: # Se o OpenCV também falhar, usa um K genérico
      h, w = 1080, 1920
    else:
      h, w = img_cv.shape[:2]
    fx = fy = max(w, h) * 1.2
    return np.array([[fx, 0, w/2], [0, fy, h/2], [0, 0, 1]], dtype=np.float32)


def criar_matcher(nome_matcher: str, desc_dtype):
    """Cria um objeto matcher (BF ou FLANN) configurado corretamente."""
    if nome_matcher.lower() == 'bf':
        norm_type = cv2.NORM_L2 if desc_dtype == np.float32 else cv2.NORM_HAMMING
        return cv2.BFMatcher(norm_type)
    elif nome_matcher.lower() == 'flann':
        if desc_dtype == np.float32:
            index_params = dict(algorithm=1, trees=5)
        else:
            index_params = dict(algorithm=6, table_number=6,
                                key_size=12, multi_probe_level=1)
        search_params = dict(checks=50)
        return cv2.FlannBasedMatcher(index_params, search_params)
    else:
        raise ValueError(f"Matcher '{nome_matcher}' não reconhecido.")


def filtrar_matches_ratio_test(matches_brutos, ratio=0.75):
    """Aplica o Ratio Test de David Lowe."""
    matches_filtrados = []
    for match_pair in matches_brutos:
        if len(match_pair) == 2:
            m, n = match_pair
            if m.distance < ratio * n.distance:
                matches_filtrados.append(m)
    return matches_filtrados

def validar_qualidade_pose(R, t, min_angle=1.0, min_translation=1e-3):
    """Valida se a pose recuperada é geometricamente plausível."""
    if R is None or t is None: return False
    # Verificar se a rotação não é muito pequena
    angle = np.arccos(np.clip((np.trace(R) - 1) / 2, -1, 1)) * 180 / np.pi
    if angle < min_angle: return False
    # Verificar se a translação não é nula
    if np.linalg.norm(t) < min_translation: return False
    return True

def verificar_geometria_par_robusta(kps1, kps2, matches, K, ransac_thresh=0.99, conf=0.999):
    """Versão mais robusta com validações adicionais."""
    null_return = None, None, None, None, [], np.zeros((0,3))
    if len(matches) < 8: return null_return

    pts1 = np.float32([kps1[m.queryIdx].pt for m in matches])
    pts2 = np.float32([kps2[m.trainIdx].pt for m in matches])

    F, mask = cv2.findFundamentalMat(pts1, pts2, cv2.FM_RANSAC, ransac_thresh, conf)
    if F is None or mask is None: return null_return

    inlier_mask = mask.ravel().astype(bool)
    inlier_matches = [m for i, m in enumerate(matches) if inlier_mask[i]]
    if len(inlier_matches) < 8: return null_return

    pts1_inliers = pts1[inlier_mask]
    pts2_inliers = pts2[inlier_mask]
    
    E = K.T @ F @ K
    _, R, t, _ = cv2.recoverPose(E, pts1_inliers, pts2_inliers, K)

    if not validar_qualidade_pose(R, t): return F, E, None, None, inlier_matches, np.zeros((0,3))

    proj_mat1 = K @ np.hstack((np.eye(3), np.zeros((3, 1))))
    proj_mat2 = K @ np.hstack((R, t))
    pontos_4d_hom = cv2.triangulatePoints(proj_mat1, proj_mat2, pts1_inliers.T, pts2_inliers.T)
    pontos_3d_hom = pontos_4d_hom / pontos_4d_hom[3]

    # Verificação de Cheirality
    pontos_3d_cam1 = pontos_3d_hom[:3].T
    pontos_3d_cam2 = (R @ pontos_3d_cam1.T + t).T
    cheirality_mask = (pontos_3d_cam1[:, 2] > 0) & (pontos_3d_cam2[:, 2] > 0)

    # Filtra os resultados com base na cheirality
    inlier_matches = [m for i, m in enumerate(inlier_matches) if cheirality_mask[i]]
    pontos_3d_validos = pontos_3d_cam1[cheirality_mask]

    if len(inlier_matches) < 8: return F, E, R, t, [], np.zeros((0,3))

    return F, E, R, t, inlier_matches, pontos_3d_validos

## Nota sobre Eficiência e Escala

A abordagem de emparelhar "todos com todos" (`all-pairs`) garante que nenhuma conexão visual seja perdida, o que é ideal para a máxima robustez. Contudo, sua complexidade é quadrática (O(n²)), o que pode tornar o processamento muito lento para conjuntos com centenas de imagens. 

Para este projeto (40-120 imagens), esta abordagem é viável. Em um cenário de produção, seriam necessárias otimizações como:
- **Pré-filtragem de Pares:** Usar técnicas mais baratas (como similaridade de histograma ou *bag of visual words*) para descartar pares de imagens que claramente não têm sobreposição, antes de executar o custoso matching de features.
- **Matching Sequencial com Janela:** Para vídeos ou capturas em sequência, limitar o matching a uma janela de imagens próximas (ex: cada imagem com as 5 seguintes).
- **Paralelização:** Distribuir o processamento dos pares entre múltiplos núcleos de CPU.

In [16]:
def analisar_geometria_conjunto(caminho_base_etapa2: str, matchers_a_usar: List[str]):
    """Pipeline da Etapa 3: Carrega features, realiza matching, verifica geometria e salva os resultados."""
    nome_conjunto = os.path.basename(caminho_base_etapa2)
    print(f"\n{'='*65}\nAnalisando Geometria para o conjunto: '{nome_conjunto}'\n{'='*65}")
    
    detectores = [d for d in os.listdir(caminho_base_etapa2) if os.path.isdir(os.path.join(caminho_base_etapa2, d)) and d.lower() != 'akaze']
    for nome_detector in detectores:
        print(f"\n--- Detector: {nome_detector.upper()} ---")
        caminho_detector_etapa2 = os.path.join(caminho_base_etapa2, nome_detector)
        arquivos_npz = sorted(glob.glob(os.path.join(caminho_detector_etapa2, "*_features.npz")))
        if len(arquivos_npz) < 2: continue

        pares_indices = list(combinations(range(len(arquivos_npz)), 2))

        # Estima K para cada imagem uma vez para economizar tempo
        matrizes_K = [estimar_K_da_imagem(carregar_features_npz(npz)[2]) for npz in arquivos_npz]

        for nome_matcher in matchers_a_usar:
            print(f"  - Matcher: {nome_matcher.upper()}")
            dir_saida = os.path.join("resultados_etapa3", nome_conjunto, nome_detector, nome_matcher)
            os.makedirs(dir_saida, exist_ok=True)

            # Contadores para estatísticas
            total_inliers = 0
            pares_validos = 0
            total_pares = len(pares_indices)

            for i, j in pares_indices:
                kps1, desc1, path1 = carregar_features_npz(arquivos_npz[i])
                kps2, desc2, path2 = carregar_features_npz(arquivos_npz[j])
                
                if not kps1 or not kps2 or desc1 is None or desc2 is None or desc1.shape[0] == 0 or desc2.shape[0] == 0: continue
                
                matcher = criar_matcher(nome_matcher, desc1.dtype)
                matches_brutos = matcher.knnMatch(desc1, desc2, k=2)
                matches_filtrados = filtrar_matches_ratio_test(matches_brutos)

                K1 = matrizes_K[i] # Usa a matriz K da primeira imagem do par
                F, E, R, t, inliers, pts3d = verificar_geometria_par_robusta(kps1, kps2, matches_filtrados, K1)
                if len(inliers) != 0:
                    # Atualizar contadores (sem print individual)
                    pares_validos += 1
                    total_inliers += len(inliers)

                salvar_verificacao_geometrica_npz(os.path.join(dir_saida, f"geometria_{i:03d}_{j:03d}.npz"), i, j, inliers, F, E, R, t, K1, pts3d)
                
                if len(inliers) > 20:
                    # Corrigir os caminhos das imagens incluindo o nome do dataset
                    img1_path = os.path.join(
                        "resultados_etapa2", nome_conjunto, nome_detector, f"img_{i:03d}_keypoints.jpg")
                    img2_path = os.path.join(
                        "resultados_etapa2", nome_conjunto, nome_detector, f"img_{j:03d}_keypoints.jpg")

                    # Verificar se os arquivos existem antes de tentar carregar
                    if os.path.exists(img1_path) and os.path.exists(img2_path):
                        img1 = cv2.imread(img1_path)
                        img2 = cv2.imread(img2_path)

                        if img1 is not None and img2 is not None:
                            preview_matches = desenhar_matches_custom(
                                img1, kps1, img2, kps2, inliers)
                            cv2.imwrite(os.path.join(
                                dir_saida, f"preview_inliers_{i:03d}_{j:03d}.jpg"), preview_matches)
            
            # Resumo estatístico por matcher
            media_inliers = total_inliers / pares_validos if pares_validos > 0 else 0
            print(f"    Resumo {nome_matcher.upper()}: {pares_validos}/{total_pares} pares válidos, "
                  f"{total_inliers} inliers totais, média {media_inliers:.1f} inliers/par")
        
        print(f"--- Detector {nome_detector.upper()} concluído ---")

## Execução do Pipeline

In [17]:
RAIZ_ETAPA2 = "resultados_etapa2"
CONJUNTOS = ['Imagens4']
MATCHERS = ['bf'] # 'flann'

for conjunto in CONJUNTOS:
    caminho_do_conjunto = f"{RAIZ_ETAPA2}/{conjunto}"
    analisar_geometria_conjunto(caminho_do_conjunto, MATCHERS)


Analisando Geometria para o conjunto: 'Imagens4'

--- Detector: ORB ---
  - Matcher: BF
  - Matcher: BF
    Resumo BF: 1459/4186 pares válidos, 32122 inliers totais, média 22.0 inliers/par
--- Detector ORB concluído ---
    Resumo BF: 1459/4186 pares válidos, 32122 inliers totais, média 22.0 inliers/par
--- Detector ORB concluído ---


## Geração do Relatório Comparativo Final

In [6]:
def gerar_relatorio_final_etapa3(raiz_resultados: str):
    """Gera uma tabela CSV comparando a média de inliers geométricos por combinação."""
    print("\nGerando relatório comparativo final da Etapa 3...")
    dados_tabela = []

    for nome_conjunto in sorted(os.listdir(raiz_resultados)):
        caminho_conjunto = os.path.join(raiz_resultados, nome_conjunto)
        if not os.path.isdir(caminho_conjunto):
            continue
        for nome_detector in sorted(os.listdir(caminho_conjunto)):
            caminho_detector = os.path.join(caminho_conjunto, nome_detector)
            if not os.path.isdir(caminho_detector):
                continue
            for nome_matcher in sorted(os.listdir(caminho_detector)):
                caminho_matcher = os.path.join(caminho_detector, nome_matcher)
                if not os.path.isdir(caminho_matcher):
                    continue

                arquivos_geometria_npz = glob.glob(
                    os.path.join(caminho_matcher, "geometria_*.npz"))
                if not arquivos_geometria_npz:
                    media_inliers = 0.0
                else:
                    contagens = []
                    for npz in arquivos_geometria_npz:
                        with np.load(npz) as data:
                            contagens.append(data['matches'].shape[0])
                    media_inliers = np.mean(contagens)

                dados_tabela.append({
                    "conjunto": nome_conjunto,
                    "detector": nome_detector,
                    "matcher": nome_matcher,
                    "media_inliers_geometricos_por_par": media_inliers
                })

    if not dados_tabela:
        print("Nenhum dado encontrado para gerar o relatório.")
        return

    df_final = pd.DataFrame(dados_tabela)
    df_final['media_inliers_geometricos_por_par'] = df_final['media_inliers_geometricos_por_par'].map(
        '{:,.2f}'.format)

    print(f"\n{'='*80}\nRELATÓRIO COMPARATIVO FINAL - ETAPA 3\n{'='*80}")
    print(df_final.to_string(index=False))
    print(f"{'='*80}")

    caminho_saida_final = os.path.join(
        raiz_resultados, "relatorio_comparativo_geometria.csv")
    df_final.to_csv(caminho_saida_final, index=False, sep=';')
    print(f"\nTabela resumo salva em: '{caminho_saida_final}'")


# Execução
gerar_relatorio_final_etapa3("resultados_etapa3")


Gerando relatório comparativo final da Etapa 3...


FileNotFoundError: [Errno 2] No such file or directory: 'resultados_etapa3'