In [1]:
import os
import cv2
import torch
import numpy as np
import json
import supervision as sv
from PIL import Image
from sam2.build_sam import build_sam2_video_predictor, build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection 
import cv2
import numpy as np
import io
from google import genai
from google.genai.types import Part
from dotenv import load_dotenv


load_dotenv()



class GroundedSam2Tracker:
    """
    A unified class to handle video segmentation, Grounding DINO detection, 
    and SAM 2 video tracking for a hackathon project.
    """
    def __init__(self, sam2_checkpoint_path, sam2_config_file, gdino_model_id="IDEA-Research/grounding-dino-tiny"):
        
        # --- Efficiency Setup (Hackathon Tip: Load once!) ---
        torch.cuda.empty_cache()
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # Enable performance features for newer GPUs
        if torch.cuda.get_device_properties(0).major >= 8:
            torch.backends.cuda.matmul.allow_tf32 = True
            torch.backends.cudnn.allow_tf32 = True
            
        # Initialize SAM 2 image and video predictors
        self.video_predictor = build_sam2_video_predictor(sam2_config_file, sam2_checkpoint_path)
        sam2_image_model = build_sam2(sam2_config_file, sam2_checkpoint_path)
        self.image_predictor = SAM2ImagePredictor(sam2_image_model)

        # Initialize Grounding DINO
        self.gdino_processor = AutoProcessor.from_pretrained(gdino_model_id)
        self.gdino_model = AutoModelForZeroShotObjectDetection.from_pretrained(gdino_model_id).to(self.device)
        print("✅ Models initialized and moved to CUDA.")


    def segment_video(self, video_path, output_folder_frames, target_resolution=(960, 540), max_images=400):
        """
        [PHASE 1] Extract frames from a video, skipping frames to respect max_images limit.
        Stores output folder path for subsequent tracking.
        """
        self.video_path = video_path
        self.output_folder_frames = output_folder_frames
        
        if not os.path.exists(output_folder_frames):
            os.makedirs(output_folder_frames)
        
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            print(f"Error: Unable to open video at {video_path}")
            return False

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frames_per_image = int(np.ceil(total_frames / max_images)) if total_frames > max_images else 1

        print(f"Total frames: {total_frames}. Calculated interval: {frames_per_image}")
        
        frame_count = 0
        image_count = 0
        while True:
            ret, frame = cap.read()
            if not ret: break
            
            if frame_count % frames_per_image == 0 and image_count < max_images:
                resized_frame = cv2.resize(frame, target_resolution, interpolation=cv2.INTER_AREA)
                output_filename = os.path.join(output_folder_frames, f"{image_count:03d}.jpg")
                cv2.imwrite(output_filename, resized_frame)
                image_count += 1
            
            if image_count >= max_images: break
            frame_count += 1
            
        cap.release()
        print(f"Extraction finished. {image_count} frames saved.")
        return True


    def track_object(self, prompt, initial_frame_idx=0):
        """
        [PHASE 2] Track and segment the object defined by 'prompt' across all frames.
        Returns a list of annotated (NumPy array) frames.
        """
        
        # --- Setup paths and state ---
        video_dir = self.output_folder_frames
        
        frame_names = [p for p in os.listdir(video_dir)
                       if os.path.splitext(p)[-1] in [".jpg", ".jpeg", ".JPG", ".JPEG"]]
        frame_names.sort(key=lambda p: int(os.path.splitext(p)[0]))
        
        if not frame_names:
            print("Error: No frames found in the directory.")
            return []

        # Use bfloat16 for computation
        with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
            
            # --- Step 2: Initial Grounding DINO Detection (on the first frame) ---
            img_path = os.path.join(video_dir, frame_names[initial_frame_idx])
            image = Image.open(img_path)

            inputs = self.gdino_processor(images=image, text=prompt, return_tensors="pt").to(self.device)
            with torch.no_grad():
                outputs = self.gdino_model(**inputs)

            results = self.gdino_processor.post_process_grounded_object_detection(
                outputs,
                inputs.input_ids,
                threshold=0.25,
                text_threshold=0.3,
                target_sizes=[image.size[::-1]]
            )
            
            # --- Step 2b: SAM 2 Image Segmentation (on the first frame) ---
            self.image_predictor.set_image(np.array(image.convert("RGB")))
            
            input_boxes = results[0]["boxes"].cpu().numpy()
            OBJECTS = results[0]["labels"]

            masks, _, _ = self.image_predictor.predict(box=input_boxes, multimask_output=False)
            
            if masks.ndim == 4: masks = masks.squeeze(1)

            # --- Step 3: Register Prompts to Video Predictor ---
            inference_state = self.video_predictor.init_state(video_path=video_dir)
            PROMPT_TYPE_FOR_VIDEO = "box" 
            
            for object_id, (box) in enumerate(input_boxes, start=1):
                self.video_predictor.add_new_points_or_box(
                    inference_state=inference_state,
                    frame_idx=initial_frame_idx,
                    obj_id=object_id,
                    box=box,
                )

            # --- Step 4: Propagate Masks through all frames ---
            video_segments = {}
            print(f"Starting tracking from frame {initial_frame_idx} for '{prompt}'.")
            
            for out_frame_idx, out_obj_ids, out_mask_logits in self.video_predictor.propagate_in_video(inference_state):
                video_segments[out_frame_idx] = {
                    out_obj_id: (out_mask_logits[i] > 0.0).cpu().numpy()
                    for i, out_obj_id in enumerate(out_obj_ids)
                }

        # --- Step 5: Annotate and Collect Results ---
        annotated_frames_list = []
        ID_TO_OBJECTS = {i: obj for i, obj in enumerate(OBJECTS, start=1)}
        
        for frame_idx, segments in video_segments.items():
            img = cv2.imread(os.path.join(video_dir, frame_names[frame_idx]))
            if img is None: continue

            object_ids = list(segments.keys())
            masks = list(segments.values())
            masks = np.concatenate(masks, axis=0)

            detections = sv.Detections(
                xyxy=sv.mask_to_xyxy(masks),
                mask=masks,
                class_id=np.array(object_ids, dtype=np.int32),
            )
            
            # Annotate
            box_annotator = sv.BoxAnnotator()
            annotated_frame = box_annotator.annotate(scene=img.copy(), detections=detections)
            label_annotator = sv.LabelAnnotator()
            annotated_frame = label_annotator.annotate(annotated_frame, detections=detections, 
                labels=[ID_TO_OBJECTS.get(i, "tracked object") for i in object_ids])
            mask_annotator = sv.MaskAnnotator()
            annotated_frame = mask_annotator.annotate(scene=annotated_frame, detections=detections)                
            
            annotated_frames_list.append(annotated_frame)

        print(f"Tracking completed. {len(annotated_frames_list)} frames with tracked objects stored.")
        return annotated_frames_list


