In [1]:
!pip install piexif -q

In [2]:
import os
import csv
import logging
import argparse
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import EfficientNetV2B0
from tensorflow.keras.applications.efficientnet_v2 import preprocess_input

from tqdm import tqdm
from PIL import Image, UnidentifiedImageError
import piexif
import shutil
from types import SimpleNamespace

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
from pathlib import Path

# --- Configurazione Percorso Base del Progetto ---
# Questo deve puntare alla cartella principale del tuo progetto.
GDRIVE_PROJECT_PATH = Path("/content/drive/MyDrive/ProgettoClassificazioneLuoghi")
print(f"Percorso base del progetto impostato a: {GDRIVE_PROJECT_PATH}")

# --- CONFIGURAZIONE PARAMETRI GLOBALI ---
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS_HEAD = 10       # Epoche per addestrare solo la testa
EPOCHS_FINE = 8        # Epoche per il fine-tuning di tutto il modello
FRAME_INTERVAL = 1
LEARN_HEAD_LR = 1e-3
LEARN_FULL_LR = 1e-5   # Un learning rate molto basso è più sicuro per il fine-tuning

# Parametri per selezione frame video
SHARPNESS_THRESHOLD = 80.0
FRAMES_PER_PLACE_PER_VIDEO = 5

# --- PERCORSI DI DEFAULT DERIVATI ---
DEFAULT_TRAIN_DIR = GDRIVE_PROJECT_PATH / "dataset" / "train"
DEFAULT_VAL_DIR = GDRIVE_PROJECT_PATH / "dataset" / "val"
DEFAULT_IMG_DIR = GDRIVE_PROJECT_PATH / "dataset" / "inputIMG"
DEFAULT_VID_DIR = GDRIVE_PROJECT_PATH / "dataset" / "inputVID"

DEFAULT_OUTPUT = GDRIVE_PROJECT_PATH / "output"
DEFAULT_CSV = DEFAULT_OUTPUT / "results.csv"
PROCESSED_LOG = DEFAULT_OUTPUT / "processed.log"
BEST_FRAMES_OUTPUT_DIR = DEFAULT_OUTPUT / "best_frames"
TEMP_FRAME_EXTRACT_DIR = DEFAULT_OUTPUT / "temp_frames"

# Crea directory di output necessarie
DEFAULT_OUTPUT.mkdir(parents=True, exist_ok=True)
BEST_FRAMES_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
TEMP_FRAME_EXTRACT_DIR.mkdir(parents=True, exist_ok=True)

# Stampa di verifica
print(f"\n--- Percorsi Configurati ---")
print(f"Training Data: {DEFAULT_TRAIN_DIR}")
print(f"Validation Data: {DEFAULT_VAL_DIR}")
print(f"Input Immagini (Inferenza): {DEFAULT_IMG_DIR}")
print(f"Input Video (Inferenza): {DEFAULT_VID_DIR}")
print(f"Output Generale: {DEFAULT_OUTPUT}")
print(f"-----------------------------")

In [None]:
def setup_logging(output_dir: Path):
    log_file = output_dir / "run.log"
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s: %(message)s",
        datefmt='%Y-%m-%d %H:%M:%S',
        handlers=[logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler()]
    )
    logging.info(f"Logging configurato. Log salvati in: {log_file}")

def is_already_processed(key: str) -> bool:
    try:
        if not PROCESSED_LOG.exists(): return False
        with open(PROCESSED_LOG, 'r', encoding='utf-8') as f:
           return any(key == line.strip() for line in f)
    except Exception as e:
        logging.error(f"Errore leggendo {PROCESSED_LOG}: {e}")
        return False

def mark_processed(key: str):
    try:
        with open(PROCESSED_LOG, "a", encoding='utf-8') as f:
            f.write(key + "\\n")
    except Exception as e:
        logging.error(f"Errore scrivendo su {PROCESSED_LOG}: {e}")

