# Core Data Structures

> DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import json
import tempfile
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional

from cjm_plugin_system.core.interface import FileBackedDTO

## TimeRange

Represents a specific temporal segment within a media file. Used by analysis plugins to mark regions of interest (VAD segments, scene boundaries, etc.).

In [None]:
#| export
@dataclass
class TimeRange:
    """Represents a temporal segment within a media file."""
    start: float                                    # Start time in seconds
    end: float                                      # End time in seconds
    label: str = "segment"                          # Segment type (e.g., 'speech', 'silence', 'scene')
    confidence: Optional[float] = None              # Detection confidence (0.0 to 1.0)
    payload: Dict[str, Any] = field(default_factory=dict)  # Extra data (e.g., speaker embedding)

    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

In [None]:
# Test TimeRange creation
segment = TimeRange(
    start=1.5,
    end=3.2,
    label="speech",
    confidence=0.95,
    payload={"speaker_id": "speaker_01"}
)

print(f"TimeRange: {segment.start}s - {segment.end}s")
print(f"Label: {segment.label}")
print(f"Confidence: {segment.confidence}")
print(f"Payload: {segment.payload}")
print(f"\nAs dict: {segment.to_dict()}")

TimeRange: 1.5s - 3.2s
Label: speech
Confidence: 0.95
Payload: {'speaker_id': 'speaker_01'}

As dict: {'start': 1.5, 'end': 3.2, 'label': 'speech', 'confidence': 0.95, 'payload': {'speaker_id': 'speaker_01'}}


## MediaMetadata

Standard container for basic media file information (duration, codec, streams, etc.).

In [None]:
#| export
@dataclass
class MediaMetadata:
    """Container for media file metadata."""
    path: str                                                   # File path
    duration: float                                             # Duration in seconds
    format: str                                                 # Container format (e.g., 'mp4', 'mkv')
    size_bytes: int                                             # File size in bytes
    video_streams: List[Dict[str, Any]] = field(default_factory=list)  # Video stream info
    audio_streams: List[Dict[str, Any]] = field(default_factory=list)  # Audio stream info

    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        """Convert to dictionary for JSON serialization."""
        return asdict(self)

In [None]:
# Test MediaMetadata creation
metadata = MediaMetadata(
    path="/path/to/video.mp4",
    duration=120.5,
    format="mp4",
    size_bytes=15_000_000,
    video_streams=[{"codec": "h264", "width": 1920, "height": 1080, "fps": 30}],
    audio_streams=[{"codec": "aac", "sample_rate": 48000, "channels": 2}]
)

print(f"File: {metadata.path}")
print(f"Duration: {metadata.duration}s")
print(f"Format: {metadata.format}")
print(f"Size: {metadata.size_bytes / 1_000_000:.2f} MB")
print(f"Video streams: {metadata.video_streams}")
print(f"Audio streams: {metadata.audio_streams}")

File: /path/to/video.mp4
Duration: 120.5s
Format: mp4
Size: 15.00 MB
Video streams: [{'codec': 'h264', 'width': 1920, 'height': 1080, 'fps': 30}]
Audio streams: [{'codec': 'aac', 'sample_rate': 48000, 'channels': 2}]


## MediaAnalysisResult

Standard output for media analysis plugins. Implements `FileBackedDTO` for zero-copy transfer between Host and Worker processes.

In [None]:
#| export
@dataclass
class MediaAnalysisResult:
    """Standard output for media analysis plugins."""
    ranges: List[TimeRange]                              # Detected temporal segments
    metadata: Dict[str, Any] = field(default_factory=dict)  # Global analysis stats

    def to_temp_file(self) -> str:  # Absolute path to temporary JSON file
        """Save results to a temp JSON file for zero-copy transfer."""
        tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
        
        data = {
            "ranges": [r.to_dict() for r in self.ranges],
            "metadata": self.metadata
        }
        
        json.dump(data, tmp)
        tmp.close()
        return str(Path(tmp.name).absolute())
    
    @classmethod
    def from_file(
        cls,
        filepath: str  # Path to JSON file
    ) -> "MediaAnalysisResult":  # Loaded result instance
        """Load results from a JSON file."""
        with open(filepath, 'r') as f:
            data = json.load(f)
            
        ranges = [TimeRange(**r) for r in data.get('ranges', [])]
        return cls(ranges=ranges, metadata=data.get('metadata', {}))

In [None]:
# Test MediaAnalysisResult creation
result = MediaAnalysisResult(
    ranges=[
        TimeRange(start=0.0, end=2.5, label="speech", confidence=0.98),
        TimeRange(start=2.5, end=4.0, label="silence", confidence=0.99),
        TimeRange(start=4.0, end=8.5, label="speech", confidence=0.95),
    ],
    metadata={"total_speech": 7.0, "total_silence": 1.5, "model": "silero-vad"}
)

print(f"Number of segments: {len(result.ranges)}")
for r in result.ranges:
    print(f"  {r.label}: {r.start}s - {r.end}s (conf: {r.confidence})")
print(f"Metadata: {result.metadata}")

# Test FileBackedDTO protocol
print(f"Implements FileBackedDTO: {isinstance(result, FileBackedDTO)}")

# Test to_temp_file (this is what the Proxy calls)
temp_path = result.to_temp_file()
print(f"Saved to temp file: {temp_path}")

# Verify the file exists
import os
print(f"File exists: {os.path.exists(temp_path)}")
print(f"File size: {os.path.getsize(temp_path)} bytes")

# Test from_file (round-trip)
loaded = MediaAnalysisResult.from_file(temp_path)
print(f"\nLoaded {len(loaded.ranges)} ranges from file")
print(f"Loaded metadata: {loaded.metadata}")

# Clean up
os.unlink(temp_path)

Number of segments: 3
  speech: 0.0s - 2.5s (conf: 0.98)
  silence: 2.5s - 4.0s (conf: 0.99)
  speech: 4.0s - 8.5s (conf: 0.95)
Metadata: {'total_speech': 7.0, 'total_silence': 1.5, 'model': 'silero-vad'}
Implements FileBackedDTO: True
Saved to temp file: /tmp/tmpo_x6f468.json
File exists: True
File size: 339 bytes

Loaded 3 ranges from file
Loaded metadata: {'total_speech': 7.0, 'total_silence': 1.5, 'model': 'silero-vad'}


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()