In [None]:
# **Installation des packages n√©cessaires**

In [1]:
%%capture
# Installation silencieuse des d√©pendances avec gestion des conflits

# 1. Mise √† jour pip pour √©viter les probl√®mes
!pip install --upgrade pip -q

# 2. Installation FFmpeg (syst√®me)
!apt-get update -qq
!apt-get install -qq ffmpeg

# 3. Installation des packages de transcription
!pip install -q openai-whisper==20250625
!pip install -q faster-whisper==1.2.0

# 4. Packages de d√©bruitage audio
!pip install -q librosa==0.10.1
!pip install -q soundfile==0.12.1
!pip install -q noisereduce==3.0.0
!pip install -q scipy==1.11.4
!pip install -q pydub==0.25.1

# 5. Packages documents
!pip install -q python-docx==1.2.0
!pip install -q python-pptx==1.0.2

# 6. Packages LLM et NLP
!pip install -q openai==1.91.0
!pip install -q assemblyai==0.44.3
!pip install -q tiktoken==0.9.0

# 7. LangChain
!pip install -q langchain==0.3.27 langchain-community==0.3.29 langchain-core -q 2>/dev/null || true

# 8. Packages utilitaires
!pip install -q numpy==1.24.3
!pip install -q pandas matplotlib seaborn

# 9. Installation FAISS pour le RAG
!pip install -q faiss-cpu==1.12.0

print("‚úÖ Installation termin√©e!")

In [2]:
# V√©rification que tout est install√© correctement
import sys
import importlib

packages_to_check = [
    ('whisper', 'openai-whisper'),
    ('faster_whisper', 'faster-whisper'),
    ('librosa', 'librosa'),
    ('soundfile', 'soundfile'),
    ('noisereduce', 'noisereduce'),
    ('scipy', 'scipy'),
    ('pydub', 'pydub'),
    ('docx', 'python-docx'),
    ('pptx', 'python-pptx'),
    ('openai', 'openai'),
    ('langchain', 'langchain'),
    ('langchain_community', 'langchain-community'),
    ('faiss', 'faiss-cpu'),
    ('assemblyai', 'assemblyai'),
    ('tiktoken', 'tiktoken')
]

print("üîç V√©rification des packages install√©s:")
print("-" * 50)

all_ok = True
for import_name, package_name in packages_to_check:
    try:
        module = importlib.import_module(import_name)
        version = getattr(module, '__version__', 'N/A')
        print(f"‚úÖ {package_name:20} : {version}")
    except ImportError:
        print(f"‚ùå {package_name:20} : Non install√©")
        all_ok = False

if all_ok:
    print("\n‚ú® Tous les packages sont install√©s correctement!")
else:
    print("\n‚ö†Ô∏è Certains packages manquent. Relancez la cellule 1.")

üîç V√©rification des packages install√©s:
--------------------------------------------------
‚úÖ openai-whisper       : 20250625
‚úÖ faster-whisper       : 1.2.0
‚úÖ librosa              : 0.10.1
‚úÖ soundfile            : 0.12.1
‚úÖ noisereduce          : N/A
‚úÖ scipy                : 1.11.4
‚úÖ pydub                : N/A
‚úÖ python-docx          : 1.2.0
‚úÖ python-pptx          : 1.0.2
‚úÖ openai               : 1.91.0
‚úÖ langchain            : 0.3.27
‚úÖ langchain-community  : 0.3.29
‚úÖ faiss-cpu            : 1.12.0
‚úÖ assemblyai           : 0.44.3
‚úÖ tiktoken             : 0.9.0

‚ú® Tous les packages sont install√©s correctement!


# **Imports et configuration GPU**

In [3]:
# Imports standards
import os
import sys
import json
import warnings
import re
import subprocess
warnings.filterwarnings('ignore')

from datetime import datetime, timezone
import time
try:
    from zoneinfo import ZoneInfo
except Exception:
    ZoneInfo = None

from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import gc  # Garbage collector

# Imports audio et d√©bruitage
import numpy as np
import librosa
import soundfile as sf
import noisereduce as nr
from scipy.signal import butter, filtfilt, medfilt
from pydub import AudioSegment

# Imports pour la transcription
import whisper
from faster_whisper import WhisperModel

# Imports pour les documents
from docx import Document
from pptx import Presentation

# Imports pour le NLP et LLM
import openai
try:
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.vectorstores import FAISS
    from langchain_community.embeddings import OpenAIEmbeddings
    langchain_available = True
except ImportError:
    print("‚ö†Ô∏è LangChain non disponible")
    langchain_available = False