def extract_frames(video_path: Path, frame_output_base_dir: Path, interval_sec: int) -> list:
    saved_frames_paths = []
    video_frame_dir = frame_output_base_dir / video_path.stem
    video_frame_dir.mkdir(parents=True, exist_ok=True)
    try:
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            logging.error(f"Impossibile aprire il video: {video_path}")
            return []
        fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
        step = max(1, int(fps * interval_sec))
        frame_count, saved_frame_index, total_frames = 0, 0, int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        with tqdm(total=total_frames, desc=f"Extracting frames from {video_path.name}") as pbar:
            while True:
                ret, frame = cap.read()
                if not ret: break
                if frame_count % step == 0:
                    out_file_path = video_frame_dir / f"{video_path.stem}_frame{saved_frame_index:05d}.jpg"
                    if cv2.imwrite(str(out_file_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 90]):
                        saved_frames_paths.append(out_file_path)
                        saved_frame_index += 1
                frame_count += 1
                pbar.update(1)
        cap.release()
        logging.info(f"Estratti {len(saved_frames_paths)} frame da {video_path.name}")
    except Exception as e:
        logging.error(f"Errore durante estrazione frame da {video_path}: {e}")
    return saved_frames_paths

def update_image_exif(img_path: Path, label: str):
    try:
        img = Image.open(img_path)
        exif_data = img.info.get('exif', b"")
        exif_dict = piexif.load(exif_data) if exif_data else {'0th': {}}
        if '0th' not in exif_dict: exif_dict['0th'] = {}
        exif_dict['0th'][piexif.ImageIFD.ImageDescription] = label.encode('utf-8')
        exif_dict['thumbnail'] = None
        exif_bytes = piexif.dump(exif_dict)
        img.save(img_path, exif=exif_bytes, quality=95)
        img.close()
    except Exception as e:
        logging.warning(f"UPDATE_EXIF: Errore per {img_path}: {e}")

def calculate_sharpness(image_cv2):
    try:
        if image_cv2 is None or image_cv2.size == 0: return 0.0
        gray = cv2.cvtColor(image_cv2, cv2.COLOR_BGR2GRAY)
        return cv2.Laplacian(gray, cv2.CV_64F).var()
    except Exception as e:
        logging.error(f"CALCULATE_SHARPNESS: Errore: {e}")
        return 0.0

print("Funzioni di Utilità definite.")

In [None]:
# --- Funzione per costruire il modello EfficientNetV2 ---
def build_model(num_classes: int) -> models.Model:
    """Costruisce un modello di classificazione basato su EfficientNetV2B0 pre-addestrato."""
    # Se vuoi provare un modello più grande, cambia EfficientNetV2B0 qui.
    # Esempi: EfficientNetV2B1, EfficientNetV2B2
    backbone = EfficientNetV2B0(
        weights="imagenet",
        include_top=False,
        input_shape=(*IMG_SIZE, 3),
        drop_connect_rate=0.2  # Aggiunge regolarizzazione
    )

    backbone.trainable = False # Congela il backbone per il training della testa

    # Costruisci la testa del modello
    x = layers.GlobalAveragePooling2D(name="gap")(backbone.output)
    x = layers.Dropout(0.3, name="dropout")(x) # Dropout per regolarizzazione
    outputs = layers.Dense(num_classes, activation="softmax", name="predictions")(x)
    model = models.Model(backbone.input, outputs, name="efficientnetv2b0_finetuned")

    model.compile(optimizer=optimizers.Adam(learning_rate=LEARN_HEAD_LR),
                  loss="categorical_crossentropy",
                  metrics=["accuracy"])

    logging.info(f"Modello EfficientNetV2B0 costruito e compilato per il training della testa.")
    return model

# --- Pipeline di Data Augmentation ---
data_augmentation_pipeline = tf.keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.15),
    layers.RandomZoom(0.15),
    layers.RandomContrast(0.1),
    layers.RandomBrightness(0.1),
], name="data_augmentation")

