# source_utils

> Source record operations for metadata extraction, grouping, and validation

In [None]:
#| default_exp services.source_utils

In [None]:
#| export
from typing import Any, List, Dict, Optional, Set
from pathlib import Path
import json

## Metadata Extraction

In [None]:
#| export
def extract_batch_id(
    metadata: Any  # Metadata dict or JSON string
) -> str:  # Batch ID or "No Batch ID"
    """Extract batch_id from transcription metadata."""
    if not metadata:
        return "No Batch ID"
    
    # Parse JSON string if needed
    if isinstance(metadata, str):
        try:
            metadata = json.loads(metadata)
        except (json.JSONDecodeError, TypeError):
            return "No Batch ID"
    
    batch_id = metadata.get("batch_id", "")
    return batch_id if batch_id else "No Batch ID"

In [None]:
#| export
def extract_model_name(
    metadata: Any  # Metadata dict or JSON string
) -> str:  # Formatted model name for display
    """Extract and format model name from transcription metadata."""
    if not metadata:
        return "Unknown"
    
    # Parse JSON string if needed
    if isinstance(metadata, str):
        try:
            metadata = json.loads(metadata)
        except (json.JSONDecodeError, TypeError):
            return "Unknown"
    
    model = metadata.get("model", "")
    if not model:
        return "Unknown"
    
    # Extract just the model name (after the slash if present)
    # e.g., "mistralai/Voxtral-Mini-3B-2507" -> "Voxtral-Mini-3B-2507"
    if "/" in model:
        model = model.split("/")[-1]
    
    return model

## Record Grouping

In [None]:
#| export
def group_transcriptions(
    transcriptions: List[Dict[str, Any]],  # List of transcription records
    group_by: str = "media_path"  # Grouping mode: "media_path" or "batch_id"
) -> Dict[str, List[Dict[str, Any]]]:  # Grouped transcriptions
    """Group transcription records by the specified field."""
    groups = {}
    for t in transcriptions:
        if group_by == "batch_id":
            key = extract_batch_id(t.get("metadata"))
        else:
            # Default to media_path grouping
            key = t.get("media_path", "Unknown")
        
        if key not in groups:
            groups[key] = []
        groups[key].append(t)
    return groups

In [None]:
#| export
def group_transcriptions_by_audio(
    transcriptions: List[Dict[str, Any]]  # List of transcription records
) -> Dict[str, List[Dict[str, Any]]]:  # Grouped by media_path
    """Group transcription records by their source audio file."""
    return group_transcriptions(transcriptions, group_by="media_path")

## Selection Checks

In [None]:
#| export
def is_source_selected(
    record_id: str,  # Job ID to check
    provider_id: str,  # Provider ID to check
    selected_sources: List[Dict[str, str]]  # List of selected sources
) -> bool:  # True if source is selected
    """Check if a source is in the selected list by (record_id, provider_id) pair."""
    return any(
        s.get("record_id") == record_id and s.get("provider_id") == provider_id
        for s in selected_sources
    )

In [None]:
#| export
def get_selected_media_paths(
    selected_sources: List[Dict[str, str]],  # Current selections (record_id, provider_id)
    all_transcriptions: List[Dict[str, Any]],  # All available transcription records
) -> Set[str]:  # Media paths already represented in selections
    """Get the set of media_paths for currently selected sources."""
    selected_keys = {(s.get("record_id"), s.get("provider_id")) for s in selected_sources}
    return {
        t.get("media_path") for t in all_transcriptions
        if (t.get("record_id"), t.get("provider_id")) in selected_keys
        and t.get("media_path")
    }

## Filtering

In [None]:
#| export
def filter_transcriptions(
    transcriptions: List[Dict[str, Any]],  # List of transcription records to filter
    search_text: str,  # Search term for case-insensitive substring matching
) -> List[Dict[str, Any]]:  # Filtered transcription records
    """Filter transcriptions by substring match across record_id, media_path, and text fields."""
    if not search_text or not search_text.strip():
        return transcriptions
    
    search_lower = search_text.lower().strip()
    return [
        t for t in transcriptions
        if (search_lower in t.get("record_id", "").lower() or
            search_lower in t.get("media_path", "").lower() or
            search_lower in t.get("text", "").lower())
    ]

## Group Selection

