
[![LinkedIn](https://img.shields.io/badge/My-LinkedIn-red?&style=flat)](https://www.linkedin.com/in/brandon-laroche-3b73691b7/) [![GitHub](https://img.shields.io/badge/GitHub-red?logo=github)](https://github.com/brandon57l/) [![huggingface](https://img.shields.io/badge/HuggingFace-red)
](https://huggingface.co/brandon57)
<!-- [![GitHub](https://img.shields.io/badge/huggingface-red?logo=huggingface&logoColor=white&style=flat)
](https://www.linkedin.com/in/brandon-laroche-3b73691b7/) -->


In [1]:
# @title # 1️⃣ **Installation**
from IPython.display import clear_output, HTML

clear_output()
display(HTML('<h2 style="color: blue;">Installation des dépendances... 🚀</h2>'))


!wget -P ~ https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb
!dpkg -i ~/cloudflared-linux-amd64.deb
clear_output()
display(HTML('<h3 style="color: black;">Cloudflared - OK</h3>'))

!pip install flask flask-cors
clear_output()
display(HTML('<h3 style="color: black;">Flask/Cors - OK</h3>'))

!pip uninstall torch torchvision torchaudio -y
!pip install --no-cache-dir torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
clear_output()
display(HTML('<h3 style="color: black;">Torch/Cuda - OK</h3>'))

!pip uninstall faster-whisper ctranslate2 -y
!pip install --no-cache-dir ctranslate2 faster-whisper
clear_output()
display(HTML('<h3 style="color: black;">Whisper - OK</h3>'))


# !pip install -q openai-whisper
# clear_output()
# display(HTML('<h3 style="color: black;">Whisper - OK</h3>'))

!pip install yt-dlp pydub==0.25.1
clear_output()
display(HTML('<h3 style="color: black;">Youtube dep - OK</h3>'))

clear_output()
display(HTML('<h1 style="color: green;">✅ Done !</h1>'))


import torch
print(f"PyTorch Version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version PyTorch built with: {torch.version.cuda}")
    print(f"cuDNN version PyTorch built with: {torch.backends.cudnn.version()}")
    print(f"Device Name: {torch.cuda.get_device_name(0)}")

In [1]:
# @title # **Transcription & Traduction (Upload Incrémental)**

# --- Initial Cleanup ---
print("--- Initial Cleanup ---")
!rm -f /content/audio_output/*
!rm -f /content/audio_output_optimized/*
!rm -f /content/audio_output_optimized_v2/*
!rm -rf /content/audio_chunks/*
print("Cleanup done.")

import json
import time
import subprocess
import os
import re
import traceback
import math
import shutil
import requests
import sys
import random
import concurrent.futures # <-- Ajout pour l'exécution concurrente

# --- Library Imports & Checks ---
print("\n--- Library Imports & Checks ---")
try:
    import torch
    print("✅ PyTorch installé.")
except ImportError:
    print("❌ PyTorch non installé. Veuillez l'installer.")
    exit()
try:
    from faster_whisper import WhisperModel
    print("✅ faster-whisper installé.")
except ImportError:
    print("❌ faster-whisper non installé. Veuillez l'installer.")
    exit()
try:
    from pydub import AudioSegment
    print("✅ pydub installé.")
except ImportError:
    print("❌ pydub non installé. Veuillez l'installer.")
    exit()

# --- GPU Check ---
print("\n--- GPU Check ---")
IS_GPU_AVAILABLE = torch.cuda.is_available()
print(f"✅ GPU détecté: {IS_GPU_AVAILABLE}")
if IS_GPU_AVAILABLE:
    try:
        print(f"   GPU Name: {torch.cuda.get_device_name(0)}")
        major, minor = torch.cuda.get_device_capability(0)
        print(f"   Compute Capability: {major}.{minor}")
    except Exception as e:
        print(f"   ⚠️ Impossible de récupérer les détails du GPU: {e}")


# --- Directories ---
print("\n--- Directories ---")
CHUNK_DIR = "/content/audio_chunks"
OUTPUT_DIR_V2 = "/content/audio_output_optimized_v2"
os.makedirs(CHUNK_DIR, exist_ok=True)
print(f"✅ Dossier chunks prêt: {CHUNK_DIR}")
os.makedirs(OUTPUT_DIR_V2, exist_ok=True)
print(f"✅ Dossier sortie prêt: {OUTPUT_DIR_V2}")

# ==============================================================
# --- FONCTIONS TRANSCRIPTION (INCHANGÉES) ---
# ==============================================================
print("\n--- Définition Fonctions Transcription ---")
# --- download_youtube_audio_improved (INCHANGÉE) ---
def download_youtube_audio_improved(youtube_url, output_path):
    """Downloads audio from a YouTube URL using yt-dlp and retrieves metadata."""
    # ... (code inchangé) ...
    print(f"\n--- Téléchargement Audio depuis YouTube ---")
    print(f"URL: {youtube_url}")
    print(f"Destination: {output_path}")

    video_info = None
    audio_file_path = None
    output_dir = os.path.dirname(output_path)

    if not os.path.exists(output_dir):
        try:
            os.makedirs(output_dir)
            print(f"📁 Création répertoire: {output_dir}")
        except OSError as e:
            print(f"❌ Erreur création répertoire {output_dir}: {e}")
            return None, None

    print("ℹ️ Récupération métadonnées...")
    try:
        cmd = ["yt-dlp", "--dump-json", "--encoding", "utf-8", "--socket-timeout", "30", youtube_url]
        metadata_result = subprocess.run(cmd, check=True, capture_output=True, text=True, encoding='utf-8', timeout=60)
        m = json.loads(metadata_result.stdout)
        # Extract video ID robustly
        video_id_match = re.search(r"v=([a-zA-Z0-9_-]+)", youtube_url)
        v_id = m.get("id") or (video_id_match.group(1) if video_id_match else None) or "UNKNOWN_ID"

        video_info = {
            "video_id": v_id,
            "channel_name": m.get("uploader", "N/A"),
            "channel_url": m.get("uploader_url", "N/A"),
            "title": m.get("title", "N/A"),
            "description": m.get("description", "N/A"),
            "original_url": youtube_url,
            "duration": m.get("duration"), # Keep duration if available
            "upload_date": m.get("upload_date"), # Keep upload date if available
        }
        print("✅ Métadonnées récupérées.")
    except subprocess.TimeoutExpired:
         print(f"❌ Timeout récupération métadonnées.")
         return None, None
    except subprocess.CalledProcessError as e:
         print(f"❌ Erreur yt-dlp (metadata): {e}\n{e.stderr}")
         return None, None
    except json.JSONDecodeError as e:
        print(f"❌ Erreur décodage JSON métadonnées: {e}")
        return None, {"original_url": youtube_url, "video_id": "UNKNOWN_ERROR_JSON"}
    except Exception as e:
        print(f"❌ Erreur inattendue (métadonnées): {e}")
        return None, None

    print(f"🔄 Vérif/Téléchargement audio -> {os.path.basename(output_path)}...")
    if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
        print(f"✅ Fichier audio existant trouvé et non vide. Utilisation de '{os.path.basename(output_path)}'.")
        audio_file_path = output_path
    else:
        if os.path.exists(output_path):
            print(f"ℹ️ Fichier existant '{os.path.basename(output_path)}' est vide. Re-téléchargement...")
        else:
            print(f"ℹ️ Fichier '{os.path.basename(output_path)}' absent. Téléchargement...")
        try:
            cmd = [
                "yt-dlp", "-x",
                "--audio-format", "mp3",
                "--audio-quality", "0", # 0 is best quality
                "--force-overwrites",
                "-o", output_path,
                "--encoding", "utf-8",
                "--socket-timeout", "30",
                 youtube_url
            ]
            dl_res = subprocess.run(cmd, check=True, capture_output=True, text=True, encoding='utf-8', timeout=1800) # 30 min timeout
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"✅ Audio téléchargé avec succès.")
                audio_file_path = output_path
            else:
                print(f"⚠️ yt-dlp a terminé sans erreur, mais le fichier est absent ou vide.")
                print(f"   Sortie yt-dlp:\n{dl_res.stdout}\n{dl_res.stderr}")
                return None, video_info
        except subprocess.TimeoutExpired:
            print(f"❌ Timeout durant le téléchargement audio.")
            return None, video_info
        except subprocess.CalledProcessError as e:
            print(f"❌ Erreur yt-dlp (téléchargement): {e}\n{e.stderr}")
            return None, video_info
        except Exception as e:
            print(f"❌ Erreur inattendue (téléchargement): {e}")
            return None, video_info

    if audio_file_path and (not os.path.exists(audio_file_path) or os.path.getsize(audio_file_path) == 0):
        print(f"❌ ERREUR FINALE: Le chemin du fichier audio est défini mais le fichier est absent ou vide.")
        return None, video_info

    return audio_file_path, video_info

# --- split_audio_by_fixed_duration (INCHANGÉE) ---
def split_audio_by_fixed_duration(input_audio_path, output_chunk_dir, chunk_duration_s):
    """Splits an audio file into fixed duration chunks using pydub."""
    # ... (code inchangé) ...
    print(f"\n🔊 Découpage audio '{os.path.basename(input_audio_path)}' en chunks de {chunk_duration_s}s...")
    if not os.path.exists(input_audio_path) or os.path.getsize(input_audio_path) == 0:
        print(f"❌ Fichier audio d'entrée absent ou vide: {input_audio_path}")
        return [], []
    if chunk_duration_s <= 0:
        print(f"❌ Durée de chunk invalide ({chunk_duration_s}s). Doit être positive.")
        return [], []

    paths, offsets = [], []
    chunk_dur_ms = int(chunk_duration_s * 1000)

    try:
        print(f"   Chargement de l'audio avec pydub...")
        audio = AudioSegment.from_file(input_audio_path)
        total_dur_ms = len(audio)
        total_dur_s = total_dur_ms / 1000.0
        print(f"   Audio chargé ({total_dur_s:.2f}s). Découpage en cours...")

        if total_dur_ms <= 0:
            print(f"❌ La durée de l'audio est nulle ou négative.")
            return [], []

        start_ms = 0
        idx = 0
        while start_ms < total_dur_ms:
            end_ms = start_ms + chunk_dur_ms
            chunk = audio[start_ms:end_ms]
            cur_dur_ms = len(chunk)
            cur_dur_s = cur_dur_ms / 1000.0
            cur_start_s = start_ms / 1000.0

            if cur_dur_s <= 0.1:
                start_ms = end_ms
                continue

            start_fmt = f"{cur_start_s:.3f}".replace('.', '_')
            dur_fmt = f"{cur_dur_s:.3f}".replace('.', '_')
            fname = f"chunk_{idx:04d}_start{start_fmt}s_dur{dur_fmt}s.mp3"
            fpath = os.path.join(output_chunk_dir, fname)

            try:
                chunk.export(fpath, format="mp3")
                if os.path.exists(fpath) and os.path.getsize(fpath) > 0:
                    paths.append(fpath)
                    offsets.append(cur_start_s)
                else:
                    print(f"     ⚠️ Échec export ou fichier vide créé: {fpath}")
            except Exception as e:
                print(f"❌ Erreur lors de l'export du chunk {fpath}: {e}")

            start_ms = end_ms
            idx += 1

        if not paths:
            print("❌ Aucun chunk n'a été créé ou sauvegardé.")
        else:
            print(f"✅ Découpage terminé: {len(paths)} chunks créés dans {output_chunk_dir}")

        return paths, offsets

    except FileNotFoundError:
        print(f"❌ Erreur pydub: Fichier d'entrée non trouvé: {input_audio_path}")
        return [], []
    except Exception as e:
        print(f"❌ Erreur inattendue durant le découpage: {e}")
        traceback.print_exc()
        return [], []

# --- transcribe_audio_faster (INCHANGÉE) ---
def transcribe_audio_faster(file_path, model, chunk_offset_s, beam_size, vad_filter, vad_min_silence_ms):
    """Transcribes a single audio file chunk using the preloaded faster-whisper model."""
    # ... (code inchangé) ...
    start_time_transcribe = time.time()
    print(f"\n🎙️ Transcription: {os.path.basename(file_path)} (Offset Global: {chunk_offset_s:.3f}s)")

    segments_data = []
    total_duration = 0
    last_prog = -1

    if model is None:
        print("❌ Modèle Whisper non chargé! Impossible de transcrire.")
        return {"segments": []}

    try:
        vad_params = {"min_silence_duration_ms": vad_min_silence_ms} if vad_filter else None

        segments_generator, info = model.transcribe(
            file_path,
            beam_size=beam_size,
            vad_filter=vad_filter,
            vad_parameters=vad_params
        )

        lang, prob, total_duration = info.language, info.language_probability, info.duration
        print(f"   Infos chunk détectées: Lang='{lang}' (Conf: {prob:.2f}), Durée: {total_duration:.2f}s")

        if total_duration <= 0:
            print("   ⚠️ Durée du chunk audio nulle ou négative selon Whisper.")
            return {"segments": []}

        print("   Traitement des segments...")
        seg_count = 0
        for segment in segments_generator:
            seg_count += 1
            start_local, end_local = segment.start, segment.end
            duration_local = max(0, end_local - start_local)
            text = segment.text.strip() if segment.text else ""
            start_global = round(start_local + chunk_offset_s, 3)
            duration_rounded = round(duration_local, 3)

            prog = min(100.0, (end_local / total_duration) * 100) if total_duration > 0 else 0
            rounded_prog = math.floor(prog)
            if rounded_prog > last_prog and (rounded_prog % 10 == 0 or rounded_prog >= 99):
                bar_len = 20
                filled_len = int(bar_len * prog / 100)
                bar = '█' * filled_len + '-' * (bar_len - filled_len)
                sys.stdout.write(f"\r   Progression: [{bar}] {prog:.0f}% ")
                sys.stdout.flush()
                last_prog = rounded_prog

            segments_data.append({
                "text": text,
                "start": start_global,
                "duration": duration_rounded
            })
        sys.stdout.write("\n")
        sys.stdout.flush()

        transcription_time = time.time() - start_time_transcribe
        print(f"   🕒 Transcription du chunk terminée en {transcription_time:.2f}s. {seg_count} segments trouvés.")
        if seg_count == 0:
            print("   ⚠️ Aucun segment de parole trouvé dans ce chunk.")

    except Exception as e:
        print(f"\n❌ Erreur durant la transcription du chunk: {e}")
        traceback.print_exc()
        sys.stdout.write("\n")
        sys.stdout.flush()
        return {"segments": []}

    return {"segments": segments_data}


# ==============================================================
# --- FONCTIONS TRADUCTION (INCHANGÉES EN ELLES-MÊMES) ---
# ==============================================================
print("\n--- Définition Fonctions Traduction ---")

# --- chunk_list (INCHANGÉE) ---
def chunk_list(lst, n):
    """Yield successive n-sized chunks from lst."""
    if not isinstance(lst, list):
        raise TypeError("Input must be a list.")
    if n <= 0:
        raise ValueError("Chunk size must be positive.")
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

# --- extract_json_from_response (INCHANGÉE) ---
def extract_json_from_response(text_response):
    # ... (code inchangé) ...
    match_fence = re.search(r'```json\s*([\s\S]*?)\s*```', text_response, re.DOTALL)
    if match_fence:
        return match_fence.group(1).strip()
    match_list = re.search(r'(\[[^\]]*\])', text_response, re.DOTALL)
    match_obj = re.search(r'(\{[\s\S]*\})', text_response, re.DOTALL)
    json_string = None
    first_match_pos = float('inf')
    if match_list and match_list.start() < first_match_pos:
        json_string = match_list.group(1)
        first_match_pos = match_list.start()
    if match_obj and match_obj.start() < first_match_pos:
        json_string = match_obj.group(1)
    if json_string:
        return json_string.strip()
    return text_response.strip()


# --- translate_audio_chunk_segments (INCHANGÉE EN ELLE-MÊME) ---
# Cette fonction est appelée par chaque thread, elle n'a pas besoin de connaître la concurrence.
def translate_audio_chunk_segments(transcript_segments, api_key, audio_chunk_index, total_audio_chunks, segment_chunk_index=None, total_segment_chunks=None):
    """
    Translates a list of transcript segments using Google Gemini API.
    Logs added to indicate segment chunk index if provided.
    """
    # ... (code inchangé) ...
    if not transcript_segments:
        log_prefix = f"   >> [Audio Chunk {audio_chunk_index + 1}/{total_audio_chunks}]"
        print(f"{log_prefix} Aucun segment à traduire (peut-être un sous-chunk vide?).", flush=True)
        return []

    start_time = time.time()
    num_segments = len(transcript_segments)

    log_prefix = f"   >> [Audio Chunk {audio_chunk_index + 1}/{total_audio_chunks}"
    if segment_chunk_index is not None and total_segment_chunks is not None:
        log_prefix += f" | Segment Chunk {segment_chunk_index + 1}/{total_segment_chunks}"
    log_prefix += "]"

    # Rendre le log initial plus discret car il y en aura potentiellement beaucoup en parallèle
    # print(f"{log_prefix} 🔄 Traduction de {num_segments} segments...", flush=True)

    if not api_key or not api_key.startswith("AIzaSy"):
         print(f"{log_prefix} ❌ Clé API Gemini invalide ou manquante.", flush=True)
         return None # Retourne None explicitement en cas d'erreur de clé

    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?key={api_key}"

    try:
        transcript_str = json.dumps(transcript_segments, ensure_ascii=False, separators=(',', ':'))
    except TypeError as e:
        print(f"{log_prefix} ❌ Erreur préparation JSON pour API: {e}", flush=True)
        return None # Retourne None en cas d'erreur

    prompt = (
        "You are an expert multilingual transcriber and translator.\n"
        "INPUT: A JSON array. Each object in the array represents a transcript segment and has keys 'text', 'start', and 'duration'.\n"
        "TASK: Process EACH segment object in the input JSON array:\n"
        "1. Identify the original language of the 'text' field.\n"
        "2. Add a new key 'text_english' containing the English translation of the original 'text'.\n"
        "3. Add a new key 'text_french' containing the French translation of the original 'text'.\n"
        "4. IMPORTANT: Preserve ALL original keys ('text', 'start', 'duration') and their values.\n"
        "OUTPUT: Return ONLY the modified JSON array containing all processed segments. Ensure the output is a single, valid JSON array. Do NOT include any extra text, explanations, or markdown formatting (like ```json ... ```) outside the JSON array itself.\n\n"
        "INPUT JSON:\n"
        f"{transcript_str}"
    )

    payload = {
        "contents": [{"parts": [{"text": prompt}]}],
        "generationConfig": {
            "temperature": 0.2,
            "maxOutputTokens": 8192,
            "response_mime_type": "application/json"
        },
        "safetySettings": [
            {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
        ]
    }

    headers = {"Content-Type": "application/json"}
    max_retries = 3
    base_delay = 3
    raw_text_response = ""

    for attempt in range(max_retries):
        if attempt > 0:
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            # print(f"{log_prefix} ⏳ Tentative Traduction {attempt + 1}/{max_retries} après {delay:.1f}s d'attente...", flush=True)
            time.sleep(delay)

        try:
            response = requests.post(url, headers=headers, json=payload, timeout=240)

            if response.status_code == 429:
                print(f"{log_prefix} ⚠️ Erreur 429 (Rate Limit API). Tentative {attempt + 1}/{max_retries}. Re-essai...", flush=True)
                time.sleep(20 + random.uniform(0, 5)) # Délai plus long pour rate limit
                if attempt == max_retries - 1:
                     print(f"{log_prefix} ❌ Rate Limit persiste après {max_retries} tentatives.", flush=True)
                     return None # Échec final après retries pour rate limit
                continue # Passe à la tentative suivante

            response.raise_for_status() # Lève une exception pour les autres erreurs HTTP (4xx, 5xx)
            response_data = response.json()

            # --- Validation Réponse Gemini (inchangée) ---
            if not response_data.get("candidates"):
                prompt_feedback = response_data.get("promptFeedback", {})
                block_reason = prompt_feedback.get("blockReason")
                safety_ratings = prompt_feedback.get("safetyRatings", [])
                error_message = f"Aucun 'candidates' dans la réponse API."
                if block_reason: error_message += f" Raison blocage: {block_reason}."
                if safety_ratings: error_message += f" Safety Ratings: {safety_ratings}"
                print(f"{log_prefix} ❌ Erreur API: {error_message}", flush=True)
                if block_reason == "SAFETY": return None # Ne pas retenter pour SAFETY block
                if attempt == max_retries - 1: return None # Échec final après retries
                continue # Retenter pour autres erreurs

            candidate = response_data["candidates"][0]
            finish_reason = candidate.get("finishReason")

            if finish_reason not in ["STOP", "MAX_TOKENS"]:
                safety_ratings = candidate.get("safetyRatings", [])
                print(f"{log_prefix} ❌ Fin anormale API: {finish_reason}.", flush=True)
                if safety_ratings: print(f"      -> Safety Ratings: {safety_ratings}", flush=True)
                if finish_reason == "SAFETY": return None # Ne pas retenter pour SAFETY finish reason
                if attempt == max_retries - 1: return None # Échec final
                continue # Retenter pour autres fins anormales

            if not ("content" in candidate and "parts" in candidate["content"] and
                    candidate["content"]["parts"] and "text" in candidate["content"]["parts"][0]):
                print(f"{log_prefix} ❌ Structure de contenu de réponse API invalide.", flush=True)
                if attempt == max_retries - 1: return None # Échec final
                continue # Retenter

            # --- Extraction et Parsing JSON (inchangé) ---
            raw_text_response = candidate["content"]["parts"][0]["text"]
            json_string = raw_text_response
            try:
                result_json = json.loads(json_string)
            except json.JSONDecodeError as e:
                 print(f"{log_prefix} ❌ Erreur décodage JSON de la réponse API: {e}", flush=True)
                 print(f"      Réponse brute reçue:\n{raw_text_response[:500]}...")
                 if attempt == max_retries - 1: return None # Échec final
                 continue # Retenter

            # --- Validation Résultat Parsé (inchangée) ---
            if not isinstance(result_json, list):
                print(f"{log_prefix} ❌ Le JSON décodé n'est pas une liste.", flush=True)
                if attempt == max_retries - 1: return None # Échec final
                continue # Retenter

            if len(result_json) != num_segments:
                print(f"{log_prefix} ⚠️ Nombre de segments retournés ({len(result_json)}) != entrée ({num_segments}). Possiblement tronqué?", flush=True)

            if result_json and isinstance(result_json[0], dict):
                 if 'text_english' not in result_json[0] or 'text_french' not in result_json[0]:
                     print(f"{log_prefix} ⚠️ Clés traduites manquantes dans le premier segment retourné.", flush=True)

            # --- Succès ---
            elapsed_time = time.time() - start_time
            # Log de succès plus discret
            # print(f"{log_prefix} ✅ Traduction réussie en {elapsed_time:.2f}s.", flush=True)
            if finish_reason == "MAX_TOKENS":
                 print(f"{log_prefix} ⚠️ Attention: MAX_TOKENS atteint. Traduction pourrait être incomplète.", flush=True)
            return result_json # Retourne la liste des segments traduits

        except requests.exceptions.Timeout:
            print(f"{log_prefix} ⚠️ Timeout API (Tentative {attempt + 1}/{max_retries}).", flush=True)
            if attempt == max_retries - 1: return None # Échec final après retries

        except requests.exceptions.RequestException as e:
            # Gérer les erreurs HTTP qui ne sont pas des rate limits ici
            print(f"{log_prefix} ❌ Erreur Réseau/HTTP API (Tentative {attempt + 1}/{max_retries}): {e}", flush=True)
            if attempt == max_retries - 1: return None # Échec final après retries
            # Attente avant re-essai pour erreurs génériques
            time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))

        except Exception as e:
            print(f"{log_prefix} ❌ Erreur inattendue durant traduction (Tentative {attempt + 1}/{max_retries}): {e}", flush=True)
            traceback.print_exc() # Imprime la trace pour le débogage
            if attempt == max_retries - 1: return None # Échec final après retries
            # Attente avant re-essai pour erreurs inattendues
            time.sleep(base_delay * (2 ** attempt) + random.uniform(0, 1))


    # Si la boucle se termine sans succès après toutes les tentatives
    print(f"{log_prefix} ❌ Traduction ÉCHOUÉE après {max_retries} tentatives.", flush=True)
    return None # Retourne None pour indiquer l'échec final


# =======================================================================
# --- Configuration et Exécution Principale (AVEC TRADUCTION CONCURRENTE) ---
# =======================================================================
print("\n\n" + "="*40 + "\n--- Configuration Principale ---\n" + "="*40)

# --- Paramètres de Général ---
youtube_url = "https://www.youtube.com/watch?v=EDf6I36S-aE&pp=0gcJCYQJAYcqIYzv" #@param {"type":"string"}
model_size = "large-v3" #@param ["tiny", "base", "small", "medium", "large-v3"]
gemini_api_key = "AIzaSyAI2CLDtikFeKi5P6UxgXi9D9bMwYA6l8w" #@param {"type":"string"}

#@markdown _____

#@markdown *Les paramètres suivent sont configurés pour un fonctionnement optimal. Si vous ne savez pas ce qu'ils font, il est généralement judicieux de ne pas les modifier.*

# --- Paramètres de Transcription ---
output_filename_base = "youtube_audio.mp3"
output_path_original = os.path.join(OUTPUT_DIR_V2, output_filename_base)
output_chunk_dir = CHUNK_DIR

enable_pre_splitting = True #@param {type:"boolean"}
split_fixed_duration_minutes = 10 #@param {type:"slider", min:1, max:30, step:1}

use_vad_during_transcription = False #@param {type:"boolean"}
vad_silence_duration_ms = 500 #@param {type:"slider", min:100, max:2000, step:50}
beam_search_size = 5

# --- Paramètres de Traduction ---
enable_segment_chunking = True #@param {type:"boolean"}
max_segments_per_translation_chunk = 30 #@param {type:"integer"}
# NOUVEAU PARAMÈTRE pour la concurrence
max_concurrent_translation_tasks = 14 #@param {type:"integer"} # Nombre max d'appels API Gemini simultanés

# --- Paramètres d'Upload ---
upload_chunk_url = "default"  #@param {"type":"string"}
if upload_chunk_url == "default":
    upload_chunk_url = "https://qingplay.pythonanywhere.com/update_transcript_chunk"

enable_incremental_upload = True #@param {type:"boolean"}

# --- Variables Globales ---
original_audio_file_path = None
video_info = None
final_output_data_local = None
json_output_filename_final = None
loaded_whisper_model = None

# --- Validation Configuration ---
print("--- Validation Configuration ---")
valid_config = True
if not youtube_url or not youtube_url.startswith("http"):
    print("❌ URL YouTube invalide.")
    valid_config = False
if not gemini_api_key or gemini_api_key == "YOUR_GEMINI_API_KEY":
     if enable_incremental_upload or enable_segment_chunking:
         print("❌ Clé API Gemini manquante ou non remplacée. La traduction et/ou l'upload échoueront.")
         valid_config = False
     else:
         print("⚠️ Clé API Gemini manquante ou non remplacée. Traduction et upload seront ignorés.")
     gemini_api_key = None
if enable_incremental_upload and (not upload_chunk_url or not upload_chunk_url.startswith("http")):
    print("❌ URL d'upload invalide.")
    valid_config = False
if enable_segment_chunking and max_segments_per_translation_chunk <= 0:
    print("❌ max_segments_per_translation_chunk doit être positif.")
    valid_config = False
if enable_segment_chunking and max_concurrent_translation_tasks <= 0:
    print("❌ max_concurrent_translation_tasks doit être positif.")
    valid_config = False

if not valid_config:
    print("\n🚫 Erreurs de configuration détectées. Arrêt du script.")
    exit()
else:
    print("✅ Configuration validée.")
    print(f"   Chunking Audio: {'Activé' if enable_pre_splitting else 'Désactivé'} (Durée: {split_fixed_duration_minutes} min)")
    print(f"   Modèle Whisper: {model_size}, VAD: {'Activé' if use_vad_during_transcription else 'Désactivé'}")
    print(f"   Traduction : {'Activée' if gemini_api_key else 'Désactivée'}")
    if gemini_api_key:
        print(f"     Chunking Segments: {'Activé' if enable_segment_chunking else 'Désactivé'} (Max Segments/Chunk: {max_segments_per_translation_chunk if enable_segment_chunking else 'N/A'})")
        if enable_segment_chunking:
             print(f"     Tâches traduction concurrentes max: {max_concurrent_translation_tasks}")
    print(f"   Upload Incrémental: {'Activé' if enable_incremental_upload else 'Désactivé'}")


# --- Exécution ---
try:
    # ========================================
    # ÉTAPE 1: Préparation & Chargement Modèle
    # ========================================
    print("\n\n" + "="*40 + "\nÉTAPE 1: PRÉPARATION & CHARGEMENT MODÈLE\n" + "="*40)
    start_step1 = time.time()

    original_audio_file_path, video_info = download_youtube_audio_improved(youtube_url, output_path_original)

    if not original_audio_file_path or not video_info:
        raise ValueError("Échec du téléchargement audio ou de la récupération des métadonnées.")

    safe_video_id = video_info.get("video_id", "UNKNOWN_ID").replace("-", "_")

    print("\n--- Infos Vidéo Récupérées ---")
    print(f"   ID: {video_info.get('video_id', 'N/A')}")
    print(f"   Titre: {video_info.get('title', 'N/A')}")
    print(f"   Chaîne: {video_info.get('channel_name', 'N/A')}")
    print(f"   Durée: {video_info.get('duration', 'N/A')}s")
    print("----------------------------\n")

    audio_files_to_process = []
    chunk_offsets = []

    if enable_pre_splitting:
        split_secs = split_fixed_duration_minutes * 60
        paths, offs = split_audio_by_fixed_duration(original_audio_file_path, output_chunk_dir, split_secs)
        if paths:
            audio_files_to_process = paths
            chunk_offsets = offs
            print(f"✅ Pré-découpage activé. {len(paths)} chunks audio à traiter.")
        else:
            print("⚠️ Le découpage a échoué. Traitement du fichier audio entier.")
            audio_files_to_process = [original_audio_file_path]
            chunk_offsets = [0.0]
    else:
        print("ℹ️ Pré-découpage désactivé. Traitement du fichier audio entier.")
        audio_files_to_process = [original_audio_file_path]
        chunk_offsets = [0.0]

    total_audio_chunks = len(audio_files_to_process)
    print(f"Nombre total de fichiers audio à traiter: {total_audio_chunks}")

    # --- Chargement Modèle Whisper ---
    print("\n--- Chargement Modèle Whisper ---")
    print(f"Modèle demandé: {model_size}")
    start_load = time.time()
    device = "cuda" if IS_GPU_AVAILABLE else "cpu"
    compute_type = "default"

    if IS_GPU_AVAILABLE:
        major, _ = torch.cuda.get_device_capability(0)
        if major >= 8: compute_type = "bfloat16"; print("   Utilisation compute_type: bfloat16")
        else: compute_type = "float16"; print("   Utilisation compute_type: float16")
    else: compute_type = "int8"; print("   Utilisation compute_type: int8 (CPU)")

    try:
        loaded_whisper_model = WhisperModel(model_size, device=device, compute_type=compute_type)
        load_time = time.time() - start_load
        print(f"✅ Modèle '{model_size}' chargé sur {device} ({compute_type}) en {load_time:.2f}s.")
    except Exception as e:
        raise ValueError(f"Échec chargement modèle Whisper: {e}")

    step1_time = time.time() - start_step1
    print(f"⏱️ Temps Étape 1 (Préparation): {step1_time:.2f}s")

    # ================================================
    # ÉTAPE 2: Boucle de Traitement & Upload Intercalé (MODIFIÉE POUR CONCURRENCE)
    # ================================================
    print("\n\n" + "="*40 + "\nÉTAPE 2: TRAITEMENT & UPLOAD INTERCALÉS\n" + "="*40)
    all_final_segments_local = []
    total_transcribed_segments = 0
    total_translated_segments_ok = 0
    successful_uploads = 0
    failed_uploads = 0
    failed_translation_audio_chunks = []
    global_start_proc = time.time()

    if not audio_files_to_process:
        print("🚫 Aucun fichier audio à traiter.")
    else:
        for i, file_path in enumerate(audio_files_to_process):
            chunk_start_time = time.time()
            current_offset = chunk_offsets[i]
            print(f"\n--- Traitement Chunk Audio {i + 1}/{total_audio_chunks}: {os.path.basename(file_path)} ---")

            # --- 2.1 Transcription (Inchangé) ---
            transcript_result = transcribe_audio_faster(
                file_path, loaded_whisper_model, current_offset,
                beam_search_size, use_vad_during_transcription, vad_silence_duration_ms
            )
            current_chunk_segments_transcribed = transcript_result.get("segments", [])

            if not current_chunk_segments_transcribed:
                print(f"   -> Aucun segment transcrit pour ce chunk audio.")
                chunk_end_time = time.time()
                print(f"   ⏱️ Temps traitement Chunk Audio {i+1}: {chunk_end_time - chunk_start_time:.2f}s")
                continue

            num_transcribed = len(current_chunk_segments_transcribed)
            total_transcribed_segments += num_transcribed
            print(f"   -> Transcrit {num_transcribed} segments.")

            # --- 2.2 Traduction (avec gestion concurrence si activée) ---
            aggregated_translated_segments_for_audio_chunk = []
            translation_failed_for_this_audio_chunk = False
            translation_api_calls = 0
            translation_start_time = time.time()


            if not gemini_api_key:
                print("   >> ⚠️ Clé API Gemini non fournie. Traduction ignorée.")
                all_final_segments_local.extend(current_chunk_segments_transcribed)
            else:
                # --- 2.2.1 Segment Chunking & Concurrent Execution ---
                if enable_segment_chunking:
                    segment_sub_chunks = list(chunk_list(current_chunk_segments_transcribed, max_segments_per_translation_chunk))
                    total_segment_sub_chunks = len(segment_sub_chunks)
                    print(f"   >> Préparation traduction pour {num_transcribed} segments en {total_segment_sub_chunks} sous-chunk(s).")
                    print(f"      (Utilisation de max {max_concurrent_translation_tasks} workers concurrents)")

                    # Dictionnaire pour stocker les résultats dans l'ordre
                    # Clé: index du sous-chunk (j), Valeur: résultat de la traduction (liste ou None)
                    futures_results = {}
                    futures_map = {} # Pour mapper Future -> index j

                    # Utilisation de ThreadPoolExecutor pour les appels API concurrents
                    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_translation_tasks) as executor:
                        # Soumettre toutes les tâches de traduction
                        for j, segment_sub_chunk in enumerate(segment_sub_chunks):
                             # Vérifier si le sous-chunk n'est pas vide avant de soumettre
                             if segment_sub_chunk:
                                 future = executor.submit(
                                     translate_audio_chunk_segments,
                                     segment_sub_chunk,
                                     gemini_api_key,
                                     i, # Audio chunk index (0-based)
                                     total_audio_chunks,
                                     j, # Segment sub-chunk index (0-based)
                                     total_segment_sub_chunks
                                 )
                                 futures_map[future] = j # Stocker l'index associé au future
                             else:
                                 # Gérer le cas d'un sous-chunk vide (ne devrait pas arriver avec chunk_list sauf si liste initiale vide)
                                 futures_results[j] = [] # Un sous-chunk vide donne un résultat vide

                        translation_api_calls = len(futures_map) # Nombre d'appels API réels
                        print(f"      >> {translation_api_calls} tâches de traduction soumises...")

                        # Récupérer les résultats au fur et à mesure qu'ils arrivent (ou en ordre si on itère sur futures_map.keys())
                        # Utiliser as_completed peut donner une impression de réactivité mais mélange l'ordre des logs
                        # Ici, on attend la fin de toutes et on récupère dans l'ordre pour la cohérence
                        processed_count = 0
                        for future in concurrent.futures.as_completed(futures_map):
                             j_index = futures_map[future] # Récupérer l'index original
                             try:
                                 result = future.result() # Récupère le résultat (list ou None) ou lève une exception
                                 futures_results[j_index] = result
                                 if result is None:
                                      translation_failed_for_this_audio_chunk = True
                                      # Le log d'erreur est déjà dans translate_audio_chunk_segments
                                 processed_count += 1
                                 # Afficher la progression de la traduction concurrente
                                 sys.stdout.write(f"\r      >> Progression traduction (tâches terminées): {processed_count}/{translation_api_calls} ")
                                 sys.stdout.flush()

                             except Exception as exc:
                                 # Gérer les exceptions non interceptées dans translate_audio_chunk_segments
                                 print(f'\n      >> ❌ Erreur lors de la traduction du sous-chunk {j_index + 1}: {exc}')
                                 traceback.print_exc() # Pour plus de détails
                                 futures_results[j_index] = None # Marquer comme échec
                                 translation_failed_for_this_audio_chunk = True
                                 processed_count += 1
                                 sys.stdout.write(f"\r      >> Progression traduction (tâches terminées): {processed_count}/{translation_api_calls} ")
                                 sys.stdout.flush()

                        sys.stdout.write("\n") # Nouvelle ligne après la progression
                        sys.stdout.flush()


                    # Agréger les résultats dans l'ordre original des sous-chunks
                    for j in range(total_segment_sub_chunks):
                         result_sub_chunk = futures_results.get(j)
                         if result_sub_chunk is not None:
                              aggregated_translated_segments_for_audio_chunk.extend(result_sub_chunk)
                         # Si result_sub_chunk est None, l'échec a déjà été marqué

                # --- 2.2.2 No Segment Chunking (Séquentiel) ---
                else:
                     print(f"   >> Traduction séquentielle de {num_transcribed} segments (Chunking Segment désactivé).")
                     translation_api_calls = 1
                     translated_segments = translate_audio_chunk_segments(
                         current_chunk_segments_transcribed,
                         gemini_api_key,
                         i, total_audio_chunks
                     )
                     if translated_segments is not None:
                         aggregated_translated_segments_for_audio_chunk = translated_segments
                     else:
                         translation_failed_for_this_audio_chunk = True
                         # Le log d'erreur est déjà dans translate_audio_chunk_segments


                translation_end_time = time.time()
                translation_duration = translation_end_time - translation_start_time
                print(f"   >> Traduction terminée pour Audio Chunk {i+1} en {translation_duration:.2f}s ({translation_api_calls} appels API).")


                # --- Post-Translation Handling (Inchangé) ---
                if translation_failed_for_this_audio_chunk:
                    failed_translation_audio_chunks.append(i + 1)
                    print(f"   >> ℹ️ Sauvegarde locale utilisera les segments transcrits originaux pour Audio Chunk {i+1} dû à un échec.")
                    all_final_segments_local.extend(current_chunk_segments_transcribed)
                else:
                     num_translated = len(aggregated_translated_segments_for_audio_chunk)
                     total_translated_segments_ok += num_translated
                     all_final_segments_local.extend(aggregated_translated_segments_for_audio_chunk)
                     # print(f"   >> {num_translated} segments traduits agrégés pour Audio Chunk {i+1}.") # Log un peu redondant


            # --- 2.3 Upload Incrémental (Inchangé logiquement, opère sur le résultat agrégé) ---
            if enable_incremental_upload and aggregated_translated_segments_for_audio_chunk and not translation_failed_for_this_audio_chunk and gemini_api_key:
                print(f"      >> ⬆️ Tentative d'upload pour Chunk Audio {i + 1} ({len(aggregated_translated_segments_for_audio_chunk)} segments traduits)...", flush=True)
                upload_payload = {
                    "video_id": video_info["video_id"], "description": video_info["description"],
                    "channel_name": video_info["channel_name"], "channel_url": video_info["channel_url"],
                    "segments": aggregated_translated_segments_for_audio_chunk,
                    "chunk_index": i, "total_chunks": total_audio_chunks,
                    "title": video_info["title"] if i == 0 else None,
                }
                upload_success = False
                try:
                    response = requests.post(upload_chunk_url, json=upload_payload, timeout=60)
                    response.raise_for_status()
                    try: server_message = response.json().get("message", response.text[:100])
                    except json.JSONDecodeError: server_message = response.text[:100]
                    print(f"      >> ✅ Upload Chunk Audio {i + 1} réussi (Status: {response.status_code}). Serveur: {server_message}...", flush=True)
                    successful_uploads += 1
                    upload_success = True
                except requests.exceptions.Timeout: print(f"      >> ❌ Upload Chunk Audio {i + 1} échoué (Timeout).", flush=True)
                except requests.exceptions.RequestException as e:
                    print(f"      >> ❌ Upload Chunk Audio {i + 1} échoué (Erreur HTTP/Réseau): {e}", flush=True)
                    if e.response is not None: print(f"         Réponse serveur ({e.response.status_code}): {e.response.text[:200]}...")
                except Exception as e: print(f"      >> ❌ Erreur inattendue pendant upload Chunk Audio {i + 1}: {e}", flush=True); traceback.print_exc()
                if not upload_success: failed_uploads += 1
            elif not enable_incremental_upload: print(f"      >> ℹ️ Upload incrémental désactivé.")
            elif not gemini_api_key: print(f"      >> ℹ️ Upload ignoré (traduction désactivée).")
            elif translation_failed_for_this_audio_chunk: print(f"      >> ℹ️ Upload ignoré (échec traduction).")
            elif not aggregated_translated_segments_for_audio_chunk: print(f"      >> ℹ️ Aucun segment traduit à uploader.")

            chunk_end_time = time.time()
            print(f"   ⏱️ Temps total traitement Chunk Audio {i+1}: {chunk_end_time - chunk_start_time:.2f}s")


    global_end_proc = time.time()

    # --- Résumé Traitement ---
    print("\n\n" + "="*40 + "\nÉTAPE 2: RÉSUMÉ DU TRAITEMENT\n" + "="*40)
    total_proc_time = global_end_proc - global_start_proc
    print(f"⏱️ Temps total Traitement & Upload (Étape 2): {total_proc_time:.2f}s.")
    print(f"📊 Total Chunks Audio Traités: {total_audio_chunks}")
    print(f"📊 Total Segments Transcrits: {total_transcribed_segments}")
    if gemini_api_key:
        print(f"📊 Total Segments agrégés après traduction réussie: {total_translated_segments_ok}")
        failed_chunks_list = sorted(list(set(failed_translation_audio_chunks)))
        if failed_chunks_list:
             print(f"❌ Traduction échouée (partiellement ou totalement) pour {len(failed_chunks_list)} chunk(s) audio: {', '.join(map(str, failed_chunks_list))}")
             print(f"   (Sauvegarde locale utilise fallback transcrit pour eux)")
        elif total_transcribed_segments > 0 : print(f"✅ Traduction réussie pour tous les chunks audio avec segments.")
        else: print(f"ℹ️ Aucune traduction (pas de segments transcrits).")
    else: print("ℹ️ Traduction ignorée (pas de clé API).")
    print(f"💾 Total Segments agrégés pour sauvegarde locale: {len(all_final_segments_local)}")
    if enable_incremental_upload:
        print(f"☁️ Uploads Incrémentaux Réussis (par chunk audio): {successful_uploads}")
        print(f"☁️ Uploads Incrémentaux Échoués (par chunk audio): {failed_uploads}")
    else: print("☁️ Upload Incrémental désactivé.")

    # ========================================
    # ÉTAPE 3: SAUVEGARDE LOCALE FINALE (Inchangée)
    # ========================================
    print("\n\n" + "="*40 + "\nÉTAPE 3: SAUVEGARDE LOCALE FINALE\n" + "="*40)
    start_step3 = time.time()
    if all_final_segments_local:
        final_output_data_local = {"segments": all_final_segments_local}
        if video_info:
             metadata_to_include = ["video_id", "title", "channel_name", "channel_url", "duration", "original_url", "upload_date"]
             final_output_data_local.update({k: v for k, v in video_info.items() if k in metadata_to_include and v is not None})

        split_suffix = f"_split{split_fixed_duration_minutes}min" if enable_pre_splitting else "_noSplit"
        vad_suffix = "_VAD" if use_vad_during_transcription else "_noVAD"
        # Suffixe pour chunking de segments et concurrence (si activé)
        segchunk_suffix = ""
        if enable_segment_chunking and gemini_api_key:
            segchunk_suffix = f"_segChunk{max_segments_per_translation_chunk}_conc{max_concurrent_translation_tasks}"
        elif gemini_api_key: # Traduction active mais pas de chunking de segments
            segchunk_suffix = "_segFull"

        failed_chunks_list = sorted(list(set(failed_translation_audio_chunks)))
        if not gemini_api_key: status_suffix = "_transcribed_only"
        elif not failed_chunks_list and total_transcribed_segments > 0: status_suffix = "_fully_translated"
        elif not failed_chunks_list and total_transcribed_segments == 0: status_suffix = "_no_segments_found" # Cas où il n'y avait rien à transcrire/traduire
        elif failed_chunks_list and total_translated_segments_ok > 0: status_suffix = "_partially_translated"
        elif failed_chunks_list: status_suffix = "_translation_failed" # Aucun segment traduit avec succès, ou tous les chunks avec segments ont échoué
        else: status_suffix = "_unknown_state"

        json_output_filename_final = os.path.join(
            OUTPUT_DIR_V2,
            f"{safe_video_id}__{model_size}{split_suffix}{vad_suffix}{segchunk_suffix}{status_suffix}.json"
        )

        print(f"Tentative de sauvegarde du résultat local agrégé vers:")
        print(f"   {json_output_filename_final}")
        try:
            with open(json_output_filename_final, "w", encoding="utf-8") as f:
                json.dump(final_output_data_local, f, indent=2, ensure_ascii=False)
            print(f"✅ Résultat local complet sauvegardé avec succès.")
            if failed_uploads > 0: print(f"   -> ⚠️ Attention: {failed_uploads} upload(s) incrémentaux ont échoué.")
            if failed_chunks_list: print(f"   -> ℹ️ Ce fichier contient les segments transcrits originaux pour les chunks audio dont la traduction a échoué.")
        except IOError as e: print(f"❌ Erreur d'écriture lors de la sauvegarde JSON locale: {e}")
        except Exception as e: print(f"❌ Erreur inattendue lors de la sauvegarde JSON locale: {e}"); traceback.print_exc()
    else: print("❌ Aucune donnée de segment n'a été collectée. Pas de fichier JSON local sauvegardé.")
    step3_time = time.time() - start_step3
    print(f"⏱️ Temps Étape 3 (Sauvegarde): {step3_time:.2f}s")

except ValueError as ve: print(f"\n❌ ERREUR DE CONFIGURATION OU DE PROCESSUS: {ve}"); traceback.print_exc()
except Exception as e: print(f"\n❌ ERREUR GLOBALE INATTENDUE: {e}"); traceback.print_exc()
finally:
    # ===========================
    # ÉTAPE 4: NETTOYAGE (Optionnel - Inchangé)
    # ===========================
    print("\n\n" + "="*40 + "\nÉTAPE 4: NETTOYAGE (OPTIONNEL)\n" + "="*40)
    cleanup = False # Mettre à True pour supprimer les fichiers intermédiaires
    if cleanup:
        print("--- Nettoyage des fichiers temporaires ---")
        if original_audio_file_path and os.path.exists(original_audio_file_path):
            try: os.remove(original_audio_file_path); print(f"🗑️ Fichier audio original supprimé.")
            except Exception as e: print(f"❌ Erreur suppression fichier audio original: {e}")
        if os.path.exists(output_chunk_dir):
             try: shutil.rmtree(output_chunk_dir); print(f"🗑️ Dossier des chunks audio supprimé.")
             except Exception as e: print(f"❌ Erreur suppression dossier chunks: {e}")
        print("--- Nettoyage terminé ---")
    else:
        print("--- Nettoyage désactivé ---")
        if original_audio_file_path and os.path.exists(original_audio_file_path): print(f"ℹ️ Fichier audio original conservé: {original_audio_file_path}")
        if os.path.exists(output_chunk_dir) and os.listdir(output_chunk_dir): print(f"ℹ️ Chunks audio conservés dans: {output_chunk_dir}")
        if json_output_filename_final and os.path.exists(json_output_filename_final): print(f"ℹ️ Fichier JSON final conservé: {json_output_filename_final}")

    print("\n🏁 Script complet terminé.")
    total_runtime = time.time() - global_start_proc if 'global_start_proc' in locals() else 0
    if total_runtime > 0:
        if 'start_step1' in locals():
           full_runtime = time.time() - start_step1
           print(f"⏱️ Durée totale d'exécution du script (toutes étapes): {full_runtime:.2f}s")
        else: print(f"⏱️ Durée d'exécution (Étape 2 et suivantes): {total_runtime:.2f}s")

    if enable_incremental_upload and video_info and video_info.get("video_id") != "UNKNOWN_ID" and successful_uploads > 0:
        print(f"\n🔗 Lien potentiel pour consulter le résultat sur le serveur:")
        print(f"   https://qingplay.pythonanywhere.com/vid/{video_info['video_id']}")
    elif enable_incremental_upload and failed_uploads > 0: print(f"\n⚠️ Certains uploads ont échoué, le résultat sur le serveur peut être incomplet.")
    elif enable_incremental_upload and successful_uploads == 0 and total_transcribed_segments > 0 and gemini_api_key: print(f"\nℹ️ Aucun upload réussi (vérifier échecs traduction/upload).")

--- Initial Cleanup ---
Cleanup done.

--- Library Imports & Checks ---
✅ PyTorch installé.
✅ faster-whisper installé.
✅ pydub installé.

--- GPU Check ---
✅ GPU détecté: True
   GPU Name: Tesla T4
   Compute Capability: 7.5

--- Directories ---
✅ Dossier chunks prêt: /content/audio_chunks
✅ Dossier sortie prêt: /content/audio_output_optimized_v2

--- Définition Fonctions Transcription ---

--- Définition Fonctions Traduction ---


--- Configuration Principale ---
--- Validation Configuration ---
✅ Configuration validée.
   Chunking Audio: Activé (Durée: 10 min)
   Modèle Whisper: large-v3, VAD: Désactivé
   Traduction : Activée
     Chunking Segments: Activé (Max Segments/Chunk: 30)
     Tâches traduction concurrentes max: 14
   Upload Incrémental: Activé


ÉTAPE 1: PRÉPARATION & CHARGEMENT MODÈLE

--- Téléchargement Audio depuis YouTube ---
URL: https://www.youtube.com/watch?v=EDf6I36S-aE&pp=0gcJCYQJAYcqIYzv
Destination: /content/audio_output_optimized_v2/youtube_audio.mp3
ℹ️ Récupér

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


config.json:   0%|          | 0.00/2.39k [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/340 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.48M [00:00<?, ?B/s]

model.bin:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

vocabulary.json:   0%|          | 0.00/1.07M [00:00<?, ?B/s]

✅ Modèle 'large-v3' chargé sur cuda (float16) en 20.13s.
⏱️ Temps Étape 1 (Préparation): 200.53s


ÉTAPE 2: TRAITEMENT & UPLOAD INTERCALÉS

--- Traitement Chunk Audio 1/6: chunk_0000_start0_000s_dur600_000s.mp3 ---

🎙️ Transcription: chunk_0000_start0_000s_dur600_000s.mp3 (Offset Global: 0.000s)
   Infos chunk détectées: Lang='zh' (Conf: 0.92), Durée: 600.00s
   Traitement des segments...
   Progression: [████████████████████] 100% 
   🕒 Transcription du chunk terminée en 99.53s. 301 segments trouvés.
   -> Transcrit 301 segments.
   >> Préparation traduction pour 301 segments en 11 sous-chunk(s).
      (Utilisation de max 14 workers concurrents)
      >> 11 tâches de traduction soumises...
      >> Progression traduction (tâches terminées): 11/11 
   >> Traduction terminée pour Audio Chunk 1 en 11.46s (11 appels API).
      >> ⬆️ Tentative d'upload pour Chunk Audio 1 (301 segments traduits)...
      >> ✅ Upload Chunk Audio 1 réussi (Status: 201). Serveur: New transcript created succes

**-----------------------------------------------------------------------------**
# **Developper testing cells**

In [None]:
# @title # **Transcription (Optimized with faster-whisper & GPU support)**

!rm -r /content/audio_output/*
!rm -r /content/audio_output_optimized/*
!rm -r /content/audio_output_optimized_v2/*

import json
import time
import subprocess
import os
import re
import traceback # For detailed error printing

# --- Try importing necessary libraries and provide guidance if missing ---
try:
    import torch
except ImportError:
    print("❌ PyTorch n'est pas installé. Veuillez l'installer.")
    print("   - CPU: pip install torch torchvision torchaudio")
    print("   - GPU: Voir https://pytorch.org/ pour la commande CUDA appropriée.")
    exit()

try:
    from faster_whisper import WhisperModel
except ImportError:
    print("❌ faster-whisper n'est pas installé. Veuillez exécuter : pip install faster-whisper")
    exit()

# --- Check for GPU availability ---
IS_GPU_AVAILABLE = torch.cuda.is_available()
if IS_GPU_AVAILABLE:
    print("✅ GPU détecté. PyTorch utilisera CUDA.")
else:
    print("⚠️ Aucun GPU compatible CUDA détecté. PyTorch utilisera le CPU.")
    print("   La transcription sera plus lente. faster-whisper avec 'int8' peut aider.")


# --- Fonction download_youtube_audio_improved (PAS DE CHANGEMENT ICI) ---
# ... (votre fonction download_youtube_audio_improved reste identique) ...
def download_youtube_audio_improved(youtube_url, output_path):
    """
    Downloads YouTube audio and reliably extracts metadata using separate yt-dlp calls.
    Checks if the audio file already exists before attempting download.

    Args:
        youtube_url (str): The URL of the YouTube video.
        output_path (str): The desired path to save the MP3 audio file.

    Returns:
        tuple: (str, dict) containing the audio file path and video info dictionary,
               or (None, None) if a critical error occurs (metadata failure).
               If audio download fails but metadata is retrieved, returns (None, video_info).
    """
    video_info = None
    audio_file_path = None
    output_dir = os.path.dirname(output_path)
    if not os.path.exists(output_dir):
        try:
            os.makedirs(output_dir)
            print(f"📁 Création du répertoire de sortie : {output_dir}")
        except OSError as e:
            print(f"❌ Erreur lors de la création du répertoire {output_dir}: {e}")
            return None, None # Cannot proceed without output directory

    # --- Étape 1: Récupérer les métadonnées avec --dump-json ---
    print("ℹ️  Récupération des métadonnées de la vidéo...")
    metadata_result = None # Initialize
    try:
        metadata_command = [
            "yt-dlp",
            "--dump-json", # Sortir les métadonnées en JSON sur stdout
            "--encoding", "utf-8", # Assurer l'encodage correct
            youtube_url,
        ]
        metadata_result = subprocess.run(metadata_command, check=True, capture_output=True, text=True, encoding='utf-8')
        metadata = json.loads(metadata_result.stdout)

        # Extraire les informations nécessaires
        channel_name = metadata.get("uploader", "N/A")
        channel_url = metadata.get("uploader_url", "N/A")
        title = metadata.get("title", "N/A")
        description = metadata.get("description", "N/A")
        video_id = metadata.get("id", None) # ID YouTube réel

        if not video_id:
             print("⚠️ Impossible d'extraire l'ID de la vidéo via yt-dlp.")
             # Essayer avec regex en secours
             video_id_match = re.search(r"v=([a-zA-Z0-9_-]+)", youtube_url)
             video_id = video_id_match.group(1) if video_id_match else "UNKNOWN"

        video_info = {
            "video_id": video_id, # ID YouTube réel
            "channel_name": channel_name,
            "channel_url": channel_url,
            "title": title,
            "description": description,
        }
        print("✅ Métadonnées récupérées.")

    except subprocess.CalledProcessError as e:
        print(f"❌ Erreur critique lors de la récupération des métadonnées avec yt-dlp (Code: {e.returncode}).")
        print(f"   Commande: {' '.join(e.cmd)}")
        stderr_output = e.stderr.strip() if e.stderr else "N/A"
        print(f"   Erreur: {stderr_output}")
        return None, None # Erreur critique, impossible de continuer sans métadonnées
    except json.JSONDecodeError as e:
        print(f"❌ Erreur critique lors de l'analyse des métadonnées JSON : {e}")
        if metadata_result and metadata_result.stdout:
             print(f"--- Sortie brute de yt-dlp --- \n{metadata_result.stdout[:500]}...\n--------------------------")
        return None, None # Erreur critique
    except FileNotFoundError:
        print("❌ yt-dlp n'est pas installé ou introuvable dans le PATH.")
        print("   Veuillez l'installer (par exemple avec : pip install yt-dlp)")
        return None, None
    except Exception as e:
        print(f"❌ Erreur inattendue critique lors de la récupération des métadonnées : {e}")
        traceback.print_exc()
        return None, None # Erreur critique

    # --- Étape 2: Télécharger (ou vérifier) l'audio ---
    print(f"🔄 Vérification/Téléchargement de l'audio vers {output_path}...")

    # Vérification préalable de l'existence du fichier
    if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
        print(f"✅ Fichier audio '{output_path}' existe déjà et n'est pas vide. Utilisation du fichier existant.")
        audio_file_path = output_path
    else:
        if os.path.exists(output_path):
             print(f"ℹ️ Fichier audio '{output_path}' existe mais est vide. Tentative de re-téléchargement...")
        else:
             print(f"ℹ️ Fichier audio non trouvé. Tentative de téléchargement...")

        download_result = None # Initialize
        try:
            download_command = [
                "yt-dlp",
                "-x", # Extraire l'audio
                "--audio-format", "mp3",
                # '--audio-quality', '0', # Optionnel: Meilleure qualité audio (peut être plus lent)
                # '--no-overwrites', # On gère la vérification avant, mais laisser en sécurité
                "--force-overwrites", # Forcer l'écrasement si le fichier existant était vide/corrompu
                "-o", output_path, # Spécifier le chemin de sortie complet
                "--encoding", "utf-8",
                # "-v", # Décommenter pour une sortie très détaillée (debug)
                youtube_url,
            ]
            download_result = subprocess.run(download_command, check=True, capture_output=True, text=True, encoding='utf-8')

            # Vérifier si le fichier existe et n'est pas vide APRES l'exécution
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"✅ Fichier audio téléchargé avec succès : {output_path}")
                audio_file_path = output_path
            else:
                print(f"⚠️ yt-dlp a terminé sans erreur mais le fichier audio '{output_path}' est introuvable ou vide après la tentative.")
                if download_result:
                    print(f"--- Sortie yt-dlp (stdout) ---\n{download_result.stdout.strip()}")
                    print(f"--- Sortie yt-dlp (stderr) ---\n{download_result.stderr.strip()}")
                # Retourner les métadonnées même si le téléchargement échoue ici
                return None, video_info

        except subprocess.CalledProcessError as e:
            print(f"❌ Erreur lors du téléchargement de l'audio avec yt-dlp (Code: {e.returncode}).")
            print(f"   Commande: {' '.join(e.cmd)}")
            stderr_output = e.stderr.strip() if e.stderr else "N/A"
            print(f"   Erreur: {stderr_output}")
            # Vérifier si le fichier existe quand même (parfois yt-dlp échoue mais laisse un fichier partiel)
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                 print(f"ℹ️ Un fichier audio existe à '{output_path}' malgré l'erreur. Il est peut-être incomplet.")
                 audio_file_path = output_path # On le retourne quand même, l'utilisateur verra l'erreur
            else:
                 # Retourner les métadonnées, mais pas de chemin audio
                 return None, video_info
        except FileNotFoundError:
            # Devrait avoir été capturé à l'étape 1, mais redondance
            print("❌ yt-dlp n'est pas installé ou introuvable dans le PATH.")
            return None, video_info # On a peut-être les métadonnées
        except Exception as e:
            print(f"❌ Erreur inattendue lors du téléchargement de l'audio : {e}")
            traceback.print_exc()
            # Vérifier si le fichier existe malgré l'erreur
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"ℹ️ Utilisation du fichier audio existant '{output_path}' malgré l'erreur inattendue.")
                audio_file_path = output_path
            else:
                return None, video_info # Retourner métadonnées, mais pas de chemin audio

    # Dernière vérification de l'existence et de la taille du fichier avant de retourner
    if audio_file_path and (not os.path.exists(audio_file_path) or os.path.getsize(audio_file_path) == 0):
        print(f"❌ ERREUR FINALE: Le chemin audio '{audio_file_path}' indiqué n'existe pas ou est vide.")
        return None, video_info

    return audio_file_path, video_info


# --- Fonction transcribe_audio (MISE À JOUR avec progression et conseils VAD) ---
def transcribe_audio_faster(file_path, model_size="base", video_info=None, beam_size=5, vad_filter=True, vad_min_silence_ms=700):
    """
    Transcrit un fichier audio en utilisant faster-whisper optimisé, affiche la progression
    et retourne une liste de segments. Utilise le GPU si disponible.

    Args:
      file_path (str): Chemin vers le fichier audio (ex : "audio.mp3").
      model_size (str): Taille du modèle ("tiny", "base", "small", "medium", "large-v3").
      video_info (dict, optional): Informations sur la vidéo. Defaults to None.
      beam_size (int): Taille du faisceau pour le décodage.
      vad_filter (bool): Activer le filtre VAD (Voice Activity Detection).
                         Si True, utilise vad_min_silence_ms.
                         Si False, désactive VAD (peut être plus lent mais utile pour déboguer les temps).
      vad_min_silence_ms (int): Durée minimale de silence en ms pour couper un segment avec VAD.
                                Uniquement utilisé si vad_filter=True.
                                Augmenter (ex: 1000) peut créer des segments plus longs.
                                Diminuer (ex: 500) peut créer des segments plus courts.

    Returns:
      dict: Un dictionnaire contenant 'segments' (liste de segments)
            et potentiellement des informations sur la vidéo si fournies.
            Retourne {'segments': []} en cas d'erreur majeure.
    """
    start_time_load = time.time()
    print(f"\n🔄 Chargement du modèle faster-whisper '{model_size}'...")

    device = "cuda" if IS_GPU_AVAILABLE else "cpu"
    # Utiliser float16 pour GPU récent, bfloat16 pour Ampere+, float32/int8 pour CPU
    if IS_GPU_AVAILABLE:
        # Vérifier la capacité de calcul pour bfloat16 (Ampere et plus récent)
        if torch.cuda.get_device_capability(0)[0] >= 8:
            compute_type = "bfloat16"
            print("ℹ️ GPU compatible Ampere+ détecté, utilisation de bfloat16.")
        else:
            compute_type = "float16"
            print("ℹ️ GPU détecté, utilisation de float16.")
    else:
        compute_type = "int8" # int8 est généralement un bon compromis pour CPU

    model = None
    try:
        model = WhisperModel(model_size, device=device, compute_type=compute_type)
        load_time = time.time() - start_time_load
        print(f"✅ Modèle chargé sur '{device}' avec compute_type='{compute_type}' en {load_time:.2f}s.")

    except Exception as e:
        print(f"❌ Erreur lors du chargement du modèle faster-whisper ({model_size}, {device}, {compute_type}): {e}")
        # Essayer un fallback plus sûr si le premier choix a échoué
        fallback_compute_type = None
        if IS_GPU_AVAILABLE and compute_type != "float32":
             fallback_compute_type = "float32" # Moins performant mais plus compatible sur GPU
        elif not IS_GPU_AVAILABLE and compute_type != "int8": # S'il y avait une autre option CPU testée
             fallback_compute_type = "int8"

        if fallback_compute_type:
            print(f"ℹ️ Tentative de fallback avec compute_type='{fallback_compute_type}'...")
            try:
                model = WhisperModel(model_size, device=device, compute_type=fallback_compute_type)
                load_time = time.time() - start_time_load
                print(f"✅ Modèle chargé sur '{device}' avec compute_type='{fallback_compute_type}' en {load_time:.2f}s.")
            except Exception as e2:
                print(f"❌ Échec du chargement même avec fallback '{fallback_compute_type}': {e2}")
                traceback.print_exc()
                return {"segments": []}
        else:
            traceback.print_exc()
            return {"segments": []} # Échec initial sans option de fallback évidente

    print(f"\n🎙️ Début de la transcription (faster-whisper) pour {os.path.basename(file_path)}...")
    print(f"   Options: beam_size={beam_size}, vad_filter={vad_filter}" + (f", vad_min_silence_ms={vad_min_silence_ms}" if vad_filter else ""))
    start_time_transcribe = time.time()
    transcript_segments_data = []
    total_audio_duration = 0
    last_printed_progress = -1 # Pour éviter d'imprimer 0% plusieurs fois

    try:
        # Configurer les paramètres VAD si activé
        vad_parameters = None
        if vad_filter:
            vad_parameters = dict(min_silence_duration_ms=vad_min_silence_ms)
            # Vous pouvez ajouter d'autres paramètres VAD ici si nécessaire, ex:
            # vad_parameters["threshold"] = 0.5 # Seuil de détection vocale (0 à 1)

        # Lancer la transcription - segments_generator est un itérateur !
        segments_generator, info = model.transcribe(
            file_path,
            beam_size=beam_size,
            vad_filter=vad_filter,
            vad_parameters=vad_parameters
            # word_timestamps=False # Mettre à True pour des temps au niveau du mot (change la structure)
        )

        detected_lang = info.language
        lang_prob = info.language_probability
        total_audio_duration = info.duration # Durée totale détectée par faster-whisper

        print(f"✅ Infos détectées: Langue='{detected_lang}' (Prob: {lang_prob:.2f}), Durée Totale: {total_audio_duration:.2f}s")
        if total_audio_duration <= 0:
             print("⚠️ Durée audio détectée nulle ou négative. Le calcul de progression sera imprécis.")

        print("--> Début du traitement des segments...")

        # --- Boucle principale avec progression ---
        segment_count = 0
        for segment in segments_generator:
            segment_count += 1
            seg_start = segment.start
            seg_end = segment.end
            duration = max(0, seg_end - seg_start)
            text = segment.text.strip() if segment.text else ""

            # Calculer le pourcentage de progression basé sur la FIN du segment actuel
            current_progress = 0
            if total_audio_duration > 0:
                current_progress = min(100.0, (seg_end / total_audio_duration) * 100)

            # Afficher la progression (avec une barre simple et moins fréquente)
            # Imprime tous les 5% ou pour le dernier segment
            rounded_progress = int(current_progress)
            if rounded_progress > last_printed_progress and (rounded_progress % 5 == 0 or rounded_progress >= 99):
                progress_bar_length = 20
                filled_length = int(progress_bar_length * current_progress / 100)
                bar = '█' * filled_length + '-' * (progress_bar_length - filled_length)
                # Utiliser \r pour revenir au début de la ligne et écraser le précédent message
                print(f"\r   Progression: [{bar}] {current_progress:.1f}% (Segment ~{segment_count}, Temps: {seg_end:.2f}s / {total_audio_duration:.2f}s)", end="")
                last_printed_progress = rounded_progress

            transcript_segments_data.append({
                "text": text,
                "start": round(seg_start, 3),
                "duration": round(duration, 3),
                # On peut garder le % par segment si utile, sinon on l'a déjà affiché
                "progress_percentage_at_segment_end": round(current_progress, 2)
            })

        # Assurer un saut de ligne après la barre de progression finale
        print() # New line after the loop finishes

        transcription_time = time.time() - start_time_transcribe
        print(f"🕒 Transcription (faster-whisper) terminée en {transcription_time:.2f} secondes.")
        print(f"   Nombre total de segments générés : {segment_count}")

        if segment_count == 0:
            print("⚠️ Aucun segment transcrit trouvé.")
            # Si aucun segment, vérifier si VAD était actif.
            if vad_filter:
                print("   -> Conseil: Essayez avec 'vad_filter=False' ou ajustez 'vad_min_silence_ms'.")

    except FileNotFoundError:
        print(f"❌ Erreur: Fichier audio introuvable à '{file_path}'")
        return {"segments": []}
    except Exception as e:
        print(f"\n❌ Erreur majeure lors de la transcription avec faster-whisper : {e}")
        print("--- Traceback de l'erreur ---")
        traceback.print_exc()
        print("---------------------------")
        return {"segments": []}

    # --- Conseils si les temps semblent incorrects ---
    print("\n--- Vérification des Timestamps ---")
    if vad_filter:
        print("ℹ️ VAD était activé. Si les temps des segments semblent incorrects (trop longs, décalés) :")
        print(f"   - Essayez d'ajuster 'vad_min_silence_ms' (actuellement: {vad_min_silence_ms}). Augmenter (ex: 1000, 1500) regroupe plus, diminuer (ex: 500, 300) segmente plus.")
        print(f"   - Essayez de relancer avec 'vad_filter=False' pour voir les segments bruts du modèle (peut être plus lent).")
    else:
         print("ℹ️ VAD était désactivé. Les temps proviennent directement du modèle. Si incorrects:")
         print(f"   - Envisagez un modèle plus grand ('{model_size}' -> 'large-v3'?) pour une meilleure précision potentielle.")
         print(f"   - Vérifiez la qualité de l'audio source.")
    print("------------------------------------\n")


    # Construire la sortie finale
    output = {"segments": transcript_segments_data}
    if video_info:
        output_video_info = {k: v for k, v in video_info.items() if v is not None}
        output.update(output_video_info)

    return output

# --- Exemple d'utilisation (Adapté) ---
youtube_url = "https://www.youtube.com/watch?v=GRLdsdBDjE4&pp=ygUObWF5aG8gc2hlbnpoZW4%3D" #@param {"type":"string"}
output_dir = "/content/audio_output_optimized_v2" # Nouveau dossier pour éviter conflits
output_filename = "youtube_audio_opt.mp3"
output_path = os.path.join(output_dir, output_filename)

# --- Paramètres de Transcription ---
model_size = "large-v3" #@param ["tiny","base","small","medium","large-v3"]

# --- NOUVEAUX paramètres pour le contrôle VAD et timing ---
use_vad = False # @param {type:"boolean"}
# Cette valeur est IGNORÉE si use_vad est False
vad_silence_duration_ms = 200 # @param {type:"slider", min:100, max:2000, step:50}

beam_search_size = 5 # Taille standard pour un bon équilibre vitesse/qualité

# --- Exécution Principale ---
file_path = None
video_info = None
json_output_filename = None # Initialiser pour le bloc finally

try:
    # 1. Télécharger l'audio et récupérer les infos vidéo
    file_path, video_info = download_youtube_audio_improved(youtube_url, output_path)

    # 2. Vérifier si le téléchargement et les métadonnées ont réussi
    if file_path and video_info:
        print("\n--- Informations Vidéo Récupérées ---")
        print(json.dumps(video_info, indent=2, ensure_ascii=False))
        print("-----------------------------------\n")

        print(f"🚀 Lancement de la transcription optimisée pour : {file_path}")
        # 3. Lancer la transcription avec faster-whisper ET les nouveaux paramètres
        transcript = transcribe_audio_faster(
            file_path,
            model_size=model_size,
            video_info=video_info,
            beam_size=beam_search_size,
            vad_filter=use_vad, # Utiliser le paramètre défini ci-dessus
            vad_min_silence_ms=vad_silence_duration_ms # Utiliser le paramètre défini ci-dessus
        )

        # (Le reste du code pour sauvegarder le JSON est inchangé)
        print("\n------------------------------------")

        # 4. Sauvegarder la transcription si elle n'est pas vide
        if transcript and transcript.get("segments"): # Vérifier que des segments existent
            # Ajouter les paramètres utilisés au nom de fichier pour référence
            vad_suffix = f"vad{vad_silence_duration_ms}" if use_vad else "noVAD"
            json_output_filename = os.path.splitext(output_path)[0] + f"_transcript_{model_size}_{vad_suffix}.json"
            try:
                with open(json_output_filename, "w", encoding="utf-8") as f:
                    json.dump(transcript, f, indent=2, ensure_ascii=False)
                print(f"✅ Transcription sauvegardée dans : {json_output_filename}")
            except IOError as e:
                print(f"❌ Erreur lors de la sauvegarde du fichier JSON : {e}")
            except Exception as e:
                 print(f"❌ Erreur inattendue lors de la sauvegarde JSON : {e}")
                 traceback.print_exc() # Voir l'erreur
        elif transcript: # Si transcript existe mais pas de segments
             print("ℹ️ La transcription a été exécutée mais n'a produit aucun segment. Aucun fichier JSON sauvegardé.")
        else: # Si transcript est None ou vide
             print("❌ La transcription a échoué ou a retourné un résultat vide. Aucun fichier JSON sauvegardé.")


    elif video_info:
        print("\n⚠️ Le téléchargement/accès au fichier audio a échoué, mais les métadonnées ont été récupérées.")
        print("--- Informations Vidéo ---")
        print(json.dumps(video_info, indent=2, ensure_ascii=False))
        print("------------------------")
        print("🚫 Transcription annulée car le fichier audio est manquant ou inaccessible.")
    else:
        print("\n❌ Impossible de récupérer les métadonnées et/ou de télécharger/trouver le fichier audio.")
        print("   Vérifiez l'URL YouTube, votre connexion internet et l'installation de yt-dlp.")
        print("🚫 Transcription annulée.")

except Exception as e:
    print(f"❌ Une erreur globale inattendue s'est produite : {e}")
    print("--- Traceback de l'erreur ---")
    traceback.print_exc()
    print("---------------------------")

finally:
    # --- Nettoyage (Optionnel - Inchangé) ---
    # Décommentez pour supprimer l'audio après traitement
    # print("\n--- Nettoyage ---")
    # if file_path and os.path.exists(file_path):
    #     try:
    #         # Utilisez 'rm -f' pour forcer la suppression sous Linux/Colab si nécessaire
    #         # subprocess.run(['rm', '-f', file_path], check=True)
    #         os.remove(file_path) # Essayez d'abord os.remove
    #         print(f"🗑️ Fichier audio supprimé : {file_path}")
    #     except Exception as e:
    #         print(f"❌ Erreur lors de la suppression du fichier audio {file_path}: {e}")
    # elif file_path:
    #     print(f"ℹ️ Fichier audio {file_path} non trouvé pour suppression.")
    # else:
    #      print(f"ℹ️ Aucun fichier audio à supprimer.")
    pass # Ne rien faire par défaut

print("\n🏁 Script terminé.")

rm: cannot remove '/content/audio_output/*': No such file or directory
rm: cannot remove '/content/audio_output_optimized/*': No such file or directory
✅ GPU détecté. PyTorch utilisera CUDA.
ℹ️  Récupération des métadonnées de la vidéo...
✅ Métadonnées récupérées.
🔄 Vérification/Téléchargement de l'audio vers /content/audio_output_optimized_v2/youtube_audio_opt.mp3...
ℹ️ Fichier audio non trouvé. Tentative de téléchargement...
✅ Fichier audio téléchargé avec succès : /content/audio_output_optimized_v2/youtube_audio_opt.mp3

--- Informations Vidéo Récupérées ---
{
  "video_id": "GRLdsdBDjE4",
  "channel_name": "May Ho",
  "channel_url": "https://www.youtube.com/@MayHo",
  "title": "【 深圳 VLOG 】 中國免簽立馬飛去深圳吃喝玩樂 😝 帶著你們的疑問找 Ulike 問清楚 😤｜MAYHO",
  "description": "每一樣食物都好好吃啊啊啊 🥺\n▸  訂閱我吧 😉  http://bit.ly/37IiWLu\n▸ 上集影片：https://youtu.be/XQbXKy2jViQ?si=hUFWW0tuBf87PdCw\n\n- - -\n\n剛宣佈中國免簽，立馬買機票飛去深圳玩嘍！🤣\n非常謝謝 Ulike 的邀請，參觀他們的店面及公司\n我還幫你們談了超棒的優惠，大家千萬別錯過喔！😎\n\n也謝謝 Kris 和 大衛帶我們吃了很多道地美食\n這個旅程很照顧我們，愛你們

In [None]:
# @title # **Traduction Parallélisée**

import requests
import json
import re
import time
import math # Pour calculer le nombre total de chunks
import concurrent.futures # Pour la parallélisation
import sys # Pour sys.stdout.flush() si besoin dans certains environnements

# --- Fonction Utilitaires ---
def chunk_list(lst, n):
    """Divise une liste en sous-listes de taille n."""
    if not isinstance(lst, list):
        raise TypeError("L'entrée doit être une liste.")
    if n <= 0:
        raise ValueError("La taille du chunk doit être positive.")
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def extract_json_from_response(text_response):
    """
    Tente d'extraire une chaîne JSON valide à partir d'une réponse textuelle,
    en gérant les blocs de code markdown potentiels (```json ... ```).
    """
    # 1. Essayer de trouver un bloc de code JSON démarqué
    match = re.search(r'```json\s*([\s\S]*?)\s*```', text_response, re.DOTALL)
    if match:
        # print("ℹ️ Bloc JSON détecté via ```json ... ```.") # Moins de logs en parallèle
        return match.group(1).strip()

    # 2. Si pas de bloc, chercher un JSON qui commence par '[' et finit par ']' (liste)
    #    Ou commence par '{' et finit par '}' (objet - moins probable ici mais par sécurité)
    match_list = re.search(r'(\[[\s\S]*\])', text_response, re.DOTALL)
    match_obj = re.search(r'(\{[\s\S]*\})', text_response, re.DOTALL)

    if match_list and match_obj:
        json_string = match_list.group(1) if match_list.start() < match_obj.start() else match_obj.group(1)
        # print("ℹ️ JSON détecté via délimiteurs [...] ou {...}.")
        return json_string.strip()
    elif match_list:
        # print("ℹ️ JSON détecté via délimiteurs [...].")
        return match_list.group(1).strip()
    elif match_obj:
         # print("ℹ️ JSON détecté via délimiteurs {...}.")
         return match_obj.group(1).strip()

    # 3. Si rien ne fonctionne, retourner le texte original
    # print("⚠️ Impossible d'isoler un bloc JSON spécifique. Tentative d'analyse du texte brut.")
    return text_response.strip()


# --- Fonction de Traduction (pour un seul chunk - reste inchangée) ---
def translate_chunk_with_gemini(transcript_chunk, api_key, chunk_index, total_chunks):
    """
    Envoie un chunk de transcription à l'API Gemini pour traduction.
    (Cette fonction est appelée par les threads)

    Args:
        transcript_chunk (list): Un morceau (chunk) de la transcription (liste de segments).
        api_key (str): Clé API Gemini.
        chunk_index (int): Index global du chunk (pour l'affichage).
        total_chunks (int): Nombre total de chunks (pour l'affichage).

    Returns:
        tuple: (chunk_index, list | None) - Retourne l'index du chunk et le résultat (liste traduite ou None)
               pour pouvoir réassembler dans le bon ordre.
    """
    if not transcript_chunk:
        print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] est vide, ignoré.")
        # Retourne l'index et une liste vide pour indiquer qu'il a été traité (mais était vide)
        return chunk_index, []

    start_time = time.time()
    # Utiliser sys.stdout.flush() peut aider si les logs n'apparaissent pas immédiatement en environnement multi-threadé
    print(f"🔄 [Chunk {chunk_index+1}/{total_chunks}] Début traduction ({len(transcript_chunk)} segments)...", flush=True)

    # Utiliser un modèle récent et approprié
    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?key={api_key}"

    try:
        transcript_str = json.dumps(transcript_chunk, ensure_ascii=False, indent=2)
    except TypeError as e:
         print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Erreur de sérialisation JSON du chunk : {e}", flush=True)
         return chunk_index, None

    prompt = (
        "You are an expert multilingual translator. Below is a JSON array representing segments of an audio transcript.\n"
        "For EACH segment object in the array, please perform the following:\n"
        "1. Identify the original language of the 'text' field.\n"
        "2. Translate the content of the 'text' field into English and add it as a new key-value pair: 'text_english': \"<english_translation>\".\n"
        "3. Translate the content of the 'text' field into French and add it as a new key-value pair: 'text_french': \"<french_translation>\".\n"
        "4. IMPORTANT: Preserve ALL other existing keys and their values ('start', 'duration', 'progress_percentage', etc.) exactly as they are.\n"
        "5. Return ONLY the complete, modified JSON array. Do not include any explanatory text before or after the JSON array itself. Respond only with the JSON.\n\n"
        "Here is the JSON array:\n"
        f"```json\n{transcript_str}\n```"
    )

    payload = {
        "contents": [{"parts": [{"text": prompt}]}],
        "generationConfig": {
            "temperature": 0.3,
            "maxOutputTokens": 8192,
             # Spécifier explicitement le format de sortie JSON (si le modèle le supporte bien)
             # NOTE : Pas tous les modèles ou versions supportent response_mime_type de manière fiable.
             # Si cela cause des erreurs, commentez la ligne suivante.
             "response_mime_type": "application/json",
        },
         "safetySettings": [
            {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
        ]
    }
    headers = {"Content-Type": "application/json"}

    # --- Boucle de tentatives avec backoff exponentiel léger ---
    max_retries = 3
    base_delay = 2 # secondes
    for attempt in range(max_retries):
        try:
            response = requests.post(url, headers=headers, json=payload, timeout=180) # Timeout long
            response.raise_for_status() # Lève une exception pour 4xx/5xx

            response_data = response.json()

            # Vérification des erreurs Gemini
            if not response_data.get("candidates"):
                prompt_feedback = response_data.get("promptFeedback", {})
                block_reason = prompt_feedback.get("blockReason")
                safety_ratings = prompt_feedback.get("safetyRatings")
                error_message = f"Aucun candidat retourné."
                if block_reason: error_message += f" Raison blocage: {block_reason}."
                if safety_ratings: error_message += f" Ratings: {safety_ratings}"
                # Si bloqué pour sécurité, ne pas retenter
                if block_reason == 'SAFETY':
                     print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Blocage API Gemini (Sécurité). {error_message}", flush=True)
                     return chunk_index, None
                # Pour d'autres erreurs sans candidat, on peut retenter
                if attempt < max_retries - 1:
                    delay = base_delay * (2 ** attempt)
                    print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] Erreur API Gemini: {error_message}. Tentative {attempt+2}/{max_retries} après {delay}s...", flush=True)
                    time.sleep(delay)
                    continue
                else:
                    print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Erreur API Gemini: {error_message}. Echec après {max_retries} tentatives.", flush=True)
                    return chunk_index, None

            candidate = response_data["candidates"][0]
            finish_reason = candidate.get("finishReason")

            if finish_reason not in ["STOP", "MAX_TOKENS"]:
                print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] Fin de génération anormale: {finish_reason}.", flush=True)
                # Retenter si ce n'est pas une erreur fatale
                if finish_reason == 'SAFETY': # Ne pas retenter si bloqué pour sécurité
                     print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Blocage API Gemini (Sécurité - finish_reason).", flush=True)
                     return chunk_index, None
                if attempt < max_retries - 1:
                    delay = base_delay * (2 ** attempt)
                    print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] Tentative {attempt+2}/{max_retries} après {delay}s...", flush=True)
                    time.sleep(delay)
                    continue
                else:
                     print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec après {max_retries} tentatives (finish_reason={finish_reason}).", flush=True)
                     return chunk_index, None # Echec final si raison anormale


            if "content" not in candidate or "parts" not in candidate["content"] or not candidate["content"]["parts"]:
                 print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Structure de réponse inattendue (manque content/parts).", flush=True)
                 # Retenter pourrait aider si c'est une erreur transitoire
                 if attempt < max_retries - 1:
                    delay = base_delay * (2 ** attempt)
                    print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] Tentative {attempt+2}/{max_retries} après {delay}s...", flush=True)
                    time.sleep(delay)
                    continue
                 else:
                     print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec après {max_retries} tentatives (structure réponse).", flush=True)
                     return chunk_index, None

            raw_text_response = candidate["content"]["parts"][0]["text"]
            json_string = extract_json_from_response(raw_text_response)
            result_json = json.loads(json_string) # Peut lever JSONDecodeError

            if not isinstance(result_json, list):
                 print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Le résultat décodé n'est pas une liste.", flush=True)
                 # Probablement une erreur du LLM, retenter ne sert à rien ici
                 return chunk_index, None
            if len(result_json) != len(transcript_chunk):
                 print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] Le nombre de segments retournés ({len(result_json)}) != entrée ({len(transcript_chunk)}). Probablement tronqué (MAX_TOKENS?).", flush=True)
                 # On accepte le résultat partiel mais on logue l'avertissement
                 # return chunk_index, None # Ou considérer comme un échec si on veut être strict

            elapsed_time = time.time() - start_time
            print(f"✅ [Chunk {chunk_index+1}/{total_chunks}] Traduction réussie en {elapsed_time:.2f} secondes (Tentative {attempt+1}).", flush=True)
            return chunk_index, result_json # Succès

        except requests.exceptions.Timeout:
            print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] Timeout lors de la requête (Tentative {attempt+1}).", flush=True)
            if attempt == max_retries - 1:
                print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec final après timeout.", flush=True)
                return chunk_index, None
            delay = base_delay * (2 ** attempt)
            print(f"   Retentative dans {delay}s...", flush=True)
            time.sleep(delay)

        except requests.exceptions.RequestException as e:
            # Gérer les erreurs 429 (Too Many Requests) spécifiquement si possible
            status_code = e.response.status_code if e.response is not None else None
            if status_code == 429:
                print(f"⚠️ [Chunk {chunk_index+1}/{total_chunks}] Erreur 429 (Too Many Requests) (Tentative {attempt+1}).", flush=True)
                # Attendre plus longtemps et retenter
                delay = 10 * (attempt + 1) # Attente plus longue pour 429
                print(f"   Attente de {delay}s avant la prochaine tentative...", flush=True)
                time.sleep(delay)
                if attempt == max_retries - 1:
                     print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec final après erreur 429.", flush=True)
                     return chunk_index, None
            else:
                print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Erreur réseau/HTTP (Tentative {attempt+1}): {e}", flush=True)
                # Pour les erreurs non-429, un backoff standard suffit
                if attempt == max_retries - 1:
                    print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec final après erreur réseau/HTTP.", flush=True)
                    return chunk_index, None
                delay = base_delay * (2 ** attempt)
                print(f"   Retentative dans {delay}s...", flush=True)
                time.sleep(delay)


        except json.JSONDecodeError as e:
            print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Erreur analyse JSON réponse (Tentative {attempt+1}): {e}", flush=True)
            print(f"--- Réponse textuelle brute reçue (Chunk {chunk_index+1}) ---")
            print(raw_text_response[:500] + "..." if len(raw_text_response) > 500 else raw_text_response)
            print("--- Fin Réponse textuelle brute ---", flush=True)
            # Retenter peut aider si la réponse était corrompue transitoirement
            if attempt == max_retries - 1:
                 print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec final après erreur JSON.", flush=True)
                 return chunk_index, None
            delay = base_delay * (2 ** attempt)
            print(f"   Retentative dans {delay}s...", flush=True)
            time.sleep(delay)

        except (KeyError, IndexError) as e:
            print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Erreur accès clés réponse API (Tentative {attempt+1}): {e}", flush=True)
             # Retenter peut aider si la réponse était malformée transitoirement
            if attempt == max_retries - 1:
                 print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec final après erreur structure réponse.", flush=True)
                 return chunk_index, None
            delay = base_delay * (2 ** attempt)
            print(f"   Retentative dans {delay}s...", flush=True)
            time.sleep(delay)

        except Exception as e:
            print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Erreur inattendue (Tentative {attempt+1}): {e.__class__.__name__}: {e}", flush=True)
            # Retenter pour erreurs génériques
            if attempt == max_retries - 1:
                print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec final après erreur inattendue.", flush=True)
                return chunk_index, None
            delay = base_delay * (2 ** attempt)
            print(f"   Retentative dans {delay}s...", flush=True)
            time.sleep(delay)

    # Si on sort de la boucle sans succès
    print(f"❌ [Chunk {chunk_index+1}/{total_chunks}] Echec final après {max_retries} tentatives.", flush=True)
    return chunk_index, None


# --- Fonction pour traiter un lot (batch) de chunks en parallèle ---
def translate_batch(batch_chunks_with_indices, api_key, total_chunks, max_workers=14):
    """
    Traite un lot de chunks en parallèle en utilisant ThreadPoolExecutor.

    Args:
        batch_chunks_with_indices (list): Liste de tuples (index_global, chunk_data).
        api_key (str): Clé API Gemini.
        total_chunks (int): Nombre total de chunks dans la transcription complète.
        max_workers (int): Nombre maximum de threads à utiliser.

    Returns:
        dict: Dictionnaire {index_global: resultat_traduction} pour ce lot.
              resultat_traduction est soit la liste des segments traduits, soit None en cas d'échec.
    """
    batch_results = {}
    # Utilise un ThreadPoolExecutor pour gérer les threads
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Soumet chaque tâche de traduction de chunk au pool de threads
        # future_to_index mappe l'objet Future (représentant l'exécution) à l'index global du chunk
        future_to_index = {
            executor.submit(translate_chunk_with_gemini, chunk_data, api_key, index, total_chunks): index
            for index, chunk_data in batch_chunks_with_indices
        }

        # Récupère les résultats au fur et à mesure que les tâches se terminent
        for future in concurrent.futures.as_completed(future_to_index):
            original_index = future_to_index[future]
            try:
                # Obtient le résultat de la fonction (qui est un tuple: index, data)
                index_result, translated_data = future.result()
                # Stocke le résultat (translated_data) en utilisant l'index original comme clé
                batch_results[original_index] = translated_data
            except Exception as exc:
                # Si l'exécution de la tâche elle-même a levé une exception imprévue
                print(f"‼️ [Chunk {original_index+1}/{total_chunks}] a généré une exception dans le thread: {exc}", flush=True)
                batch_results[original_index] = None # Marque comme échoué

    # Retourne le dictionnaire des résultats pour ce lot, indexé par l'index global du chunk
    return batch_results


# --- Script Principal d'Exécution ---

# !!! IMPORTANT: Utiliser Colab Secrets ou une méthode sécurisée pour la clé API. !!!
# from google.colab import userdata
# api_key = userdata.get('GEMINI_API_KEY')
api_key = "AIzaSyBiZONd6VA8y9zAd8vueZRo_IrPnn7iHlw" #@param {type:"string"}

# Nombre de chunks à envoyer en parallèle (limité par l'API et vos ressources)
# Gemini free tier = 15 RPM (Requests Per Minute)
# On prend 14 pour laisser une petite marge.
PARALLEL_CHUNKS = 14 #@param {type:"integer"}

# Délai d'attente entre les lots (en secondes) pour respecter la limite de 15 RPM
# Si on envoie 14 requêtes, on attend 60s pour pouvoir en envoyer 14 autres la minute suivante.
# Mettre un peu plus pour être sûr (ex: 61 ou 62)
RATE_LIMIT_DELAY = 61 #@param {type:"integer"}


if api_key == "VOTRE_CLE_API_GEMINI_ICI" or not api_key:
     print("🛑 Veuillez fournir votre clé API Gemini dans la variable 'api_key'.")
     # raise ValueError("Clé API Gemini manquante.")
else:
    # Assurer l'existence et le format de 'transcript'
    if 'transcript' not in locals() or not isinstance(transcript, dict) or 'segments' not in transcript or not isinstance(transcript['segments'], list):
        print("❌ La variable 'transcript' n'est pas définie ou n'a pas le format attendu.")
        print("   Assurez-vous que la cellule de transcription a été exécutée avec succès auparavant.")
        print("   Format attendu: {'segments': [{'text': ..., 'start': ..., ...}, ...], ...}")
        transcript = {"segments": []} # Créer un transcript vide pour éviter les erreurs

    source_segments = transcript.get("segments", [])
    if not source_segments:
        print("ℹ️ La transcription source ne contient aucun segment. Aucune traduction à effectuer.")
        translated_transcript_data = transcript.copy()
        translated_transcript_data['segments'] = []
    else:
        print(f"\nPréparation de la traduction pour {len(source_segments)} segments...")

        # Taille du chunk (nombre de segments par requête API) - Ajustable
        chunk_size = 50 # @param {type:"integer"}
        if chunk_size <= 0:
            print("⚠️ Taille de chunk invalide, utilisation de la valeur par défaut 50.")
            chunk_size = 50

        # Créer une liste de tous les chunks avec leur index global
        all_chunks_with_indices = list(enumerate(chunk_list(source_segments, chunk_size)))
        total_chunks = len(all_chunks_with_indices)

        print(f"Découpage en {total_chunks} chunks de {chunk_size} segments maximum.")
        print(f"Traitement par lots de {PARALLEL_CHUNKS} chunks en parallèle.")

        # Dictionnaire pour stocker tous les résultats, indexés par l'index du chunk
        all_results_dict = {}
        failed_chunk_indices = []
        successful_chunk_count = 0

        start_total_time = time.time()
        batch_num = 0
        total_batches = math.ceil(total_chunks / PARALLEL_CHUNKS)

        # Boucler sur les lots (batches) de chunks
        for i in range(0, total_chunks, PARALLEL_CHUNKS):
            batch_num += 1
            # Sélectionner le lot actuel de chunks (jusqu'à PARALLEL_CHUNKS)
            current_batch_with_indices = all_chunks_with_indices[i : i + PARALLEL_CHUNKS]
            batch_start_index = current_batch_with_indices[0][0] + 1 # Index du 1er chunk du lot
            batch_end_index = current_batch_with_indices[-1][0] + 1 # Index du dernier chunk du lot

            print(f"\n--- Traitement du Lot {batch_num}/{total_batches} (Chunks {batch_start_index} à {batch_end_index}) ---", flush=True)

            # Traduire le lot en parallèle
            batch_start_time = time.time()
            batch_results = translate_batch(current_batch_with_indices, api_key, total_chunks, PARALLEL_CHUNKS)
            batch_end_time = time.time()
            print(f"--- Lot {batch_num}/{total_batches} terminé en {batch_end_time - batch_start_time:.2f} secondes ---", flush=True)


            # Mettre à jour les résultats globaux et compter les succès/échecs pour ce lot
            for index, result_data in batch_results.items():
                all_results_dict[index] = result_data # Stocker le résultat (liste ou None)
                if result_data is not None:
                    # Vérification supplémentaire optionnelle des clés (peut ralentir un peu)
                    if result_data and isinstance(result_data, list) and result_data[0]:
                         if 'text_english' not in result_data[0] or 'text_french' not in result_data[0]:
                               print(f"⚠️ [Chunk {index+1}/{total_chunks}] Clés 'text_english'/'text_french' manquantes dans le résultat retourné. Format peut-être incorrect.", flush=True)
                               # On le compte quand même comme 'réussi' car on a reçu une liste, mais avec un avertissement
                               successful_chunk_count += 1
                         else:
                               successful_chunk_count += 1
                    elif not result_data: # Cas du chunk source vide traité correctement
                         successful_chunk_count += 1
                    # else: # Cas où result_data est None (déjà traité ci-dessous)
                    #     pass
                else:
                    failed_chunk_indices.append(index + 1) # Ajouter l'index (base 1) du chunk échoué

            # --- Pause pour la limite de taux ---
            # Si ce n'est pas le dernier lot, attendre avant de lancer le suivant
            if i + PARALLEL_CHUNKS < total_chunks:
                print(f"\n⏸️ Respect de la limite de taux : Attente de {RATE_LIMIT_DELAY} secondes avant le prochain lot...", flush=True)
                time.sleep(RATE_LIMIT_DELAY)

        end_total_time = time.time()

        # --- Assemblage final des résultats dans le bon ordre ---
        all_translated_segments = []
        print("\nAssemblage des résultats...", flush=True)
        for index in range(total_chunks):
            if index in all_results_dict and all_results_dict[index] is not None:
                all_translated_segments.extend(all_results_dict[index])
            # else: # Si le chunk a échoué (index pas dans dict ou valeur None), on ne l'ajoute pas
                # print(f"ℹ️ Chunk {index+1} manquant (échec ou vide).") # Optionnel: loguer les chunks manquants
        print("Assemblage terminé.", flush=True)


        # --- Résumé Final ---
        print("\n--- Résumé de la Traduction ---")
        print(f"Temps total de traduction : {end_total_time - start_total_time:.2f} secondes.")
        print(f"Chunks traités avec succès : {successful_chunk_count}/{total_chunks}")
        if failed_chunk_indices:
            print(f"Chunks ayant échoué ({len(failed_chunk_indices)}): {', '.join(map(str, sorted(failed_chunk_indices)))}")
        # Comparer le nombre de segments attendus et obtenus
        final_segment_count = len(all_translated_segments)
        print(f"Nombre total de segments dans la sortie finale : {final_segment_count}")
        if final_segment_count < len(source_segments):
             print(f"⚠️ {len(source_segments) - final_segment_count} segments sont manquants par rapport à l'original (dus aux chunks échoués ou potentiellement tronqués).")
        elif final_segment_count > len(source_segments):
             print(f"⚠️ Le nombre de segments finaux ({final_segment_count}) est supérieur à l'original ({len(source_segments)}). Vérifiez les doublons potentiels ou erreurs d'assemblage.")
        print("------------------------------\n")


        # Créer le dictionnaire final
        translated_transcript_data = transcript.copy()
        translated_transcript_data["segments"] = all_translated_segments

        # Affichage JSON final (optionnel, peut être très long)
        # try:
        #    print("--- Début de la Transcription JSON Traduite (Aperçu) ---")
        #    # Afficher seulement les premières N segments pour éviter un output trop massif
        #    preview_segments = translated_transcript_data["segments"][:5] # Afficher les 5 premiers
        #    temp_preview = translated_transcript_data.copy()
        #    temp_preview["segments"] = preview_segments
        #    print(json.dumps(temp_preview, indent=2, ensure_ascii=False))
        #    if len(translated_transcript_data["segments"]) > 5:
        #        print(f"\n   ... et {len(translated_transcript_data['segments']) - 5} autres segments.")
        #    print("--- Fin de la Transcription JSON Traduite (Aperçu) ---")
        # except Exception as e:
        #      print(f"❌ Erreur lors de la génération de l'aperçu JSON : {e}")

        # Sauvegarde JSON (si nécessaire)
        # json_output_filename_translated = "transcript_translated_parallel.json"
        # try:
        #     with open(json_output_filename_translated, "w", encoding="utf-8") as f:
        #         json.dump(translated_transcript_data, f, indent=2, ensure_ascii=False)
        #     print(f"✅ Transcription traduite sauvegardée dans : {json_output_filename_translated}")
        # except IOError as e:
        #     print(f"❌ Erreur lors de la sauvegarde du fichier JSON traduit : {e}")


Préparation de la traduction pour 1933 segments...
Découpage en 39 chunks de 50 segments maximum.
Traitement par lots de 14 chunks en parallèle.

--- Traitement du Lot 1/3 (Chunks 1 à 14) ---
🔄 [Chunk 1/39] Début traduction (50 segments)...
🔄 [Chunk 2/39] Début traduction (50 segments)...
🔄 [Chunk 3/39] Début traduction (50 segments)...
🔄 [Chunk 4/39] Début traduction (50 segments)...
🔄 [Chunk 5/39] Début traduction (50 segments)...
🔄 [Chunk 6/39] Début traduction (50 segments)...
🔄 [Chunk 7/39] Début traduction (50 segments)...
🔄 [Chunk 8/39] Début traduction (50 segments)...
🔄 [Chunk 9/39] Début traduction (50 segments)...
🔄 [Chunk 10/39] Début traduction (50 segments)...
🔄 [Chunk 11/39] Début traduction (50 segments)...
🔄 [Chunk 12/39] Début traduction (50 segments)...
🔄 [Chunk 13/39] Début traduction (50 segments)...
🔄 [Chunk 14/39] Début traduction (50 segments)...
✅ [Chunk 14/39] Traduction réussie en 21.54 secondes (Tentative 1).
✅ [Chunk 8/39] Traduction réussie en 21.57 secon

In [None]:
# @title # **Transcription & Translation (Optimized)**

import whisper
import json
import time
import subprocess
import os
import re
import concurrent.futures
import math
import requests
import torch # For GPU check
from typing import List, Dict, Tuple, Optional, Any

# --- Configuration ---
WHISPER_MODEL_SIZE = "base" # @param ["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"]
# Nombre de segments à envoyer à Gemini en une seule requête API
TRANSLATION_CHUNK_SIZE = 50 # @param {type:"integer"}
# Nombre de requêtes de traduction simultanées vers l'API Gemini
MAX_TRANSLATION_WORKERS = 10 # @param {type:"integer"}
# Clé API Gemini - !! NE PAS METTRE EN DUR DANS LE CODE PARTAGÉ / PRODUCTION !!
# Utiliser Colab Secrets: from google.colab import userdata; api_key = userdata.get('GEMINI_API_KEY')
GEMINI_API_KEY = "VOTRE_CLE_API_GEMINI_ICI" #@param {type:"string"} # REMPLACEZ PAR VOTRE VRAIE CLE

# --- Fonction download_youtube_audio_improved (Mostly Unchanged, added typing) ---
def download_youtube_audio_improved(youtube_url: str, output_path: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
    """
    Downloads YouTube audio and reliably extracts metadata using separate yt-dlp calls.

    Args:
        youtube_url: The URL of the YouTube video.
        output_path: The desired path to save the MP3 audio file.

    Returns:
        A tuple (audio_file_path, video_info_dict).
        Returns (None, video_info) if audio download fails but metadata is retrieved.
        Returns (None, None) if metadata retrieval fails.
    """
    video_info = None
    audio_file_path = None
    output_dir = os.path.dirname(output_path)
    if not os.path.exists(output_dir):
        try:
            os.makedirs(output_dir)
            print(f"📁 Création du répertoire de sortie : {output_dir}")
        except OSError as e:
            print(f"❌ Erreur lors de la création du répertoire {output_dir}: {e}")
            return None, None # Cannot proceed without output directory

    # --- Step 1: Get Metadata ---
    print("ℹ️  Récupération des métadonnées de la vidéo...")
    try:
        metadata_command = [
            "yt-dlp", "--dump-json", "--encoding", "utf-8", youtube_url,
        ]
        metadata_result = subprocess.run(metadata_command, check=True, capture_output=True, text=True, encoding='utf-8', errors='replace')
        metadata = json.loads(metadata_result.stdout)

        video_info = {
            "video_id": metadata.get("id", "N/A"),
            "channel_name": metadata.get("uploader", "N/A"),
            "channel_url": metadata.get("uploader_url", "N/A"),
            "title": metadata.get("title", "N/A"),
            "description": metadata.get("description", "N/A"),
        }
        # Fallback for video_id if yt-dlp fails to get it
        if video_info["video_id"] == "N/A":
             video_id_match = re.search(r"v=([a-zA-Z0-9_-]+)", youtube_url)
             video_info["video_id"] = video_id_match.group(1) if video_id_match else "UNKNOWN"
             print(f"⚠️ ID vidéo non trouvé via yt-dlp, fallback regex: {video_info['video_id']}")

        print("✅ Métadonnées récupérées.")

    except subprocess.CalledProcessError as e:
        print(f"❌ Erreur yt-dlp (métadonnées) (Code: {e.returncode}).")
        print(f"   Commande: {' '.join(e.cmd)}")
        # Try to decode stderr even if it has errors
        error_output = e.stderr.strip() if e.stderr else "(no stderr)"
        print(f"   Erreur: {error_output}")
        return None, None
    except json.JSONDecodeError as e:
        print(f"❌ Erreur analyse JSON métadonnées yt-dlp : {e}")
        print(f"--- Sortie brute yt-dlp ---\n{metadata_result.stdout[:500]}...\n---")
        return None, None
    except FileNotFoundError:
        print("❌ yt-dlp non trouvé. Installez avec: pip install yt-dlp")
        return None, None
    except Exception as e:
        print(f"❌ Erreur inattendue (métadonnées) : {e}")
        return None, None

    # --- Step 2: Download Audio ---
    print(f"🔄 Téléchargement/Vérification audio vers {output_path}...")
    try:
        # Check if file exists AND is non-empty before trying download
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
             print(f"ℹ️ Fichier audio '{output_path}' existe déjà et n'est pas vide. Utilisation du fichier existant.")
             audio_file_path = output_path
        else:
            if os.path.exists(output_path):
                 print(f"ℹ️ Fichier audio '{output_path}' existe mais est vide. Tentative de téléchargement.")

            download_command = [
                "yt-dlp", "-x", "--audio-format", "mp3",
                # '--audio-quality', '0', # Optional: Best audio quality
                "-o", output_path,
                "--encoding", "utf-8",
                # "-v", # Uncomment for verbose debug output
                youtube_url,
            ]
            # Capture both stdout and stderr
            download_result = subprocess.run(download_command, check=True, capture_output=True, text=True, encoding='utf-8', errors='replace')

            # Double-check file existence and size after download attempt
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"✅ Fichier audio téléchargé/vérifié : {output_path}")
                audio_file_path = output_path
            else:
                print(f"❌ ERREUR: yt-dlp terminé sans erreur mais fichier '{output_path}' introuvable ou vide après tentative.")
                print(f"--- Sortie yt-dlp (stdout) ---\n{download_result.stdout.strip()}")
                print(f"--- Sortie yt-dlp (stderr) ---\n{download_result.stderr.strip()}")
                return None, video_info # Return metadata even if download failed

    except subprocess.CalledProcessError as e:
        # Log error even if the file exists (e.g., if download failed mid-way but left a file)
        print(f"❌ Erreur yt-dlp (téléchargement) (Code: {e.returncode}).")
        print(f"   Commande: {' '.join(e.cmd)}")
        error_output = e.stderr.strip() if e.stderr else "(no stderr)"
        print(f"   Erreur: {error_output}")
        # Check if file exists despite error (maybe it completed partially?)
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
             print(f"⚠️ Fichier '{output_path}' existe malgré l'erreur yt-dlp. Utilisation prudente.")
             audio_file_path = output_path # Use existing file cautiously
        else:
             return None, video_info # Failed, return metadata only
    except FileNotFoundError:
        print("❌ yt-dlp non trouvé. Installez avec: pip install yt-dlp")
        return None, video_info
    except Exception as e:
        print(f"❌ Erreur inattendue (téléchargement) : {e}")
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            print(f"⚠️ Utilisation du fichier audio existant '{output_path}' malgré l'erreur inattendue.")
            audio_file_path = output_path
        else:
            return None, video_info

    # Final check
    if audio_file_path and (not os.path.exists(audio_file_path) or os.path.getsize(audio_file_path) == 0):
        print(f"❌ ERREUR FINALE: Chemin audio '{audio_file_path}' retourné mais fichier introuvable ou vide.")
        return None, video_info

    return audio_file_path, video_info


# --- Fonction transcribe_audio (Optimized for GPU, clearer logging) ---
def transcribe_audio(file_path: str, model_size: str = "base", video_info: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Transcribes an audio file using Whisper, prioritizing GPU.

    Args:
      file_path: Path to the audio file.
      model_size: Whisper model size.
      video_info: Optional dictionary with video metadata.

    Returns:
      Dictionary containing 'segments' list and potentially video info.
      Returns {'segments': []} on critical error.
    """
    start_load_time = time.time()
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"🔄 Chargement du modèle Whisper '{model_size}' sur '{device}'...")
    try:
        model = whisper.load_model(model_size, device=device)
        load_time = time.time() - start_load_time
        print(f"✅ Modèle chargé en {load_time:.2f} secondes.")
    except Exception as e:
        print(f"❌ Erreur lors du chargement du modèle : {e}")
        return {"segments": []}

    print(f"🎙️ Début de la transcription pour {os.path.basename(file_path)}...")
    start_transcribe_time = time.time()
    try:
        # Use fp16=False if on CPU or if encountering precision issues on some GPUs
        use_fp16 = True if device == "cuda" else False
        result = model.transcribe(file_path, verbose=False, fp16=use_fp16) # verbose=True for debug timestamps
        transcription_time = time.time() - start_transcribe_time
        print(f"🕒 Transcription terminée en {transcription_time:.2f} secondes.")
    except FileNotFoundError:
        print(f"❌ Erreur: Fichier audio introuvable à '{file_path}'")
        return {"segments": []}
    except Exception as e:
        print(f"❌ Erreur lors de la transcription : {e}")
        # Consider specific errors, e.g., out-of-memory on GPU
        if "CUDA out of memory" in str(e):
            print("   Suggestion: Essayez un modèle plus petit ou réduisez la charge sur le GPU.")
        return {"segments": []}

    source_segments = result.get("segments", [])
    if not source_segments:
        print("⚠️ Aucun segment trouvé dans la transcription.")
        output = {"segments": []}
        if video_info:
            output.update({k: v for k, v in video_info.items() if v is not None})
        return output

    # Determine total duration more robustly
    total_audio_duration = source_segments[-1].get("end") if source_segments else 0
    if total_audio_duration <= 0:
         print("⚠️ Impossible de déterminer la durée totale à partir des segments.")
         # Could try 'result.get("duration")' if available, but segment end is often more reliable

    transcript_segments = []
    print(f"Traitement et formatage de {len(source_segments)} segments...")
    for i, segment in enumerate(source_segments):
        seg_start = segment.get("start", 0.0)
        seg_end = segment.get("end", 0.0)
        duration = max(0, seg_end - seg_start)
        text = segment.get("text", "").strip()

        progress_percentage = (seg_end / total_audio_duration) * 100 if total_audio_duration > 0 else 0

        transcript_segments.append({
            "id": i, # Add a simple ID for potential reference
            "text": text,
            "start": round(seg_start, 3),
            "end": round(seg_end, 3), # Adding end time can be useful
            "duration": round(duration, 3),
            "progress_percentage": round(progress_percentage, 2)
        })
        # Log progress less frequently
        # if (i + 1) % 50 == 0 or (i + 1) == len(source_segments):
        #      print(f"  Segment {i+1}/{len(source_segments)} formaté ({progress_percentage:.1f}%).")

    output = {"segments": transcript_segments}
    if video_info:
        # Filter out None values from video_info before updating
        output_video_info = {k: v for k, v in video_info.items() if v is not None}
        output.update(output_video_info)

    print(f"✅ Formatage des segments terminé.")
    return output


# --- Fonction Utilitaires Traduction (Unchanged) ---
def chunk_list(lst: list, n: int) -> list:
    """Divise une liste en sous-listes de taille n."""
    if not isinstance(lst, list):
        raise TypeError("L'entrée doit être une liste.")
    if n <= 0:
        raise ValueError("La taille du chunk doit être positive.")
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def extract_json_from_response(text_response: str) -> str:
    """
    Tente d'extraire une chaîne JSON valide à partir d'une réponse textuelle,
    en gérant les blocs de code markdown potentiels (```json ... ```).
    """
    match = re.search(r'```json\s*([\s\S]*?)\s*```', text_response, re.DOTALL)
    if match:
        # print("ℹ️ Bloc JSON détecté via ```json ... ```.")
        return match.group(1).strip()

    # Tentative plus simple : chercher le premier '[' et le dernier ']'
    start = text_response.find('[')
    end = text_response.rfind(']')
    if start != -1 and end != -1 and end > start:
         # print("ℹ️ JSON détecté via délimiteurs [...] simples.")
         return text_response[start:end+1].strip()

    # Fallback : essayer de trouver un objet JSON
    start = text_response.find('{')
    end = text_response.rfind('}')
    if start != -1 and end != -1 and end > start:
        # print("ℹ️ JSON détecté via délimiteurs {...} simples.")
         return text_response[start:end+1].strip()

    # print("⚠️ Impossible d'isoler un bloc JSON spécifique. Tentative d'analyse du texte brut.")
    return text_response.strip()


# --- Fonction de Traduction de Chunk (pour exécution concurrente) ---
def translate_chunk_with_gemini(
    transcript_chunk: List[Dict[str, Any]],
    api_key: str,
    chunk_index: int,
    total_chunks: int
) -> Optional[List[Dict[str, Any]]]:
    """
    Sends a transcription chunk to the Gemini API for translation. Designed for concurrency.

    Args:
        transcript_chunk: A piece (chunk) of the transcription (list of segments).
        api_key: Gemini API Key.
        chunk_index: Index of the current chunk (for logging).
        total_chunks: Total number of chunks (for logging).

    Returns:
        The chunk modified with translations, or None on failure.
    """
    if not transcript_chunk:
        print(f"💨 [Chunk {chunk_index+1}/{total_chunks}] Vide, ignoré.")
        return [] # Return empty list for consistency

    start_time = time.time()
    log_prefix = f"[Chunk {chunk_index+1:03d}/{total_chunks:03d}]" # Padded index
    print(f"➡️ {log_prefix} Envoi vers Gemini API ({len(transcript_chunk)} segments)...")

    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?key={api_key}"

    try:
        transcript_str = json.dumps(transcript_chunk, ensure_ascii=False, indent=None) # No indent for smaller payload
    except TypeError as e:
         print(f"❌ {log_prefix} Erreur sérialisation JSON du chunk : {e}")
         return None

    # Precise prompt
    prompt = (
        "You are an expert multilingual translator specializing in transcription segments.\n"
        "Input: A JSON array of transcript segments, each with 'text', 'start', 'end', 'duration', 'progress_percentage'.\n"
        "Task: For EACH segment in the input array:\n"
        "1. Identify the original language of the 'text'.\n"
        "2. Translate 'text' into English. Add the result as 'text_english': \"<english_translation>\".\n"
        "3. Translate 'text' into French. Add the result as 'text_french': \"<french_translation>\".\n"
        "4. IMPORTANT: Preserve ALL original keys ('id', 'text', 'start', 'end', 'duration', 'progress_percentage') and their exact values.\n"
        "Output Format: Return ONLY the modified JSON array (a valid JSON list of objects). Do NOT include any introductory text, explanations, or markdown formatting (like ```json).\n\n"
        "Input JSON array:\n"
        f"{transcript_str}" # Directly embed the JSON string
    )

    payload = {
        "contents": [{"parts": [{"text": prompt}]}],
        "generationConfig": {
            "temperature": 0.2, # Lower temp for more deterministic translation
            "maxOutputTokens": 8192, # Generous limit
            "responseMimeType": "application/json", # Request JSON directly if model supports it
        },
         "safetySettings": [ # Standard safety settings
            {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
        ]
    }
    headers = {"Content-Type": "application/json"}
    raw_text_response = "" # Initialize in case of early error

    try:
        # Increased timeout for potentially longer API calls
        response = requests.post(url, headers=headers, json=payload, timeout=240)

        # Check for HTTP errors first
        response.raise_for_status()

        response_data = response.json()

        # --- Gemini API Response Validation ---
        if not response_data.get("candidates"):
            prompt_feedback = response_data.get("promptFeedback", {})
            block_reason = prompt_feedback.get("blockReason")
            safety_ratings = prompt_feedback.get("safetyRatings")
            error_message = "Aucun candidat retourné par l'API."
            if block_reason: error_message += f" Raison blocage: {block_reason}."
            if safety_ratings: error_message += f" Safety Ratings: {safety_ratings}"
            print(f"❌ {log_prefix} Erreur API Gemini: {error_message}")
            # print(f"DEBUG: Réponse complète reçue: {json.dumps(response_data, indent=2)}") # Debug
            return None

        candidate = response_data["candidates"][0]
        finish_reason = candidate.get("finishReason")
        if finish_reason not in ["STOP", "MAX_TOKENS"]: # MAX_TOKENS might be ok if JSON is parseable
             print(f"⚠️ {log_prefix} Fin de génération anormale: {finish_reason}.")
             # If blocked by safety, log it
             if finish_reason == "SAFETY":
                 safety_ratings = candidate.get("safetyRatings")
                 print(f"   Safety Ratings: {safety_ratings}")


        if "content" not in candidate or "parts" not in candidate["content"]:
             print(f"❌ {log_prefix} Structure réponse inattendue (manque content/parts).")
             # print(f"DEBUG: Candidat reçu: {json.dumps(candidate, indent=2)}") # Debug
             return None

        # Since we requested application/json, the content should ideally be parsed JSON already
        # However, Gemini might still wrap it or return text if it fails.
        raw_text_response = candidate["content"]["parts"][0].get("text", "")
        if not raw_text_response:
             print(f"❌ {log_prefix} Réponse de l'API vide.")
             return None

        # --- JSON Parsing ---
        # Attempt to parse the raw text response directly first
        result_json = None
        try:
            result_json = json.loads(raw_text_response)
            # print(f"ℹ️ {log_prefix} Réponse API directement parsée comme JSON.")
        except json.JSONDecodeError:
            # If direct parsing fails, try extracting from potential markdown/text
            # print(f"ℹ️ {log_prefix} Réponse non-JSON direct, tentative d'extraction...")
            json_string = extract_json_from_response(raw_text_response)
            try:
                result_json = json.loads(json_string)
            except json.JSONDecodeError as e_inner:
                print(f"❌ {log_prefix} Échec final de l'analyse JSON : {e_inner}")
                print(f"--- Réponse texte brute reçue (max 500 chars) ---")
                print(raw_text_response[:500] + ("..." if len(raw_text_response) > 500 else ""))
                print("--- Fin Réponse texte brute ---")
                return None

        # --- Validation of Parsed JSON ---
        if not isinstance(result_json, list):
             print(f"❌ {log_prefix} Résultat décodé n'est pas une liste JSON.")
             # print(f"DEBUG: JSON Parsé: {result_json}") # Debug
             return None

        if len(result_json) != len(transcript_chunk):
             print(f"⚠️ {log_prefix} Taille de retour ({len(result_json)}) != taille d'entrée ({len(transcript_chunk)}). Possible troncature ou erreur IA.")
             # Decide how to handle: return None (safer) or return partial result?
             # For now, treat as failure to ensure data integrity.
             return None

        # Optional: Deeper validation (check for expected keys in first element)
        if result_json and isinstance(result_json[0], dict):
            if 'text_english' not in result_json[0] or 'text_french' not in result_json[0]:
                 print(f"⚠️ {log_prefix} Clés 'text_english'/'text_french' manquantes dans le premier segment traduit.")
                 # Treat as failure? Or proceed with missing data? Let's treat as failure.
                 return None
        elif not result_json: # Empty list returned for empty input
             pass # This is fine if the input chunk was empty

        elapsed_time = time.time() - start_time
        print(f"✅ {log_prefix} Traduction réussie ({elapsed_time:.2f}s).")
        return result_json

    except requests.exceptions.Timeout:
        print(f"❌ {log_prefix} Timeout de la requête API ({response.request.url}).")
        return None
    except requests.exceptions.RequestException as e:
        print(f"❌ {log_prefix} Erreur Réseau/HTTP : {e}")
        return None
    except json.JSONDecodeError as e: # Error parsing the initial response from requests
        print(f"❌ {log_prefix} Erreur analyse JSON de la réponse HTTP initiale : {e}")
        try:
            print(f"--- Réponse HTTP brute (max 500 chars) --- \n{response.text[:500]}...\n---")
        except NameError:
             print(" (Impossible d'afficher la réponse HTTP)") # response might not be defined
        return None
    except (KeyError, IndexError) as e:
        print(f"❌ {log_prefix} Erreur accès aux clés de la réponse API : {e}")
        try:
            print(f"--- Réponse JSON brute reçue ---")
            print(json.dumps(response_data, indent=2, ensure_ascii=False))
            print("--- Fin Réponse JSON brute ---")
        except NameError:
             print("(Impossible d'afficher les données JSON de la réponse)")
        return None
    except Exception as e:
        print(f"❌ {log_prefix} Erreur inattendue : {e.__class__.__name__}: {e}")
        return None

# --- Fonction d'Orchestration de la Traduction Concurrent ---
def translate_transcript_concurrently(
    source_segments: List[Dict[str, Any]],
    api_key: str,
    chunk_size: int,
    max_workers: int
) -> Tuple[List[Dict[str, Any]], int, int]:
    """
    Translates transcription segments concurrently using ThreadPoolExecutor.

    Args:
        source_segments: List of segments from Whisper.
        api_key: Gemini API Key.
        chunk_size: Number of segments per API call.
        max_workers: Maximum number of concurrent API calls.

    Returns:
        Tuple: (list_of_translated_segments, successful_chunk_count, failed_chunk_count)
    """
    if not source_segments:
        return [], 0, 0

    print(f"\n🚀 Lancement de la traduction concurrente ({max_workers} workers max)...")
    all_translated_segments = []
    failed_chunks_count = 0
    successful_chunks_count = 0

    # Create chunks with indices for proper ordering later
    chunks = list(chunk_list(source_segments, chunk_size))
    total_chunks = len(chunks)
    # Store future results mapped by original chunk index
    future_to_chunk_index = {}
    results = [None] * total_chunks # Pre-allocate results list

    start_total_time = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all jobs
        for i, chunk in enumerate(chunks):
            future = executor.submit(
                translate_chunk_with_gemini,
                chunk,
                api_key,
                i, # Pass chunk index
                total_chunks
            )
            future_to_chunk_index[future] = i

        # Process completed jobs
        for future in concurrent.futures.as_completed(future_to_chunk_index):
            chunk_index = future_to_chunk_index[future]
            try:
                translated_chunk = future.result()
                if translated_chunk is not None:
                    results[chunk_index] = translated_chunk # Store result in correct position
                    successful_chunks_count += 1
                else:
                    # Failure already logged in translate_chunk_with_gemini
                    failed_chunks_count += 1
                    # results[chunk_index] remains None
            except Exception as exc:
                # Catch exceptions raised *during* future.result() if not caught inside the worker
                print(f"❌ [Chunk {chunk_index+1:03d}/{total_chunks:03d}] Erreur lors de la récupération du résultat: {exc}")
                failed_chunks_count += 1
                # results[chunk_index] remains None

    end_total_time = time.time()

    # Combine results in the correct order, skipping None entries (failed chunks)
    for chunk_result in results:
        if chunk_result is not None:
            all_translated_segments.extend(chunk_result)

    print("\n--- Résumé Traduction Concurrente ---")
    print(f"Temps total de traduction : {end_total_time - start_total_time:.2f} secondes.")
    print(f"Chunks traités : {successful_chunks_count}/{total_chunks} succès, {failed_chunks_count}/{total_chunks} échecs.")
    print(f"Segments sources : {len(source_segments)}")
    print(f"Segments traduits (récupérés) : {len(all_translated_segments)}")
    print("-------------------------------------\n")

    return all_translated_segments, successful_chunks_count, failed_chunks_count

# --- Script Principal d'Exécution ---
def main():
    start_pipeline_time = time.time()

    # --- Configuration & Validation ---
    youtube_url = "https://www.youtube.com/watch?v=7q88I_hs3Uw"#@param {"type":"string"}
    output_dir = "/content/audio_output"
    output_filename = "youtube_audio.mp3"
    output_path = os.path.join(output_dir, output_filename)
    json_transcript_filename = os.path.splitext(output_path)[0] + "_transcript.json"
    json_translated_filename = os.path.splitext(output_path)[0] + "_transcript_translated.json"

    if not GEMINI_API_KEY or GEMINI_API_KEY == "VOTRE_CLE_API_GEMINI_ICI":
         print("🛑 ERREUR: Clé API Gemini manquante ou invalide.")
         print("   Veuillez la définir dans la variable GEMINI_API_KEY en haut du script ou via Colab Secrets.")
         return # Stop execution

    if TRANSLATION_CHUNK_SIZE <= 0:
        print(f"⚠️ Taille de chunk de traduction invalide ({TRANSLATION_CHUNK_SIZE}), utilisation de 50.")
        chunk_size = 50
    else:
        chunk_size = TRANSLATION_CHUNK_SIZE

    if MAX_TRANSLATION_WORKERS <= 0:
        print(f"⚠️ Nombre de workers de traduction invalide ({MAX_TRANSLATION_WORKERS}), utilisation de 5.")
        max_workers = 5
    else:
        max_workers = MAX_TRANSLATION_WORKERS

    # --- Étape 1: Téléchargement ---
    print("-" * 30)
    print("ÉTAPE 1: TÉLÉCHARGEMENT AUDIO & METADONNÉES")
    print("-" * 30)
    audio_file_path, video_info = download_youtube_audio_improved(youtube_url, output_path)

    if not audio_file_path:
        print("\n❌ Échec critique: Impossible de télécharger ou trouver le fichier audio.")
        if video_info:
            print("   Métadonnées récupérées:", json.dumps(video_info, indent=2, ensure_ascii=False))
        else:
            print("   Échec également de la récupération des métadonnées.")
        print("Pipeline arrêté.")
        return

    print("\n--- Informations Vidéo Récupérées ---")
    print(json.dumps(video_info, indent=2, ensure_ascii=False))
    print("-----------------------------------\n")

    # --- Étape 2: Transcription ---
    print("-" * 30)
    print(f"ÉTAPE 2: TRANSCRIPTION AUDIO (Modèle: {WHISPER_MODEL_SIZE})")
    print("-" * 30)
    transcript_data = transcribe_audio(audio_file_path, model_size=WHISPER_MODEL_SIZE, video_info=video_info)

    if not transcript_data or not transcript_data.get("segments"):
        print("\n❌ Échec critique: La transcription n'a retourné aucun segment.")
        print("Pipeline arrêté.")
        # Optional: Save metadata even if transcription failed?
        # final_data = {"segments": []}
        # if video_info: final_data.update(video_info)
        # ... save final_data ...
        return

    # Sauvegarder la transcription originale (non traduite)
    try:
        with open(json_transcript_filename, "w", encoding="utf-8") as f:
            json.dump(transcript_data, f, indent=2, ensure_ascii=False)
        print(f"💾 Transcription originale sauvegardée dans : {json_transcript_filename}")
    except IOError as e:
        print(f"⚠️ Erreur lors de la sauvegarde de la transcription originale : {e}")
    except Exception as e:
         print(f"⚠️ Erreur inattendue lors de la sauvegarde JSON (original) : {e}")


    source_segments = transcript_data.get("segments", []) # Should exist based on check above

    # --- Étape 3: Traduction Concurrente ---
    print("-" * 30)
    print("ÉTAPE 3: TRADUCTION DES SEGMENTS")
    print("-" * 30)
    translated_segments, successful_chunks, failed_chunks = translate_transcript_concurrently(
        source_segments,
        GEMINI_API_KEY,
        chunk_size=chunk_size,
        max_workers=max_workers
    )

    # --- Assemblage Final et Sauvegarde ---
    final_translated_data = transcript_data.copy() # Conserve les métadonnées
    final_translated_data["segments"] = translated_segments # Remplace par les segments traduits

    if failed_chunks > 0:
        print(f"⚠️ {failed_chunks} chunks de traduction ont échoué. Le résultat final peut être incomplet.")
    elif not translated_segments and source_segments:
         print(f"⚠️ La traduction n'a retourné aucun segment alors que la source en contenait. Vérifiez les logs API.")
    else:
        print("✅ Traduction terminée.")


    # Sauvegarder le résultat final traduit
    try:
        with open(json_translated_filename, "w", encoding="utf-8") as f:
            json.dump(final_translated_data, f, indent=2, ensure_ascii=False)
        print(f"💾 Transcription traduite sauvegardée dans : {json_translated_filename}")
    except IOError as e:
        print(f"❌ Erreur lors de la sauvegarde du fichier JSON traduit : {e}")
    except TypeError as e:
         print(f"❌ Erreur de Type lors de la sérialisation finale en JSON : {e}")
         print("   Cela peut arriver si des données non sérialisables sont dans les résultats.")
    except Exception as e:
         print(f"❌ Erreur inattendue lors de la sauvegarde JSON (traduit) : {e}")


    # --- Nettoyage Optionnel ---
    # Décommentez pour supprimer le fichier audio après traitement
    # print("\n--- Nettoyage ---")
    # if os.path.exists(audio_file_path):
    #     try:
    #         os.remove(audio_file_path)
    #         print(f"🗑️ Fichier audio supprimé : {audio_file_path}")
    #     except OSError as e:
    #         print(f"❌ Erreur suppression fichier audio {audio_file_path}: {e}")
    # else:
    #     print(f"ℹ️ Fichier audio {audio_file_path} non trouvé pour suppression.")

    end_pipeline_time = time.time()
    print("\n" + "="*40)
    print(f"🏁 Pipeline Terminé en {end_pipeline_time - start_pipeline_time:.2f} secondes.")
    print("="*40)


if __name__ == "__main__":
    # Ensure prerequisite libraries are installed (useful in environments like Colab)
    try:
        import yt_dlp
    except ImportError:
        print("Tentative d'installation de yt-dlp...")
        subprocess.run(['pip', 'install', '-q', 'yt-dlp'])
        print("yt-dlp installé.")

    try:
        import whisper
    except ImportError:
        print("Tentative d'installation de openai-whisper...")
        # Note: Whisper installation can be heavy. Consider git+https for latest version if needed.
        subprocess.run(['pip', 'install', '-q', 'openai-whisper'])
        print("openai-whisper installé.")
        # May need ffmpeg too
        try:
             subprocess.run(['ffmpeg', '-version'], check=True, capture_output=True)
        except (FileNotFoundError, subprocess.CalledProcessError):
             print("ffmpeg non trouvé ou ne fonctionne pas. Installation nécessaire (apt-get install ffmpeg sous Debian/Ubuntu).")


    try:
        import requests
    except ImportError:
         print("Tentative d'installation de requests...")
         subprocess.run(['pip', 'install', '-q', 'requests'])
         print("requests installé.")

    try:
        import torch
    except ImportError:
         print("Tentative d'installation de torch...")
         # Installation can vary depending on CUDA version. Provide a common one.
         subprocess.run(['pip', 'install', '-q', 'torch'])
         print("torch installé.")


    # Run the main processing pipeline
    main()


Préparation de la traduction pour 701 segments...
Découpage en 15 chunks de 50 segments maximum.
🔄 [Chunk 1/15] Envoi à l'API Gemini (50 segments)...
ℹ️ Bloc JSON détecté via ```json ... ```.
✅ [Chunk 1/15] Traduction réussie en 27.64 secondes.
--------------------
🔄 [Chunk 2/15] Envoi à l'API Gemini (50 segments)...
ℹ️ Bloc JSON détecté via ```json ... ```.
✅ [Chunk 2/15] Traduction réussie en 28.44 secondes.
--------------------
🔄 [Chunk 3/15] Envoi à l'API Gemini (50 segments)...
ℹ️ Bloc JSON détecté via ```json ... ```.
✅ [Chunk 3/15] Traduction réussie en 28.99 secondes.
--------------------
🔄 [Chunk 4/15] Envoi à l'API Gemini (50 segments)...
ℹ️ Bloc JSON détecté via ```json ... ```.
✅ [Chunk 4/15] Traduction réussie en 28.63 secondes.
--------------------
🔄 [Chunk 5/15] Envoi à l'API Gemini (50 segments)...
ℹ️ Bloc JSON détecté via ```json ... ```.
✅ [Chunk 5/15] Traduction réussie en 28.15 secondes.
--------------------
🔄 [Chunk 6/15] Envoi à l'API Gemini (50 segments)...
ℹ️ B

In [None]:
# @title # **Upload**

url = "https://qingplay.pythonanywhere.com/upload_transcripts"  #@param {"type":"string"}

# Prepare the data to be sent as JSON
payload = {
    "video_id": translated_transcript_data["video_id"],
    "description": translated_transcript_data["description"],
    "channel_name": translated_transcript_data["channel_name"],
    "channel_url": translated_transcript_data["channel_url"],
    "title": translated_transcript_data["title"],
    "segments": translated_transcript_data["segments"]
}

# Send the POST request
try:
    response = requests.post(url, json=payload)  # Use json= to send JSON data
    response.raise_for_status()  # Raise HTTPError for bad responses (4xx or 5xx)

    print("Request successful!")
    print("Status code:", response.status_code)
    print("Response body:", response.json())
    print("https://qingplay.pythonanywhere.com/vid/"+translated_transcript_data["video_id"])

except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
    if response is not None:  # Print the error returned by the server.
        print(f"Response text: {response.text}")
except json.JSONDecodeError as e:
    print(f"Failed to decode JSON: {e}")

Request successful!
Status code: 201
Response body: {'id': 15, 'message': 'Transcript created successfully'}
https://qingplay.pythonanywhere.com/vid/GRLdsdBDjE4


In [None]:
from flask import Flask
from flask_cors import CORS

app = Flask(__name__)
CORS(app)


@app.route("/")
def hello_world():
  return "<p>Hello, World!</p>"



In [None]:
!rm -r audio_output

In [None]:
# @title # **Transcription & Translation (Optimized with Streaming & Rate Limiting) - HARDCODED API KEY VERSION**

# --- Core Python Libraries ---
import json
import time
import subprocess
import os
import re
import math
import asyncio # For async operations
import concurrent.futures # Keep for potential future use, but not core here

# --- Third-party Libraries ---
import requests # For synchronous checks if needed later
import torch # For GPU check
import aiohttp # For async HTTP requests to Gemini
from aiolimiter import AsyncLimiter # For precise rate limiting
import nest_asyncio # For Colab/Jupyter

# --- Typing ---
from typing import List, Dict, Tuple, Optional, Any, AsyncGenerator, Set

# --- Apply nest_asyncio ---
nest_asyncio.apply()

# --- Whisper Implementation ---
# Separate the check/install logic from the actual import used later

try:
    # Just check if it *can* be imported for the initial message
    import faster_whisper
    print("✅ faster-whisper est déjà installé.")
    # We will import specific names later, outside the try/except
except ImportError:
    print("⚠️ Importation de faster-whisper échouée. Tentative d'installation...")
    print("‼️ IMPORTANT: Si cela échoue, exécutez cette commande dans une cellule Colab séparée AVANT ce script:")
    print("   !pip install -q faster-whisper ctranslate2>=3.10.0,<4.0.0")
    try:
        # Attempt installation
        subprocess.run(['pip', 'install', '-q', 'faster-whisper', 'ctranslate2>=3.10.0,<4.0.0'], check=True)
        import faster_whisper # Verify again after install
        print("✅ faster-whisper installé avec succès après tentative.")
    except (ImportError, subprocess.CalledProcessError) as install_err:
        print(f"❌ Échec critique de l'installation/importation de faster-whisper : {install_err}")
        print("   Assurez-vous d'avoir exécuté la commande pip manuellement dans une autre cellule.")
        print("   Vérifiez la compatibilité CTranslate2 (CPU/GPU, CUDA).")
        exit(1) # Cannot proceed

# --- NOW, import the necessary components ---
# This assumes the above block succeeded or exited.
try:
    from faster_whisper import WhisperModel, AutoModel
    print("✅ Noms requis (WhisperModel, AutoModel) importés depuis faster-whisper.")
except ImportError as e:
     print(f"❌ Erreur finale lors de l'importation de WhisperModel/AutoModel: {e}")
     print("   Même si l'installation a semblé réussir, l'importation spécifique a échoué.")
     print("   Essayez de redémarrer l'environnement d'exécution et réinstallez.")
     exit(1)


# --- Configuration ---
WHISPER_MODEL_SIZE = "medium" # @param ["tiny", "base", "small", "medium", "large", "large-v2", "large-v3"]
WHISPER_COMPUTE_TYPE = "float16" # @param ["default", "float16", "int8", "int8_float16"]

TRANSLATION_CHUNK_SIZE = 30 # @param {type:"integer"} # Segments per API call
MAX_CONCURRENT_TRANSLATIONS = 5 # @param {type:"integer"} # Max simultaneous API calls
GEMINI_RATE_LIMIT_PER_MINUTE = 15 # @param {type:"integer"} # Based on Gemini Free Tier (adjust if needed)

# --- Gemini API Key (HARDCODED) ---
GEMINI_API_KEY = "AIzaSyBiZONd6VA8y9zAd8vueZRo_IrPnn7iHlw" # <--- REMPLACEZ CECI

if GEMINI_API_KEY == "AIzaSyBiZONd6VA8y9zAd8vueZRo_IrPnn7iHlw":
    print("🛑 ALERTE: Vous n'avez pas remplacé 'VOTRE_CLE_API_GEMINI_ICI' par votre clé API réelle.")
    # exit(1) # Optional: Stop execution


# --- Global Variables / State ---
gemini_rate_limiter = AsyncLimiter(GEMINI_RATE_LIMIT_PER_MINUTE, 60)
translation_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TRANSLATIONS)


# --- Fonction download_youtube_audio_improved ---
# ... (keep function as is) ...
def download_youtube_audio_improved(youtube_url: str, output_path: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
    """
    Downloads YouTube audio and reliably extracts metadata using separate yt-dlp calls.

    Args:
        youtube_url: The URL of the YouTube video.
        output_path: The desired path to save the MP3 audio file.

    Returns:
        A tuple (audio_file_path, video_info_dict).
        Returns (None, video_info) if audio download fails but metadata is retrieved.
        Returns (None, None) if metadata retrieval fails.
    """
    video_info = None
    audio_file_path = None
    output_dir = os.path.dirname(output_path)
    if not os.path.exists(output_dir):
        try:
            os.makedirs(output_dir)
            print(f"📁 Création du répertoire de sortie : {output_dir}")
        except OSError as e:
            print(f"❌ Erreur lors de la création du répertoire {output_dir}: {e}")
            return None, None # Cannot proceed without output directory

    # --- Step 1: Get Metadata ---
    print("ℹ️  Récupération des métadonnées de la vidéo...")
    try:
        metadata_command = [
            "yt-dlp", "--dump-json", "--encoding", "utf-8", youtube_url,
        ]
        metadata_result = subprocess.run(metadata_command, check=True, capture_output=True, text=True, encoding='utf-8', errors='replace', timeout=120)
        metadata = json.loads(metadata_result.stdout)

        video_info = {
            "video_id": metadata.get("id", "N/A"),
            "channel_name": metadata.get("uploader", "N/A"),
            "channel_url": metadata.get("uploader_url", "N/A"),
            "title": metadata.get("title", "N/A"),
            "description": metadata.get("description", "N/A"),
            "duration": metadata.get("duration"), # Keep duration if available
            "upload_date": metadata.get("upload_date"), # Keep upload date
            "thumbnail": metadata.get("thumbnail"), # Keep thumbnail URL
        }
        if video_info["video_id"] == "N/A":
             video_id_match = re.search(r"v=([a-zA-Z0-9_-]+)", youtube_url)
             video_info["video_id"] = video_id_match.group(1) if video_id_match else "UNKNOWN"
             print(f"⚠️ ID vidéo non trouvé via yt-dlp, fallback regex: {video_info['video_id']}")

        print("✅ Métadonnées récupérées.")

    except subprocess.TimeoutExpired:
        print(f"❌ Timeout lors de la récupération des métadonnées yt-dlp.")
        return None, None
    except subprocess.CalledProcessError as e:
        print(f"❌ Erreur yt-dlp (métadonnées) (Code: {e.returncode}).")
        print(f"   Commande: {' '.join(e.cmd)}")
        error_output = e.stderr.strip() if e.stderr else "(no stderr)"
        print(f"   Erreur: {error_output}")
        return None, None
    except json.JSONDecodeError as e:
        print(f"❌ Erreur analyse JSON métadonnées yt-dlp : {e}")
        print(f"--- Sortie brute yt-dlp ---\n{metadata_result.stdout[:500]}...\n---")
        return None, None
    except FileNotFoundError:
        print("❌ yt-dlp non trouvé. Installez avec: pip install yt-dlp")
        return None, None
    except Exception as e:
        print(f"❌ Erreur inattendue (métadonnées) : {e}")
        return None, None

    # --- Step 2: Download Audio ---
    print(f"🔄 Téléchargement/Vérification audio vers {output_path}...")
    try:
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
             print(f"ℹ️ Fichier audio '{output_path}' existe déjà et n'est pas vide. Utilisation du fichier existant.")
             audio_file_path = output_path
        else:
            if os.path.exists(output_path):
                 print(f"ℹ️ Fichier audio '{output_path}' existe mais est vide. Tentative de téléchargement.")

            download_command = [
                "yt-dlp", "-x", "--audio-format", "mp3",
                "-o", output_path,
                "--encoding", "utf-8",
                youtube_url,
            ]
            download_result = subprocess.run(download_command, check=True, capture_output=True, text=True, encoding='utf-8', errors='replace', timeout=600) # 10 min timeout

            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"✅ Fichier audio téléchargé/vérifié : {output_path}")
                audio_file_path = output_path
            else:
                print(f"❌ ERREUR: yt-dlp terminé sans erreur mais fichier '{output_path}' introuvable ou vide après tentative.")
                print(f"--- Sortie yt-dlp (stdout) ---\n{download_result.stdout.strip()}")
                print(f"--- Sortie yt-dlp (stderr) ---\n{download_result.stderr.strip()}")
                return None, video_info

    except subprocess.TimeoutExpired:
         print(f"❌ Timeout lors du téléchargement audio yt-dlp.")
         if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
             print(f"⚠️ Fichier partiel '{output_path}' existe malgré le timeout. Suppression...")
             try: os.remove(output_path)
             except OSError: pass
         return None, video_info
    except subprocess.CalledProcessError as e:
        print(f"❌ Erreur yt-dlp (téléchargement) (Code: {e.returncode}).")
        print(f"   Commande: {' '.join(e.cmd)}")
        error_output = e.stderr.strip() if e.stderr else "(no stderr)"
        print(f"   Erreur: {error_output}")
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
             print(f"⚠️ Fichier '{output_path}' existe malgré l'erreur yt-dlp. Utilisation prudente.")
             audio_file_path = output_path
        else:
             return None, video_info
    except FileNotFoundError:
        print("❌ yt-dlp non trouvé. Installez avec: pip install yt-dlp")
        return None, video_info
    except Exception as e:
        print(f"❌ Erreur inattendue (téléchargement) : {e}")
        if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
            print(f"⚠️ Utilisation du fichier audio existant '{output_path}' malgré l'erreur inattendue.")
            audio_file_path = output_path
        else:
            return None, video_info

    if audio_file_path and (not os.path.exists(audio_file_path) or os.path.getsize(audio_file_path) == 0):
        print(f"❌ ERREUR FINALE: Chemin audio '{audio_file_path}' retourné mais fichier introuvable ou vide.")
        return None, video_info

    return audio_file_path, video_info

# --- Fonction Utilitaires ---
# ... (keep function as is) ...
def extract_json_from_response(text_response: str) -> str:
    """
    Tente d'extraire une chaîne JSON valide à partir d'une réponse textuelle,
    en gérant les blocs de code markdown potentiels (```json ... ```).
    """
    match = re.search(r'```json\s*([\s\S]*?)\s*```', text_response, re.DOTALL)
    if match:
        return match.group(1).strip()
    start = text_response.find('[')
    end = text_response.rfind(']')
    if start != -1 and end != -1 and end > start:
         return text_response[start:end+1].strip()
    start = text_response.find('{')
    end = text_response.rfind('}')
    if start != -1 and end != -1 and end > start:
         return text_response[start:end+1].strip()
    return text_response.strip()


# --- Fonction de Traduction Async ---
# ... (keep function as is) ...
async def translate_chunk_gemini_async(
    session: aiohttp.ClientSession,
    transcript_chunk: List[Dict[str, Any]],
    api_key: str, # La clé est passée ici
    chunk_index: int, # For logging
    total_segments_in_chunk: int # For logging
) -> Optional[List[Dict[str, Any]]]:
    """
    Sends a transcription chunk to the Gemini API for translation asynchronously.
    Designed to be called by concurrent workers. Returns translated chunk or None on failure.
    """
    if not transcript_chunk:
        # print(f"💨 [Chunk {chunk_index+1:03d}] Vide, ignoré.") # Too verbose
        return []

    start_time = time.time()
    log_prefix = f"[Chunk {chunk_index+1:03d}]" # Padded index

    # Vérification de la clé API avant de faire l'appel
    if not api_key or api_key == "VOTRE_CLE_API_GEMINI_ICI":
        print(f"❌ {log_prefix} Clé API Gemini manquante ou invalide. Impossible de traduire.")
        return None # Échec du chunk

    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?key={api_key}"

    try:
        transcript_str = json.dumps(transcript_chunk, ensure_ascii=False, indent=None)
    except TypeError as e:
         print(f"❌ {log_prefix} Erreur sérialisation JSON du chunk : {e}")
         return None

    prompt = (
        "You are an expert multilingual translator specializing in transcription segments.\n"
        "Input: A JSON array of transcript segments, each with 'id', 'text', 'start', 'end', 'duration', 'progress_percentage'.\n"
        "Task: For EACH segment in the input array:\n"
        "1. Identify the original language of the 'text'.\n"
        "2. Translate 'text' into English. Add the result as 'text_english': \"<english_translation>\".\n"
        "3. Translate 'text' into French. Add the result as 'text_french': \"<french_translation>\".\n"
        "4. IMPORTANT: Preserve ALL original keys ('id', 'text', 'start', 'end', 'duration', 'progress_percentage') and their exact values.\n"
        "Output Format: Return ONLY the modified JSON array (a valid JSON list of objects). Do NOT include any introductory text, explanations, or markdown formatting (like ```json).\n\n"
        "Input JSON array:\n"
        f"{transcript_str}"
    )

    payload = {
        "contents": [{"parts": [{"text": prompt}]}],
        "generationConfig": {
            "temperature": 0.2,
            "maxOutputTokens": 8192,
            "responseMimeType": "application/json", # Explicitly request JSON
        },
         "safetySettings": [ # Standard safety settings
            {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
            {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}
        ]
    }
    headers = {"Content-Type": "application/json"}
    response_text = ""
    response_json = None

    try:
        async with gemini_rate_limiter: # Wait if rate limit exceeded
             async with translation_semaphore: # Wait if max concurrent requests reached
                # print(f"🔒 {log_prefix} Rate limit/semaphore acquis. Envoi...") # Verbose
                req_start_time = time.time()
                async with session.post(url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=240)) as response:
                    req_duration = time.time() - req_start_time
                    # print(f"☁️ {log_prefix} Réponse reçue ({response.status}) en {req_duration:.2f}s.") # Verbose

                    response_text = await response.text()
                    response.raise_for_status() # Raise HTTP errors (4xx, 5xx)
                    response_json = await response.json(content_type=None) # Allow flexible content type

        # --- Process Response ---
        if not response_json:
             print(f"❌ {log_prefix} Réponse JSON vide ou invalide reçue.")
             print(f"   Réponse texte brute: {response_text[:200]}...")
             return None

        if not response_json.get("candidates"):
            prompt_feedback = response_json.get("promptFeedback", {})
            block_reason = prompt_feedback.get("blockReason")
            safety_ratings = prompt_feedback.get("safetyRatings")
            error_message = "Aucun candidat retourné par l'API."
            if block_reason: error_message += f" Raison blocage: {block_reason}."
            if safety_ratings: error_message += f" Safety Ratings: {safety_ratings}"
            print(f"❌ {log_prefix} Erreur API Gemini: {error_message}")
            # print(f"DEBUG: Réponse complète: {json.dumps(response_json, indent=2)}") # Debug only
            return None

        candidate = response_json["candidates"][0]
        finish_reason = candidate.get("finishReason")
        if finish_reason not in ["STOP", "MAX_TOKENS"]:
             print(f"⚠️ {log_prefix} Fin de génération anormale: {finish_reason}.")
             if finish_reason == "SAFETY":
                 safety_ratings = candidate.get("safetyRatings")
                 print(f"   Safety Ratings: {safety_ratings}")

        if "content" not in candidate or "parts" not in candidate["content"]:
             print(f"❌ {log_prefix} Structure réponse inattendue (manque content/parts).")
             # print(f"DEBUG: Candidat reçu: {json.dumps(candidate, indent=2)}") # Debug only
             return None

        raw_llm_output_text = candidate["content"]["parts"][0].get("text", "")
        if not raw_llm_output_text:
             print(f"❌ {log_prefix} Contenu de la réponse API vide.")
             return None

        # --- JSON Parsing from LLM output ---
        parsed_result_json = None
        try:
            # Try direct parsing first (due to responseMimeType)
            parsed_result_json = json.loads(raw_llm_output_text)
        except json.JSONDecodeError:
            # print(f"ℹ️ {log_prefix} Réponse non-JSON direct, tentative d'extraction...") # Verbose
            json_string = extract_json_from_response(raw_llm_output_text)
            try:
                parsed_result_json = json.loads(json_string)
            except json.JSONDecodeError as e_inner:
                print(f"❌ {log_prefix} Échec final de l'analyse JSON du contenu LLM : {e_inner}")
                print(f"--- Contenu texte brut LLM reçu (max 500 chars) ---\n{raw_llm_output_text[:500]}...\n---")
                return None

        # --- Validation of Parsed JSON ---
        if not isinstance(parsed_result_json, list):
             print(f"❌ {log_prefix} Résultat décodé du LLM n'est pas une liste JSON.")
             # print(f"DEBUG: JSON Parsé: {parsed_result_json}") # Debug only
             return None

        if len(parsed_result_json) != total_segments_in_chunk:
             print(f"⚠️ {log_prefix} Taille de retour LLM ({len(parsed_result_json)}) != taille d'entrée ({total_segments_in_chunk}). Possible troncature ou erreur IA. Chunk échoué.")
             return None # Treat as failure

        # Deeper validation (optional but good)
        if parsed_result_json: # If list is not empty
            first_segment = parsed_result_json[0]
            if not isinstance(first_segment, dict) or \
               'text_english' not in first_segment or \
               'text_french' not in first_segment or \
               'id' not in first_segment: # Check essential keys
                 print(f"⚠️ {log_prefix} Clés essentielles manquantes ('id', 'text_english', 'text_french') dans le premier segment traduit. Chunk échoué.")
                 return None

        elapsed_time = time.time() - start_time
        print(f"✅ {log_prefix} Traduction réussie ({total_segments_in_chunk} segments, {elapsed_time:.2f}s total).")
        return parsed_result_json

    except aiohttp.ClientResponseError as e:
        print(f"❌ {log_prefix} Erreur HTTP {e.status} de l'API Gemini : {e.message}")
        print(f"   Headers: {e.headers}")
        print(f"   Réponse Texte (max 500): {response_text[:500]}...")
        # Si l'erreur est 400 Bad Request, et contient "API key not valid"
        if e.status == 400 and "api key not valid" in response_text.lower():
             print(f"   ‼️ ERREUR SPÉCIFIQUE: La clé API '{api_key[:4]}...{api_key[-4:]}' semble invalide. Vérifiez la clé dans le code.")
        elif e.status == 429:
             print(f"   ‼️ ERREUR SPÉCIFIQUE: Rate limit (Quota) dépassé. Réduisez GEMINI_RATE_LIMIT_PER_MINUTE ou MAX_CONCURRENT_TRANSLATIONS.")

        return None
    except asyncio.TimeoutError:
        print(f"❌ {log_prefix} Timeout de la requête API Gemini ({url}).")
        return None
    except aiohttp.ClientError as e:
        print(f"❌ {log_prefix} Erreur Client aiohttp : {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"❌ {log_prefix} Erreur analyse JSON de la réponse principale: {e}")
        if response_text: print(f"--- Réponse texte brute (max 500 chars) ---\n{response_text[:500]}...\n---")
        return None
    except (KeyError, IndexError) as e:
        print(f"❌ {log_prefix} Erreur accès aux clés/index de la réponse API : {e}")
        if response_json: print(f"--- Réponse JSON brute reçue ---\n{json.dumps(response_json, indent=2, ensure_ascii=False)}\n---")
        return None
    except Exception as e:
        print(f"❌ {log_prefix} Erreur inattendue dans translate_chunk : {e.__class__.__name__}: {e}")
        return None


# --- Fonction d'Orchestration Async ---
# ... (keep function as is) ...
async def process_audio_pipeline_async(
    audio_file_path: str,
    whisper_model_size: str,
    whisper_compute_type: str,
    api_key: str, # La clé est passée ici
    chunk_size: int,
    num_workers_to_start: int, # Explicitly pass worker count
    video_info: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    Asynchronous pipeline to transcribe and translate audio.
    """
    start_pipeline_time = time.time()
    print("\n🚀 Démarrage du pipeline asynchrone...")

    # --- Vérification clé API au début du pipeline ---
    if not api_key or api_key == "VOTRE_CLE_API_GEMINI_ICI":
        print("❌ ERREUR PIPELINE: Clé API Gemini non valide ou non définie dans le code.")
        return {"segments": [], **(video_info or {})} # Return empty data

    # --- Load Whisper Model ---
    start_load_time = time.time()
    device = "cuda" if torch.cuda.is_available() else "cpu"
    if whisper_compute_type == "default":
        compute_type = "float16" if device == "cuda" else "int8"
    else:
        compute_type = whisper_compute_type

    print(f"🔄 Chargement du modèle faster-whisper '{whisper_model_size}' (compute_type={compute_type}) sur '{device}'...")
    try:
        # >>> AutoModel SHOULD BE DEFINED HERE <<<
        # Check if it exists just before using it for debugging
        # if 'AutoModel' not in globals() and 'AutoModel' not in locals():
        #      print("DEBUG: AutoModel is STILL not defined right before usage!")
        #      # You might want to raise an error here or handle it
        # else:
        #      print("DEBUG: AutoModel appears to be defined now.")

        # This is the line that caused the original error
        model = AutoModel.from_pretrained(whisper_model_size, device=device, compute_type=compute_type)
        load_time = time.time() - start_load_time
        print(f"✅ Modèle chargé en {load_time:.2f} secondes.")
    except NameError as ne:
         # Catch the specific error if it still happens
         print(f"❌ ERREUR CRITIQUE (NameError): {ne}. 'AutoModel' n'est toujours pas défini.")
         print("   Cela indique un problème persistant avec l'importation de faster_whisper.")
         print("   Vérifiez l'installation manuelle et l'environnement.")
         return {"segments": [], **(video_info or {})}
    except Exception as e:
        print(f"❌ Erreur critique lors du chargement du modèle faster-whisper: {e}")
        if "ctranslate2" in str(e).lower():
            print("   Suggestion: Assurez-vous que CTranslate2 est installé et compatible.")
        # Print more details for other errors
        import traceback
        traceback.print_exc()
        return {"segments": [], **(video_info or {})} # Return empty data

    # --- Setup Async Components ---
    translation_tasks: Set[asyncio.Task] = set()
    all_translated_segments_list: List[Dict[str, Any]] = []
    chunk_queue = asyncio.Queue(maxsize=num_workers_to_start * 2) # Bounded queue
    processed_chunk_count = 0
    failed_chunk_count = 0
    total_segments_yielded = 0
    current_chunk_index = 0

    # --- Create aiohttp Session ---
    async with aiohttp.ClientSession() as session:

        # --- Define the Translation Worker ---
        async def translation_worker(worker_id: int):
            nonlocal processed_chunk_count, failed_chunk_count
            print(f"👷 [Worker-{worker_id}] Démarré.")
            while True:
                try:
                    chunk_data = await chunk_queue.get()
                    if chunk_data is None: # Sentinel
                        chunk_queue.task_done()
                        break

                    chunk_idx, segment_chunk = chunk_data
                    log_prefix_worker = f"[Worker-{worker_id} | Chunk {chunk_idx+1:03d}]"

                    translated_chunk = await translate_chunk_gemini_async(
                        session, segment_chunk, api_key, chunk_idx, len(segment_chunk)
                    )

                    if translated_chunk is not None:
                        all_translated_segments_list.extend(translated_chunk)
                        processed_chunk_count += 1
                    else:
                        print(f"👎 {log_prefix_worker} Échec de la traduction.")
                        failed_chunk_count += 1

                    chunk_queue.task_done()

                except asyncio.CancelledError:
                    print(f"🚫 [Worker-{worker_id}] Tâche annulée.")
                    break
                except Exception as e:
                    print(f"❌ [Worker-{worker_id}] Erreur inattendue : {e}")
                    try: chunk_queue.task_done()
                    except ValueError: pass
                    await asyncio.sleep(1)

            print(f"🏁 [Worker-{worker_id}] Terminé.")

        # --- Start Translation Workers ---
        for i in range(num_workers_to_start):
            task = asyncio.create_task(translation_worker(i + 1))
            translation_tasks.add(task)

        # --- Transcribe and Produce Chunks ---
        print(f"\n🎙️ Début de la transcription (yield) et mise en file d'attente des chunks...")
        start_transcribe_time = time.time()
        current_chunk: List[Dict[str, Any]] = []
        total_audio_duration_est = video_info.get("duration") if video_info else None

        try:
            # Use the loaded 'model' variable here
            segments_generator, info = model.transcribe(audio_file_path, beam_size=5, word_timestamps=False)

            detected_language = info.language
            lang_prob = info.language_probability
            duration_detected = info.duration
            print(f"   Langue détectée: {detected_language} (Probabilité: {lang_prob:.2f})")
            print(f"   Durée audio détectée par Whisper: {duration_detected:.2f}s")
            if total_audio_duration_est is None:
                 total_audio_duration_est = duration_detected
                 print(f"   Utilisation de la durée détectée ({total_audio_duration_est:.1f}s) pour le %.")

            if not total_audio_duration_est or total_audio_duration_est <= 0:
                print("⚠️ Durée audio nulle ou invalide, calcul du pourcentage désactivé.")
                total_audio_duration_est = None

            segment_id_counter = 0
            last_log_time = time.time()

            async for segment in segments_generator:
                seg_start = segment.start
                seg_end = segment.end
                text = segment.text.strip()
                if not text: continue

                duration = max(0, seg_end - seg_start)
                progress_percentage = 0.0
                if total_audio_duration_est and total_audio_duration_est > 0:
                    progress_percentage = min(100.0, max(0.0, (seg_end / total_audio_duration_est) * 100))

                total_segments_yielded += 1

                formatted_segment = {
                    "id": segment_id_counter, "text": text,
                    "start": round(seg_start, 3), "end": round(seg_end, 3),
                    "duration": round(duration, 3),
                    "progress_percentage": round(progress_percentage, 2)
                }
                segment_id_counter += 1
                current_chunk.append(formatted_segment)

                current_time = time.time()
                if current_time - last_log_time > 10.0:
                     q_size = chunk_queue.qsize()
                     active_translators = num_workers_to_start - translation_semaphore._value if hasattr(translation_semaphore, '_value') else 'N/A'
                     progress_str = f"{progress_percentage:.1f}%" if total_audio_duration_est else f"{seg_end:.1f}s"
                     print(f"   -> Transcription: {progress_str} | Segments: {total_segments_yielded} | Queue: {q_size} | Translators Active ~{active_translators}")
                     last_log_time = current_time

                if len(current_chunk) >= chunk_size:
                    await chunk_queue.put((current_chunk_index, current_chunk))
                    current_chunk_index += 1
                    current_chunk = []

            if current_chunk:
                 await chunk_queue.put((current_chunk_index, current_chunk))
                 current_chunk_index += 1

            transcription_time = time.time() - start_transcribe_time
            print(f"\n✅ Transcription terminée en {transcription_time:.2f}s. {total_segments_yielded} segments générés.")
            print(f"   Total chunks envoyés à la traduction : {current_chunk_index}")

        except Exception as e:
            print(f"❌ Erreur critique pendant la transcription/production de chunks: {e}")
            import traceback
            traceback.print_exc()

        finally:
            print("\n🚦 Envoi des signaux d'arrêt aux workers...")
            for _ in range(num_workers_to_start):
                try: await chunk_queue.put(None)
                except Exception as qe: print(f"Erreur envoi sentinel: {qe}")

            print("⏳ Attente de la fin du traitement des chunks...")
            await chunk_queue.join()
            print("✅ File d'attente vide.")

            print("⏳ Attente de la terminaison des workers...")
            done, pending = await asyncio.wait(translation_tasks, timeout=60)

            if pending:
                print(f"⚠️ {len(pending)} workers n'ont pas terminé, annulation...")
                for task in pending: task.cancel()
                await asyncio.gather(*pending, return_exceptions=True)

            print("✅ Tous les workers de traduction ont terminé.")

    # --- Assemble Final Result ---
    pipeline_duration = time.time() - start_pipeline_time
    print("\n--- Résumé du Pipeline Async ---")
    print(f"Temps total d'exécution : {pipeline_duration:.2f} secondes.")
    # ... (rest of summary prints) ...
    print(f"Segments transcrits : {total_segments_yielded}")
    print(f"Chunks visés pour traduction : {current_chunk_index}") # Chunks created
    print(f"Chunks traités avec succès : {processed_chunk_count}")
    print(f"Chunks échoués : {failed_chunk_count}")
    print(f"Segments traduits récupérés : {len(all_translated_segments_list)}")


    final_data = video_info.copy() if video_info else {}
    all_translated_segments_list.sort(key=lambda x: x.get('id', float('inf')))
    final_data["segments"] = all_translated_segments_list

    if failed_chunk_count > 0:
         missed_segments_approx = failed_chunk_count * chunk_size
         print(f"⚠️ ATTENTION: {failed_chunk_count} chunks ({missed_segments_approx} segments approx.) ont échoué.")
         print(f"   Segments sources: {total_segments_yielded}, Segments finaux: {len(all_translated_segments_list)}")

    return final_data


# --- Script Principal d'Exécution ---
# ... (keep main function as is, it calls the async pipeline) ...
def main():
    start_overall_time = time.time()

    # --- Configuration & Validation ---
    youtube_url = "https://www.youtube.com/watch?v=7q88I_hs3Uw"#@param {"type":"string"}
    output_dir = "/content/audio_output"
    output_filename = "youtube_audio.mp3"
    output_path = os.path.join(output_dir, output_filename)
    json_translated_filename = os.path.splitext(output_path)[0] + "_transcript_translated.json"

    # --- API Key Check (Simplified for hardcoded key) ---
    if not GEMINI_API_KEY or GEMINI_API_KEY == "VOTRE_CLE_API_GEMINI_ICI":
         print("🛑 ERREUR: La clé API Gemini est manquante ou est toujours le placeholder.")
         print("   Veuillez éditer la variable GEMINI_API_KEY en haut du script avec votre clé réelle.")
         return # Stop execution
    else:
         print("🔑 Clé API Gemini chargée depuis le code source (hardcodée).")


    # --- Parameter Validation & Global Updates ---
    global TRANSLATION_CHUNK_SIZE, MAX_CONCURRENT_TRANSLATIONS, GEMINI_RATE_LIMIT_PER_MINUTE
    global translation_semaphore, gemini_rate_limiter # Allow modification

    chunk_size = TRANSLATION_CHUNK_SIZE
    if not isinstance(chunk_size, int) or chunk_size <= 0:
        print(f"⚠️ Taille de chunk invalide ({chunk_size}), utilisation de 30.")
        chunk_size = 30

    max_workers_local = MAX_CONCURRENT_TRANSLATIONS
    if not isinstance(max_workers_local, int) or max_workers_local <= 0:
        print(f"⚠️ Nombre de workers invalide ({max_workers_local}), utilisation de 5.")
        max_workers_local = 5

    rate_limit_local = GEMINI_RATE_LIMIT_PER_MINUTE
    if not isinstance(rate_limit_local, int) or rate_limit_local <= 0:
        print(f"⚠️ Rate limit invalide ({rate_limit_local}), utilisation de 15.")
        rate_limit_local = 15

    # Re-initialize globals based on potentially validated values
    print(f"🔧 Configuration appliquée : Chunk={chunk_size}, Workers={max_workers_local}, RateLimit={rate_limit_local}/min")
    translation_semaphore = asyncio.Semaphore(max_workers_local)
    gemini_rate_limiter = AsyncLimiter(rate_limit_local, 60)


    # --- Étape 1: Téléchargement (Synchrone) ---
    print("\n" + "-" * 30)
    print("ÉTAPE 1: TÉLÉCHARGEMENT AUDIO & METADONNÉES")
    print("-" * 30)
    audio_file_path, video_info = download_youtube_audio_improved(youtube_url, output_path)

    if not audio_file_path:
        print("\n❌ Échec critique: Impossible de télécharger ou trouver le fichier audio.")
        if video_info: print("   Métadonnées partielles:", json.dumps(video_info, indent=2, ensure_ascii=False))
        print("Pipeline arrêté.")
        return

    print("\n--- Informations Vidéo Récupérées ---")
    if video_info: print(json.dumps(video_info, indent=2, ensure_ascii=False))
    else: print("(Aucune métadonnée récupérée)")
    print("-----------------------------------\n")

    # --- Étape 2 & 3: Transcription et Traduction Async ---
    print("-" * 30)
    print(f"ÉTAPE 2 & 3: TRANSCRIPTION & TRADUCTION ASYNCHRONE")
    print(f"  Modèle Whisper: {WHISPER_MODEL_SIZE}, Compute: {WHISPER_COMPUTE_TYPE}, Device: {'cuda' if torch.cuda.is_available() else 'cpu'}")
    print(f"  Chunk Size: {chunk_size}, Max Concurrent API: {max_workers_local}, Rate Limit: {rate_limit_local}/min")
    print("-" * 30)

    # Run the async pipeline, passing the hardcoded API key
    final_translated_data = asyncio.run(process_audio_pipeline_async(
        audio_file_path=audio_file_path,
        whisper_model_size=WHISPER_MODEL_SIZE,
        whisper_compute_type=WHISPER_COMPUTE_TYPE,
        api_key=GEMINI_API_KEY, # Passer la clé API globale ici
        chunk_size=chunk_size,
        num_workers_to_start=max_workers_local, # Pass validated worker count
        video_info=video_info
    ))

    # --- Sauvegarde Finale ---
    print("\n" + "-" * 30)
    print("ÉTAPE 4: SAUVEGARDE DES RÉSULTATS")
    print("-" * 30)

    if final_translated_data and final_translated_data.get("segments"):
        try:
            with open(json_translated_filename, "w", encoding="utf-8") as f:
                json.dump(final_translated_data, f, indent=2, ensure_ascii=False)
            print(f"💾 Transcription traduite sauvegardée dans : {json_translated_filename}")
            print(f"   Nombre total de segments dans le fichier final : {len(final_translated_data['segments'])}")
        except IOError as e:
            print(f"❌ Erreur lors de la sauvegarde du fichier JSON traduit : {e}")
        except TypeError as e:
             print(f"❌ Erreur de Type lors de la sérialisation finale en JSON : {e}")
        except Exception as e:
             print(f"❌ Erreur inattendue lors de la sauvegarde JSON (traduit) : {e}")
    else:
        print("⚠️ Aucune donnée traduite à sauvegarder (pipeline échoué ou aucun segment généré/traduit).")


    # --- Nettoyage Optionnel ---
    # print("\n--- Nettoyage ---")
    # try:
    #     if audio_file_path and os.path.exists(audio_file_path):
    #         os.remove(audio_file_path)
    #         print(f"🗑️ Fichier audio supprimé : {audio_file_path}")
    # except OSError as e:
    #     print(f"❌ Erreur suppression fichier audio {audio_file_path}: {e}")


    end_overall_time = time.time()
    print("\n" + "="*40)
    print(f"🏁 Pipeline Global Terminé en {end_overall_time - start_overall_time:.2f} secondes.")
    print("="*40)


# --- Bloc d'exécution principal ---
if __name__ == "__main__":
    print("--- Vérification Initiale des Dépendances ---")
    # --- Dependency Checks ---
    required_libs = {'yt-dlp', 'requests', 'torch', 'faster_whisper', 'ctranslate2', 'aiohttp', 'aiolimiter', 'nest_asyncio'} # Added nest_asyncio
    installed_libs = set()
    try: import yt_dlp; installed_libs.add('yt-dlp')
    except ImportError: pass
    try: import requests; installed_libs.add('requests')
    except ImportError: pass
    try: import torch; installed_libs.add('torch')
    except ImportError: pass
    try: import faster_whisper; installed_libs.add('faster_whisper')
    except ImportError: pass # Handled above
    try: import ctranslate2; installed_libs.add('ctranslate2')
    except ImportError: pass
    try: import aiohttp; installed_libs.add('aiohttp')
    except ImportError: pass
    try: import aiolimiter; installed_libs.add('aiolimiter')
    except ImportError: pass
    try: import nest_asyncio; installed_libs.add('nest_asyncio')
    except ImportError: pass


    missing_libs = required_libs - installed_libs
    # Don't try to re-install faster_whisper/ctranslate2 if the top-level import failed
    missing_libs -= {'faster_whisper', 'ctranslate2'}


    if missing_libs:
        print(f"Bibliothèques manquantes détectées: {', '.join(missing_libs)}")
        print("Tentative d'installation via pip...")
        install_cmd = ['pip', 'install', '-q'] + list(missing_libs)
        try:
            subprocess.run(install_cmd, check=True)
            print("Installation terminée. Re-vérification...")
            recheck_failed = False
            # Minimal re-check (add more if needed)
            if 'yt-dlp' in missing_libs:
                 try: import yt_dlp
                 except ImportError: recheck_failed = True; print("Echec import yt-dlp post-install")
            if 'aiohttp' in missing_libs:
                 try: import aiohttp
                 except ImportError: recheck_failed = True; print("Echec import aiohttp post-install")
            if 'nest_asyncio' in missing_libs:
                 try: import nest_asyncio
                 except ImportError: recheck_failed = True; print("Echec import nest_asyncio post-install")

            if recheck_failed:
                 print("‼️ Certaines installations semblent avoir échoué. Le script risque de ne pas fonctionner.")
            else:
                 print("✅ Bibliothèques supplémentaires installées.")

        except subprocess.CalledProcessError as install_err:
            print(f"❌ Échec de l'installation pip : {install_err}")
            print("   Veuillez essayer d'installer manuellement les bibliothèques manquantes.")
        except Exception as e:
            print(f"❌ Erreur inattendue lors de l'installation pip : {e}")

    # Check ffmpeg
    try:
         result = subprocess.run(['ffmpeg', '-version'], check=True, capture_output=True, timeout=5, text=True, errors='ignore')
         print(f"✅ ffmpeg trouvé: {result.stdout.splitlines()[0]}")
    except Exception as ffmpeg_err:
         print(f"⚠️ ffmpeg non trouvé ou ne fonctionne pas ({ffmpeg_err}). Requis par faster-whisper.")
         print("   Installation suggérée: sudo apt update && sudo apt install ffmpeg")


    # Check for API Key placeholder *before* running main logic
    if not GEMINI_API_KEY or GEMINI_API_KEY == "VOTRE_CLE_API_GEMINI_ICI":
        print("\n⚠️ ALERTE: Clé API Gemini non configurée ou placeholder non remplacé dans le script.")
        print("   Veuillez éditer le script et remplacer 'VOTRE_CLE_API_GEMINI_ICI'.")
    else:
        print("\n🔑 Clé API Gemini trouvée (hardcodée) dans le script.")
    print("-------------------------------------------")

    # Run the main processing pipeline
    main()

✅ faster-whisper est déjà installé.
❌ Erreur finale lors de l'importation de WhisperModel/AutoModel: cannot import name 'AutoModel' from 'faster_whisper' (/usr/local/lib/python3.11/dist-packages/faster_whisper/__init__.py)
   Même si l'installation a semblé réussir, l'importation spécifique a échoué.
   Essayez de redémarrer l'environnement d'exécution et réinstallez.
🛑 ALERTE: Vous n'avez pas remplacé 'VOTRE_CLE_API_GEMINI_ICI' par votre clé API réelle.
--- Vérification Initiale des Dépendances ---
✅ ffmpeg trouvé: ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers

🔑 Clé API Gemini trouvée (hardcodée) dans le script.
-------------------------------------------
🔑 Clé API Gemini chargée depuis le code source (hardcodée).
🔧 Configuration appliquée : Chunk=50, Workers=5, RateLimit=15/min

------------------------------
ÉTAPE 1: TÉLÉCHARGEMENT AUDIO & METADONNÉES
------------------------------
ℹ️  Récupération des métadonnées de la vidéo...
✅ Métadonnées r

In [None]:
!pip install yt-dlp requests torch faster_whisper ctranslate2 aiohttp aiolimiter

Collecting faster_whisper
  Using cached faster_whisper-1.1.1-py3-none-any.whl.metadata (16 kB)
Collecting ctranslate2
  Using cached ctranslate2-4.6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting aiolimiter
  Using cached aiolimiter-1.2.1-py3-none-any.whl.metadata (4.5 kB)
Collecting onnxruntime<2,>=1.14 (from faster_whisper)
  Using cached onnxruntime-1.21.1-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting av>=11 (from faster_whisper)
  Using cached av-14.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.7 kB)
Collecting coloredlogs (from onnxruntime<2,>=1.14->faster_whisper)
  Using cached coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting humanfriendly>=9.1 (from coloredlogs->onnxruntime<2,>=1.14->faster_whisper)
  Downloading humanfriendly-10.0-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading faster_whisper-1.1.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━

In [None]:
# @title # **Start App**
# @markdown <-- Start the cell.


import subprocess
import threading
import time
import socket
import urllib.request

def iframe_thread(port):
  while True:
      time.sleep(0.5)
      sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      result = sock.connect_ex(('127.0.0.1', port))
      if result == 0:
        break
      sock.close()
  clear_output()
  display(HTML(f'<strong style="color: blue;">Trying to launch the App on the Web(if it gets stuck here cloudflared is having issues)</strong>'))
  p = subprocess.Popen(["cloudflared", "tunnel", "--url", "http://127.0.0.1:{}".format(port)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  for line in p.stderr:
    l = line.decode()
    if "trycloudflare.com " in l:
      clear_output()
      display(HTML('<h1 style="color: green;">Done! ✅</h1></br>'))
      print("This is the URL to access the App:", l[l.find("http"):], end='')

clear_output()

threading.Thread(target=iframe_thread, daemon=True, args=(5051,)).start()


if __name__ == '__main__':
    app.run(debug=False, port=5051)

This is the URL to access the App: https://journal-congo-currently-pointer.trycloudflare.com                                 |


INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:44:09] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:44:10] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:44:13] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:44:13] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:44:13] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:44:13] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:44:13] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:50:27] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:50:28] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -
INFO:werkzeug:127.0.0.1 - - [17/Apr/2025 12:50:57] "GET / HTTP/1.1" 200 -