# --- MAIN EXECUTION BLOCK (HACKATHON EXAMPLE) ---

# --- CONFIGURATION (Change these paths!) ---
VIDEO_INPUT_PATH = "/home/pepito/Documents/Python/ML/Mit_hackathon/Nike.mp4" 
FRAMES_OUTPUT_DIR = "/home/pepito/Documents/Python/ML/Mit_hackathon/Nike_frames"
SAM2_CHECKPOINT = "/home/pepito/Documents/Python/ML/Mit_hackathon/Grounded-SAM-2/checkpoints/sam2.1_hiera_tiny.pt"
MODEL_CONFIG = "configs/sam2.1/sam2.1_hiera_t.yaml"
TRACKING_PROMPT = "Only sport shoes."
MAX_FRAMES_TO_EXTRACT = 200 # Faster analysis for the hackathon

# 1. Initialize the tracker (Loads models ONCE)
tracker = GroundedSam2Tracker(
    sam2_checkpoint_path=SAM2_CHECKPOINT,
    sam2_config_file=MODEL_CONFIG
)

# 2. Extract frames
tracker.segment_video(
    video_path=VIDEO_INPUT_PATH,
    output_folder_frames=FRAMES_OUTPUT_DIR,
    max_images=MAX_FRAMES_TO_EXTRACT
)

