In [None]:
# üéß AUDIO INGEST + VAD SPLIT (Silero) + TRANSCRIBE + CHUNK + SUMMARIZE

from pathlib import Path
import json, math, subprocess, sys
from typing import List, Dict, Any, Optional, Tuple

# === CONFIG ===
AUDIO_ROOT = Path("/mnt/d/Roshidat_Msc_Project/AI_Project/Company dataset/Mytrainingdataset/AllAudio")
AUDIO_OUT  = Path("./audio_outputs"); AUDIO_OUT.mkdir(parents=True, exist_ok=True)
AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".flac", ".ogg", ".wma", ".aac", ".opus"}


CHUNK_SETTINGS = dict(max_characters=10_000, new_after_n_chars=6_000, combine_text_under_n_chars=2_000)


FW_MODEL_NAME   = "small"      
FW_COMPUTE_TYPE = "int8"       


FIXED_SEGMENT_SECONDS = 600


USE_VAD = True                
VAD_MIN_SPEECH = 0.6          
VAD_MAX_SPEECH = 300.0         
VAD_PAD = 0.15                
VAD_MERGE_GAP = 0.35          


audio_file_stats: Dict[str, Dict] = {}
all_audio_segments: List[Dict] = []
all_audio_chunks: List[Dict] = []
all_audio_transcripts: List[Dict] = []


def _pip(*args):
    print(">", sys.executable, "-m", "pip", *args)
    subprocess.check_call([sys.executable, "-m", "pip", *args])

def _ensure_pkgs():
    try:
        import faster_whisper  
    except ImportError:
        _pip("install", "faster-whisper==1.0.3")
    try:
        from pydub import AudioSegment  
    except ImportError:
        _pip("install", "pydub")
    if USE_VAD:
        try:
            import torch  
        except ImportError:
            _pip("install", "torch")  

_ensure_pkgs()


def _ensure_ffmpeg():
    try:
        subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except Exception:
        raise RuntimeError(
            "ffmpeg is required.\n"
            "Conda: conda install -c conda-forge ffmpeg\n"
            "Ubuntu: sudo apt-get install -y ffmpeg"
        )


def _list_audio_files(root: Path) -> List[Path]:
    return [p for p in root.rglob("*") if p.suffix.lower() in AUDIO_EXTS and p.is_file()]

from pydub import AudioSegment

def _export_slice(file_path: Path, start_s: float, end_s: float) -> Path:
    audio = AudioSegment.from_file(file_path)
    piece = audio[int(start_s*1000):int(end_s*1000)]
    tmp = AUDIO_OUT / f"tmp_{file_path.stem}_{int(start_s)}_{int(end_s)}.wav"
    piece.export(tmp, format="wav")
    return tmp

def _duration_seconds(file_path: Path) -> float:
    a = AudioSegment.from_file(file_path)
    return len(a) / 1000.0


def _slice_fixed(file_path: Path, segment_seconds: int = FIXED_SEGMENT_SECONDS) -> List[Tuple[float, float]]:
    dur_s = _duration_seconds(file_path)
    if dur_s <= segment_seconds:
        return [(0.0, dur_s)]
    n = math.ceil(dur_s / segment_seconds)
    return [(i*segment_seconds, min((i+1)*segment_seconds, dur_s)) for i in range(n)]