# --- Funzione per creare i dataset ---
def make_dataset(dirpath: Path, shuffle: bool, subset: str = None, validation_split: float = None, augment: bool = False):
    if not dirpath.is_dir():
        logging.error(f"MAKE_DATASET: Directory non trovata: {dirpath}")
        return None, None
    try:
        initial_ds = tf.keras.preprocessing.image_dataset_from_directory(
            dirpath, labels="inferred", label_mode="categorical",
            batch_size=BATCH_SIZE, image_size=IMG_SIZE, shuffle=shuffle,
            seed=123, validation_split=validation_split, subset=subset,
            interpolation='bilinear'
        )
        class_names = initial_ds.class_names

        # Applica augmentation se richiesto (solo per il training set)
        processed_ds = initial_ds
        if augment:
            processed_ds = processed_ds.map(lambda imgs, labs: (data_augmentation_pipeline(imgs, training=True), labs),
                                           num_parallel_calls=tf.data.AUTOTUNE)

        # Applica sempre la funzione di preprocessing specifica del modello
        processed_ds = processed_ds.map(lambda imgs, labs: (preprocess_input(imgs), labs),
                                       num_parallel_calls=tf.data.AUTOTUNE)

        return processed_ds.prefetch(tf.data.AUTOTUNE), class_names
    except Exception as e:
        logging.error(f"MAKE_DATASET: Errore durante creazione dataset da {dirpath}: {e}")
        return None, None

print("Funzioni per Modello e Dataset definite.")

In [None]:
def train(args):
    """Esegue il ciclo di training completo."""
    setup_logging(args.output)
    logging.info("--- Inizio Processo di Training ---")

    # Gestione dataset di training e validazione
    train_dir_path = Path(args.train_dir)
    val_dir_path = Path(args.val_dir) if args.val_dir else None

    if val_dir_path and val_dir_path.is_dir():
        logging.info(f"Uso directory di validazione separata: {val_dir_path}")
        train_ds, class_names = make_dataset(train_dir_path, shuffle=True, augment=True)
        val_ds, _ = make_dataset(val_dir_path, shuffle=False)
    else:
        logging.info(f"Uso 20% split da {train_dir_path} per validazione.")
        train_ds, class_names = make_dataset(train_dir_path, shuffle=True, subset='training', validation_split=0.2, augment=True)
        val_ds, _ = make_dataset(train_dir_path, shuffle=False, subset='validation', validation_split=0.2)

    if not all([train_ds, val_ds, class_names]):
        logging.error("Creazione dataset fallita. Training interrotto.")
        return

    num_classes = len(class_names)
    logging.info(f"Trovate {num_classes} classi: {class_names}")

    # Costruisci il modello
    model = build_model(num_classes)

    # Callbacks
    checkpoint_head = tf.keras.callbacks.ModelCheckpoint(
        filepath=str(args.output / "model_head_best.keras"), monitor='val_accuracy',
        save_best_only=True, mode='max', verbose=1)
    checkpoint_fine = tf.keras.callbacks.ModelCheckpoint(
        filepath=str(args.output / "model_fine_tuned_best.keras"), monitor='val_accuracy',
        save_best_only=True, mode='max', verbose=1)
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

    # Fase 1: Training della testa
    logging.info(f"--- FASE 1: Training testa per {args.epochs_head} epoche ---")
    model.fit(train_ds, validation_data=val_ds, epochs=args.epochs_head, callbacks=[checkpoint_head])

    # Carica il miglior modello dalla fase 1
    logging.info("Caricamento del miglior modello dal training della testa.")
    model = models.load_model(args.output / "model_head_best.keras")

    # Fase 2: Fine-tuning
    model.layers[0].trainable = True # Scongela il backbone EfficientNetV2
    model.compile(optimizer=optimizers.Adam(learning_rate=args.learn_full_lr),
                  loss="categorical_crossentropy",
                  metrics=["accuracy"])
    logging.info(f"Modello ricompilato per fine-tuning (LR={args.learn_full_lr})")

    logging.info(f"--- FASE 2: Fine-tuning per max {args.epochs_fine} epoche ---")
    model.fit(train_ds, validation_data=val_ds, epochs=args.epochs_fine, callbacks=[checkpoint_fine, early_stopping])

    # Salvataggio modello finale e classi
    final_model_path = args.output / "place_model.keras"
    logging.info(f"Salvataggio modello finale in: {final_model_path}")
    model.save(final_model_path)

    class_indices_path = args.output / "class_indices.csv"
    pd.DataFrame({"class_name": class_names, "index": list(range(num_classes))}).to_csv(class_indices_path, index=False)
    logging.info(f"Mappatura classi salvata in: {class_indices_path}")
    logging.info("--- Processo di Training Concluso ---")

