# Childes Whisper Transcription and Analysis

## Data processsing

In [None]:

"""
NOTEBOOK: Data Processing Pipeline
√âtape 1: Nettoyer les .cha et matcher avec audio
√âtape 2: Segmenter l'audio bas√© sur les timestamps
"""

from pathlib import Path
from typing import List, Dict, Tuple
import subprocess
from dataclasses import dataclass
from extract_wor import extract_wor_segments, WorSegment



In [None]:
# ============================================================================
# ZONE 1: FILE MATCHING - Matcher les .cha avec les .mp3
# ============================================================================

def find_matching_files(cha_dir: Path, audio_dir: Path, audio_extensions: List[str] = None) -> Dict:
    """
    Trouver les paires .cha/.audio correspondantes
    
    Args:
        cha_dir: Dossier contenant les .cha
        audio_dir: Dossier contenant les fichiers audio
        audio_extensions: Extensions √† chercher (par d√©faut: .wav, .mp3, .m4a, .flac)
    
    Returns:
        Dict avec:
        - 'matched': liste de tuples (cha_file, audio_file)
        - 'cha_missing_audio': .cha sans audio correspondant
        - 'audio_orphans': audio sans .cha correspondant
    """
    
    if audio_extensions is None:
        audio_extensions = [".wav", ".mp3", ".m4a", ".flac"]
    
    # Lister les .cha
    cha_files = sorted(cha_dir.glob("**/*.cha"))
    cha_stems = {f.stem: f for f in cha_files}
    
    # Lister les audio
    audio_files = []
    for ext in audio_extensions:
        audio_files.extend(audio_dir.glob(f"**/*{ext}"))
    audio_stems = {f.stem: f for f in audio_files}
    
    # Matcher
    matched = []
    cha_missing_audio = []
    
    for stem, cha_file in cha_stems.items():
        if stem in audio_stems:
            matched.append((cha_file, audio_stems[stem]))
        else:
            cha_missing_audio.append(cha_file)
    
    # Audio orphans
    audio_orphans = [f for stem, f in audio_stems.items() if stem not in cha_stems]
    
    return {
        "matched": matched,
        "cha_missing_audio": cha_missing_audio,
        "audio_orphans": audio_orphans,
        "total_cha": len(cha_files),
        "total_audio": len(audio_files),
        "matched_count": len(matched)
    }


def print_matching_report(match_result: Dict):
    """Afficher un rapport du matching"""
    
    print("\n" + "="*70)
    print("üìä MATCHING REPORT: .cha ‚Üî Audio")
    print("="*70)
    
    print(f"\nüìÅ Fichiers trouv√©s:")
    print(f"   Total .cha:     {match_result['total_cha']}")
    print(f"   Total audio:    {match_result['total_audio']}")
    print(f"   ‚úÖ Match√©s:      {match_result['matched_count']}")
    
    print(f"\n‚ö†Ô∏è  Manquants:")
    print(f"   .cha sans audio: {len(match_result['cha_missing_audio'])}")
    if match_result['cha_missing_audio']:
        for cha in match_result['cha_missing_audio'][:5]:
            print(f"      - {cha.name}")
        if len(match_result['cha_missing_audio']) > 5:
            print(f"      ... et {len(match_result['cha_missing_audio']) - 5} autres")
    
    print(f"\n   Audio orphans:   {len(match_result['audio_orphans'])}")
    if match_result['audio_orphans']:
        for audio in match_result['audio_orphans'][:5]:
            print(f"      - {audio.name}")
        if len(match_result['audio_orphans']) > 5:
            print(f"      ... et {len(match_result['audio_orphans']) - 5} autres")
    
    print("\n" + "="*70 + "\n")


In [None]:
# ============================================================================
# ZONE 2: SEGMENT EXTRACTION - Extraire les segments .cha (d√©j√† fait par extract_wor)
# ============================================================================

