In [3]:
import cv2
import numpy as np
import logging
import os
from collections import deque

def setup_logging(output_dir):
    log_file = os.path.join(output_dir, "motion_smoothing.log")
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[
            logging.FileHandler(log_file),  # Log su file
            logging.StreamHandler()         # Log su console
        ]
    )
    logging.info("Logging configurato. Log salvati in: %s", log_file)

def add_transparent_overlay(frame, boxes, color=(0, 0, 255), alpha=0.3):
    """
    Disegna rettangoli rossi trasparenti sulle aree di movimento.
    """
    overlay = frame.copy()
    for (x, y, w, h) in boxes:
        cv2.rectangle(overlay, (x, y), (x + w, y + h), color, -1)
    cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)

def temporal_smoothing_flow(
    video_path, 
    output_dir, 
    flow_threshold=0.5,       # Soglia di magnitudo per considerare un pixel in movimento
    alpha_fraction=0.2,       # Percentuale di frame su N in cui il pixel deve essere acceso
    window_size=30,           # N: lunghezza finestra temporale
    morph_kernel=3,           # Dimensione kernel morfologico ridotto
    save_name="flow_smoothing_output.mp4",
    mask_save_name="movement_mask.mp4",  # Nome del video maschera binaria
    margin=10,                # Padding in pixel
    scale_factor=0.5,         # Riduzione di scala per il calcolo dell'Optical Flow
    skip_frames=0             # Numero di frame da saltare (0 = nessuno)
):
    """
    Rileva il movimento usando Farneback Optical Flow e produce:
      - Un video con overlay (rettangoli rossi) = save_name
      - Un video con maschera binaria (0/255) = mask_save_name
    """
    logging.info("=== Inizio Optical Flow con Buffer Temporale ===")
    logging.info(f"Video input: {video_path}")
    logging.info(f"Finestra temporale: N={window_size}, soglia %={alpha_fraction}, scale_factor={scale_factor}")
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        logging.error("Impossibile aprire il video.")
        return

    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    logging.info(f"FPS: {fps} | Dimensioni: {width}x{height}")

    # VideoWriter per video con overlay
    fourcc_overlay = cv2.VideoWriter_fourcc(*'avc1')
    output_path = os.path.join(output_dir, save_name)
    out_overlay = cv2.VideoWriter(output_path, fourcc_overlay, fps, (width, height))

    # VideoWriter per la maschera binaria (isColor=False)
    fourcc_mask = cv2.VideoWriter_fourcc(*'avc1')
    mask_output_path = os.path.join(output_dir, mask_save_name)
    out_mask = cv2.VideoWriter(mask_output_path, fourcc_mask, fps, (width, height), isColor=False)

    ret, first_frame = cap.read()
    if not ret:
        logging.error("Non riesco a leggere il primo frame.")
        return

    # Converto il primo frame in scala di grigi e riduco per Optical Flow
    prev_gray_full = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    prev_gray = cv2.resize(prev_gray_full, (0, 0), fx=scale_factor, fy=scale_factor)

    # Deque per maschere negli ultimi N frame
    mask_queue = deque(maxlen=window_size)
    frame_count = 1

    # Kernel morfologico
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (morph_kernel, morph_kernel))

    while True:
        ret, frame = cap.read()
        if not ret:
            logging.info("Fine del video.")
            break

        # Se vuoi saltare frame per velocità
        if skip_frames > 0 and frame_count % (skip_frames + 1) != 0:
            gray_full = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            gray = cv2.resize(gray_full, (0, 0), fx=scale_factor, fy=scale_factor)
            prev_gray = gray.copy()
            frame_count += 1
            continue

        gray_full = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.resize(gray_full, (0, 0), fx=scale_factor, fy=scale_factor)

        # Optical Flow
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, gray, None,
            0.5,    # Scale
            2,      # Levels
            9,      # Winsize
            2,      # Iterations
            5,      # Poly_n
            1.2,    # Poly_sigma
            0       # Flags
        )

        # Magnitudo e Angolo
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1], angleInDegrees=False)
        mask_current_resized = (mag > flow_threshold).astype(np.uint8) * 255
        # Risali alla risoluzione originale
        mask_current = cv2.resize(mask_current_resized, (width, height), interpolation=cv2.INTER_NEAREST)

        # Aggiungi la maschera alla coda
        mask_queue.append(mask_current)

        # Costruisci la maschera "media" con smoothing temporale
        if frame_count <= window_size:
            cumulative_mask = np.sum(np.array(mask_queue), axis=0)
        else:
            cumulative_mask = np.sum(np.array(mask_queue), axis=0)

        mask_smoothed = (cumulative_mask >= (alpha_fraction * len(mask_queue) * 255)).astype(np.uint8) * 255

        # Operazioni morfologiche
        mask_smoothed = cv2.morphologyEx(mask_smoothed, cv2.MORPH_CLOSE, kernel)
        mask_smoothed = cv2.morphologyEx(mask_smoothed, cv2.MORPH_OPEN, kernel)

        # Trova contorni e crea bounding box
        contours, _ = cv2.findContours(mask_smoothed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        boxes = []
        for cnt in contours:
            x, y, w, h = cv2.boundingRect(cnt)
            x_padded = max(0, x - margin)
            y_padded = max(0, y - margin)
            w_padded = min(w + 2*margin, width - x_padded)
            h_padded = min(h + 2*margin, height - y_padded)

            if w_padded > 5 and h_padded > 5:
                boxes.append((x_padded, y_padded, w_padded, h_padded))

        if frame_count % 10 == 0:
            logging.info(f"Frame {frame_count} - {len(boxes)} aree di movimento")

        # Disegno overlay
        if boxes:
            add_transparent_overlay(frame, boxes, (0, 0, 255), alpha=0.3)

        # Scrivi il frame con overlay
        out_overlay.write(frame)

        # Crea la maschera binaria con rettangoli pieni
        mask_rect = np.zeros((height, width), dtype=np.uint8)
        for (x, y, w, h) in boxes:
            cv2.rectangle(mask_rect, (x, y), (x + w, y + h), 255, -1)  # -1 riempie il rettangolo

        # Scrivi la maschera binaria
        out_mask.write(mask_rect)

        # Opzionale: visualizza il frame con overlay
        cv2.imshow("Temporal Smoothing Flow", frame)

        # Aggiorna
        prev_gray = gray.copy()
        frame_count += 1

        if cv2.waitKey(1) & 0xFF == ord('q'):
            logging.info("Interruzione manuale.")
            break

    cap.release()
    out_overlay.release()
    out_mask.release()
    cv2.destroyAllWindows()
    logging.info(f"Video overlay salvato in: {output_path}")
    logging.info(f"Video maschera salvato in: {mask_output_path}")

def main():
    video_path = "../Dataset/input/test2.mp4"
    output_dir = "../Dataset/output/"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    setup_logging(output_dir)
    
    temporal_smoothing_flow(
        video_path=video_path,
        output_dir=output_dir,
        flow_threshold=0.5,
        alpha_fraction=0.2,
        window_size=9,
        morph_kernel=3,
        save_name="flow_smoothing_output.mp4",
        mask_save_name="movement_mask.mp4",
        margin=20,
        scale_factor=0.5,
        skip_frames=0
    )

if __name__ == "__main__":
    main()

2025-01-18 10:19:36,241 - INFO - Logging configurato. Log salvati in: ../Dataset/output/motion_smoothing.log
2025-01-18 10:19:36,242 - INFO - === Inizio Optical Flow con Buffer Temporale ===
2025-01-18 10:19:36,243 - INFO - Video input: ../Dataset/input/test2.mp4
2025-01-18 10:19:36,243 - INFO - Finestra temporale: N=9, soglia %=0.2, scale_factor=0.5
2025-01-18 10:19:36,278 - INFO - FPS: 60.05879882402352 | Dimensioni: 1920x1080
2025-01-18 10:19:37,141 - INFO - Frame 10 - 107 aree di movimento
2025-01-18 10:19:37,980 - INFO - Frame 20 - 50 aree di movimento
2025-01-18 10:19:38,831 - INFO - Frame 30 - 50 aree di movimento
2025-01-18 10:19:39,663 - INFO - Frame 40 - 52 aree di movimento
2025-01-18 10:19:40,517 - INFO - Frame 50 - 59 aree di movimento
2025-01-18 10:19:41,393 - INFO - Frame 60 - 111 aree di movimento
2025-01-18 10:19:42,333 - INFO - Frame 70 - 55 aree di movimento
2025-01-18 10:19:43,220 - INFO - Frame 80 - 63 aree di movimento
2025-01-18 10:19:44,075 - INFO - Frame 90 - 4

In [1]:
import cv2
import numpy as np
import logging
import os

def setup_logging(output_dir):
    log_file = os.path.join(output_dir, "compression.log")
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[
            logging.FileHandler(log_file),  # Log su file
            logging.StreamHandler()         # Log su console
        ]
    )
    logging.info("Logging configurato. Log salvati in: %s", log_file)

