In [None]:
print("hello")

In [5]:
#!/usr/bin/env python3
"""
step1_2_collect_transcripts.py

Step 1 & 2:
- Read the FT CSV index (FT Data - data.csv)
- For each recording:
    - find or download the transcription JSON and metadata JSON
    - parse transcription segments (start, end, text)
    - save raw files locally and create:
        - data/processed/transcripts_index.json  (mapping recording_id -> info + segments)
        - data/processed/train_manifest.jsonl    (one line per segment, ready for later steps)
- Helpful logging and safe failure handling.

Usage:
    python step1_2_collect_transcripts.py \
        --ft_csv "FT Data - data.csv" \
        --out_dir "./data" \
        --local_search_dirs "/mnt/data,./local_uploads" \
        --download-timeout 30

Notes:
- Requires: pandas, requests, tqdm
- Later steps (disfluency detection and audio clipping) will use the generated manifest files.
"""

import os
import sys
import json
import shutil
import argparse
from pathlib import Path
from typing import Optional
import pandas as pd
import requests
from tqdm import tqdm

# -----------------------------
# Helper utilities
# -----------------------------
def is_http_url(s: str) -> bool:
    if not isinstance(s, str):
        return False
    return s.startswith("http://") or s.startswith("https://")

def safe_mkdir(path: Path):
    path.mkdir(parents=True, exist_ok=True)

def download_http(url: str, dest: Path, timeout: int = 30) -> bool:
    """Download a file via HTTP(S) and save to dest. Returns True if successful."""
    try:
        # make parent dir
        safe_mkdir(dest.parent)
        with requests.get(url, stream=True, timeout=timeout) as r:
            r.raise_for_status()
            with open(dest, "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
        return True
    except Exception as e:
        print(f"[WARN] Failed to download {url} -> {dest}: {e}")
        return False

def copy_local(src: Path, dest: Path) -> bool:
    try:
        safe_mkdir(dest.parent)
        shutil.copy2(src, dest)
        return True
    except Exception as e:
        # not fatal, just report
        print(f"[WARN] Failed to copy {src} -> {dest}: {e}")
        return False

def find_local_candidate(recording_id: str, local_dirs):
    """
    Look for likely transcription/metadata files for this recording in local_dirs.
    Returns Path if found, else None.
    Candidate names:
      - {recording_id}_transcription.json
      - {recording_id}_transcription*.json
      - {recording_id}.json
    """
    for d in local_dirs:
        p = Path(d)
        if not p.exists():
            continue
        # exact filenames
        candidates = [
            p / f"{recording_id}_transcription.json",
            p / f"{recording_id}.transcription.json",
            p / f"{recording_id}.json",
            p / f"{recording_id}_transcript.json",
            p / f"{recording_id}_transcription"
        ]
        # wildcard search
        glob_candidates = list(p.glob(f"*{recording_id}*trans*json")) + list(p.glob(f"{recording_id}*.json"))
        for c in candidates + glob_candidates:
            if c.exists() and c.is_file():
                return c
    return None

def load_json_file(path: Path):
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"[WARN] Failed to parse JSON {path}: {e}")
        return None