print("Funzione di Training definita.")

In [None]:
# Prima di iniziare un nuovo training, cancellare i vecchi modelli
# e il log dei file processati per partire da zero.
# De-commenta le righe seguenti se vuoi fare pulizia.

# print("Pulizia dei vecchi file di modello e log in corso...")
# old_model_files = ["model_head_best.keras", "model_fine_tuned_best.keras", "place_model.keras"]
# for f_name in old_model_files:
#     if (DEFAULT_OUTPUT / f_name).exists():
#         (DEFAULT_OUTPUT / f_name).unlink()
# if PROCESSED_LOG.exists():
#     PROCESSED_LOG.unlink()
# print("Pulizia completata.")


# Configurazione per il training
args_train_config = SimpleNamespace(
    train_dir=str(DEFAULT_TRAIN_DIR),
    val_dir=str(DEFAULT_VAL_DIR),
    output=DEFAULT_OUTPUT,
    epochs_head=EPOCHS_HEAD,
    epochs_fine=EPOCHS_FINE,
    learn_head_lr=LEARN_HEAD_LR,
    learn_full_lr=LEARN_FULL_LR
)

# ESECUZIONE DEL TRAINING
try:
    train(args_train_config)
except Exception as e:
    print(f"ERRORE INASPETTATO DURANTE IL TRAINING: {e}")
    import traceback
    traceback.print_exc()