def compress_frame_with_motion_mask(frame_bgr, mask, blockSize=8, QTY_static=None):
    """
    Comprimi il frame in base alla maschera di movimento utilizzando DCT e quantizzazione.
    """
    if QTY_static is None:
        QTY_static = np.full((blockSize, blockSize), 50, dtype=np.float32)  # Quantizzazione aggressiva

    # Converti il frame in YCrCb
    H, W, _ = frame_bgr.shape
    frame_ycrcb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2YCrCb)
    Y, Cr, Cb = cv2.split(frame_ycrcb)

    Y_recon = np.zeros_like(Y, dtype=np.float32)

    # Itera sui blocchi
    for by in range(0, H, blockSize):
        for bx in range(0, W, blockSize):
            y_end = min(by + blockSize, H)
            x_end = min(bx + blockSize, W)
            blockY = Y[by:y_end, bx:x_end]
            blockMask = mask[by:y_end, bx:x_end]

            # Se il blocco è in movimento, non applicare la compressione
            if np.mean(blockMask) > 0:  # Movimento presente
                Y_recon[by:y_end, bx:x_end] = blockY
            else:
                # Comprimi il blocco con DCT e quantizzazione
                paddedY = np.zeros((blockSize, blockSize), dtype=np.float32)
                paddedY[:(y_end-by), :(x_end-bx)] = blockY

                dct_block = cv2.dct(paddedY)  # Calcolo DCT
                quant_block = np.round(dct_block / QTY_static)  # Quantizzazione
                idct_block = cv2.idct(quant_block * QTY_static)  # Ricostruzione

                Y_recon[by:y_end, bx:x_end] = idct_block[:(y_end-by), :(x_end-bx)]

    # Ricostruisci il frame in YCrCb e converti in BGR
    recon_frame_ycrcb = cv2.merge([Y_recon.astype(np.uint8), Cr, Cb])
    recon_bgr = cv2.cvtColor(recon_frame_ycrcb, cv2.COLOR_YCrCb2BGR)
    return recon_bgr