def _slice_vad(file_path: Path) -> List[Tuple[float, float]]:
    """
    Uses Silero VAD (torch.hub) to detect speech segments and returns a list of (start, end) in seconds.
    Applies padding, merging, and length normalization for edge stability.
    """
    try:
        import torch
        torch.set_num_threads(max(1, torch.get_num_threads()))  
      
        if "_silero_model" not in globals():
            print("üß† Loading Silero VAD...")
            globals()["_silero_model"], utils = torch.hub.load(
                repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False, onnx=False
            )
            (globals()["_get_speech_timestamps"],
             globals()["_save_audio"],
             globals()["_read_audio"],
             globals()["_VAD_utils"]) = utils
        model = globals()["_silero_model"]
        get_speech_timestamps = globals()["_get_speech_timestamps"]
        read_audio = globals()["_read_audio"]
        VAD_utils = globals()["_VAD_utils"]

        wav = read_audio(str(file_path), sampling_rate=16000)  
        sr = 16000

     
        print("üó£Ô∏è Running VAD‚Ä¶")
        speech_ts = get_speech_timestamps(
            wav, model, sampling_rate=sr,
            threshold=0.5,               
            min_speech_duration_ms=int(VAD_MIN_SPEECH*1000),
            min_silence_duration_ms=200,
            window_size_samples=1536
        )

        if not speech_ts:
            print("‚ÑπÔ∏è No speech detected; using fixed slicing fallback.")
            return _slice_fixed(file_path)

      
        segs = []
        for ts in speech_ts:
            s = max(0.0, ts["start"] / sr - VAD_PAD)
            e = ts["end"] / sr + VAD_PAD
            segs.append((s, e))

        
        merged = []
        for s, e in sorted(segs):
            if not merged:
                merged.append([s, e])
                continue
            ps, pe = merged[-1]
            if s - pe <= VAD_MERGE_GAP:
                merged[-1][1] = max(pe, e)
            else:
                merged.append([s, e])

    
        normalized = []
        for s, e in merged:
            dur = e - s
            if dur > VAD_MAX_SPEECH:
               
                k = math.ceil(dur / VAD_MAX_SPEECH)
                step = dur / k
                for i in range(k):
                    ss = s + i*step
                    ee = min(s + (i+1)*step, e)
                    normalized.append((ss, ee))
            elif dur < VAD_MIN_SPEECH:
               
                file_dur = _duration_seconds(file_path)
                extra = (VAD_MIN_SPEECH - dur) / 2.0
                ss = max(0.0, s - extra)
                ee = min(file_dur, e + extra)
                normalized.append((ss, ee))
            else:
                normalized.append((s, e))

        
        file_dur = _duration_seconds(file_path)
        final = []
        for s, e in normalized:
            s = max(0.0, min(s, file_dur))
            e = max(0.0, min(e, file_dur))
            if e - s > 0.15:
                final.append((s, e))

        print(f"‚úÖ VAD produced {len(final)} segment(s).")
        return final or _slice_fixed(file_path)

    except Exception as e:
        print(f"‚ö†Ô∏è VAD unavailable or failed ({e}); using fixed slicing.")
        return _slice_fixed(file_path)


def transcribe_audio_file(file_path: Path) -> Dict[str, Any]:
    from faster_whisper import WhisperModel
    _ensure_ffmpeg()

    print(f"üéß Transcribing: {file_path.name}")
    model = WhisperModel(FW_MODEL_NAME, compute_type=FW_COMPUTE_TYPE)


    slices = _slice_vad(file_path) if USE_VAD else _slice_fixed(file_path)

    segments_out, full_text = [], []
    for (s, e) in slices:
        try:
            tmpwav = _export_slice(file_path, s, e)
            segs, info = model.transcribe(str(tmpwav), language=None, vad_filter=False, beam_size=1)
            text_buf = []
            for seg in segs:
                rec = {"file": str(file_path), "start": float(s + seg.start), "end": float(s + seg.end), "text": seg.text.strip()}
                segments_out.append(rec)
                all_audio_segments.append(rec)
                if seg.text.strip():
                    text_buf.append(seg.text.strip())
            full_text.append(" ".join(text_buf))
        except Exception as ex:
            print(f"   ‚ùå slice {s:.0f}-{e:.0f}s failed: {ex}")

    combined_text = " ".join([t for t in full_text if t]).strip()
    return {"file": str(file_path), "segments": segments_out, "text": combined_text}


def _chunk_transcript(text: str, meta_file: str) -> List[Dict]:
    if not text: return []
    chunks, pending, refs = [], "", []

    def _emit():
        nonlocal pending, refs
        if pending.strip():
            chunks.append({"type":"Chunk","text":pending.strip(),
                           "metadata":{"source_file": meta_file,
                                       "orig_elements":[{"file": meta_file,"type":"AudioTranscript"}]}})
        pending, refs = "", []

    if len(text) < CHUNK_SETTINGS["combine_text_under_n_chars"]:
        pending = text; refs=[{"file": meta_file,"type":"AudioTranscript"}]; _emit(); return chunks

    parts = [p.strip() for p in text.split("\n") if p.strip()] or [text]
    for p in parts:
        if len(pending) + len(p) + 2 <= CHUNK_SETTINGS["new_after_n_chars"]:
            pending += (("\n\n"+p) if pending else p)
        else:
            _emit()
            start = 0; maxc = CHUNK_SETTINGS["max_characters"]
            while start < len(p):
                piece = p[start:start+maxc]
                chunks.append({"type":"Chunk","text":piece,
                               "metadata":{"source_file": meta_file,
                                           "orig_elements":[{"file": meta_file,"type":"AudioTranscript"}]}})
                start += maxc
    _emit()
    return chunks


