# STEP 3: Inferenza Locale (Mac/Local)

Versione locale senza dipendenze Colab.

**Pipeline:**
1. STEP 1 - Occlusione: Verifica se l'immagine è occlusa → se sì, ritorna "OCCLUSION"
2. STEP 2 - Anomaly Detection: Se visibile, verifica se è anomalo → se errore > threshold → "KO", altrimenti "OK"

**Nota:** L'inferenza è veloce anche su CPU (~0.1-0.5 sec per immagine)

In [None]:
# Setup locale (Mac/Local)
import os
from pathlib import Path

# Assicurati di essere nella directory del progetto
# Se esegui da Jupyter, imposta il path corretto
PROJECT_ROOT = Path.cwd()  # Modifica se necessario
print(f"Project root: {PROJECT_ROOT}")

import torch
import torch.nn as nn
from torchvision import transforms
from PIL import Image
import numpy as np

# Device: CPU è sufficiente per l'inferenza (veloce anche senza GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
if device.type == 'cpu':
    print("ℹ️  Usando CPU - l'inferenza è veloce (~0.1-0.5 sec per immagine)")

## Definizione Classi Modelli

Definiamo le classi dei modelli per poterli caricare (step3 è autonomo).


In [None]:
# Definizione classe OcclusionCNN (da step1)
class OcclusionCNN(nn.Module):
    """
    CNN semplice per classificazione binaria OCCLUSION vs VISIBLE.
    
    Architettura:
    - 3 layer convoluzionali con pooling
    - 2 layer fully connected
    - Output: 2 classi (OCCLUSION, VISIBLE)
    """
    
    def __init__(self):
        super(OcclusionCNN, self).__init__()
        
        # Encoder convoluzionale
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(2, 2)  # 128x128 -> 64x64
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(2, 2)  # 64x64 -> 32x32
        
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(2, 2)  # 32x32 -> 16x16
        
        # Fully connected
        self.fc1 = nn.Linear(128 * 16 * 16, 512)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 2)  # 2 classi: OCCLUSION, VISIBLE
        
        self.relu = nn.ReLU()
    
    def forward(self, x):
        # Convoluzioni
        x = self.pool1(self.relu(self.bn1(self.conv1(x))))
        x = self.pool2(self.relu(self.bn2(self.conv2(x))))
        x = self.pool3(self.relu(self.bn3(self.conv3(x))))
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Fully connected
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        
        return x


# Definizione classe ConvAE (da step2)
class ConvAE(nn.Module):
    """
    Autoencoder convoluzionale per anomaly detection.
    
    Encoder: 3 conv2d con stride=2 (3→16→32→64 canali)
    Decoder: 3 convtranspose2d simmetriche (64→32→16→3 canali)
    """
    
    def __init__(self):
        super(ConvAE, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            # 128x128x3 -> 64x64x16
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            
            # 64x64x16 -> 32x32x32
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            
            # 32x32x32 -> 16x16x64
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            # 16x16x64 -> 32x32x32
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            
            # 32x32x32 -> 64x64x16
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            
            # 64x64x16 -> 128x128x3
            nn.ConvTranspose2d(16, 3, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()  # Output in [0, 1]
        )
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded


# Funzioni helper per caricare i modelli
def load_occlusion_model(device=None):
    """Carica il modello classificatore OCCLUSION."""
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model = OcclusionCNN().to(device)
    model_path = Path("models/occlusion_cnn.pth")
    
    if not model_path.exists():
        raise FileNotFoundError(f"Modello non trovato: {model_path}")
    
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    return model


def load_ae_and_threshold(device=None):
    """Carica l'autoencoder e il threshold."""
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model = ConvAE().to(device)
    model_path = Path("models/ae_conv.pth")
    
    if not model_path.exists():
        raise FileNotFoundError(f"Modello non trovato: {model_path}")
    
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    
    threshold_path = Path("models/ae_threshold.npy")
    if not threshold_path.exists():
        raise FileNotFoundError(f"Threshold non trovato: {threshold_path}")
    
    threshold = np.load(threshold_path)
    return model, threshold


## Funzioni di Preprocessing


In [None]:
def preprocess_image(image_path, device=None):
    """
    Preprocessa un'immagine per l'inferenza (classificatore OCCLUSION).
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Trasformazioni (stesse del training)
    transform = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Carica e preprocessa
    image = Image.open(image_path).convert('RGB')
    image_tensor = transform(image)
    image_tensor = image_tensor.unsqueeze(0)  # Aggiungi dimensione batch
    image_tensor = image_tensor.to(device)
    
    return image_tensor


def preprocess_image_for_ae(image_path, device=None):
    """
    Preprocessa un'immagine per l'autoencoder (senza normalizzazione ImageNet).
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Trasformazioni (stesse del training AE)
    transform = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor(),  # Già in [0, 1]
    ])
    
    # Carica e preprocessa
    image = Image.open(image_path).convert('RGB')
    image_tensor = transform(image)
    image_tensor = image_tensor.unsqueeze(0)  # Aggiungi dimensione batch
    image_tensor = image_tensor.to(device)
    
    return image_tensor