def main_compress_with_motion(input_video, motionMask_video, output_video="COMPRESSION_test2.mp4"):
    """
    Comprimi le aree statiche basate sulla maschera binaria e utilizza H.264 per la compressione.
    """
    cap_input = cv2.VideoCapture(input_video)
    cap_mask = cv2.VideoCapture(motionMask_video)

    if not cap_input.isOpened():
        logging.error(f"Impossibile aprire il video originale: {input_video}")
        return
    if not cap_mask.isOpened():
        logging.error(f"Impossibile aprire il video maschera: {motionMask_video}")
        return

    fps = cap_input.get(cv2.CAP_PROP_FPS)
    width = int(cap_input.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap_input.get(cv2.CAP_PROP_FRAME_HEIGHT))
    logging.info(f"FPS: {fps} | Dimensioni: {width}x{height}")

    # VideoWriter per video compresso con codec H.264
    fourcc = cv2.VideoWriter_fourcc(*'avc1')  # H.264 codec
    output_path = os.path.join(os.path.dirname(motionMask_video), output_video)
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Tabella di quantizzazione per aree statiche
    QTY_static = np.full((8, 8), 200, dtype=np.int16)  # Compressione aggressiva

    frame_count = 0

    while True:
        ret_in, frame_in = cap_input.read()
        ret_mask, frame_mask = cap_mask.read()
        if not ret_in or not ret_mask:
            break

        frame_count += 1

        # Comprimi il frame con maschera di movimento
        compressed_frame = compress_frame_with_motion_mask(
            frame_in, 
            frame_mask, 
            blockSize=8, 
            QTY_static=QTY_static
        )

        # Scrivi il frame compresso nel video di output
        out.write(compressed_frame)

        # Opzionale: visualizza il frame compresso
        cv2.imshow("Compressed Frame", compressed_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            logging.info("Interruzione manuale.")
            break

    cap_input.release()
    cap_mask.release()
    out.release()
    cv2.destroyAllWindows()
    logging.info(f"Elaborazione completata. Video compresso salvato in: {output_path}")

def main():
    video_originale = "../Dataset/input/test2.mp4"
    output_dir = "../Dataset/output/"
    mask_video_path = os.path.join(output_dir, "movement_mask.mp4")  # Generato dal Blocco 1
    video_compresso = "COMPRESSION_test2.mp4"
    compressed_video_path = os.path.join(output_dir, video_compresso)

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    setup_logging(output_dir)

    main_compress_with_motion(
        input_video=video_originale,
        motionMask_video=mask_video_path,
        output_video=video_compresso
    )

if __name__ == "__main__":
    main()

2025-01-18 10:33:09,695 - INFO - Logging configurato. Log salvati in: ../Dataset/output/compression.log
2025-01-18 10:33:09,719 - INFO - FPS: 60.05879882402352 | Dimensioni: 1920x1080
2025-01-18 10:33:11.106 Python[14764:367833] +[IMKClient subclass]: chose IMKClient_Modern
2025-01-18 10:33:11.106 Python[14764:367833] +[IMKInputSession subclass]: chose IMKInputSession_Modern