# 3. Track object and get annotated frames list
final_annotated_frames = tracker.track_object(
    prompt=TRACKING_PROMPT
)

# --- Next Steps for Analysis ---
if final_annotated_frames:
    print(f"\nReady for Analysis! The list 'final_annotated_frames' contains {len(final_annotated_frames)} NumPy arrays.")



2025-11-09 13:06:29.421037: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-11-09 13:06:29.461167: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-11-09 13:06:30.739444: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


✅ Models initialized and moved to CUDA.
Total frames: 919. Calculated interval: 5
Extraction finished. 184 frames saved.


frame loading (JPEG): 100%|██████████| 307/307 [00:04<00:00, 62.88it/s]

Skipping the post-processing step due to the error above. You can still use SAM 2 and it's OK to ignore the error above, although some post-processing functionality may be limited (which doesn't affect the results in most cases; see https://github.com/facebookresearch/sam2/blob/main/INSTALL.md).


Starting tracking from frame 0 for 'Only sport shoes.'.


propagate in video: 100%|██████████| 307/307 [00:30<00:00,  9.91it/s]


Tracking completed. 307 frames with tracked objects stored.

Ready for Analysis! The list 'final_annotated_frames' contains 307 NumPy arrays.


In [2]:
import io
import json
import os
import re
import cv2
import numpy as np
from google import genai
from google.genai.types import Part
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed

# --- 1. CONFIGURATION ---
load_dotenv()
try:
    gemini_client = genai.Client()
except Exception as e:
    print(f"Error initializing Gemini client: {e}")

# IMPORTANT: Define brand profiles
BRAND_PROFILES = {
    "Nike": {
        "visual_tone": "Energetic, Dynamic, Performance-focused, High contrast.",
        "flagging_criteria": {"logo_score_min": 0.5, "quality_score_min": 2}
    },
    "Rolex": {
        "visual_tone": "Luxurious, Classic, Minimalist, Elegant lighting.",
        "flagging_criteria": {"logo_score_min": 0.7, "quality_score_min": 4} 
    },
}

# --- 2. PROMPT GENERATION ---
def get_json_template_string():
    """Returns the raw JSON structure as a string with placeholders."""
    return """
    {
      "prompt_id": "BRAND_COHERENCE_V1",
      "target_brand": "{target_brand_name}",
      "frame_status": "OK",
      "metrics": {
        "logo_presence_score": {
          "query": "Is the '{target_brand_name}' logo clearly visible, not obscured, and recognizable in the frame?",
          "score": "[Note between 0.0 (Absent) and 1.0 (Perfect)]",
          "status": "[YES/NO/PARTIAL]"
        },
        "visual_quality_score": {
          "query": "Evaluate the visual quality of the product focus area: clarity, sharpness, and acceptable lighting.",
          "score": "[Note from 1 (Very Poor) to 5 (Excellent)]",
          "justification": "[Describe the visual quality or defect.]"
        },
        "brand_tone_match": {
          "query": "Does the overall mood, composition, and color grading of the frame match the required brand tone: '{brand_visual_tone}'?",
          "match": "[YES/NO]",
          "justification": "[Indicate if the ambiance is coherent with the brand or describe the mismatch.]"
        },
        "product_placement_relevance": {
          "query": "Is the product placed in a context relevant to the brand's identity?",
          "match": "[YES/NO]",
          "justification": "[Describe the product context relevance.]"
        }
      },
      "is_flagged_bad_frame": "[NO]",
      "flag_reason": "None",
      "final_summary": "[One sentence summarizing conformity and the main positive/negative point.]"
    }
    """