## Funzione di Classificazione Principale


In [None]:
def classify_connector(image_path, device=None):
    """
    Classifica un connettore come "OK", "KO" o "OCCLUSION".
    
    Pipeline:
    1. STEP 1 - Occlusione: Verifica se l'immagine è occlusa
       - Se occlusa → ritorna "OCCLUSION"
    2. STEP 2 - Anomaly Detection: Se visibile, verifica se è anomalo
       - Se errore ricostruzione > threshold → ritorna "KO"
       - Altrimenti → ritorna "OK"
    
    Args:
        image_path: Path all'immagine del connettore
        device: Device (cuda/cpu)
    
    Returns:
        str: "OK", "KO" o "OCCLUSION"
        
    Note:
        - "OCCLUSION" = immagine non leggibile (cavi o altro coprono la zona critica)
        - "OK" = connettore visibile e simile ai campioni OK di training
        - "KO" = connettore visibile ma anomalo rispetto ai campioni OK
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Carica modelli (caricati una volta e riutilizzati)
    try:
        occ_model = load_occlusion_model(device)
        ae_model, threshold = load_ae_and_threshold(device)
    except (FileNotFoundError, NameError) as e:
        raise FileNotFoundError(
            f"Modelli non trovati o classi non definite. "
            f"Assicurati di aver eseguito step1 e step2. Errore: {e}"
        )
    
    # STEP 1: Verifica occlusione
    x_occ = preprocess_image(image_path, device)
    
    with torch.no_grad():
        logits = occ_model(x_occ)
        pred_vis = torch.argmax(logits, dim=1).item()
    
    # Se pred_vis == 0 → OCCLUSION
    if pred_vis == 0:
        return "OCCLUSION"
    
    # STEP 2: Anomaly detection (solo se visibile)
    x_ae = preprocess_image_for_ae(image_path, device)
    
    with torch.no_grad():
        reconstructed = ae_model(x_ae)
        # Calcola errore MSE medio su [C, H, W]
        mse = nn.MSELoss(reduction='mean')
        error = mse(reconstructed, x_ae).item()
    
    # Se errore > threshold → KO (anomalo)
    if error > threshold:
        return "KO"
    else:
        return "OK"


In [None]:
# I modelli vengono caricati automaticamente da classify_connector()
# Ma se vuoi caricarli manualmente per testare:

# Nota: Le classi OcclusionCNN e ConvAE devono essere in memoria
# (eseguite da step1 e step2) oppure importate dai file Python

# Esempio di caricamento manuale:
# occ_model = load_occlusion_model(device)
# ae_model, threshold = load_ae_and_threshold(device)
# print("Modelli caricati con successo!")


## Test Locale

In [None]:
# Test su immagini locali
# Modifica questi path secondo la tua struttura

test_images = [
    "Data/connectors/conn1/20251106110559_TOP.png",
    "Data/connectors/conn2/20251106110559_TOP.png",
    "Data/connectors/conn3/20251106110559_TOP.png",
]

print("Test classificazione locale:\n")
for img_path in test_images:
    img_path_obj = Path(img_path)
    if not img_path_obj.is_absolute():
        img_path_obj = PROJECT_ROOT / img_path_obj
    
    if img_path_obj.exists():
        try:
            result = classify_connector(str(img_path_obj), device=device)
            print(f"  {img_path_obj.name} ({img_path_obj.parent.name}): {result}")
        except Exception as e:
            print(f"  {img_path_obj.name}: ❌ ERRORE - {e}")
    else:
        print(f"  {img_path}: ⚠️  Immagine non trovata")