In [None]:
#| export
def select_all_in_group(
    transcriptions: List[Dict[str, Any]],  # All transcription records
    group_key: str,  # Group key to match against
    grouping_mode: str,  # Grouping mode: "media_path" or "batch_id"
    selected_sources: List[Dict[str, str]],  # Current selections
    excluded_media_paths: Optional[Set[str]] = None,  # Media paths to skip (already selected)
) -> List[Dict[str, str]]:  # Updated selections with new items appended
    """Add all transcriptions matching a group key to the selection list, skipping duplicates."""
    # Filter transcriptions by group key
    if grouping_mode == "batch_id":
        matching = [t for t in transcriptions if extract_batch_id(t.get("metadata")) == group_key]
    else:
        matching = [t for t in transcriptions if t.get("media_path") == group_key]
    
    # Deduplicate against existing selections using (record_id, provider_id) pairs
    existing_keys = {(s.get("record_id"), s.get("provider_id")) for s in selected_sources}
    used_paths = set(excluded_media_paths) if excluded_media_paths else set()
    result = list(selected_sources)
    for t in matching:
        record_id = t.get("record_id")
        provider_id = t.get("provider_id", "")
        media_path = t.get("media_path")
        key = (record_id, provider_id)
        if not record_id or key in existing_keys:
            continue
        # Skip if media_path already represented
        if excluded_media_paths is not None and media_path and media_path in used_paths:
            continue
        result.append({"record_id": record_id, "provider_id": provider_id})
        existing_keys.add(key)
        if media_path:
            used_paths.add(media_path)
    
    return result

## Selection Mutations

In [None]:
#| export
def toggle_source_selection(
    record_id: str,  # Job ID to toggle
    provider_id: str,  # Plugin name for the source
    selected_sources: List[Dict[str, str]],  # Current selections
) -> List[Dict[str, str]]:  # Updated selections
    """Toggle a source in or out of the selection list by (record_id, provider_id) pair."""
    if any(s.get("record_id") == record_id and s.get("provider_id") == provider_id
           for s in selected_sources):
        return [s for s in selected_sources
                if not (s.get("record_id") == record_id and s.get("provider_id") == provider_id)]
    else:
        return selected_sources + [{"record_id": record_id, "provider_id": provider_id}]

In [None]:
#| export
def reorder_item(
    selected_sources: List[Dict[str, str]],  # Current selections
    record_id: str,  # Record ID of item to move
    provider_id: str,  # Provider ID of item to move
    direction: str,  # Direction: "up" or "down"
) -> List[Dict[str, str]]:  # Reordered selections
    """Move an item up or down in the selection list by swapping with its neighbor."""
    sources = list(selected_sources)
    current_index = next(
        (i for i, s in enumerate(sources)
         if s.get("record_id") == record_id and s.get("provider_id") == provider_id),
        None
    )
    
    if current_index is None:
        return sources
    
    if direction == "up" and current_index > 0:
        sources[current_index], sources[current_index - 1] = sources[current_index - 1], sources[current_index]
    elif direction == "down" and current_index < len(sources) - 1:
        sources[current_index], sources[current_index + 1] = sources[current_index + 1], sources[current_index]
    
    return sources

In [None]:
#| export
def reorder_sources(
    selected_sources: List[Dict[str, str]],  # Current selections
    new_order_ids: List[str],  # Job IDs in desired order
) -> List[Dict[str, str]]:  # Reordered selections
    """Reorder sources to match the given job ID order."""
    if not new_order_ids:
        return list(selected_sources)
    
    source_lookup = {s.get("record_id"): s for s in selected_sources}
    reordered = [source_lookup[jid] for jid in new_order_ids if jid in source_lookup]
    
    # Append any sources not in the new order (safety fallback)
    new_order_set = set(new_order_ids)
    for s in selected_sources:
        if s.get("record_id") not in new_order_set:
            reordered.append(s)
    
    return reordered

## Tab Navigation

In [None]:
#| export
def calculate_next_tab(
    direction: str,  # Direction: "prev", "next", or a direct tab name
    current_tab: str,  # Currently active tab name
    tabs: List[str],  # Available tab names in order
) -> str:  # New active tab name
    """Calculate the next tab based on direction or direct selection."""
    if direction in tabs:
        return direction
    
    current_idx = tabs.index(current_tab) if current_tab in tabs else 0
    if direction == "prev":
        return tabs[(current_idx - 1) % len(tabs)]
    else:
        return tabs[(current_idx + 1) % len(tabs)]