def get_single_frame_prompt_custom(target_brand_name):
    """Generates the customized prompt containing the JSON template."""
    
    profile = BRAND_PROFILES.get(target_brand_name, BRAND_PROFILES["Nike"])
    
    prompt_template_string = """
    Your role is to act as a senior Brand Consistency Analyst. Analyze the visual frame provided, which is focused on a product from the brand '{target_brand_name}'.
    
    Your analysis must strictly use the JSON format provided below. Do not add any text, explanations, or headings before or after the JSON block.
    
    {json_template}
    
    You must change "frame_status" to "FLAGGED" and "is_flagged_bad_frame" to "YES" if ANY of these brand-specific conditions are met:
    - Logo Score is less than {logo_min_score}.
    - Visual Quality Score is less than {quality_min_score}.
    - Brand Tone Match is NO.
    """
    
    json_template = get_json_template_string()
    
    final_prompt = prompt_template_string.format(
        json_template=json_template,
        target_brand_name=target_brand_name,
        brand_visual_tone=profile["visual_tone"],
        logo_min_score=profile["flagging_criteria"]["logo_score_min"],
        quality_min_score=profile["flagging_criteria"]["quality_score_min"]
    )
    
    return final_prompt

# --- 3. PARALLEL PROCESSING FUNCTIONS ---

def process_single_frame(idx, frame_np_array, target_brand_name, client, prompt_template):
    """Analyzes a single frame and returns the compiled result dictionary."""
    
    try:
        # 1. Image preparation (encoding NumPy array to JPEG bytes)
        is_success, buffer = cv2.imencode(".jpg", frame_np_array)
        if not is_success:
            raise ValueError("Image encoding failed.")
        image_bytes = buffer.tobytes()
        image_part = Part.from_bytes(data=image_bytes, mime_type='image/jpeg')

        # 2. Call Gemini
        response = client.models.generate_content(
            model='gemini-2.5-flash',
            contents=[image_part, prompt_template]
        )
        
        # 3. Robust JSON Parsing (using the regex fix)
        response_text = response.text.strip()
        match = re.search(r'\{.*\}', response_text, re.DOTALL)
        if not match:
            raise json.JSONDecodeError("JSON block not found in response.", response_text, 0)
            
        json_text = match.group(0)
        frame_data = json.loads(json_text)
        
        # 4. Return data along with index
        return {"status": "SUCCESS", "index": idx, "data": frame_data}
        
    except Exception as e:
        # 5. Return error state
        return {"status": "ERROR", "index": idx, "error": str(e), "raw_response": response.text if 'response' in locals() else 'N/A'}