def process_all_audio() -> Dict[str, Any]:
    audio_file_stats.clear(); all_audio_segments.clear(); all_audio_chunks.clear(); all_audio_transcripts.clear()

    files = _list_audio_files(AUDIO_ROOT)
    print(f"üìÇ Found {len(files)} audio file(s) under {AUDIO_ROOT}")
    if not files: return {"files": 0}

    transcripts_path = AUDIO_OUT / "transcripts.jsonl"
    with open(transcripts_path, "w", encoding="utf-8") as jf:
        for f in files:
            res = transcribe_audio_file(f)
            all_audio_transcripts.append(res)
            jf.write(json.dumps({"type":"transcript", **res}, ensure_ascii=False) + "\n")
            file_chunks = _chunk_transcript(res["text"], res["file"])
            all_audio_chunks.extend(file_chunks)
            audio_file_stats[f.stem] = {"segments": len(res["segments"]), "has_text": bool(res["text"]), "chunks": len(file_chunks)}
            print(f"   ‚úÖ {f.name}: segments={len(res['segments'])}, chunks={len(file_chunks)}")

    print(f"üíæ Transcripts saved: {transcripts_path}")
    print(f"üìä Total chunks built: {len(all_audio_chunks)}")
    return {"files": len(files), "chunks": len(all_audio_chunks)}


def summarize_audio_chunks(max_chunks: Optional[int] = None, max_chars_per_chunk: int = 4000) -> List[Dict[str, Any]]:
    if 'summarize_chain' not in globals():
        raise RuntimeError("summarize_chain not found. Run your Gemma setup.")

    n = len(all_audio_chunks) if max_chunks is None else min(max_chunks, len(all_audio_chunks))
    print(f"üìù Summarizing {n} audio chunk(s)‚Ä¶")
    out: List[Dict[str, Any]] = []
    for i in range(n):
        ch = all_audio_chunks[i]; txt = (ch.get("text") or "").strip()
        if not txt: continue
        excerpt = (txt[:max_chars_per_chunk]+"... [truncated]") if len(txt) > max_chars_per_chunk else txt
        try:
            summary = summarize_chain.invoke(excerpt)
            out.append({"chunk_index": i, "source_file": ch.get("metadata",{}).get("source_file"),
                        "summary": summary, "excerpt": excerpt[:300]})
            if (i+1) % 25 == 0 or i == n-1: print(f"   ‚úÖ {i+1}/{n}")
        except Exception as e:
            print(f"   ‚ùå chunk {i+1}/{n} failed: {e}")
    print(f"‚úÖ Audio summarization done. {len(out)} summaries.")
    return out

def audio_summarization_pipeline(save_jsonl: bool = True, out_path: Optional[Path] = None) -> Dict[str, Any]:
    stats = process_all_audio()
    text_sums = summarize_audio_chunks()
    results = {"stats": stats, "text_summaries": text_sums}
    if save_jsonl:
        target = out_path or (AUDIO_OUT / "audio_summaries.jsonl")
        with open(target, "w", encoding="utf-8") as f:
            for r in text_sums:
                f.write(json.dumps({"type":"audio_text_summary", **r}, ensure_ascii=False) + "\n")
        print(f"üíæ Saved audio summaries to: {target}")
    print("üéâ Audio pipeline finished.")
    return results


def show_audio_summary_samples(text_summaries: List[Dict[str, Any]], n: int = 5):
    print("\nüîç SAMPLE AUDIO SUMMARIES")
    print("=" * 28)
    for i, s in enumerate(text_summaries[:n], 1):
        print(f"\n#{i} file: {s.get('source_file')}, chunk: {s.get('chunk_index')}")
        print(f"Excerpt: {s.get('excerpt','')[:160]}...")
        print(f"Summary: {s.get('summary')}")