import torch
print(f"üîß PyTorch: {torch.__version__}")
print(f"üéÆ CUDA disponible: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"   GPU: {torch.cuda.get_device_name(0)}")
    print(f"   M√©moire: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

üîß PyTorch: 2.6.0+cu124
üéÆ CUDA disponible: True
   GPU: Tesla T4
   M√©moire: 15.83 GB


In [4]:
# Configuration des cl√©s API
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
OPENAI_API_KEY = user_secrets.get_secret("OPENAI_API_KEY")
ASSEMBLYAI_API_KEY = user_secrets.get_secret("ASSEMBLYAI_API_KEY")

In [5]:
# Configuration des chemins 
UPLOAD_PATH = "/kaggle/input/meeting-audio/" # Chemin des fichiers upload√©s 
OUTPUT_PATH = "/kaggle/working" # Chemin de sortie

In [6]:
# Configuration du pipeline 
@dataclass 
class Config: 
    """Configuration centralis√©e pour Kaggle""" 

    # Chemins
    input_dir: str = UPLOAD_PATH
    output_dir: str = OUTPUT_PATH
    
    timezone: str = "Indian/Antananarivo"
    
    # Mod√®le Whisper 
    whisper_model: str = "large-v3" # 'tiny', 'base', 'small', 'medium', 'large'
    whisper_device: str = "cuda" if torch.cuda.is_available() else "cpu"
    whisper_compute_type: str = "float16" if torch.cuda.is_available() else "int8"
    #device: str = "cuda" if torch.cuda.is_available() else "cpu"
    compute_type: str = "float16" if torch.cuda.is_available() else "int8"
    
    openai_model: str = "gpt-3.5-turbo" # Plus √©conomique que GPT-4 
    
    # Cl√©s API 
    openai_key: str = OPENAI_API_KEY 
    assemblyai_key: str = ASSEMBLYAI_API_KEY

    # Param√®tres audio
    denoise_method: str = "hybrid"  # ffmpeg, noisereduce, hybrid
    denoise_aggressive: bool = True
    sample_rate: int = 16000

    # Param√®tres de traitement 
    
    ## Longueur maximale d‚Äôun ‚Äúmorceau de texte‚Äù (chunk) qu‚Äôon d√©coupe avant d‚Äôenvoyer au LLM.
    ## R√®gle : chunk_size ‚âà 20-30% de la capacit√© max du mod√®le.
    chunk_size: int = 1000 # nombre de caract√®re ‚âà 200‚Äì250 tokens (selon la langue et la densit√©) √† modifier selon la limitation du mod√®le choisie (ex. GPT-3.5 ‚âà 4k tokens, GPT-4 ‚âà 8k ou 32k).
    
    ## Nombre de caract√®res r√©p√©t√©s entre deux chunks.
    ## R√®gle : overlap = 15-25% du chunk_size.
    chunk_overlap: int = 200 # nombre de caract√®re ‚âà 40 tokens. Suffisant pour garder la continuit√© (phrases coup√©es, dialogues, etc.).
    
    ## Proportion maximale de mots que le LLM a le droit de modifier dans une transcription brute.
    ## R√®gle : plus l‚Äôaudio est bruit√©, plus tu tol√®res une correction √©lev√©e. [propre (dictaphone, micro-cravate) ‚Üí mettre bas (0.10 √† 0.15). / bruyant (claquements de porte, plusieurs intervenants) ‚Üí monter √† 0.20 voire 0.25]
    max_correction_rate: float = 0.15 # Max 15% du texte peut √™tre modifi√© (Pas de r√©√©criture compl√®te ‚Üí garde la fid√©lit√© au discours original.) Evite les hallucinations
    
    ## Score minimal de confiance (0‚Äì1) pour garder une phrase transcrite par Whisper/AssemblyAI.
    confidence_threshold: float = 0.85 #Segments dont la transcription est jug√©e correcte √† au moins 85%.

    # Optimisation m√©moire pour Kaggle 
    num_workers: int = 2  # Ajust√© pour T4
    batch_size: int = 4 # Pour le traitement par lots [Si CPU seulement ‚Üí descendre (1‚Äì2).]
    use_gpu: bool = torch.cuda.is_available()

    # NOUVEAUX PARAM√àTRES ANTI-HALLUCINATIONS
    beam_size: int = 3  # Plus de beam = plus de pr√©cision
    best_of: int = 2    # Prendre le meilleur de 3 tentatives
    patience: float = 1.0
    temperature: float = 0.0  # Pas de randomness
    
    # Seuils de confiance stricts
    no_speech_threshold: float = 0.8 # Plus strict
    logprob_threshold: float = -0.5  # Plus strict
    compression_ratio_threshold: float = 2.8  # √âvite les r√©p√©titions

    # NOUVEAU: Param√®tres anti-r√©p√©tition
    max_initial_timestamp: float = 1.0
    suppress_blank: bool = True
    suppress_tokens: str = "-1"  # Supprime les tokens probl√©matiques
    
    # VAD (Voice Activity Detection) optimis√©
    use_vad: bool = True
    vad_threshold: float = 0.45
    vad_min_speech_duration_ms: int = 500  # Minimum 250ms de parole
    vad_max_speech_duration_s: float = 60  # Max 30s par segment
    vad_min_silence_duration_ms: int = 1000  # 2s de silence minimum
    vad_speech_pad_ms: int = 400

    # NOUVEAU: Chunking intelligent
    chunk_length_s: int = 300  # Chunks de 5 minutes max
    chunk_overlap_s: int = 30   # Overlap de 30 secondes
    
    # Audio processing
    sample_rate: int = 16000
    use_denoise: bool = "auto"  # auto, True, False
    denoise_stationary: float = 0.97
    denoise_prop_decrease: float = 1.0
    
    # D√©tection r√©p√©titions
    repetition_penalty: float = 1.2  # NOUVEAU
    max_repetitions: int = 3  # NOUVEAU: max r√©p√©titions tol√©r√©es
    
    # Prompt sp√©cialis√© CA - AM√âLIOR√â
    # PROMPT AM√âLIOR√â avec contexte financier malgache
    initial_prompt: str = (
        "Conseil d'administration Madagascar. Vocabulaire financier: Ariary, millions, "
        "budget, rapport financier, r√©solution, d√©lib√©ration. "
        "Termes sp√©cifiques: Fihariana, SON'INVEST, UNIMA, AQUALMA. "
        "Intervenants: Pr√©sident, Directeur G√©n√©ral, Commissaire aux Comptes. "
        "Format: discours naturel sans r√©p√©titions."
    )

config = Config() 
print(f"‚úÖ Configuration charg√©e - Mod√®le Whisper: {config.whisper_model}")

‚úÖ Configuration charg√©e - Mod√®le Whisper: large-v3


In [7]:
def prepare_audio_file(audio_path: str) -> Dict:
    """Pr√©pare et valide le fichier audio pour la transcription"""
    import wave
    import contextlib
    
    file_info = {
        "path": audio_path,
        "exists": os.path.exists(audio_path),
        "size_mb": 0,
        "duration_seconds": 0,
        "format": audio_path.split('.')[-1],
        "sample_rate": 0,
        "channels": 0
    }
    
    if file_info["exists"]:
        file_info["size_mb"] = os.path.getsize(audio_path) / (1024 * 1024)
        
        try:
            # Charger avec librosa pour info
            y, sr = librosa.load(audio_path, sr=None, duration=10)
            file_info["sample_rate"] = sr
            
            # Dur√©e totale
            duration = librosa.get_duration(path=audio_path)
            file_info["duration_seconds"] = duration
            
        except Exception as e:
            print(f"‚ö†Ô∏è Erreur lecture audio: {e}")
    
    return file_info

In [8]:
def format_timestamp(seconds: float) -> str:
    """Convertit des secondes en format HH:MM:SS"""
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = int(seconds % 60)
    return f"{hours:02d}:{minutes:02d}:{secs:02d}"

# **Pr√©processing et D√©bruitage Audio**
**Classe de d√©bruitage audio avanc√©**


In [9]:
class AudioPreprocessor:
    """Service de pr√©traitement audio avec d√©tection intelligente du bruit"""
    
    def __init__(self, config: Config):
        self.config = config
    
    def analyze_noise_profile(self, audio_path: str, duration: int = 30) -> Dict:
        """Analyse le profil de bruit de l'audio"""
        print("üîç Analyse du niveau de bruit...")
        
        y, sr = librosa.load(audio_path, sr=self.config.sample_rate, duration=duration)
        
        # Calculer le SNR
        signal_power = np.mean(y ** 2)
        noise_floor = np.percentile(np.abs(y), 5) ** 2
        snr = 10 * np.log10(signal_power / (noise_floor + 1e-10))
        
        # D√©tecter les silences
        silence_threshold = np.percentile(np.abs(y), 20)
        silence_ratio = np.sum(np.abs(y) < silence_threshold) / len(y)
        
        # D√©tecter les impulsions (clics, pops)
        impulses = np.sum(np.abs(np.diff(y)) > 0.5) / len(y)
        
        return {
            "snr": float(snr),
            "silence_ratio": float(silence_ratio),
            "impulse_ratio": float(impulses),
            "needs_denoising": bool(snr < 20 or impulses > 0.001)
        }
    
    def apply_denoising(self, audio_path: str, output_path: str = None) -> str:
        """Applique un d√©bruitage intelligent"""
        print("üîß Application du d√©bruitage adaptatif...")
        
        if output_path is None:
            output_path = audio_path.replace(".mp3", "_denoised.wav")
        
        # Charger l'audio complet
        y, sr = librosa.load(audio_path, sr=self.config.sample_rate)
        
        # D√©bruitage stationnaire
        y_denoised = nr.reduce_noise(
            y=y,
            sr=sr,
            stationary=True,
            prop_decrease=self.config.denoise_prop_decrease
        )
        
        # Normalisation douce
        max_val = np.max(np.abs(y_denoised))
        if max_val > 0:
            y_denoised = y_denoised * (0.95 / max_val)
        
        # Sauvegarder
        sf.write(output_path, y_denoised, sr)
        print(f"‚úÖ Audio d√©bruit√© sauvegard√©: {output_path}")
        
        return output_path

# **Transcription Audio**
**Service de transcription avec audio nettoy√©**

In [10]:
class TranscriptionService:
    """Service de transcription Whisper avec d√©tection et correction des r√©p√©titions"""
    
    def __init__(self, config: Config):
        self.config = config
        self.model = None
        self.preprocessor = AudioPreprocessor(config)
        self.repetition_buffer = []  # Buffer pour d√©tecter les r√©p√©titions
        
    def load_whisper_model(self):
        """Charge le mod√®le Whisper avec gestion m√©moire optimis√©e"""
        if self.model is None:
            print(f"‚è≥ Chargement Whisper {self.config.whisper_model}...")
            
            # Lib√©rer la m√©moire GPU
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
                gc.collect()
            
            # Charger avec faster-whisper (plus efficace)
            self.model = WhisperModel(
                self.config.whisper_model,
                device=self.config.whisper_device,
                compute_type=self.config.whisper_compute_type,
                cpu_threads=8 if self.config.whisper_device == "cpu" else 0,
                num_workers=2  # R√©duit pour √©viter les probl√®mes
            )
            
            print("‚úÖ Mod√®le charg√© sur", self.config.whisper_device.upper())
        
        return self.model
    
    def detect_repetitions(self, text: str, window_size: int = 100) -> bool:
        """D√©tecte les r√©p√©titions dans le texte"""
        words = text.lower().split()
        
        if len(words) < window_size:
            return False
        
        # V√©rifier les r√©p√©titions dans une fen√™tre glissante
        for i in range(len(words) - window_size):
            window = words[i:i + window_size]
            unique_ratio = len(set(window)) / len(window)
            
            # Si moins de 30% de mots uniques, c'est une r√©p√©tition
            if unique_ratio < 0.3:
                return True
        
        # V√©rifier les r√©p√©titions exactes de phrases
        sentences = text.split('.')
        if len(sentences) > 3:
            last_3 = sentences[-3:]
            if len(set(last_3)) == 1 and len(last_3[0]) > 10:
                return True
        
        return False
    
    def clean_repetitions(self, segments: List[Dict]) -> List[Dict]:
        """Nettoie les segments r√©p√©titifs"""
        cleaned = []
        repetition_count = {}
        
        for segment in segments:
            text = segment['text'].strip()
            
            # Compter les occurrences
            if text in repetition_count:
                repetition_count[text] += 1
                
                # Si trop de r√©p√©titions, ignorer
                if repetition_count[text] > self.config.max_repetitions:
                    continue
            else:
                repetition_count[text] = 1
            
            # D√©tecter les r√©p√©titions partielles
            if len(cleaned) > 0:
                last_text = cleaned[-1]['text']
                
                # Si le texte est identique ou tr√®s similaire au pr√©c√©dent
                if text == last_text or (len(text) > 20 and text in last_text):
                    continue
            
            cleaned.append(segment)
        
        return cleaned
    
    def transcribe_chunk(self, audio_chunk: np.ndarray, sr: int, offset: float = 0) -> List[Dict]:
        """Transcrit un chunk audio avec gestion des r√©p√©titions"""
        
        # Sauvegarder temporairement le chunk
        temp_file = f"/tmp/chunk_{offset}.wav"
        sf.write(temp_file, audio_chunk, sr)
        
        try:
            # Transcrire avec param√®tres optimis√©s
            segments, info = self.model.transcribe(
                temp_file,
                language="fr",
                beam_size=self.config.beam_size,
                best_of=self.config.best_of,
                patience=self.config.patience,
                temperature=self.config.temperature,
                compression_ratio_threshold=self.config.compression_ratio_threshold,
                log_prob_threshold=self.config.logprob_threshold,
                no_speech_threshold=self.config.no_speech_threshold,
                condition_on_previous_text=False,  # IMPORTANT: D√©sactiv√© pour √©viter propagation
                initial_prompt=self.config.initial_prompt,
                vad_filter=self.config.use_vad,
                vad_parameters={
                    "threshold": self.config.vad_threshold,
                    "min_speech_duration_ms": self.config.vad_min_speech_duration_ms,
                    "min_silence_duration_ms": self.config.vad_min_silence_duration_ms,
                    "speech_pad_ms": self.config.vad_speech_pad_ms,
                    "max_speech_duration_s": self.config.vad_max_speech_duration_s
                },
                word_timestamps=True,
                suppress_blank=self.config.suppress_blank,
                max_initial_timestamp=self.config.max_initial_timestamp
            )
            
            # Convertir en liste et ajuster les timestamps
            segment_list = []
            for seg in segments:
                segment_dict = {
                    'id': len(segment_list),
                    'start': float(seg.start + offset),
                    'end': float(seg.end + offset),
                    'text': seg.text,
                    'confidence': float(getattr(seg, 'confidence', 0)),
                    'no_speech_prob': float(seg.no_speech_prob) if hasattr(seg, 'no_speech_prob') else 0.0
                }
                
                # Filtrer les segments de faible qualit√©
                if segment_dict['confidence'] < -1.0 or segment_dict['no_speech_prob'] > 0.9:
                    continue
                
                segment_list.append(segment_dict)
            
            return segment_list
            
        finally:
            # Nettoyer le fichier temporaire
            if os.path.exists(temp_file):
                os.remove(temp_file)
    
    def transcribe_with_chunking(self, audio_path: str) -> Dict:
        """Transcription avec chunking intelligent pour √©viter les d√©rives"""
        print("üéØ Transcription avec chunking intelligent...")
        
        # Charger l'audio
        y, sr = librosa.load(audio_path, sr=self.config.sample_rate)
        duration = len(y) / sr
        
        # Calculer les chunks
        chunk_samples = int(self.config.chunk_length_s * sr)
        overlap_samples = int(self.config.chunk_overlap_s * sr)
        
        all_segments = []
        
        # Traiter par chunks
        num_chunks = max(1, int(np.ceil((len(y) - overlap_samples) / (chunk_samples - overlap_samples))))
        
        for i in range(num_chunks):
            start_sample = i * (chunk_samples - overlap_samples)
            end_sample = min(start_sample + chunk_samples, len(y))
            
            chunk = y[start_sample:end_sample]
            offset = start_sample / sr
            
            print(f"  Chunk {i+1}/{num_chunks}: {format_timestamp(offset)} - {format_timestamp(end_sample/sr)}")
            
            # Transcrire le chunk
            chunk_segments = self.transcribe_chunk(chunk, sr, offset)
            
            # D√©tecter et nettoyer les r√©p√©titions
            chunk_segments = self.clean_repetitions(chunk_segments)
            
            # Fusionner avec les segments pr√©c√©dents
            if i > 0 and len(all_segments) > 0:
                # G√©rer l'overlap - garder seulement les nouveaux segments apr√®s l'overlap
                overlap_time = offset + self.config.chunk_overlap_s / 2
                chunk_segments = [s for s in chunk_segments if s['start'] > overlap_time]
            
            all_segments.extend(chunk_segments)
            
            # V√©rification anti-d√©rive
            if len(all_segments) > 10:
                recent_texts = [s['text'] for s in all_segments[-10:]]
                if len(set(recent_texts)) == 1:
                    print("‚ö†Ô∏è R√©p√©tition d√©tect√©e - r√©initialisation du contexte")
                    # R√©initialiser pour le prochain chunk
                    self.config.initial_prompt = "Transcription suite. Nouveau contexte."
        
        # Nettoyer une derni√®re fois l'ensemble
        all_segments = self.clean_repetitions(all_segments)
        
        # Construire la transcription finale
        transcription = " ".join([s['text'] for s in all_segments])
        
        return {
            "transcription": transcription,
            "segments": all_segments,
            "duration": float(duration),
            "language": "fr"
        }
    
    def transcribe_with_preprocessing(self, audio_path: str, preprocess: bool = None, language: str = "fr") -> Dict:
        """Pipeline complet avec pr√©traitement optionnel"""
        print("=" * 60)
        print("üéØ TRANSCRIPTION AVEC PR√âPROCESSING INTELLIGENT")
        print("=" * 60)
        
        result = {
            "status": "processing",
            "original_audio": audio_path,
            "preprocessing_applied": preprocess
        }
        
        # √âtape 1: Analyse du bruit si auto
        if preprocess == "auto" or preprocess is None:
            noise_profile = self.preprocessor.analyze_noise_profile(audio_path)
            preprocess = noise_profile["needs_denoising"]
            result["noise_profile"] = noise_profile
            
            print(f"  SNR: {noise_profile['snr']:.1f} dB")
            print(f"  Silence: {noise_profile['silence_ratio']*100:.1f}%")
            print(f"  Impulsions: {noise_profile['impulse_ratio']*1000:.2f}/1000 samples")
            
            if preprocess:
                print("  ‚Üí D√©bruitage recommand√©")
            else:
                print("  ‚Üí Audio propre, pas de d√©bruitage n√©cessaire")
        
        # √âtape 2: Pr√©processing si n√©cessaire
        if preprocess:
            print("\nüîß Application du d√©bruitage...")
            audio_to_transcribe = self.preprocessor.apply_denoising(audio_path)
        else:
            audio_to_transcribe = audio_path
        
        # √âtape 3: Transcription avec chunking
        print(f"\nüìù Transcription de l'audio {'nettoy√©' if preprocess else 'original'}...")
        
        try:
            # Charger le mod√®le
            self.load_whisper_model()
            
            # Transcrire avec chunking intelligent
            transcription_result = self.transcribe_with_chunking(audio_to_transcribe)
            
            # Calculer la confiance moyenne
            if transcription_result["segments"]:
                avg_confidence = np.mean([s.get('confidence', 0) for s in transcription_result["segments"]])
            else:
                avg_confidence = 0
            
            result.update({
                "status": "success",
                "transcription": transcription_result["transcription"],
                "segments": transcription_result["segments"],
                "duration": transcription_result["duration"],
                "language": transcription_result["language"],
                "confidence": avg_confidence
            })
            
            print(f"\n‚úÖ Transcription r√©ussie!")
            print(f"  üìä Confiance moyenne: {avg_confidence:.2%}")
            print(f"  üìù Longueur: {len(result['transcription'])} caract√®res")
            print(f"  ‚è±Ô∏è Dur√©e audio: {result['duration']:.1f}s")
            print(f"  üìë Segments: {len(result['segments'])}")
            
        except Exception as e:
            print(f"‚ùå Erreur transcription: {e}")
            result["status"] = "error"
            result["error"] = str(e)
        
        finally:
            # Nettoyer les fichiers temporaires
            if preprocess and audio_to_transcribe != audio_path:
                if os.path.exists(audio_to_transcribe):
                    os.remove(audio_to_transcribe)
        
        return result

***Comment r√©gler les param√®tres selon les cas***

Cas A ‚Äî Audio propre (dictaphones, salle calme)
*  beam_size=3, best_of=1‚Äì2 (plus rapide)
* no_speech_threshold=0.6 (ok)
* temperature=0.0
* VAD : min_silence_duration_ms=1500

Cas B ‚Äî Audio bruit√© (portes, brouhaha)
* beam_size=5, best_of=5 (qualit√©)
* baisser no_speech_threshold √† 0.5 si coupures
* VAD : threshold=0.4‚Äì0.5, min_speech_duration_ms=200, min_silence_duration_ms=1800‚Äì2200
* Garde-fous : garder compression_ratio_threshold=2.4

Cas C ‚Äî CPU-only (pas de GPU Kaggle)
* compute_type="int8", mod√®le tiny ou base
* beam_size=3, best_of=1
* Threads : cpu_threads=2, num_workers=1
* Attends un RTF ‚âà 2‚Äì5 (selon longueur)

In [11]:
# Exemple d'utilisation
#result = transcription_service.transcribe_audio(audio_file)
#print(f"Transcription: {result['transcription'][:500]}...")

# **Service d'analyse de qualit√©**

In [12]:
class QualityAnalyzer:
    """Analyse la qualit√© de la transcription et d√©tecte les probl√®mes"""
    
    def __init__(self, config: Config):
        self.config = config
    
    def analyze_transcription(self, result: Dict) -> Dict:
        """Analyse compl√®te de la qualit√©"""
        
        if result["status"] != "success":
            return {"status": "error", "message": "Transcription √©chou√©e"}
        
        text = result["transcription"]
        segments = result["segments"]
        
        analysis = {
            "total_length": len(text),
            "total_segments": len(segments),
            "repetitions": {},
            "quality_issues": [],
            "statistics": {}
        }
        
        # D√©tecter les r√©p√©titions
        words = text.lower().split()
        word_freq = {}
        for word in words:
            if len(word) > 3:  # Ignorer les mots courts
                word_freq[word] = word_freq.get(word, 0) + 1
        
        # Identifier les mots trop fr√©quents
        total_words = len(words)
        for word, count in word_freq.items():
            ratio = count / total_words
            if ratio > 0.05:  # Plus de 5% du texte
                analysis["repetitions"][word] = {
                    "count": count,
                    "ratio": ratio
                }
                if ratio > 0.1:
                    analysis["quality_issues"].append(f"Mot '{word}' r√©p√©t√© {count} fois ({ratio:.1%})")
        
        # Analyser les segments
        low_confidence = sum(1 for s in segments if s.get('confidence', 0) < -0.5)
        high_no_speech = sum(1 for s in segments if s.get('no_speech_prob', 0) > 0.6)
        
        avg_conf = float(np.mean([s.get('confidence', 0) for s in segments])) if segments else 0.0
        
        analysis["statistics"] = {
            "avg_confidence": avg_conf,
            "low_confidence_segments": low_confidence,
            "high_no_speech_segments": high_no_speech,
            "words_per_segment": float(total_words / len(segments)) if segments else 0.0
        }
        
        # Identifier les probl√®mes
        if low_confidence > len(segments) * 0.3:
            analysis["quality_issues"].append(f"{low_confidence} segments avec faible confiance")
        
        if high_no_speech > len(segments) * 0.2:
            analysis["quality_issues"].append(f"{high_no_speech} segments d√©tect√©s comme silence")
        
        # Score de qualit√© global
        quality_score = 100
        quality_score -= len(analysis["repetitions"]) * 5
        quality_score -= len(analysis["quality_issues"]) * 10
        quality_score = max(0, quality_score)
        
        analysis["quality_score"] = quality_score
        
        return analysis

# **Fallback AssemblyAI (si √©chec Whisper)**

In [13]:
class AssemblyAIFallback:
    """Service de fallback avec AssemblyAI"""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        
    def transcribe_with_assemblyai(self, audio_path: str) -> Dict:
        """
        Transcription de secours via AssemblyAI
        
        Args:
            audio_path: Chemin du fichier audio
            
        Returns:
            Dict avec la transcription
        """
        if not self.api_key:
            return {
                "status": "error",
                "error": "Cl√© API AssemblyAI non configur√©e"
            }
        
        try:
            import assemblyai as aai
            
            print("üîÑ Utilisation du fallback AssemblyAI...")
            
            aai.settings.api_key = self.api_key
            transcriber = aai.Transcriber()
            
            # Upload et transcription
            config_lang = aai.TranscriptionConfig(
                language_code="fr",
                punctuate=True,
                format_text=True,
                disfluencies=True,
                speaker_labels=True
            )
            transcript = transcriber.transcribe(audio_path, config=config_lang)
            
            if transcript.status == aai.TranscriptStatus.error:
                raise Exception(f"Erreur AssemblyAI: {transcript.error}")
            
            # Attente de la transcription
            while transcript.status not in [aai.TranscriptStatus.completed, aai.TranscriptStatus.error]:
                time.sleep(5)
                transcript = transcriber.get_transcript(transcript.id)
            
            return {
                "status": "success",
                "method": "assemblyai",
                "transcription": transcript.text,
                "confidence": transcript.confidence if hasattr(transcript, 'confidence') else 0.85,
                "words": transcript.words if hasattr(transcript, 'words') else []
            }
            
        except Exception as e:
            print(f"‚ùå Erreur AssemblyAI: {str(e)}")
            return {
                "status": "error",
                "error": str(e),
                "method": "assemblyai"
            }

# Service de fallback
fallback_service = AssemblyAIFallback(config.assemblyai_key)

1. Par d√©faut, la langue est auto. Pour ton cas, force fran√ßais :
        config = aai.TranscriptionConfig(language_code="fr")
2. Diarisation (orateurs)
        config = aai.TranscriptionConfig(speaker_labels=True)

Exemple :
    config = aai.TranscriptionConfig(language_code="fr", speaker_labels=True)
    transcript = transcriber.transcribe(audio_path, config=config)

Appel :
    Si TranscriptionService.transcribe_audio renvoie status="error" ou un real_time_factor >> 5 (trop lent) ou trop de segments sous ton confidence_threshold, alors :
        > result = fallback_service.transcribe_with_assemblyai(audio_path)

**Pipeline de transcription avec gestion automatique du fallback**

In [14]:
def transcribe_audio_pipeline(
    audio_path: str, 
    config: Config,
    force_denoise: Optional[bool] = None,
    analyze_quality: bool = True
) -> Dict:
    """
    Pipeline complet de transcription avec analyse de qualit√©
    
    Args:
        audio_path: Chemin du fichier audio
        config: Configuration
        force_denoise: Forcer le d√©bruitage (None=auto)
        analyze_quality: Analyser la qualit√© apr√®s transcription
    """
    
    print("=" * 70)
    print("üéØ PIPELINE DE TRANSCRIPTION INTELLIGENT V2")
    print("=" * 70)
    
    # Pr√©parer le fichier
    file_info = prepare_audio_file(audio_path)
    print(f"üìÅ Fichier: {os.path.basename(audio_path)}")
    print(f"   Format: {file_info['format']}")
    print(f"   Dur√©e: {format_timestamp(file_info['duration_seconds'])}")
    print(f"   Taille: {file_info['size_mb']:.1f} MB")
    
    # Service de transcription
    transcription_service = TranscriptionService(config)
    
    # D√©terminer si d√©bruitage n√©cessaire
    if force_denoise is None:
        force_denoise = "auto"
    
    # Transcription
    result = transcription_service.transcribe_with_preprocessing(
        audio_path,
        preprocess=force_denoise,
        language="fr"
    )
    
    # Analyse de qualit√©
    if analyze_quality and result["status"] == "success":
        print("\nüìä Analyse de la qualit√©...")
        analyzer = QualityAnalyzer(config)
        quality = analyzer.analyze_transcription(result)
        result["quality_analysis"] = quality
        
        print(f"   Score de qualit√©: {quality['quality_score']}/100")
        
        if quality["quality_issues"]:
            print("   ‚ö†Ô∏è Probl√®mes d√©tect√©s:")
            for issue in quality["quality_issues"]:
                print(f"      - {issue}")
        
        if quality["repetitions"]:
            print("   üîÑ R√©p√©titions excessives:")
            for word, data in list(quality["repetitions"].items())[:3]:
                print(f"      - '{word}': {data['count']} fois ({data['ratio']:.1%})")
    
    # Sauvegarder le r√©sultat
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = f"{config.output_dir}/transcription_{timestamp}.json"
    
    # Convertir les types NumPy en types Python natifs pour JSON
    def convert_numpy_types(obj):
        """Convertit r√©cursivement les types NumPy en types Python natifs"""
        import numpy as np
        
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, (np.bool_, bool)):
            return bool(obj)
        elif isinstance(obj, dict):
            return {key: convert_numpy_types(value) for key, value in obj.items()}
        elif isinstance(obj, list):
            return [convert_numpy_types(item) for item in obj]
        else:
            return obj
    
    # Nettoyer le r√©sultat avant sauvegarde
    result_clean = convert_numpy_types(result)
    
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(result_clean, f, ensure_ascii=False, indent=2)
    
    print(f"\nüíæ R√©sultat sauvegard√©: {output_file}")
    
    # R√©sum√© final
    if result["status"] == "success":
        print("\n" + "=" * 70)
        print("‚úÖ TRANSCRIPTION R√âUSSIE")
        print("=" * 70)
        print(f"üìù M√©thode: Whisper {config.whisper_model}")
        print(f"üìä Confiance: {result.get('confidence', 0):.2%}")
        print(f"üìë Segments: {len(result.get('segments', []))}")
        print(f"üìÑ Longueur: {len(result.get('transcription', ''))} caract√®res")
        
        if analyze_quality:
            print(f"‚≠ê Qualit√©: {result['quality_analysis']['quality_score']}/100")
        
        # Aper√ßu
        text = result.get('transcription', '')
        if text:
            print(f"\nüìñ Aper√ßu (300 premiers caract√®res):")
            print(f"   {text[:300]}...")
    else:
        print(f"\n‚ùå √âchec transcription: {result.get('error')}")
    
    return result

In [15]:
# Test avec votre fichier audio
#audio_file = f"{UPLOAD_PATH}atelier.mp3"
#audio_file = f"{UPLOAD_PATH}test_1h.wav"
audio_file = f"{UPLOAD_PATH}test_30mn.mp3"
#audio_info = prepare_audio_file(audio_file)

In [16]:
# transcription_result = transcribe_audio_pipeline(
#             audio_file, 
#             config,
#             force_denoise=None  # Auto-d√©tection
#         )

In [17]:
# V√©rifier l'existence du fichier
if os.path.exists(audio_file):
    print(f"‚úÖ Fichier trouv√©: {audio_file}")
    
    # Lancer la transcription avec le pipeline am√©lior√©
    transcription_result = transcribe_audio_pipeline(
        audio_file, 
        config,
        force_denoise=None,  # Auto-d√©tection
        analyze_quality=True  # Analyse de qualit√© activ√©e
    )
    
    # Afficher les statistiques finales
    if transcription_result["status"] == "success":
        print("\nüìà STATISTIQUES FINALES:")
        print("-" * 40)
        
        if "quality_analysis" in transcription_result:
            qa = transcription_result["quality_analysis"]
            print(f"Score qualit√©: {qa['quality_score']}/100")
            print(f"R√©p√©titions d√©tect√©es: {len(qa['repetitions'])}")
            print(f"Probl√®mes identifi√©s: {len(qa['quality_issues'])}")
            
            if qa['statistics']:
                print(f"Confiance moyenne: {qa['statistics']['avg_confidence']:.3f}")
                print(f"Mots/segment: {qa['statistics']['words_per_segment']:.1f}")
else:
    print(f"‚ùå Fichier non trouv√©: {audio_file}")
    print("Veuillez ajuster le chemin du fichier audio dans la cellule ci-dessus.")

‚úÖ Fichier trouv√©: /kaggle/input/meeting-audio/test_30mn.mp3
üéØ PIPELINE DE TRANSCRIPTION INTELLIGENT V2
üìÅ Fichier: test_30mn.mp3
   Format: mp3
   Dur√©e: 00:32:08
   Taille: 73.5 MB
üéØ TRANSCRIPTION AVEC PR√âPROCESSING INTELLIGENT
üîç Analyse du niveau de bruit...
  SNR: 41.1 dB
  Silence: 20.0%
  Impulsions: 0.00/1000 samples
  ‚Üí Audio propre, pas de d√©bruitage n√©cessaire

üìù Transcription de l'audio original...
‚è≥ Chargement Whisper large-v3...


config.json: 0.00B [00:00, ?B/s]

preprocessor_config.json:   0%|          | 0.00/340 [00:00<?, ?B/s]

vocabulary.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

model.bin:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

‚úÖ Mod√®le charg√© sur CUDA
üéØ Transcription avec chunking intelligent...
  Chunk 1/8: 00:00:00 - 00:05:00
  Chunk 2/8: 00:04:30 - 00:09:30
  Chunk 3/8: 00:09:00 - 00:14:00
  Chunk 4/8: 00:13:30 - 00:18:30
  Chunk 5/8: 00:18:00 - 00:23:00
  Chunk 6/8: 00:22:30 - 00:27:30
  Chunk 7/8: 00:27:00 - 00:32:00
  Chunk 8/8: 00:31:30 - 00:32:08

‚úÖ Transcription r√©ussie!
  üìä Confiance moyenne: 0.00%
  üìù Longueur: 15870 caract√®res
  ‚è±Ô∏è Dur√©e audio: 1928.0s
  üìë Segments: 232

üìä Analyse de la qualit√©...
   Score de qualit√©: 90/100
   ‚ö†Ô∏è Probl√®mes d√©tect√©s:
      - 71 segments d√©tect√©s comme silence

üíæ R√©sultat sauvegard√©: /kaggle/working/transcription_20250924_053550.json

‚úÖ TRANSCRIPTION R√âUSSIE
üìù M√©thode: Whisper large-v3
üìä Confiance: 0.00%
üìë Segments: 232
üìÑ Longueur: 15870 caract√®res
‚≠ê Qualit√©: 90/100

üìñ Aper√ßu (300 premiers caract√®res):
    R√©alisation des documents n√©cessaires √† ce Conseil  ...  Merci.  Et la hausse du taux de