def extract_segments_from_matched_files(matched_pairs: List[Tuple[Path, Path]], debug: bool = False) -> List[WorSegment]:
    """
    Extraire les WorSegment de tous les fichiers .cha match√©s
    
    Args:
        matched_pairs: Liste de tuples (cha_file, audio_file) du matching
        debug: Afficher les infos
    
    Returns:
        Liste compl√®te de WorSegment avec file_name rempli
    """
    
    all_segments = []
    
    if debug:
        print(f"\nüîÑ Extraction de {len(matched_pairs)} fichiers .cha")
    
    for cha_file, audio_file in matched_pairs:
        if debug:
            print(f"  Processing {cha_file.name}...", end=" ")
        
        segments = extract_wor_segments(cha_file, debug=False)
        all_segments.extend(segments)
        
        if debug:
            print(f"‚úì ({len(segments)} segments)")
    
    if debug:
        print(f"‚úÖ Total: {len(all_segments)} segments extraits\n")
    
    return all_segments


In [None]:
# ============================================================================
# ZONE 3: AUDIO EXTRACTION - D√©couper les audio bas√© sur les timestamps
# ============================================================================

class AudioSegmenter:
    """D√©couper les fichiers audio en segments bas√© sur les timestamps"""
    
    def __init__(self, output_dir: Path, sample_rate: int = 16000, mono: bool = True):
        """
        Initialiser le segmenteur audio
        
        Args:
            output_dir: Dossier o√π sauvegarder les segments
            sample_rate: Fr√©quence d'√©chantillonnage (16000 Hz standard pour ASR)
            mono: Convertir en mono
        """
        self.output_dir = output_dir
        self.sample_rate = sample_rate
        self.mono = mono
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        # Cr√©er dossiers par speaker
        self.speaker_dirs = {}
    
    def _get_speaker_dir(self, speaker: str) -> Path:
        """Obtenir ou cr√©er le dossier pour un speaker"""
        if speaker not in self.speaker_dirs:
            speaker_dir = self.output_dir / speaker
            speaker_dir.mkdir(exist_ok=True)
            self.speaker_dirs[speaker] = speaker_dir
        return self.speaker_dirs[speaker]
    
    def extract_segment(self, audio_file: Path, start_ms: int, end_ms: int, output_path: Path) -> bool:
        """
        Extraire un segment audio avec ffmpeg
        
        Args:
            audio_file: Fichier audio source
            start_ms: D√©but en millisecondes
            end_ms: Fin en millisecondes
            output_path: Chemin de sortie
        
        Returns:
            True si succ√®s, False sinon
        """
        
        if not audio_file.exists():
            return False
        
        start_sec = start_ms / 1000.0
        duration_sec = (end_ms - start_ms) / 1000.0
        
        cmd = [
            "ffmpeg",
            "-i", str(audio_file),
            "-ss", str(start_sec),
            "-t", str(duration_sec),
            "-acodec", "pcm_s16le",
            "-ar", str(self.sample_rate),
            "-ac", "1" if self.mono else "2",
            "-y",
            str(output_path)
        ]
        
        try:
            subprocess.run(cmd, check=True, capture_output=True, timeout=10)
            return True
        except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
            return False
    
    def segment_all(self, segments: List[WorSegment], matched_pairs: Dict[str, Path], 
                   batch_size: int = 100) -> Dict:
        """
        Segmenter TOUS les fichiers audio bas√© sur les WorSegment
        
        Args:
            segments: Liste de WorSegment avec timestamps
            matched_pairs: Dict {file_name (stem): audio_file_path}
            batch_size: Afficher progress tous les N segments
        
        Returns:
            Dict avec r√©sultats du segmentation
        """
        
        # Cr√©er lookup dict: file_name -> audio_path
        audio_lookup = {audio.stem: audio for cha, audio in matched_pairs}
        
        results = {
            "extracted": [],
            "failed": [],
            "skipped": 0
        }
        
        for i, seg in enumerate(segments):
            # V√©rifier que l'audio existe
            audio_file = audio_lookup.get(seg.file_name)
            if not audio_file or not audio_file.exists():
                results["skipped"] += 1
                continue
            
            # Cr√©er ID unique
            segment_id = f"{seg.file_name}_{seg.speaker}_{i:05d}"
            speaker_dir = self._get_speaker_dir(seg.speaker)
            output_path = speaker_dir / f"{segment_id}.wav"
            
            # Extraire timestamps
            start_ms = seg.words[0][1]
            end_ms = seg.words[-1][2]
            
            # Extraire le segment audio
            success = self.extract_segment(audio_file, start_ms, end_ms, output_path)
            
            if success:
                results["extracted"].append({
                    "segment_id": segment_id,
                    "speaker": seg.speaker,
                    "file_name": seg.file_name,
                    "audio_path": str(output_path),
                    "duration_ms": end_ms - start_ms,
                    "text": seg.text,
                    "num_words": len(seg.words)
                })
            else:
                results["failed"].append(segment_id)
            
            # Progress
            if (i + 1) % batch_size == 0:
                print(f"  ‚úì {i + 1}/{len(segments)} segments trait√©s")
        
        return results


