Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions backends/advanced/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies = [
"PyYAML>=6.0.1",
"ruamel-yaml>=0.18.0",
"omegaconf>=2.3.0",
"langfuse>=3.13.0,<4.0",
"langfuse>=4.0.0",
"opentelemetry-api>=1.20",
"opentelemetry-sdk>=1.20",
"openinference-instrumentation-openai>=0.1",
Expand Down Expand Up @@ -63,9 +63,6 @@ where = ["src"]
[tool.isort]
profile = "black"

[tool.black]
line-length = 100

[tool.uv.sources]
mem0ai = { git = "https://github.com/AnkushMalaker/mem0.git", rev = "main" }

Expand Down
34 changes: 23 additions & 11 deletions backends/advanced/src/advanced_omi_backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def get_conversation_stop_settings() -> dict:
Get conversation stop settings using OmegaConf.

Returns:
Dict with transcription_buffer_seconds, speech_inactivity_threshold
Dict with speech_inactivity_threshold
"""
cfg = get_backend_config("conversation_stop")
settings = OmegaConf.to_container(cfg, resolve=True)
Expand Down Expand Up @@ -189,20 +189,27 @@ def get_audio_storage_settings() -> dict:


# ============================================================================
# Transcription Job Timeout (OmegaConf-based)
# Streaming Fallback Timeout (OmegaConf-based)
# ============================================================================


def get_transcription_job_timeout() -> int:
def get_streaming_fallback_timeout() -> int:
"""
Get transcription job timeout in seconds from config.
Get timeout for the streaming fallback check in seconds.

This controls how long the fallback check job waits for batch
transcription to complete before giving up. Not an RQ job timeout.

Returns:
Job timeout in seconds (default 900 = 15 minutes)
Fallback timeout in seconds (default 120 = 2 minutes)
"""
cfg = get_backend_config("transcription")
settings = OmegaConf.to_container(cfg, resolve=True) if cfg else {}
return int(settings.get("job_timeout_seconds", 900))
# Try new key first, fall back to old key for compat
timeout = settings.get("streaming_fallback_timeout_seconds")
if timeout is None:
timeout = settings.get("job_timeout_seconds", 120)
return int(timeout)


# ============================================================================
Expand Down Expand Up @@ -243,8 +250,11 @@ def get_misc_settings() -> dict:
"use_provider_segments", False
),
"per_segment_speaker_id": speaker_settings.get("per_segment_speaker_id", False),
"transcription_job_timeout_seconds": int(
transcription_settings.get("job_timeout_seconds", 900)
"streaming_fallback_timeout_seconds": int(
transcription_settings.get(
"streaming_fallback_timeout_seconds",
transcription_settings.get("job_timeout_seconds", 120),
)
),
"always_batch_retranscribe": transcription_settings.get(
"always_batch_retranscribe", False
Expand Down Expand Up @@ -286,10 +296,12 @@ def save_misc_settings(settings: dict) -> bool:
if not save_config_section("backend.speaker_recognition", speaker_settings):
success = False

# Save transcription job timeout if provided
if "transcription_job_timeout_seconds" in settings:
# Save streaming fallback timeout if provided
if "streaming_fallback_timeout_seconds" in settings:
timeout_settings = {
"job_timeout_seconds": settings["transcription_job_timeout_seconds"]
"streaming_fallback_timeout_seconds": settings[
"streaming_fallback_timeout_seconds"
]
}
if not save_config_section("backend.transcription", timeout_settings):
success = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from fastapi import UploadFile
from fastapi.responses import JSONResponse

from advanced_omi_backend.config import get_transcription_job_timeout
from advanced_omi_backend.controllers.queue_controller import (
JOB_RESULT_TTL,
start_post_conversation_jobs,
Expand Down Expand Up @@ -227,7 +226,7 @@ async def upload_and_process_audio_files(
conversation_id,
version_id,
"batch", # trigger
job_timeout=get_transcription_job_timeout(),
job_timeout=-1,
result_ttl=JOB_RESULT_TTL,
job_id=transcribe_job_id,
description=f"Transcribe uploaded file {conversation_id[:8]}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
client_belongs_to_user,
get_client_manager,
)
from advanced_omi_backend.config import get_transcription_job_timeout
from advanced_omi_backend.config_loader import get_service_config
from advanced_omi_backend.controllers.queue_controller import (
JOB_RESULT_TTL,
Expand Down Expand Up @@ -788,7 +787,7 @@ def _enqueue_transcript_reprocessing(
conversation_id,
version_id,
source,
job_timeout=get_transcription_job_timeout(),
job_timeout=-1,
result_ttl=JOB_RESULT_TTL,
job_id=f"{job_id_prefix}_{conversation_id[:8]}",
description=f"Transcribe audio for {conversation_id[:8]}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ async def save_misc_settings_controller(settings: dict):
"per_segment_speaker_id",
"always_batch_retranscribe",
}
integer_keys = {"transcription_job_timeout_seconds"}
integer_keys = {
"streaming_fallback_timeout_seconds",
"max_conversation_duration_seconds",
}
valid_keys = boolean_keys | integer_keys

# Filter to only valid keys
Expand All @@ -474,12 +477,18 @@ async def save_misc_settings_controller(settings: dict):
status_code=400,
detail=f"Invalid value for {key}: must be boolean",
)
elif key == "transcription_job_timeout_seconds":
elif key == "streaming_fallback_timeout_seconds":
if not isinstance(value, int) or value < 60 or value > 7200:
raise HTTPException(
status_code=400,
detail=f"Invalid value for {key}: must be integer between 60 and 7200",
)
elif key == "max_conversation_duration_seconds":
if not isinstance(value, int) or value < 600 or value > 86400:
raise HTTPException(
status_code=400,
detail=f"Invalid value for {key}: must be integer between 600 and 86400",
)

filtered_settings[key] = value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,6 @@ async def _create_batch_conversation_and_enqueue(
Returns:
conversation_id on success, None on failure.
"""
from advanced_omi_backend.config import get_transcription_job_timeout
from advanced_omi_backend.controllers.queue_controller import (
JOB_RESULT_TTL,
transcription_queue,
Expand Down Expand Up @@ -1168,7 +1167,7 @@ async def _create_batch_conversation_and_enqueue(
conversation_id,
version_id,
trigger,
job_timeout=get_transcription_job_timeout(),
job_timeout=-1,
result_ttl=JOB_RESULT_TTL,
job_id=f"{job_id_prefix}_{conversation_id[:12]}",
description=f"Transcribe {title.lower()} {conversation_id[:8]}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def init_otel() -> None:
except ImportError:
logger.warning(
"Langfuse OTEL packages not installed. "
"Ensure langfuse>=3.13.0 is installed."
"Ensure langfuse>=4.0.0 is installed."
)
except Exception as e:
logger.error(f"Failed to add Langfuse span processor: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

logger = logging.getLogger(__name__)

MAX_STREAMING_START_ATTEMPTS = 2


def _normalize_words(words: list) -> None:
"""Normalize provider-specific word field names in-place.
Expand Down Expand Up @@ -219,36 +221,56 @@ async def start_session_stream(self, session_id: str, sample_rate: int = 16000):
session_id: Session ID (client_id from audio stream)
sample_rate: Audio sample rate in Hz
"""
try:
await self.provider.start_stream(
client_id=session_id,
sample_rate=sample_rate,
diarize=self._provider_has_diarization,
)
last_error = None
for attempt in range(MAX_STREAMING_START_ATTEMPTS):
try:
await self.provider.start_stream(
client_id=session_id,
sample_rate=sample_rate,
diarize=self._provider_has_diarization,
)

self.active_sessions[session_id] = {
"last_activity": time.time(),
"sample_rate": sample_rate,
}
self.active_sessions[session_id] = {
"last_activity": time.time(),
"sample_rate": sample_rate,
}

# Only buffer audio for speaker identification when provider lacks diarization
if not self._provider_has_diarization:
self._audio_buffers[session_id] = bytearray()
# Only buffer audio for speaker identification when provider lacks diarization
if not self._provider_has_diarization:
self._audio_buffers[session_id] = bytearray()

logger.info(f"Started streaming transcription for session: {session_id}")
logger.info(
f"Started streaming transcription for session: {session_id}"
)
return

except Exception as e:
logger.error(f"Failed to start stream for {session_id}: {e}", exc_info=True)
except Exception as e:
last_error = e
if attempt < MAX_STREAMING_START_ATTEMPTS - 1:
logger.warning(
f"Failed to start stream for {session_id} "
f"(attempt {attempt + 1}/{MAX_STREAMING_START_ATTEMPTS}): {e}. "
f"Retrying in 5s..."
)
await asyncio.sleep(5)
else:
logger.error(
f"Failed to start stream for {session_id} "
f"(attempt {attempt + 1}/{MAX_STREAMING_START_ATTEMPTS}): {e}",
exc_info=True,
)

# Set error flag in Redis so speech detection can detect failure early
session_key = f"audio:session:{session_id}"
try:
await self.redis_client.hset(session_key, "transcription_error", str(e))
logger.info(f"Set transcription error flag for {session_id}")
except Exception as redis_error:
logger.warning(f"Failed to set error flag in Redis: {redis_error}")
# Both attempts failed — set error flag and raise
session_key = f"audio:session:{session_id}"
try:
await self.redis_client.hset(
session_key, "transcription_error", str(last_error)
)
logger.info(f"Set transcription error flag for {session_id}")
except Exception as redis_error:
logger.warning(f"Failed to set error flag in Redis: {redis_error}")

raise
raise last_error

async def end_session_stream(self, session_id: str):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,9 +746,6 @@ async def convert_audio_to_chunks(
Returns:
Number of chunks created

Raises:
ValueError: If audio duration exceeds 2 hours

Example:
>>> # Convert from memory without disk write
>>> num_chunks = await convert_audio_to_chunks(
Expand All @@ -766,21 +763,16 @@ async def convert_audio_to_chunks(

logger.info(f"📦 Converting audio to MongoDB chunks: {len(audio_data)} bytes PCM")

# Calculate audio duration and validate maximum limit
# Calculate audio duration
bytes_per_second = sample_rate * sample_width * channels
total_duration_seconds = len(audio_data) / bytes_per_second
MAX_DURATION_SECONDS = 7200 # 2 hours (720 chunks @ 10s each)

if total_duration_seconds > MAX_DURATION_SECONDS:
raise ValueError(
f"Audio duration ({total_duration_seconds:.1f}s) exceeds maximum allowed "
f"({MAX_DURATION_SECONDS}s / 2 hours). Please split the file into smaller segments."
)

# Calculate chunk size in bytes
chunk_size_bytes = int(chunk_duration * bytes_per_second)

# Collect all chunks before batch insert
# Insert in batches of 100 chunks (~16 min at 10s/chunk) to avoid
# accumulating all chunks in memory for very long audio files.
BATCH_INSERT_SIZE = 100
chunks_to_insert = []
chunk_index = 0
total_original_size = 0
Expand Down Expand Up @@ -836,7 +828,16 @@ async def convert_audio_to_chunks(
f"{len(chunk_pcm)} → {len(opus_data)} bytes"
)

# Batch insert all chunks to MongoDB (single database operation)
# Flush batch to MongoDB when batch size reached
if len(chunks_to_insert) >= BATCH_INSERT_SIZE:
await AudioChunkDocument.insert_many(chunks_to_insert)
logger.info(
f"✅ Batch inserted {len(chunks_to_insert)} chunks to MongoDB "
f"(chunks {chunk_index - len(chunks_to_insert)}-{chunk_index - 1})"
)
chunks_to_insert = []

# Insert remaining chunks
if chunks_to_insert:
await AudioChunkDocument.insert_many(chunks_to_insert)
logger.info(
Expand Down Expand Up @@ -908,7 +909,6 @@ async def convert_wav_to_chunks(

Raises:
FileNotFoundError: If WAV file doesn't exist
ValueError: If WAV file is invalid or exceeds 2 hours

Example:
>>> # Convert uploaded file to chunks
Expand Down Expand Up @@ -944,21 +944,15 @@ async def convert_wav_to_chunks(
f"{sample_rate}Hz, {channels}ch, {sample_width*8}-bit"
)

# Calculate audio duration and validate maximum limit
# Calculate audio duration
bytes_per_second = sample_rate * sample_width * channels
total_duration_seconds = len(pcm_data) / bytes_per_second
MAX_DURATION_SECONDS = 7200 # 2 hours (720 chunks @ 10s each)

if total_duration_seconds > MAX_DURATION_SECONDS:
raise ValueError(
f"Audio duration ({total_duration_seconds:.1f}s) exceeds maximum allowed "
f"({MAX_DURATION_SECONDS}s / 2 hours). Please split the file into smaller segments."
)

# Calculate chunk size in bytes
chunk_size_bytes = int(chunk_duration * bytes_per_second)

# Collect all chunks before batch insert
# Insert in batches of 100 chunks (~16 min at 10s/chunk)
BATCH_INSERT_SIZE = 100
chunks_to_insert = []
chunk_index = 0
total_original_size = 0
Expand Down Expand Up @@ -1014,7 +1008,16 @@ async def convert_wav_to_chunks(
f"{len(chunk_pcm)} → {len(opus_data)} bytes"
)

# Batch insert all chunks to MongoDB (single database operation)
# Flush batch to MongoDB when batch size reached
if len(chunks_to_insert) >= BATCH_INSERT_SIZE:
await AudioChunkDocument.insert_many(chunks_to_insert)
logger.info(
f"✅ Batch inserted {len(chunks_to_insert)} chunks to MongoDB "
f"(chunks {chunk_index - len(chunks_to_insert)}-{chunk_index - 1})"
)
chunks_to_insert = []

# Insert remaining chunks
if chunks_to_insert:
await AudioChunkDocument.insert_many(chunks_to_insert)
logger.info(
Expand Down
Loading
Loading