def analyze_and_compile_report_parallel(final_annotated_frames, target_brand_name, client, max_workers=30):
    """Processes all frames in parallel using ThreadPoolExecutor."""
    
    # Initialize report structure
    global_report = {
        "analysis_summary": {
            "total_frames_analyzed": len(final_annotated_frames),
            "total_flagged_frames": 0,
            "average_logo_score": 0.0,
            "average_quality_score": 0.0
        },
        "flagged_frames_details": [],
        "all_frames_data": []
    }
    
    total_logo_scores = []
    total_quality_scores = []
    prompt_template = get_single_frame_prompt_custom(target_brand_name)
    
    print(f"\n--- Starting Parallel Analysis for {len(final_annotated_frames)} Frames (Max Workers: {max_workers}) ---")
    
    # Execute in parallel
    futures = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for idx, frame_np_array in enumerate(final_annotated_frames):
            future = executor.submit(
                process_single_frame, 
                idx, 
                frame_np_array, 
                target_brand_name, 
                client, 
                prompt_template
            )
            futures.append(future)

        # Retrieve results as they complete
        for future in as_completed(futures):
            result = future.result()
            idx = result["index"]
            
            if result["status"] == "SUCCESS":
                frame_data = result["data"]
                
                # Robust score extraction and conversion
                try:
                    logo_score = float(frame_data['metrics']['logo_presence_score']['score'].replace('[', '').replace(']', ''))
                    quality_score = float(frame_data['metrics']['visual_quality_score']['score'].replace('[', '').replace(']', ''))
                except Exception:
                    logo_score, quality_score = 0.0, 0.0

                total_logo_scores.append(logo_score)
                total_quality_scores.append(quality_score)
                
                # Flagging logic
                profile = BRAND_PROFILES.get(target_brand_name, BRAND_PROFILES["Nike"])
                logo_min = profile["flagging_criteria"]["logo_score_min"]
                quality_min = profile["flagging_criteria"]["quality_score_min"]
                tone_match = frame_data['metrics']['brand_tone_match']['match'].upper()

                if logo_score < logo_min or quality_score < quality_min or tone_match == 'NO':
                     frame_data['is_flagged_bad_frame'] = 'YES'
                     frame_data['frame_status'] = 'FLAGGED'
                
                if frame_data.get('is_flagged_bad_frame', 'NO').upper() == 'YES':
                    global_report["analysis_summary"]["total_flagged_frames"] += 1
                    global_report["flagged_frames_details"].append({
                        "frame_index": idx,
                        "reason": frame_data.get('flag_reason', 'Criteria met for flagging'),
                        "summary": frame_data['final_summary']
                    })
                
                frame_data['frame_index'] = idx
                global_report["all_frames_data"].append(frame_data)
                print(f"✅ Frame {idx + 1} processed successfully.")

            else: # Error handling
                print(f"❌ Error processing Frame {idx + 1}: {result['error']}")
                global_report["all_frames_data"].append({"frame_index": idx, "status": "API_ERROR", "error_message": result['error'], "raw_response": result.get('raw_response', 'N/A')})

    # Calculate Global Averages
    global_report["analysis_summary"]["average_logo_score"] = np.mean(total_logo_scores) if total_logo_scores else 0.0
    global_report["analysis_summary"]["average_quality_score"] = np.mean(total_quality_scores) if total_quality_scores else 0.0
    
    return global_report



In [None]:
# --- 4. EXECUTION EXAMPLE ---

TARGET_BRAND = "Nike" 
final_compiled_json_report = analyze_and_compile_report_parallel(
    final_annotated_frames=final_annotated_frames,
    target_brand_name=TARGET_BRAND,
    client=gemini_client,
    max_workers=30 
)



--- Starting Parallel Analysis for 307 Frames (Max Workers: 30) ---
✅ Frame 1 processed successfully.
✅ Frame 29 processed successfully.
✅ Frame 12 processed successfully.
✅ Frame 17 processed successfully.
✅ Frame 4 processed successfully.
✅ Frame 21 processed successfully.
✅ Frame 30 processed successfully.
✅ Frame 24 processed successfully.
✅ Frame 14 processed successfully.
✅ Frame 25 processed successfully.
✅ Frame 27 processed successfully.
✅ Frame 6 processed successfully.
✅ Frame 11 processed successfully.
✅ Frame 3 processed successfully.
✅ Frame 28 processed successfully.
✅ Frame 15 processed successfully.
✅ Frame 26 processed successfully.
✅ Frame 9 processed successfully.
✅ Frame 22 processed successfully.
✅ Frame 23 processed successfully.
✅ Frame 5 processed successfully.
✅ Frame 10 processed successfully.
✅ Frame 19 processed successfully.
✅ Frame 13 processed successfully.
✅ Frame 16 processed successfully.
✅ Frame 2 processed successfully.
✅ Frame 7 processed successf

In [4]:
print(json.dumps(final_compiled_json_report, indent=2))

