In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [8]:
# Cellule 1: Installation des dépendances
!pip install ffmpeg yt_dlp huggingface_hub
!pip install git+https://github.com/openai/whisper.git
!pip install torch torchaudio torchvision  # S'assurer que PyTorch est installé pour Whisper et GPU

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-8v0xxmfq
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-8v0xxmfq
  Resolved https://github.com/openai/whisper.git to commit dd985ac4b90cafeef8712f2998d62c59c3e62d22
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone


# Traduire une vidéo YT en Français
# Ajout de la Transcription + Traduction + SRT dans la foulée 

In [9]:
# Cellule 2: Définition des fonctions utilitaires
import os
import subprocess
import argparse # Importer argparse
from yt_dlp import YoutubeDL
import whisper
import zipfile # Importer zipfile
import glob # Importer glob pour la recherche de fichiers
from huggingface_hub import HfApi, login # Importer pour Hugging Face
import time # Pour horodatage de l'archive de sortie
import shutil # Pour le nettoyage
import torch # Pour la gestion GPU

# --- Définition des fonctions ---

def format_timestamp(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = int(seconds % 60)
    milliseconds = int((seconds % 1) * 1000)
    return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"

def download_video(url, temp_dir):
    """Télécharge une vidéo depuis une URL."""
    print(f"Téléchargement de la vidéo depuis {url}...")
    ydl_opts = {
        'format': 'bestvideo+bestaudio/best',
        'outtmpl': os.path.join(temp_dir, '%(title)s.%(ext)s'),
        'cleanup': True,
        'noplaylist': True, # S'assurer de ne télécharger qu'une seule vidéo
        'quiet': True, # Supprimer la sortie verbeuse de yt_dlp
        'no_progress': True, # Supprimer les barres de progression
    }
    video_filepath = None
    try:
        with YoutubeDL(ydl_opts) as ydl:
            info_dict = ydl.extract_info(url, download=True)
            # yt_dlp retourne parfois des noms de fichiers avec des extensions non finales,
            # ou dans des sous-dossiers si 'outtmpl' contient des répertoires.
            # Une approche plus robuste est de chercher le fichier téléchargé.
            # Le champ 'requested_downloads' ou 'filename' dans info_dict peut aussi aider.
            # Pour simplifier ici, on va se baser sur le titre et chercher.
            potential_filename = info_dict.get('filename')
            if potential_filename and os.path.exists(potential_filename):
                 video_filepath = potential_filename
            else:
                # Fallback: chercher dans le temp_dir
                downloaded_files = glob.glob(os.path.join(temp_dir, info_dict.get('title', '*') + '.*'))
                video_extensions = ('.mkv', '.mp4', '.webm', '.avi', '.mov', '.flv') # Ajouter d'autres si besoin
                # Trouver le fichier qui n'est PAS un fichier temporaire (.ytdl, .part, etc.) et qui est un format vidéo
                video_filepath = next((f for f in downloaded_files if os.path.isfile(f) and not f.endswith(('.ytdl', '.part')) and f.lower().endswith(video_extensions)), None)

        if video_filepath is None or not os.path.exists(video_filepath):
             raise FileNotFoundError("Impossible de trouver le fichier vidéo téléchargé.")

        print(f"Téléchargement terminé : {video_filepath}")
        return video_filepath

    except Exception as e:
        print(f"Erreur lors du téléchargement de la vidéo {url}: {e}")
        return None


def convert_video_to_audio(video_path, audio_path):
    """Convertit une vidéo en fichier audio en utilisant ffmpeg."""
    print(f"Conversion de {os.path.basename(video_path)} en audio ({audio_path})...")
    if not os.path.exists(video_path):
         print(f"Erreur: Fichier vidéo source introuvable pour conversion audio : {video_path}")
         return False

    # Ajouter un check si ffmpeg est installé
    try:
        subprocess.run(["ffmpeg", "-version"], check=True, capture_output=True, text=True)
    except FileNotFoundError:
        print("Erreur : ffmpeg n'est pas installé ou n'est pas dans le PATH.")
        print("Veuillez installer ffmpeg pour continuer (ex: !apt-get update && apt-get install ffmpeg).")
        return False # Indiquer l'échec

    try:
        command = [
            "ffmpeg",
            "-i", video_path,
            "-vn", # Supprimer la vidéo
            "-acodec", "libmp3lame", # Encoder en MP3
            "-q:a", "0", # Qualité audio maximale
            "-map", "a", # Sélectionner le flux audio
            "-y", # Écraser le fichier de sortie s'il existe déjà
            audio_path
        ]

        subprocess.run(command, check=True, capture_output=True, text=True)
        print("Conversion audio terminée.")
        return True # Indiquer le succès
    except subprocess.CalledProcessError as e:
        print(f"Erreur lors de la conversion vidéo en audio : {e.stderr}")
        return False # Indiquer l'échec
    except Exception as e:
        print(f"Une erreur inattendue est survenue lors de la conversion audio : {e}")
        return False # Indiquer l'échec


def transcribe_audio(model, audio_path, device="auto"):
    """Transcrire un fichier audio en utilisant le modèle Whisper."""
    print(f"Transcription de {os.path.basename(audio_path)} avec Whisper (device: {device})...")
    if not os.path.exists(audio_path):
         print(f"Erreur: Fichier audio source introuvable pour transcription : {audio_path}")
         return None

    try:
        # Charger le modèle sur le bon device.
        # Si device="auto", Whisper choisira la meilleure option (CPU ou GPU 0)
        # Pour spécifier un GPU, utilise "cuda:0", "cuda:1", etc.
        # Pour forcer le CPU, utilise "cpu".
        # model.to(device) # Le modèle est déjà chargé, il faut l'envoyer sur le device
        # Note: Whisper ne gère pas nativement le multi-GPU sur une seule tâche de transcription.
        # Pour utiliser plusieurs GPUs, il faudrait paralléliser le traitement de plusieurs fichiers
        # ou utiliser une implémentation différente comme faster-whisper.
        # Ici, on s'assure juste qu'il utilise *un* GPU si possible et permet de choisir lequel.
        # La fonction transcribe() de Whisper peut prendre un argument device.

        result = model.transcribe(audio_path, language="fr", verbose=False)
        print("Transcription terminée.")
        return result
    except Exception as e:
        print(f"Erreur lors de la transcription audio : {e}")
        return None


def save_transcription(result, base_path):
    """Sauvegarde la transcription en format .txt et .srt."""
    if result is None:
        print(f"Aucun résultat de transcription à sauvegarder pour {base_path}.")
        return None, None # Retourner None pour les chemins de fichiers

    text_file = base_path + ".txt"
    srt_file = base_path + ".srt"

    print(f"Sauvegarde de la transcription dans {text_file} et {srt_file}...")

    try:
        with open(text_file, "w", encoding="utf-8") as f:
            f.write(result["text"])

        with open(srt_file, "w", encoding="utf-8") as f:
            for i, segment in enumerate(result["segments"]):
                start_time = format_timestamp(segment["start"])
                end_time = format_timestamp(segment["end"])
                text = segment["text"].strip()
                f.write(f"{i+1}\\n{start_time} --> {end_time}\\n{text}\\n\\n")
        print("Sauvegarde terminée.")
        return text_file, srt_file # Retourner les chemins des fichiers sauvegardés
    except Exception as e:
        print(f"Erreur lors de la sauvegarde des fichiers de transcription pour {base_path}: {e}")
        return None, None # Retourner None en cas d'erreur

def extract_archive(archive_path, extract_dir):
    """Extrait une archive (zip) dans un répertoire spécifié."""
    print(f"Extraction de l'archive {archive_path} dans {extract_dir}...")
    if not os.path.exists(archive_path):
        print(f"Erreur: Fichier archive introuvable : {archive_path}")
        return False
    try:
        with zipfile.ZipFile(archive_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)
        print("Extraction terminée.")
        return True
    except zipfile.BadZipFile:
        print(f"Erreur : {archive_path} n'est pas un fichier Zip valide.")
        return False
    except Exception as e:
        print(f"Erreur lors de l'extraction de l'archive {archive_path}: {e}")
        return False

def find_video_files(directory):
    """Trouve tous les fichiers vidéo dans un répertoire donné (récursif)."""
    video_extensions = ['*.mkv', '*.mp4', '*.webm', '*.avi', '*.mov', '*.flv']
    video_list = []
    print(f"Recherche de fichiers vidéo dans le répertoire {directory}...")
    if not os.path.isdir(directory):
        print(f"Erreur: Le chemin fourni n'est pas un répertoire : {directory}")
        return []
    for ext in video_extensions:
        video_list.extend(glob.glob(os.path.join(directory, '**', ext), recursive=True))
    print(f"Trouvé {len(video_list)} fichiers vidéo.")
    return video_list

def process_video_file(video_or_audio_path, temp_dir, whisper_model, device="auto"):
    """
    Traite un seul fichier vidéo ou audio :
    convertit en audio si nécessaire, transcrit et sauvegarde.
    """
    print(f"\\n--- Traitement de {os.path.basename(video_or_audio_path)} ---")

    base_name = os.path.splitext(os.path.basename(video_or_audio_path))[0]
    audio_file_path = os.path.join(temp_dir, base_name + ".mp3")
    base_path_output = os.path.join(temp_dir, base_name)

    # Déterminer si l'entrée est déjà un fichier audio
    is_audio_input = video_or_audio_path.lower().endswith(('.mp3', '.wav', '.aac', '.flac', '.ogg')) # Ajoute d'autres extensions audio si besoin

    final_audio_source = None

    if is_audio_input:
        # Si l'entrée est déjà un audio, utiliser ce fichier directement pour la transcription
        print(f"L'entrée est un fichier audio, utilisation directe : {os.path.basename(video_or_audio_path)}")
        final_audio_source = video_or_audio_path
        # On peut optionnellement copier le fichier audio vers temp_dir si on veut le nettoyer plus tard
        # shutil.copy2(video_or_audio_path, audio_file_path)
        # final_audio_source = audio_file_path # Utiliser la copie
    else:
        # Si l'entrée est une vidéo, convertir en audio
        if not os.path.exists(audio_file_path):
            if convert_video_to_audio(video_or_audio_path, audio_file_path):
                final_audio_source = audio_file_path
            else:
                print(f"Skipping transcription for {os.path.basename(video_or_audio_path)} due to audio conversion error.")
                return None, None # Retourner None si la conversion échoue
        else:
            print(f"Audio file already exists: {audio_file_path}")
            final_audio_source = audio_file_path # Utiliser le fichier audio existant

    if final_audio_source:
        # Transcrire l'audio (qu'il vienne d'une conversion ou soit l'entrée directe)
        transcription_result = transcribe_audio(whisper_model, final_audio_source)

        # Sauvegarder la transcription
        text_path, srt_path = save_transcription(transcription_result, base_path_output)

        print(f"--- Fin du traitement de {os.path.basename(video_or_audio_path)} ---")

        # Retourner les chemins des fichiers générés (.txt et .srt)
        return text_path, srt_path
    else:
        # Aucun source audio finale disponible
        return None, None


def create_output_archive(files_to_archive, output_archive_path):
    """Crée une archive zip contenant les fichiers spécifiés."""
    if not files_to_archive:
        print("Aucun fichier à archiver.")
        return None

    print(f"Création de l'archive de sortie {output_archive_path}...")
    try:
        with zipfile.ZipFile(output_archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for file_path in files_to_archive:
                if os.path.exists(file_path):
                    # Ajouter le fichier à l'archive en conservant uniquement le nom du fichier
                    zipf.write(file_path, os.path.basename(file_path))
                else:
                    print(f"Avertissement: Le fichier {file_path} n'existe pas et ne sera pas inclus dans l'archive.")
        print("Archive de sortie créée avec succès.")
        return output_archive_path
    except Exception as e:
        print(f"Erreur lors de la création de l'archive de sortie {output_archive_path}: {e}")
        return None

def upload_to_huggingface(file_path, repo_id, repo_type="dataset", path_in_repo=None, commit_message=None):
    """Charge un fichier vers un dépôt Hugging Face."""
    print(f"Tentative de chargement de {file_path} vers Hugging Face repo '{repo_id}'...")

    if not os.path.exists(file_path):
         print(f"Erreur : Le fichier à charger {file_path} n'existe pas.")
         return False

    # Vérifier si l'utilisateur est authentifié ou si le token est présent
    # L'authentification interactive (login()) n'est pas idéale dans un script/notebook non interactif.
    # Il est préférable de s'assurer que HUGGING_FACE_HUB_TOKEN est défini comme variable d'environnement.
    token = os.environ.get("HUGGING_FACE_HUB_TOKEN")
    if not token:
         print("Erreur : Variable d'environnement HUGGING_FACE_HUB_TOKEN non définie.")
         print("Veuillez vous authentifier en utilisant `huggingface-cli login` dans votre terminal ou en définissant la variable d'environnement dans Kaggle Secrets.")
         return False # Indiquer l'échec

    try:
        api = HfApi()

        if path_in_repo is None:
             path_in_repo = os.path.basename(file_path) # Charger à la racine du dépôt avec le nom du fichier

        api.upload_file(
            path_or_fileobj=file_path,
            path_in_repo=path_in_repo,
            repo_id=repo_id,
            repo_type=repo_type, # peut être "model", "dataset", "space"
            token=token, # Utiliser le token
            commit_message=commit_message if commit_message else f"Upload {os.path.basename(file_path)}"
        )
        print(f"Fichier {file_path} chargé avec succès vers {repo_id}/{path_in_repo}")
        return True
    except Exception as e:
        print(f"Erreur lors du chargement du fichier vers Hugging Face : {e}")
        # Afficher plus de détails si possible
        if hasattr(e, 'response') and e.response is not None:
            print(f"Réponse de l'API HF : {e.response.text}")
        return False

In [10]:
# Cellule 3: Configuration et Lancement du Traitement

# --- Configuration des paramètres (simule les arguments de ligne de commande) ---
# Modifie ces variables pour spécifier ton entrée et tes options de sortie

# Choisissez l'une des options suivantes en décommentant et en renseignant la bonne variable:
source_type = "url"        # Options: "url", "archive", "directory"

# --- Paramètres d'entrée ---
# Si source_type est "url":
input_url = 'https://www.youtube.com/watch?v=TLzna9__DnI'
# Si source_type est "archive":
input_archive_path = '/chemin/vers/ton/archive.zip'
# Si source_type est "directory":
input_directory_path = '/chemin/vers/ton/repertoire/de/videos'


# --- Paramètres de sortie et temporaires ---
temp_directory = '/kaggle/working' # Répertoire pour les fichiers intermédiaires (audio, transcriptions temporaires)
output_archive_path = None         # Chemin pour l'archive de sortie des transcriptions (ex: '/kaggle/working/mes_transcriptions.zip' ou '/media/Serva/transcriptions.zip')
                                   # Laisse None si tu ne veux pas créer d'archive de sortie


# --- Paramètres d'upload Hugging Face ---
hf_upload_repo = None      # ID du dépôt Hugging Face (ex: 'ton-nom/ton-depot'). Laisse None pour ne pas uploader.
hf_repo_type = "dataset"   # Type du dépôt Hugging Face ('model', 'dataset', 'space')
# Assure-toi d'avoir configuré ton token Hugging Face (variable d'environnement HUGGING_FACE_HUB_TOKEN dans Kaggle Secrets par exemple)


# --- Paramètres Whisper ---
whisper_model_name = "large-v3"
# Gestion du GPU :
# Whisper utilise PyTorch. PyTorch gère les GPUs via CUDA.
# torch.cuda.is_available() vérifie si un GPU est accessible.
# torch.cuda.device_count() donne le nombre de GPUs.
# device = "cuda" si torch.cuda.is_available() else "cpu" utilise le GPU 0 par défaut si disponible.
# Pour spécifier un GPU (ex: le deuxième GPU), utilise device = "cuda:1".
# Pour *forcer* l'utilisation de deux GPUs en parallèle sur une *seule* transcription,
# c'est complexe et non géré nativement par la fonction transcribe() de Whisper.
# Cela nécessiterait de découper l'audio et de lancer plusieurs processus/threads avec des modèles sur chaque GPU,
# ou d'utiliser faster-whisper ou une autre implémentation.
# Pour l'instant, nous allons charger le modèle sur UN GPU spécifique si possible.
# Si tu as 2 GPUs, tu peux lancer 2 instances de ce script/notebook en parallèle,
# chacune configurée pour utiliser un GPU différent (device="cuda:0" et device="cuda:1").
# Ou tu peux adapter pour traiter les fichiers en batch, en envoyant chaque fichier à un thread/processus
# qui utilise un GPU différent. C'est une modification plus avancée.

# Pour utiliser un GPU si disponible (par défaut, souvent cuda:0)
# transcription_device = "cuda" if torch.cuda.is_available() else "cpu"

# Pour forcer l'utilisation d'un GPU spécifique (ex: cuda:1) si disponible:
gpu_index_to_use = 0 # Mets 0 ou 1 (ou plus) si tu sais quel GPU tu veux utiliser
if torch.cuda.is_available():
    num_gpus = torch.cuda.device_count()
    print(f"Nombre de GPUs disponibles : {num_gpus}")
    if gpu_index_to_use < num_gpus:
        transcription_device = f"cuda:{gpu_index_to_use}"
        print(f"Utilisation du GPU : {transcription_device}")
    else:
        print(f"GPU index {gpu_index_to_use} non disponible. Utilisation du GPU 0.")
        transcription_device = "cuda:0" # Fallback au GPU 0
else:
    transcription_device = "cpu"
    print("Aucun GPU disponible. Utilisation du CPU.")

# --- Fin de la Configuration ---


# --- Initialisation ---
os.makedirs(temp_directory, exist_ok=True)
video_files_to_process = []
cleanup_temp_dir = False # Indique si le répertoire temporaire doit être nettoyé à la fin


# --- Gérer les différentes sources d'entrée ---
if source_type == "url":
    if not input_url:
        print("Erreur: L'URL n'est pas spécifiée.")
        # exit(1) # N'utilise pas exit() dans un notebook, lève une erreur ou utilise return
        raise ValueError("L'URL n'est pas spécifiée.")

    video_path = download_video(input_url, temp_directory)
    if video_path:
        video_files_to_process.append(video_path)
    # Nettoyer le fichier vidéo téléchargé après traitement
    cleanup_temp_dir = True # On nettoiera le fichier téléchargé

elif source_type == "archive":
    if not input_archive_path or not os.path.exists(input_archive_path):
         print(f"Erreur: Fichier archive introuvable : {input_archive_path}")
         # exit(1)
         raise FileNotFoundError(f"Fichier archive introuvable : {input_archive_path}")

    extract_dir = os.path.join(temp_directory, "extracted_archive")
    os.makedirs(extract_dir, exist_ok=True)
    if extract_archive(input_archive_path, extract_dir):
         video_files_to_process = find_video_files(extract_dir)
         cleanup_temp_dir = True # Nettoyer le répertoire d'extraction après traitement
    else:
         print("Impossible de traiter l'archive.")
         # exit(1)
         raise RuntimeError("Impossible de traiter l'archive.")

elif source_type == "directory":
    if not input_directory_path or not os.path.isdir(input_directory_path):
         print(f"Erreur: Répertoire introuvable : {input_directory_path}")
         # exit(1)
         raise FileNotFoundError(f"Répertoire introuvable : {input_directory_path}")

    video_files_to_process = find_video_files(input_directory_path)
    # Ne pas nettoyer le répertoire d'entrée s'il a été fourni directement
    cleanup_temp_dir = False # Ne pas supprimer les fichiers originaux

else:
    print(f"Erreur: Type de source '{source_type}' non reconnu.")
    # exit(1)
    raise ValueError(f"Type de source '{source_type}' non reconnu.")


if not video_files_to_process:
    print("Aucun fichier vidéo/audio valide trouvé pour le traitement.")
    # exit(0) # N'utilise pas exit(0) qui arrête le kernel, utilise return si dans une fonction ou laisse passer
    # Si tu veux arrêter ici, tu peux décommenter le raise ValueError ci-dessous
    # raise ValueError("Aucun fichier vidéo/audio valide trouvé pour le traitement.")


# --- Charger le modèle Whisper ---\n",
print(f"Chargement du modèle Whisper '{whisper_model_name}' sur {transcription_device}...")
try:
    # Charger le modèle sur le bon device dès le départ
    whisper_model = whisper.load_model(whisper_model_name, device=transcription_device)
    print("Modèle Whisper chargé.")
except Exception as e:
    print(f"Erreur lors du chargement du modèle Whisper : {e}")
    # exit(1)
    raise RuntimeError(f"Erreur lors du chargement du modèle Whisper : {e}")


# --- Traiter chaque fichier vidéo/audio trouvé ---\n",
all_generated_files = []
for video_file_path in video_files_to_process:
     # Passer le device à la fonction de transcription
     text_path, srt_path = process_video_file(video_file_path, temp_directory, whisper_model, device=transcription_device)
     if text_path:
          all_generated_files.append(text_path)
     if srt_path:
          all_generated_files.append(srt_path)


# --- Créer l'archive de sortie si demandée ---\n",
output_archive_created = None
if output_archive_path:
    # S'assurer que le répertoire de sortie existe
    output_archive_dir = os.path.dirname(output_archive_path)
    if output_archive_dir and not os.path.exists(output_archive_dir): # S'assurer que ce n'est pas juste un nom de fichier
        os.makedirs(output_archive_dir, exist_ok=True)
        print(f"Répertoire de sortie créé : {output_archive_dir}")

    output_archive_created = create_output_archive(all_generated_files, output_archive_path)

# --- Charger vers Hugging Face si demandé ---\n",
if hf_upload_repo:
    files_to_upload = []
    if output_archive_created:
        # Si une archive de sortie a été créée, on charge l'archive
        print("Une archive de sortie a été créée, chargement de l'archive vers Hugging Face.")
        files_to_upload.append(output_archive_created)
    else:
        # Sinon, on charge les fichiers .txt et .srt individuels
        print("Aucune archive de sortie spécifiée ou créée, chargement des fichiers individuels (.txt, .srt) vers Hugging Face.")
        files_to_upload.extend(all_generated_files)

    if not files_to_upload:
        print("Aucun fichier à charger vers Hugging Face.")
    else:
        print("\\n--- Chargement vers Hugging Face ---")
        # Générer un timestamp unique pour les uploads individuels si pas d'archive
        timestamp_folder = time.strftime("%Y%m%d-%H%M%S")
        for file_path_to_upload in files_to_upload:
             # Définir le chemin dans le dépôt HF
             path_in_repo = None
             if not output_archive_created: # C'est un fichier individuel, pas l'archive
                 path_in_repo = f"transcriptions/{timestamp_folder}/{os.path.basename(file_path_to_upload)}"
             else: # C'est l'archive, on la met à la racine (ou spécifie un sous-dossier si voulu)
                  path_in_repo = os.path.basename(file_path_to_upload)


             upload_to_huggingface(
                 file_path_to_upload,
                 repo_id=hf_upload_repo,
                 repo_type=hf_repo_type,
                 path_in_repo=path_in_repo,
                 commit_message=f"Upload transcription files from run {timestamp_folder}"
             )
        print("--- Fin du chargement vers Hugging Face ---")


# --- Nettoyage ---\n",
# Nettoyer uniquement le répertoire temporaire si l'entrée était une URL ou une archive
# Si l'entrée était un répertoire, on ne supprime rien dans ce répertoire
if cleanup_temp_dir and os.path.exists(temp_directory):
     print(f"Nettoyage du répertoire temporaire : {temp_directory}...")
     # Supprimer les fichiers audio intermédiaires et les fichiers vidéo téléchargés/extraits
     # Ne pas supprimer les fichiers .txt ou .srt s'ils ont été créés ici et qu'une archive de sortie n'a pas été spécifiée
     # Sauf si une archive de sortie a été demandée, dans ce cas les temporaires (y compris txt/srt) peuvent être nettoyés si non nécessaires ailleurs
     files_to_keep_if_no_archive_and_no_hf_upload = all_generated_files if not output_archive_created and not hf_upload_repo else []


     for item in os.listdir(temp_directory):
         item_path = os.path.join(temp_directory, item)
         if os.path.isfile(item_path) and item_path not in files_to_keep_if_no_archive_and_no_hf_upload:
             # Nettoyer les fichiers temporaires, sauf si ce sont les outputs finaux (txt/srt) et qu'il n'y a pas d'archive/upload HF
             # et sauf l'archive de sortie elle-même si elle a été créée ici.
             if item_path != output_archive_created:
                 try:
                     os.remove(item_path)
                 except Exception as e:
                     print(f"Avertissement: Impossible de supprimer le fichier temporaire {item_path}: {e}")
             elif item_path == output_archive_created and not hf_upload_repo:
                 # Si l'archive de sortie a été créée mais PAS uploadée sur HF, on la laisse.
                 pass # Ne pas supprimer l'archive si elle est le résultat final et pas uploadée
             elif item_path == output_archive_created and hf_upload_repo:
                  # Si l'archive de sortie a été créée ET uploadée, on peut la supprimer
                 try:
                     os.remove(item_path)
                 except Exception as e:
                     print(f"Avertissement: Impossible de supprimer l'archive de sortie temporaire {item_path} après upload: {e}")

         elif os.path.isdir(item_path) and item == "extracted_archive": # Supprimer le répertoire d'extraction
             try:
                 shutil.rmtree(item_path)
             except Exception as e:
                  print(f"Avertissement: Impossible de supprimer le répertoire temporaire {item_path}: {e}")
     print("Nettoyage terminé.")

print("\\nProcessus terminé.")
print(f"Fichiers de transcription générés : {all_generated_files}")
if output_archive_created:
     print(f"Archive de sortie créée : {output_archive_created}")


Nombre de GPUs disponibles : 1
Utilisation du GPU : cuda:0


PermissionError: [Errno 13] Permission denied: '/kaggle'

** Seconde Video à traiter **

# Transcription d'audio MP3 #

https://youtu.be/afUrxn0NT2s

In [None]:
import os
import subprocess
import whisper

def format_timestamp(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = int(seconds % 60)
    milliseconds = int((seconds % 1) * 1000)
    return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"

def convert_video_to_audio(video_path, audio_path, start_time="00:00:00", stop_time="01:00:00"):
    try:
        subprocess.run([
            "ffmpeg",
            "-ss", start_time,  # Ajoute le point de départ
            "-to", stop_time,   # Ajoute le point de arrivé
            "-i", video_path,
            "-q:a", "0",
            "-map", "a",
            "-y",
            audio_path
        ], check=True)
    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"Error converting video to audio: {e}")

def transcribe_audio(model, audio_path):
    try:
        result = model.transcribe(audio_path, language="fr")
        return result
    except Exception as e:
        raise RuntimeError(f"Error transcribing audio: {e}")

def save_transcription(result, base_path):
    text_file = base_path + ".txt"
    srt_file = base_path + ".srt"

    with open(text_file, "w") as f:
        f.write(result["text"])

    with open(srt_file, "w") as f:
        for i, segment in enumerate(result["segments"]):
            start_time = format_timestamp(segment["start"])
            end_time = format_timestamp(segment["end"])
            text = segment["text"].strip()
            f.write(f"{i+1}\n{start_time} --> {end_time}\n{text}\n\n")

# Parameters
url = input("Enter the audio URL: ")
temp_dir = '/kaggle/working'
!cd /kaggle/working;wget $url

# Download the video
#try:
#    download_video(url, temp_dir)
#except Exception as e:
#    print(f"Error downloading video: {e}")
#    exit(1)

# Get the downloaded file's name
try:
    audio_file = [f for f in os.listdir(temp_dir) if f.endswith(('.mp3', '.mkv', '.mp4', '.webm'))][0]
except IndexError:
    print("Error: No audio/video file found in the specified directory.")
    exit(1)

audio_path = os.path.join(temp_dir, audio_file)
#audio_file = audio_path.rsplit('.', 1)[0] + ".mp3"

# Convert video to audio
if not os.path.exists(audio_file):
    try:
        convert_video_to_audio(video_path, audio_file)
    except RuntimeError as e:
        print(e)
        exit(1)
else:
    print(f"Audio file already exists: {audio_file}")

# Load Whisper model and transcribe
try:
    model = whisper.load_model("large-v3")
    transcription_result = transcribe_audio(model, audio_file)
except RuntimeError as e:
    print(e)
    exit(1)

# Save the transcription to text and SRT files
base_path = audio_file.rsplit('.', 1)[0]
save_transcription(transcription_result, base_path)

print(f"Processing complete. Files saved in {temp_dir}")


In [None]:
#!ffmpeg -i "/kaggle/working/chatGPT： Récupérer le texte FR d'une capsule Youtube US.mkv" -af "volume=3" output.mkv
#!rm -rf *.{webm,mp4,mp3,mkv,srt,txt}

In [None]:
#rm /kaggle/working/'"Overclocking and Fan Control for Nvidia GPUs in Linux - Comprehensive Guide".mp4'
#!ffmpeg -i "/kaggle/working/Overclocking and Fan Control for Nvidia GPUs in Linux - Comprehensive Guide.mp4"   "/kaggle/working/Overclocking and Fan Control for Nvidia GPUs in Linux - Comprehensive Guide.mp4" -vn -acodec mp3  "Linux GPU Overclocking Guide – GreenWithEnvy & CoreCtrl [iUhht9sfG_4].mp3"
#!ls -lad /kaggle/working/* #; rm -rf *.{webm,mp4,mp3,srt,txt}