## Filesystem Checks

In [None]:
#| export
def check_audio_exists(
    media_path: str  # Path to audio file
) -> bool:  # True if file exists
    """Check if the audio file exists at the given path."""
    if not media_path or media_path == "Unknown":
        return False
    return Path(media_path).exists()

In [None]:
#| export
def validate_browse_path(
    path: str  # Path to validate
) -> str:  # Validated and resolved path, or home directory on error
    """Validate a browse path for security. Returns home directory on invalid input."""
    try:
        resolved = Path(path).resolve()
        if resolved.exists() and resolved.is_dir():
            return str(resolved)
    except (ValueError, OSError):
        pass
    return str(Path.home())

## Tests

In [None]:
assert extract_batch_id(None) == "No Batch ID"
assert extract_batch_id({"batch_id": "batch_123"}) == "batch_123"
assert extract_batch_id('{"batch_id": "batch_456"}') == "batch_456"
assert extract_batch_id({}) == "No Batch ID"
print("extract_batch_id tests passed")

extract_batch_id tests passed


In [None]:
assert extract_model_name(None) == "Unknown"
assert extract_model_name({"model": "mistralai/Voxtral-Mini-3B"}) == "Voxtral-Mini-3B"
assert extract_model_name({"model": "whisper-large"}) == "whisper-large"
assert extract_model_name({}) == "Unknown"
print("extract_model_name tests passed")

extract_model_name tests passed


In [None]:
records = [
    {"record_id": "1", "media_path": "a.wav"},
    {"record_id": "2", "media_path": "a.wav"},
    {"record_id": "3", "media_path": "b.wav"},
]
groups = group_transcriptions(records)
assert len(groups) == 2
assert len(groups["a.wav"]) == 2
assert len(groups["b.wav"]) == 1
print("group_transcriptions tests passed")

group_transcriptions tests passed


In [None]:
sources = [{"record_id": "a", "provider_id": "p1"}, {"record_id": "b", "provider_id": "p2"}]
assert is_source_selected("a", "p1", sources) == True
assert is_source_selected("a", "p2", sources) == False  # Same record_id, different provider
assert is_source_selected("c", "p1", sources) == False
print("is_source_selected tests passed")

In [None]:
all_t = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav"},
    {"record_id": "j2", "provider_id": "p1", "media_path": "b.wav"},
    {"record_id": "j3", "provider_id": "p2", "media_path": "c.wav"},
]
selected = [{"record_id": "j1", "provider_id": "p1"}, {"record_id": "j3", "provider_id": "p2"}]
paths = get_selected_media_paths(selected, all_t)
assert paths == {"a.wav", "c.wav"}

# Empty selections
assert get_selected_media_paths([], all_t) == set()

# Selection not in transcriptions (stale reference)
assert get_selected_media_paths([{"record_id": "jX", "provider_id": "pX"}], all_t) == set()

print("get_selected_media_paths tests passed")

In [None]:
records = [
    {"record_id": "job_001", "media_path": "/data/podcast.wav", "text": "Hello world"},
    {"record_id": "job_002", "media_path": "/data/lecture.wav", "text": "Machine learning intro"},
    {"record_id": "job_003", "media_path": "/data/podcast.wav", "text": "Goodbye world"},
]
assert len(filter_transcriptions(records, "")) == 3
assert len(filter_transcriptions(records, "  ")) == 3
assert len(filter_transcriptions(records, "podcast")) == 2
assert len(filter_transcriptions(records, "PODCAST")) == 2
assert len(filter_transcriptions(records, "machine")) == 1
assert len(filter_transcriptions(records, "job_001")) == 1
assert len(filter_transcriptions(records, "nonexistent")) == 0
print("filter_transcriptions tests passed")

filter_transcriptions tests passed


In [None]:
transcriptions = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j2", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j3", "provider_id": "p2", "media_path": "b.wav", "metadata": '{"batch_id": "b2"}'},
]

# Select all by media_path (no exclusion)
result = select_all_in_group(transcriptions, "a.wav", "media_path", [])
assert len(result) == 2
assert result[0]["record_id"] == "j1"
assert result[1]["record_id"] == "j2"

# Select all by batch_id
result = select_all_in_group(transcriptions, "b1", "batch_id", [])
assert len(result) == 2