{
  "analysis_summary": {
    "total_frames_analyzed": 307,
    "total_flagged_frames": 194,
    "average_logo_score": 0.43534201954397395,
    "average_quality_score": 1.9022801302931596
  },
  "flagged_frames_details": [
    {
      "frame_index": 28,
      "reason": "Logo is absent, visual quality is very poor, and brand tone does not match.",
      "summary": "This frame is non-conforming as the Nike logo is absent, visual quality is extremely poor, and the overall ambiance and product context do not align with the brand's identity."
    },
    {
      "frame_index": 11,
      "reason": "Logo Score is less than 0.5 and Visual Quality Score is less than 2.",
      "summary": "The frame is flagged due to the complete absence of the Nike logo and extremely poor visual quality of the product, despite relevant product placement and brand tone match."
    },
    {
      "frame_index": 20,
      "reason": "Logo Score is less than 0.5 and Visual Quality Score is less than 2.",
      "summa

In [27]:
import numpy as np
import re 
import json

# ========================================================================
# 1. FONCTION DE CALCUL DU TEMPS (Gardée telle quelle)
# ========================================================================

def calculate_time_from_index(frame_index: int, fps: float = 25.0) -> str:
    """
    Calcule le timing HH:MM:SS à partir de l'index de la frame et du FPS.
    """
    if fps is None or fps <= 0:
        fps = 25.0 

    try:
        frame_index = int(frame_index)
    except (ValueError, TypeError):
        return "00:00:00"

    total_seconds = frame_index / fps
    
    hours = int(total_seconds // 3600)
    minutes = int((total_seconds % 3600) // 60)
    seconds = int(total_seconds % 60) 
    
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

# ========================================================================
# 2. FONCTION DE GÉNÉRATION DU RAPPORT COMPLET (LISTE LONGUE)
#    (Renommée pour la clarté du flux de travail)
# ========================================================================

def aggregate_and_group_issues(compiled_report, fps=25.0):
    """
    Traite le rapport brut pour calculer les scores globaux et génère la liste 
    complète des frames à faible performance (low_performance_frames).
    """
    
    all_data = compiled_report.get("all_frames_data", [])
    
    # Simuler le calcul des scores globaux (comme dans votre fonction initiale)
    # NOTE: Ici, nous utilisons les scores du rapport initial pour la cohérence.
    
    final_score = compiled_report['final_analysis_score']
    
    def get_evaluation(score):
        if score >= 0.85: return "Excellent"
        if score >= 0.7: return "Bon"
        if score >= 0.6: return "Passable (Révision requise)"
        return "Médiocre (Problèmes immédiats)"

    # Création de la liste des frames à faible performance (simulée avec le rapport initial)
    low_performance_frames = []
    # Nous utilisons ici la liste 'low_performance_frames' fournie dans la requête
    # pour obtenir une liste propre et correctement timée.
    for item in compiled_report['low_performance_frames']:
         # S'assurer que le timing est correct (même si on utilise celui du rapport)
         item['timing'] = calculate_time_from_index(item['frame_index'], fps) 
         low_performance_frames.append(item)

    # Retourne les données nécessaires à l'agrégation finale
    return {
        "final_analysis_score": final_score,
        "evaluation": compiled_report['evaluation'],
        "scoring_details": compiled_report['scoring_details'],
        "low_performance_frames": low_performance_frames
    }

# ========================================================================
# 3. FONCTION DE GÉNÉRATION DU JSON COURT ET AGRÉGÉ
# ========================================================================

def generate_short_json_summary(aggregated_report) -> str:
    """
    Regroupe les frames de la liste longue en catégories concises pour un JSON court.
    """
    
    low_performance_frames = aggregated_report['low_performance_frames']
    
    # 1. Préparer la structure des groupes
    groups = {
        "Qualité_Extrême_(Flou_Pixelisation)": {
            "description": "Flou de mouvement sévère, pixelisation, ou manque de netteté sur le produit (Qualité score ≤ 0.40).",
            "timings": []
        },
        "Conflit_Marque_/_Ton_Inadapté": {
            "description": "Présence de logos concurrents, ton abstrait, sombre ou amateur (Score Ton 0.00).",
            "timings": []
        },
        "Visibilité_Logo_Faible_(Absence)": {
            "description": "Logo non visible (Logo score 0.00) dans un contexte où la qualité est acceptable.",
            "timings": []
        }
    }

    # 2. Remplir les groupes avec les timings
    for frame in low_performance_frames:
        reason = frame['reason_and_score']
        timing = frame['timing']
        
        # Extraction robuste des scores
        logo_match = re.search(r"Logo:\s*([\d\.]+)", reason)
        logo_score = float(logo_match.group(1)) if logo_match else 1.0
        
        quality_match = re.search(r"Quality:\s*([\d\.]+)", reason)
        quality_score = float(quality_match.group(1)) if quality_match else 1.0

        # — Regroupement 1 : Qualité Extrême (Problème Principal)
        if quality_score <= 0.40:
            groups["Qualité_Extrême_(Flou_Pixelisation)"]["timings"].append(timing)
        
        # — Regroupement 2 : Conflit de Marque/Ton Spécifique
        elif "Mismatch:" in reason or "different brand" in reason or "QUARTERMILE" in reason:
             groups["Conflit_Marque_/_Ton_Inadapté"]["timings"].append(timing)

        # — Regroupement 3 : Absence de Logo (le reste)
        elif logo_score == 0.00:
            groups["Visibilité_Logo_Faible_(Absence)"]["timings"].append(timing)


    # 3. Créer la liste finale agrégée
    problemes_agreges = []
    for titre, data in groups.items():
        if data["timings"]:
            problemes_agreges.append({
                "type_probleme": titre.replace('_', ' '),
                "description": data["description"],
                "nombre_d_occurrences": len(data["timings"]),
                # Liste des timings sans doublons
                "timings_affectes_uniques": sorted(list(set(data["timings"]))) 
            })


    # 4. Construire le JSON final court
    final_short_json = {
        "analyse_globale_synthetique": {
            "score_final": aggregated_report['final_analysis_score'],
            "evaluation": aggregated_report['evaluation'],
            "qualite_visuelle_score": aggregated_report['scoring_details']['quality_score']['score'],
            "coherence_ton_score": aggregated_report['scoring_details']['tone_consistency_score']['score']
        },
        "problemes_majeurs_agreges": problemes_agreges
    }
    
    return json.dumps(final_short_json, indent=2, ensure_ascii=False)

In [32]:
final_summary_report = generate_final_summary(final_compiled_json_report, fps=25)
print(json.dumps(final_summary_report, indent=2))


aggregated_data = aggregate_and_group_issues(final_summary_report, fps=25.0)
final_short_json = generate_short_json_summary(aggregated_data)

print(final_short_json)

{
  "final_analysis_score": 0.64,
  "evaluation": "Fair (Needs review, moderate issues).",
  "scoring_details": {
    "logo_score": {
      "score": 0.63,
      "evaluation": "Fair (Needs review, moderate issues)."
    },
    "quality_score": {
      "score": 0.55,
      "evaluation": "Poor (Significant issues detected, requires immediate review)."
    },
    "tone_consistency_score": {
      "score": 0.74,
      "evaluation": "Good (Acceptable conformity, minor issues)."
    }
  },
  "low_performance_frames": [
    {
      "frame_index": 28,
      "timing": "00:00:01",
      "metric_failed": "Logo / Quality / Tone",
      "reason_and_score": "Logo: 0.00 (NO). Quality: 0.20 (The image is extremely blurry, pixelated, and lacks any discernible focus or sharpness, making it impossible to identify any product.). Tone: 0.00 (Mismatch: The frame is completely abstract, blurry, and devoid of any clear subject or context, thus it does not convey the active, inspiring, or performance-oriented t