Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SpeechmaticsStreamingOrchestrationPipeline:
pipeline_config:
sample_rate: 16000
language: "en"
operating_point: "enhanced"
max_delay: 1
enable_partials: true
enable_diarization: true

9 changes: 9 additions & 0 deletions config/pipeline_configs/SpeechmaticsStreamingPipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SpeechmaticsStreamingPipeline:
pipeline_config:
sample_rate: 16000
language: "en"
operating_point: "enhanced"
max_delay: 1
enable_partials: true
endpoint_url: "wss://eu2.rt.speechmatics.com/v2"

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"texterrors==0.5.1",
"nemo-toolkit[asr]>=2.5.0",
"openai>=2.7.1",
"speechmatics-python>=5.0.0",
]

[project.scripts]
Expand Down
6 changes: 2 additions & 4 deletions src/openbench/metric/word_error_metrics/word_error_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,8 @@ def compute_metric(self, detail: Details) -> float:
return (S + D + I) / N if N > 0 else 0.0


@MetricRegistry.register_metric(
(PipelineType.ORCHESTRATION, PipelineType.STREAMING_DIARIZATION),
MetricOptions.CPWER,
)
@MetricRegistry.register_metric(PipelineType.ORCHESTRATION,
MetricOptions.CPWER)
class ConcatenatedMinimumPermutationWER(BaseWordErrorMetric):
"""Concatenated minimum-Permutation Word Error Rate (cpWER) implementation.

Expand Down
21 changes: 18 additions & 3 deletions src/openbench/pipeline/orchestration/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
# For licensing see accompanying LICENSE.md file.
# Copyright (C) 2025 Argmax, Inc. All Rights Reserved.

from .orchestration_deepgram import DeepgramOrchestrationPipeline, DeepgramOrchestrationPipelineConfig
from .orchestration_deepgram import (
DeepgramOrchestrationPipeline,
DeepgramOrchestrationPipelineConfig,
)
from .orchestration_deepgram_streaming import (
DeepgramStreamingOrchestrationPipeline,
DeepgramStreamingOrchestrationPipelineConfig,
)
from .orchestration_openai import OpenAIOrchestrationPipeline, OpenAIOrchestrationPipelineConfig
from .orchestration_whisperkitpro import WhisperKitProOrchestrationConfig, WhisperKitProOrchestrationPipeline
from .orchestration_openai import (
OpenAIOrchestrationPipeline,
OpenAIOrchestrationPipelineConfig,
)
from .orchestration_speechmatics_streaming import (
SpeechmaticsStreamingOrchestrationPipeline,
SpeechmaticsStreamingOrchestrationPipelineConfig,
)
from .orchestration_whisperkitpro import (
WhisperKitProOrchestrationConfig,
WhisperKitProOrchestrationPipeline,
)
from .whisperx import WhisperXPipeline, WhisperXPipelineConfig


Expand All @@ -16,6 +29,8 @@
"DeepgramOrchestrationPipelineConfig",
"DeepgramStreamingOrchestrationPipeline",
"DeepgramStreamingOrchestrationPipelineConfig",
"SpeechmaticsStreamingOrchestrationPipeline",
"SpeechmaticsStreamingOrchestrationPipelineConfig",
"WhisperXPipeline",
"WhisperXPipelineConfig",
"WhisperKitProOrchestrationPipeline",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# For licensing see accompanying LICENSE.md file.
# Copyright (C) 2025 Argmax, Inc. All Rights Reserved.

import numpy as np
from pydantic import Field

from ...dataset import OrchestrationSample
from ...pipeline import Pipeline, PipelineConfig, register_pipeline
from ...pipeline_prediction import Transcript, Word
from ...types import PipelineType
from ..streaming_transcription.speechmatics import SpeechmaticsApi
from .common import OrchestrationOutput


class SpeechmaticsStreamingOrchestrationPipelineConfig(PipelineConfig):
sample_rate: int = Field(
default=16000,
description="Sample rate of the audio"
)
language: str = Field(
default="en",
description="Language code for transcription"
)
operating_point: str = Field(
default="enhanced",
description="Operating point (standard or enhanced)"
)
max_delay: int = Field(
default=1,
description="Maximum delay in seconds"
)
enable_partials: bool = Field(
default=True,
description="Enable partial transcripts"
)
enable_diarization: bool = Field(
default=True,
description="Whether to enable speaker diarization"
)


@register_pipeline
class SpeechmaticsStreamingOrchestrationPipeline(Pipeline):
_config_class = SpeechmaticsStreamingOrchestrationPipelineConfig
pipeline_type = PipelineType.ORCHESTRATION

def build_pipeline(self):
"""Build Speechmatics streaming API with diarization."""
# Create a modified config for the streaming API
from types import SimpleNamespace

api_config = SimpleNamespace(
sample_rate=self.config.sample_rate,
language=self.config.language,
operating_point=self.config.operating_point,
max_delay=self.config.max_delay,
enable_partials=self.config.enable_partials,
enable_diarization=self.config.enable_diarization,
)

pipeline = SpeechmaticsApi(api_config)
return pipeline

def parse_input(self, input_sample: OrchestrationSample):
"""Convert audio waveform to bytes for streaming."""
y = input_sample.waveform
y_int16 = (y * 32767).astype(np.int16)
audio_data_byte = y_int16.tobytes()
return audio_data_byte

def parse_output(self, output) -> OrchestrationOutput:
"""Parse output to extract transcription and diarization."""
# Extract words with speaker info if diarization enabled
words = []

if (
"words_with_speakers" in output and
output["words_with_speakers"]
):
# This comes from diarization-enabled streaming
for word_info in output["words_with_speakers"]:
words.append(Word(
word=word_info.get("word", ""),
start=word_info.get("start"),
end=word_info.get("end"),
speaker=word_info.get("speaker"),
))
elif (
"model_timestamps_confirmed" in output and
output["model_timestamps_confirmed"]
):
# Fallback to regular transcription without speaker
transcript_words = output.get("transcript", "").split()
timestamp_idx = 0

for timestamp_group in output["model_timestamps_confirmed"]:
for word_info in timestamp_group:
if timestamp_idx < len(transcript_words):
words.append(Word(
word=transcript_words[timestamp_idx],
start=word_info.get("start"),
end=word_info.get("end"),
speaker=None,
))
timestamp_idx += 1

# Create final transcript with speaker-attributed words
transcript = Transcript(words=words)

return OrchestrationOutput(
prediction=transcript,
transcription_output=None,
diarization_output=None,
)

39 changes: 39 additions & 0 deletions src/openbench/pipeline/pipeline_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
DeepgramOrchestrationPipeline,
DeepgramStreamingOrchestrationPipeline,
OpenAIOrchestrationPipeline,
SpeechmaticsStreamingOrchestrationPipeline,
WhisperKitProOrchestrationPipeline,
WhisperXPipeline,
)
Expand All @@ -28,6 +29,7 @@
FireworksStreamingPipeline,
GladiaStreamingPipeline,
OpenAIStreamingPipeline,
SpeechmaticsStreamingPipeline,
)
from .transcription import (
AssemblyAITranscriptionPipeline,
Expand Down Expand Up @@ -186,6 +188,25 @@ def register_pipeline_aliases() -> None:
"Deepgram streaming orchestration pipeline with diarization enabled."
),
)

PipelineRegistry.register_alias(
"speechmatics-streaming-orchestration",
SpeechmaticsStreamingOrchestrationPipeline,
default_config={
"sample_rate": 16000,
"language": "en",
"operating_point": "enhanced",
"max_delay": 1,
"enable_partials": True,
"enable_diarization": True,
},
description=(
"Speechmatics streaming orchestration pipeline with "
"diarization. Requires API key from "
"https://www.speechmatics.com/. Set "
"`SPEECHMATICS_API_KEY` env var."
),
)

PipelineRegistry.register_alias(
"whisperkitpro-orchestration-tiny",
Expand Down Expand Up @@ -684,5 +705,23 @@ def register_pipeline_aliases() -> None:
description="AssemblyAI streaming transcription pipeline. Requires API key from https://www.assemblyai.com/. Set `ASSEMBLYAI_API_KEY` env var.",
)

PipelineRegistry.register_alias(
"speechmatics-streaming",
SpeechmaticsStreamingPipeline,
default_config={
"sample_rate": 16000,
"language": "en",
"operating_point": "enhanced",
"max_delay": 1,
"enable_partials": True,
"endpoint_url": "wss://eu2.rt.speechmatics.com/v2",
},
description=(
"Speechmatics streaming transcription pipeline. "
"Requires API key from https://www.speechmatics.com/. "
"Set `SPEECHMATICS_API_KEY` env var."
),
)


register_pipeline_aliases()
29 changes: 24 additions & 5 deletions src/openbench/pipeline/streaming_transcription/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
# For licensing see accompanying LICENSE.md file.
# Copyright (C) 2025 Argmax, Inc. All Rights Reserved.

from .assemblyai import AssemblyAIStreamingPipeline, AssemblyAIStreamingPipelineConfig
from .deepgram import DeepgramStreamingPipeline, DeepgramStreamingPipelineConfig
from .fireworks import FireworksStreamingPipeline, FireworksStreamingPipelineConfig
from .gladia import GladiaStreamingPipeline, GladiaStreamingPipelineConfig
from .openai import OpenAIStreamingPipeline, OpenAIStreamingPipelineConfig
from .assemblyai import (
AssemblyAIStreamingPipeline,
AssemblyAIStreamingPipelineConfig,
)
from .deepgram import (
DeepgramStreamingPipeline,
DeepgramStreamingPipelineConfig,
)
from .fireworks import (
FireworksStreamingPipeline,
FireworksStreamingPipelineConfig,
)
from .gladia import (
GladiaStreamingPipeline,
GladiaStreamingPipelineConfig,
)
from .openai import (
OpenAIStreamingPipeline,
OpenAIStreamingPipelineConfig,
)
from .speechmatics import (
SpeechmaticsStreamingPipeline,
SpeechmaticsStreamingPipelineConfig,
)
Loading