# -----------------------------
# Main processing logic
# -----------------------------
def process_ft_csv(ft_csv_path: Path, out_dir: Path, local_search_dirs, download_timeout=30, dry_run=False):
    """
    Read FT CSV, collect transcriptions and metadata (local or downloaded),
    parse segments and write manifest files.
    """
    out_dir = Path(out_dir)
    raw_transcripts_dir = out_dir / "raw_transcripts"
    raw_metadata_dir = out_dir / "raw_metadata"
    processed_dir = out_dir / "processed"
    safe_mkdir(raw_transcripts_dir)
    safe_mkdir(raw_metadata_dir)
    safe_mkdir(processed_dir)

    # read CSV
    print(f"Reading FT CSV from: {ft_csv_path}")
    df = pd.read_csv(ft_csv_path)
    required_cols = {"user_id", "recording_id", "language", "duration", "rec_url_gcp", "transcription_url_gcp", "metadata_url_gcp"}
    if not required_cols.issubset(set(df.columns)):
        print(f"[ERROR] FT CSV missing required columns. Found: {list(df.columns)} Expected at least: {sorted(list(required_cols))}")
        sys.exit(1)

    transcripts_index = {}   # recording_id -> dict with info + segments
    manifest_lines = []      # each element is a dict for a segment (one per segment)

    failures = []
    print("Processing rows...")
    for _, row in tqdm(df.iterrows(), total=len(df)):
        rec_id = str(row["recording_id"])
        user_id = str(row["user_id"])
        lang = row["language"]
        duration = float(row["duration"]) if not pd.isna(row["duration"]) else None
        rec_url = row["rec_url_gcp"] if not pd.isna(row["rec_url_gcp"]) else None
        tx_url = row["transcription_url_gcp"] if not pd.isna(row["transcription_url_gcp"]) else None
        md_url = row["metadata_url_gcp"] if not pd.isna(row["metadata_url_gcp"]) else None

        # decide local paths to write the raw JSONs
        local_tx_path = raw_transcripts_dir / f"{rec_id}_transcription.json"
        local_md_path = raw_metadata_dir / f"{rec_id}_metadata.json"

        # 1) Try to find a local transcription file in user-provided local dirs
        tx_local_candidate = find_local_candidate(rec_id, local_search_dirs)
        if tx_local_candidate:
            ok = copy_local(tx_local_candidate, local_tx_path)
            if not ok:
                print(f"[WARN] couldn't copy local transcription {tx_local_candidate} for rec {rec_id}")
        else:
            # 2) Attempt to download from http(s)
            if isinstance(tx_url, str) and is_http_url(tx_url):
                if dry_run:
                    print(f"[DRY RUN] Would download transcription: {tx_url} -> {local_tx_path}")
                else:
                    ok = download_http(tx_url, local_tx_path, timeout=download_timeout)
                    if not ok:
                        failures.append((rec_id, "transcription", tx_url))
            else:
                # not an http URL -> skip or recommend gsutil
                print(f"[INFO] transcription URL for {rec_id} is not http(s) or missing. Value: {tx_url}")
                failures.append((rec_id, "transcription_url_invalid_or_missing", tx_url))

        # 3) metadata (same logic)
        md_local_candidate = find_local_candidate(rec_id, local_search_dirs)
        if md_local_candidate and "meta" in md_local_candidate.name.lower():
            copy_local(md_local_candidate, local_md_path)
        else:
            if isinstance(md_url, str) and is_http_url(md_url):
                if dry_run:
                    print(f"[DRY RUN] Would download metadata: {md_url} -> {local_md_path}")
                else:
                    ok = download_http(md_url, local_md_path, timeout=download_timeout)
                    if not ok:
                        failures.append((rec_id, "metadata", md_url))
            else:
                # maybe metadata is in the same local folder with transcription name pattern
                # try a heuristic: look for <rec_id>_metadata.json in local_search_dirs
                meta_candidate = None
                for d in local_search_dirs:
                    p = Path(d)
                    c = p / f"{rec_id}_metadata.json"
                    if c.exists():
                        meta_candidate = c
                if meta_candidate:
                    copy_local(meta_candidate, local_md_path)
                else:
                    # not found
                    print(f"[INFO] no metadata found for {rec_id}. Expected URL: {md_url}")

        # 4) Parse the transcription file if it exists
        if local_tx_path.exists():
            data = load_json_file(local_tx_path)
            if data is None:
                failures.append((rec_id, "parse_transcription_failed", str(local_tx_path)))
                continue

            # common transcription shapes:
            # - top-level list of segments: [ {start, end, text}, ... ]
            # - top-level dict with key 'segments' -> list
            # - top-level dict with other shapes; try to extract segments heuristically
            segments = None
            if isinstance(data, list):
                segments = data
            elif isinstance(data, dict):
                # sometimes there is {'segments': [...]} or {'results':[...]}
                if "segments" in data and isinstance(data["segments"], list):
                    segments = data["segments"]
                elif "results" in data and isinstance(data["results"], list):
                    segments = data["results"]
                else:
                    # maybe this file itself is one segment dict
                    keys = set(data.keys())
                    if {"start", "end", "text"}.issubset(keys):
                        segments = [data]
                    else:
                        # try to find nested list values that look like segments
                        found = False
                        for k, v in data.items():
                            if isinstance(v, list) and len(v) > 0 and isinstance(v[0], dict):
                                if {"start", "end", "text"}.issubset(set(v[0].keys())):
                                    segments = v
                                    found = True
                                    break
                        if not found:
                            print(f"[WARN] transcription JSON has unexpected structure for {rec_id}. We'll store raw but cannot extract segments.")
                            segments = []
            else:
                print(f"[WARN] unrecognized transcription json type for {rec_id}: {type(data)}")
                segments = []

            # normalize segments: ensure numeric start/end and text present
            normalized_segments = []
            for i, seg in enumerate(segments):
                # skip segments without text
                if not isinstance(seg, dict):
                    continue
                start = seg.get("start") if seg.get("start") is not None else seg.get("begin")
                end = seg.get("end") if seg.get("end") is not None else seg.get("finish")
                text = seg.get("text") if seg.get("text") is not None else seg.get("transcript") or seg.get("content")
                # try to coerce
                try:
                    start_f = float(start) if start is not None else None
                except:
                    start_f = None
                try:
                    end_f = float(end) if end is not None else None
                except:
                    end_f = None
                if text is None:
                    # skip empty text segments
                    continue
                text_str = str(text).strip()
                if len(text_str) == 0:
                    continue
                normalized_segments.append({
                    "segment_id": f"{rec_id}_seg_{i:04d}",
                    "start": start_f,
                    "end": end_f,
                    "duration": (end_f - start_f) if (start_f is not None and end_f is not None) else None,
                    "text": text_str
                })
            # write to index
            transcripts_index[rec_id] = {
                "recording_id": rec_id,
                "user_id": user_id,
                "language": lang,
                "duration": duration,
                "audio_url": rec_url,
                "transcription_local_path": str(local_tx_path.resolve()),
                "metadata_local_path": str(local_md_path.resolve()) if local_md_path.exists() else None,
                "num_segments": len(normalized_segments),
                "segments": normalized_segments
            }

            # create manifest lines
            for seg in normalized_segments:
                manifest_lines.append({
                    "audio_filepath": rec_url,      # full recording (we will clip later to segment times)
                    "recording_id": rec_id,
                    "user_id": user_id,
                    "language": lang,
                    "segment_id": seg["segment_id"],
                    "start_time": seg["start"],
                    "end_time": seg["end"],
                    "duration": seg["duration"],
                    "text": seg["text"]
                })
        else:
            # transcription file missing
            failures.append((rec_id, "transcription_file_missing", str(local_tx_path)))

    # 5) Save outputs
    transcripts_index_path = processed_dir / "transcripts_index.json"
    manifest_path = processed_dir / "train_manifest.jsonl"
    print(f"Saving transcripts_index to {transcripts_index_path}")
    with open(transcripts_index_path, "w", encoding="utf-8") as f:
        json.dump(transcripts_index, f, ensure_ascii=False, indent=2)

    print(f"Saving train_manifest to {manifest_path} ({len(manifest_lines)} segments)")
    with open(manifest_path, "w", encoding="utf-8") as f:
        for item in manifest_lines:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

    # summary
    print("=== Summary ===")
    print(f"Total recordings processed (with transcription parsed): {len(transcripts_index)}")
    print(f"Total segments collected: {len(manifest_lines)}")
    if failures:
        print(f"Failures or warnings: {len(failures)} (see below sample)")
        for i, fail in enumerate(failures[:10]):
            print(f"  - {fail}")
    else:
        print("No failures detected.")

    return {
        "transcripts_index_path": str(transcripts_index_path.resolve()),
        "train_manifest_path": str(manifest_path.resolve()),
        "num_recordings": len(transcripts_index),
        "num_segments": len(manifest_lines),
        "failures": failures
    }