# Deduplication: j1/p1 already selected
result = select_all_in_group(transcriptions, "a.wav", "media_path", [{"record_id": "j1", "provider_id": "p1"}])
assert len(result) == 2
assert result[0]["record_id"] == "j1"
assert result[1]["record_id"] == "j2"

# Same record_id from different provider is NOT a duplicate
result = select_all_in_group(transcriptions, "a.wav", "media_path", [{"record_id": "j1", "provider_id": "p_other"}])
assert len(result) == 3  # existing + j1/p1 + j2/p1

# No matches
result = select_all_in_group(transcriptions, "nonexistent.wav", "media_path", [])
assert len(result) == 0

# With excluded_media_paths: skip sources whose audio is already represented
result = select_all_in_group(transcriptions, "a.wav", "media_path", [], excluded_media_paths={"a.wav"})
assert len(result) == 0  # All matching records share excluded media_path

# excluded_media_paths with batch_id grouping across different audio files
mixed = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j2", "provider_id": "p1", "media_path": "b.wav", "metadata": '{"batch_id": "b1"}'},
]
result = select_all_in_group(mixed, "b1", "batch_id", [], excluded_media_paths={"a.wav"})
assert len(result) == 1
assert result[0]["record_id"] == "j2"  # Only b.wav source added

print("select_all_in_group tests passed")

In [None]:
# Toggle on: add new source
sources = [{"record_id": "a", "provider_id": "p1"}]
result = toggle_source_selection("b", "p2", sources)
assert len(result) == 2
assert result[1]["record_id"] == "b"

# Toggle off: remove existing source
result = toggle_source_selection("a", "p1", sources)
assert len(result) == 0

# Same record_id but different provider: adds (not toggle off)
result = toggle_source_selection("a", "p2", sources)
assert len(result) == 2

# Original list is not mutated
assert len(sources) == 1

print("toggle_source_selection tests passed")

In [None]:
sources = [
    {"record_id": "a", "provider_id": "p1"}, 
    {"record_id": "b", "provider_id": "p1"}, 
    {"record_id": "c", "provider_id": "p1"}
]

# Move middle item up
result = reorder_item(sources, "b", "p1", "up")
assert [s["record_id"] for s in result] == ["b", "a", "c"]

# Move middle item down
result = reorder_item(sources, "b", "p1", "down")
assert [s["record_id"] for s in result] == ["a", "c", "b"]

# Move first item up (no-op)
result = reorder_item(sources, "a", "p1", "up")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Move last item down (no-op)
result = reorder_item(sources, "c", "p1", "down")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Item not found (no-op)
result = reorder_item(sources, "x", "p1", "up")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Original list is not mutated
assert [s["record_id"] for s in sources] == ["a", "b", "c"]

print("reorder_item tests passed")

reorder_item tests passed


In [None]:
sources = [{"record_id": "a"}, {"record_id": "b"}, {"record_id": "c"}]

# Normal reorder
result = reorder_sources(sources, ["c", "a", "b"])
assert [s["record_id"] for s in result] == ["c", "a", "b"]

# Empty new_order_ids returns copy
result = reorder_sources(sources, [])
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Unknown IDs in new_order are skipped
result = reorder_sources(sources, ["b", "x", "a"])
assert [s["record_id"] for s in result] == ["b", "a", "c"]

# Missing IDs from new_order are appended
result = reorder_sources(sources, ["c"])
assert [s["record_id"] for s in result] == ["c", "a", "b"]

print("reorder_sources tests passed")

reorder_sources tests passed


In [None]:
tabs = ["db", "files"]

# Direct tab selection
assert calculate_next_tab("db", "files", tabs) == "db"
assert calculate_next_tab("files", "db", tabs) == "files"

# Cycling forward
assert calculate_next_tab("next", "db", tabs) == "files"
assert calculate_next_tab("next", "files", tabs) == "db"

# Cycling backward
assert calculate_next_tab("prev", "db", tabs) == "files"
assert calculate_next_tab("prev", "files", tabs) == "db"

# Unknown current_tab defaults to index 0
assert calculate_next_tab("next", "unknown", tabs) == "files"

print("calculate_next_tab tests passed")

calculate_next_tab tests passed


In [None]:
import os
assert validate_browse_path(os.path.expanduser("~")) == os.path.expanduser("~")
assert validate_browse_path("/nonexistent/path/xyz") == os.path.expanduser("~")
print("validate_browse_path tests passed")

validate_browse_path tests passed


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()