In [None]:
def infer(args):
    """Esegue inferenza su immagini e video."""
    setup_logging(args.output)
    logging.info("--- Inizio Processo di Inferenza ---")

    # Caricamento modello e classi
    model_path = args.output / "place_model.keras"
    class_indices_path = args.output / "class_indices.csv"
    if not model_path.exists() or not class_indices_path.exists():
        logging.error(f"Modello ({model_path}) o indici classi ({class_indices_path}) non trovati.")
        return

    model = models.load_model(model_path)
    class_names = pd.read_csv(class_indices_path).sort_values("index")["class_name"].tolist()
    logging.info(f"Modello e {len(class_names)} classi caricate.")

    # Preparazione file CSV
    with open(args.csv_path, "a", newline="", encoding='utf-8') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=["SourceType", "PredictedPlace", "OriginalPath", "Confidence", "IsVideo", "BestFrameSavedPath", "Sharpness"])
        if csv_file.tell() == 0:
            writer.writeheader()

        # Funzione helper per caricare e preprocessare una singola immagine
        def load_and_preprocess_img_infer(path_str):
            try:
                img = tf.io.read_file(path_str)
                img = tf.image.decode_image(img, channels=3, expand_animations=False)
                img = tf.image.resize(img, IMG_SIZE)
                img = preprocess_input(img)
                return img
            except Exception as e:
                logging.warning(f"Errore caricamento immagine {path_str}: {e}")
                return None

        # Processamento VIDEO
        logging.info(f"Inferenza video in: {args.vid_dir}")
        vid_dir = Path(args.vid_dir)
        if vid_dir.is_dir():
            video_files = sorted(list(vid_dir.glob("*.mp4")) + list(vid_dir.glob("*.mov")) + list(vid_dir.glob("*.avi")))
            for vid_path in video_files:
                key = f"VID::{vid_path.resolve()}"
                if is_already_processed(key):
                    logging.info(f"Video già processato, saltato: {vid_path.name}")
                    continue

                logging.info(f"Processando video: {vid_path.name}")
                frame_paths = extract_frames(vid_path, TEMP_FRAME_EXTRACT_DIR, args.frame_interval)
                if not frame_paths: continue

                # Carica e preprocessa i frame validi
                valid_frames_data = [d for d in [load_and_preprocess_img_infer(str(p)) for p in frame_paths] if d is not None]
                if not valid_frames_data: continue

                # Esegui predizioni
                predictions = model.predict(tf.stack(valid_frames_data), batch_size=BATCH_SIZE)
                pred_indices = np.argmax(predictions, axis=1)
                pred_confidences = np.max(predictions, axis=1)

                # Calcola risultato per il video
                majority_idx = np.bincount(pred_indices).argmax()
                video_label = class_names[majority_idx]
                video_confidence = float(np.mean(pred_confidences[pred_indices == majority_idx]))

                logging.info(f"Video: {vid_path.name} -> Classe: {video_label} (Conf: {video_confidence:.3f})")

                # Seleziona e salva i frame migliori
                candidate_frames_info = []
                for i, frame_path in enumerate(frame_paths):
                    if pred_indices[i] == majority_idx:
                        sharpness = calculate_sharpness(cv2.imread(str(frame_path)))
                        if sharpness >= args.sharpness_threshold:
                            candidate_frames_info.append((sharpness, frame_path, pred_confidences[i]))

                candidate_frames_info.sort(key=lambda x: x[0], reverse=True)
                best_frames_to_save = candidate_frames_info[:args.frames_per_place]

                saved_best_frame_paths = []
                if best_frames_to_save:
                    video_best_frames_dir = BEST_FRAMES_OUTPUT_DIR / video_label / vid_path.stem
                    video_best_frames_dir.mkdir(parents=True, exist_ok=True)
                    for sharpness, frame_p, conf in best_frames_to_save:
                        out_filename = f"{vid_path.stem}__{video_label}__frame{frame_p.stem.split('frame')[-1]}_sharp{sharpness:.0f}.jpg"
                        out_path = video_best_frames_dir / out_filename
                        shutil.copy(str(frame_p), str(out_path))
                        update_image_exif(out_path, video_label)
                        saved_best_frame_paths.append(str(out_path.resolve()))

                writer.writerow({
                    "SourceType": "Video", "PredictedPlace": video_label, "OriginalPath": str(vid_path.resolve()),
                    "Confidence": f"{video_confidence:.4f}", "IsVideo": 1,
                    "BestFrameSavedPath": ";".join(saved_best_frame_paths) if saved_best_frame_paths else "N/A",
                    "Sharpness": f"{best_frames_to_save[0][0]:.1f}" if best_frames_to_save else "N/A"
                })
                mark_processed(key)
                shutil.rmtree(TEMP_FRAME_EXTRACT_DIR / vid_path.stem, ignore_errors=True)

    logging.info("--- Processo di Inferenza Concluso ---")

print("Funzione di Inferenza definita.")

In [None]:
args_infer_config = SimpleNamespace(
    img_dir=str(DEFAULT_IMG_DIR),
    vid_dir=str(DEFAULT_VID_DIR),
    output=DEFAULT_OUTPUT,
    csv_path=DEFAULT_CSV,
    frame_interval=FRAME_INTERVAL,
    sharpness_threshold=SHARPNESS_THRESHOLD,
    frames_per_place=FRAMES_PER_PLACE_PER_VIDEO,
)

print("--- Parametri per l'Inferenza ---")
print(f"Directory Video Input: {args_infer_config.vid_dir}")
print(f"Directory Output: {args_infer_config.output}")
print("---------------------------------")

# ESECUZIONE DELL'INFERENZA
try:
    infer(args_infer_config)
except Exception as e:
    print(f"ERRORE INASPETTATO DURANTE L'INFERENZA: {e}")
    import traceback
    traceback.print_exc()