In [None]:
# ============================================================================
# ZONE 4: PIPELINE ORCHESTRATION - Utiliser les zones pr√©c√©dentes
# ============================================================================

class DataProcessingPipeline:
    """Orchestrer le pipeline complet: Matching ‚Üí Extraction ‚Üí Segmentation"""
    
    def __init__(self, cha_dir: Path, audio_dir: Path, output_dir: Path):
        self.cha_dir = cha_dir
        self.audio_dir = audio_dir
        self.output_dir = output_dir
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def run(self):
        """Ex√©cuter le pipeline complet"""
        
        print("\n" + "="*70)
        print("üöÄ DATA PROCESSING PIPELINE")
        print("="*70)
        
        # √âTAPE 1: Matching
        print("\n1Ô∏è‚É£  STEP 1: Matching .cha ‚Üî Audio")
        print("-"*70)
        
        match_result = find_matching_files(self.cha_dir, self.audio_dir)
        print_matching_report(match_result)
        
        matched_pairs = match_result["matched"]
        
        if not matched_pairs:
            print("‚ùå Aucune paire trouv√©e!")
            return None
        
        # √âTAPE 2: Extraction des segments .cha
        print("2Ô∏è‚É£  STEP 2: Extract .cha segments (word-level)")
        print("-"*70)
        
        segments = extract_segments_from_matched_files(matched_pairs, debug=True)
        
        print(f"‚úÖ {len(segments)} segments extraits avec timestamps")
        
        # √âTAPE 3: Segmentation audio
        print("\n3Ô∏è‚É£  STEP 3: Segment audio files")
        print("-"*70)
        
        segmenter = AudioSegmenter(self.output_dir / "audio_segments")
        
        # Cr√©er lookup dict pour matched pairs
        matched_dict = {cha.stem: audio for cha, audio in matched_pairs}
        
        print(f"Segmenting {len(segments)} segments from {len(matched_pairs)} audio files...\n")
        
        results = segmenter.segment_all(segments, matched_dict, batch_size=200)
        
        # Afficher r√©sultats
        print("\n" + "="*70)
        print("‚úÖ PIPELINE COMPLETED")
        print("="*70)
        print(f"\nüìä R√©sultats:")
        print(f"   Segments extraits: {len(results['extracted'])}")
        print(f"   Segments √©chou√©s:  {len(results['failed'])}")
        print(f"   Segments ignor√©s:  {results['skipped']}")
        print(f"   Total: {len(results['extracted']) + len(results['failed']) + results['skipped']}")
        
        print(f"\nüìÅ Fichiers audio: {self.output_dir / 'audio_segments'}")
        
        return results