# -----------------------------
# Command line interface
# -----------------------------
def parse_args():
    parser = argparse.ArgumentParser(description="Collect transcripts from FT CSV, download/copy raw JSONs, and create segment-level manifest.")
    parser.add_argument("--ft_csv", type=str, default="FT Data - data.csv", help="/Users/vs/Develop/josh/FT Data - data.csv")
    parser.add_argument("--out_dir", type=str, default="./data", help="Output directory to store raw and processed files")
    parser.add_argument("--local_search_dirs", type=str, default="/mnt/data,./local_uploads", help="Comma-separated list of local directories to search for uploaded files")
    parser.add_argument("--download_timeout", type=int, default=30, help="Timeout in seconds for HTTP downloads")
    parser.add_argument("--dry_run", action="store_true", help="Dry run: do not perform network downloads, just show actions")
    return parser.parse_args()

# ... (inside step1_2_collect_transcripts.py)

def main():
    # --- TEMPORARY FIX: Hardcode the correct path here ---
    # Replace the path below with the absolute path to your actual CSV file.
    # For example: 
    # HARDCODED_FT_CSV_PATH = "/Users/vs/Documents/Josh_Talks_Assignment/FT Data - data.csv"
    HARDCODED_FT_CSV_PATH = "FT Data - data.csv" # <-- Adjust this to your file location
    
    # Check if the hardcoded file exists before proceeding
    ft_csv_path = Path(HARDCODED_FT_CSV_PATH)
    if not ft_csv_path.exists():
        print(f"[ERROR] Hardcoded FT CSV not found: {ft_csv_path}. Please check the path.")
        sys.exit(1)
        
    # Proceed with the rest of the script, but use the hardcoded path.
    # We will ignore the command-line argument for now to bypass the error.
    
    args = parse_args() # Still need this to set up other arguments (out_dir, local_search_dirs, etc.)
    local_dirs = [d.strip() for d in args.local_search_dirs.split(",") if d.strip()]
    if "/mnt/data" not in local_dirs:
        local_dirs = ["/mnt/data"] + local_dirs
        
    print("Local search dirs:", local_dirs)
    
    # Use the hardcoded path instead of args.ft_csv
    result = process_ft_csv(ft_csv_path, Path(args.out_dir), local_dirs, download_timeout=args.download_timeout, dry_run=args.dry_run)
    print("Done. Outputs:", result)

if __name__ == "__main__":
    main()


Local search dirs: ['/mnt/data', './local_uploads']
Reading FT CSV from: FT Data - data.csv
Processing rows...


100%|██████████| 104/104 [00:08<00:00, 12.50it/s]

Saving transcripts_index to data/processed/transcripts_index.json
Saving train_manifest to data/processed/train_manifest.jsonl (5941 segments)
=== Summary ===
Total recordings processed (with transcription parsed): 104
Total segments collected: 5941
No failures detected.
Done. Outputs: {'transcripts_index_path': '/Users/vs/Develop/josh/data/processed/transcripts_index.json', 'train_manifest_path': '/Users/vs/Develop/josh/data/processed/train_manifest.jsonl', 'num_recordings': 104, 'num_segments': 5941, 'failures': []}



