Skip to content

Comments

long audio fix#95

Merged
AnkushMalaker merged 3 commits intomainfrom
long-audio-fix
Sep 17, 2025
Merged

long audio fix#95
AnkushMalaker merged 3 commits intomainfrom
long-audio-fix

Conversation

@AnkushMalaker
Copy link
Collaborator

@AnkushMalaker AnkushMalaker commented Sep 14, 2025

Summary by CodeRabbit

  • New Features

    • Enhanced long‑audio transcription with timestamp‑preserving chunking, configurable via environment variables.
    • Upload tool now supports CLI arguments for multiple files and custom backend URL.
  • Bug Fixes

    • More robust error handling and clearer status propagation during transcription.
    • Removed previous duration cap, enabling processing of longer audio files.
  • Documentation

    • Added development-only reset endpoints and quick reset guidance.
  • Chores

    • Updated dependencies and Docker/env configurations to support enhanced chunking.
    • Added a maintenance script to bulk-delete conversations via admin API.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 14, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Documentation adds development reset endpoints. New script deletes all conversations via admin API. Backend adds conversation deletion and revises system control flow for transcription/memory handling with explicit failure paths. Transcription pipeline now propagates failures via a coordinator. ASR service introduces enhanced timestamp-preserving chunking, config/env wiring, and batch path updates. Upload tool gains CLI.

Changes

Cohort / File(s) Summary of Changes
Docs — Dev reset endpoints
CLAUDE.md
Documents development-only cleanup endpoints, optional query params, and quick reset commands. No code changes.
Admin utility script
backends/advanced/scripts/delete_all_conversations_api.py
New async CLI script: logs in for admin JWT, lists conversations, prompts (or --yes), deletes /api/conversations/{audio_uuid}, reports successes/failures.
Controllers — conversation & system
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py, .../controllers/system_controller.py
Adds async delete_conversation with ownership checks and file cleanup. Refactors transcription/memory monitoring: removes 20‑min guard, explicit FAILED handling, guarded memory triggering, updated logging/control flow.
Transcription core — failure signaling
backends/advanced/src/advanced_omi_backend/transcript_coordinator.py, .../transcription.py, .../transcription_providers.py
Coordinator tracks failures, exposes signal_transcript_failed, and raises on failure after event. Transcription now try/except updates DB to FAILED and signals coordinator; provider re-raises errors instead of swallowing.
Uploader CLI
backends/advanced/upload_files.py
Adds argparse-based CLI (parse_args), supports multiple files and --base-url, default test WAV fallback, removes duration cap, consistent exit codes, resource cleanup.
ASR service — enhanced chunking feature
extras/asr-services/enhanced_chunking.py, extras/asr-services/parakeet-offline.py
Adds timestamp-preserving chunked RNNT processing and utilities; wires configurable long-audio chunking into single/batch paths with temp WAV handling, streaming, resampling, and detailed timings. Errors are re-raised for HTTP propagation.
ASR service — config & packaging
extras/asr-services/.dockerignore, extras/asr-services/.env.template, extras/asr-services/docker-compose.yml, extras/asr-services/pyproject.toml
Ensures enhanced_chunking.py included in build. Adds chunking-related env vars and defaults. Bumps easy-audio-interfaces to >=0.7.1 and adds httpx>=0.28.1.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant SystemController
  participant Transcription
  participant Provider as TranscriptionProvider
  participant Coordinator as TranscriptCoordinator
  participant DB

  Client->>SystemController: Upload audio / start processing
  SystemController->>Transcription: process_collected_audio()
  Transcription->>Provider: transcribe()
  alt Provider error
    Provider-->>Transcription: throws Exception
    Transcription->>DB: mark transcription FAILED (error)
    Transcription->>Coordinator: signal_transcript_failed(audio_uuid, error)
    Transcription-->>SystemController: exception propagated
    SystemController->>SystemController: handle FAILED (log/mark)
  else Success
    Provider-->>Transcription: transcript result
    Transcription-->>SystemController: transcript ready
    SystemController->>SystemController: if memory PENDING and not triggered, close conversation
    SystemController->>DB: update statuses as memory completes/fails/skips
  end
Loading
sequenceDiagram
  autonumber
  participant Client as ASR Client
  participant Parakeet as parakeet-offline
  participant Chunker as EnhancedChunking
  participant Streamer as LocalFileStreamer
  participant Resampler as ResamplingBlock

  Client->>Parakeet: POST /transcribe (file)
  Parakeet->>Streamer: open + iterate frames
  Parakeet->>Parakeet: detect duration >= MIN_AUDIO_FOR_CHUNKING and CHUNKING_ENABLED
  alt Long audio (chunked path)
    Parakeet->>Resampler: resample chunks (if needed)
    Parakeet->>Chunker: transcribe_with_enhanced_chunking(chunks)
    Chunker->>Chunker: chunked RNNT decode + merge timestamps
    Chunker-->>Parakeet: {text, words, segments}
  else Short audio
    Parakeet->>Parakeet: single-pass transcribe
  end
  Parakeet-->>Client: JSON transcription (errors re-raised on failure)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

I thump my paws on timestamps’ trail,
Chunk by chunk, I will not fail.
If voices falter, signals fly—
We catch the fall and mark the why.
Delete, reset, and hop anew,
Fresh carrots, clean logs, clearer view.
Hippity—ship it! 🥕✨

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "long audio fix" succinctly and accurately reflects the primary intent of the changeset, which focuses on enhanced long-audio chunking, removal of duration caps, and related transcription/ASR improvements across the extras/asr-services and backend paths; it is concise and directly related to the main changes. A teammate scanning history will understand the PR targets long-audio handling.
Docstring Coverage ✅ Passed Docstring coverage is 90.63% which is sufficient. The required threshold is 80.00%.

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@AnkushMalaker
Copy link
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 14, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 19

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
extras/asr-services/parakeet-offline.py (1)

273-281: Avoid logging user transcript content (PII leakage risk)

Dumping first 100 chars of recognized text to INFO can leak sensitive data. Remove or gate behind DEBUG + explicit opt‑in.

If you need this for debugging temporarily, guard with if logger.isEnabledFor(logging.DEBUG): ... and redact/limit further in production.

backends/advanced/upload_files.py (3)

216-247: 60s upload timeout will fail for long audio; make it configurable (default higher).
Long files over slow links can’t finish the initial multipart POST in 60s.

Apply:

-def upload_files_async(files: list[str], token: str, base_url: str = "http://localhost:8000") -> bool:
+def upload_files_async(files: list[str], token: str, base_url: str = "http://localhost:8000", submit_timeout: int = 600) -> bool:
@@
-        response = requests.post(
+        response = requests.post(
@@
-            timeout=60  # Short timeout for job submission
+            timeout=submit_timeout  # Allow large uploads
         )

Then expose the knob in the CLI (see parse_args and main comments below).


249-252: File handles are closed twice; keep one definitive close path.
Redundant closes are harmless but noisy; rely on finally only.

Apply:

-        # Close all file handles
-        for _, file_tuple in files_data:
-            file_tuple[1].close()
@@
     finally:
         # Ensure all file handles are closed
         for _, file_tuple in files_data:
             try:
                 file_tuple[1].close()
             except Exception as close_error:
                 logger.warning(f"Failed to close file handle: {close_error}")

Also applies to: 281-287


165-214: Remove or wire unused collect_wav_files

collect_wav_files is defined in backends/advanced/upload_files.py (lines 165–214) but a repo-wide search shows no calls; either delete the function or call it (e.g. assign wav_files = collect_wav_files(...) before the upload_files_async call at ~line 443).

🧹 Nitpick comments (36)
backends/advanced/src/advanced_omi_backend/transcription_providers.py (3)

555-557: Use bare raise and log with traceback.

raise e discards the original traceback and fails Ruff TRY201. Switch to logger.exception(...) + bare raise.

Apply:

-        except Exception as e:
-            logger.error(f"Error calling Parakeet service: {e}")
-            raise e
+        except Exception:
+            logger.exception("Error calling Parakeet service")
+            raise

511-514: Remove dead normalization work.

audio_array is computed then unused; wasting CPU for large files.

Apply:

-            audio_array = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32)
-            audio_array = audio_array / np.iinfo(np.int16).max  # Normalize to [-1, 1]
+            # Upload original PCM bytes; no pre-normalization needed here.

281-298: Error handling policy: confirm provider consistency.

Parakeet now raises on failures, but Deepgram still swallows errors and returns empty results. If the orchestration expects exceptions to propagate, consider aligning Deepgram to also raise (at least on 5xx/timeouts).

CLAUDE.md (1)

677-698: Call out dev-only and auth explicitly; add a warning banner.

These destructive endpoints should be clearly marked “dev-only” and require an Admin JWT in every call.

Apply:

 ### Development Reset Endpoints
-Useful endpoints for resetting state during development:
+Useful endpoints for resetting state during development.
+WARNING: These endpoints are intended for local development only. Protect them behind auth and never expose in production. Admin JWT required.
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (3)

352-362: Drop exploratory regex lookups.

Regex/substring probes leak existence of other records in logs and add latency. If exact match fails, return 404.

Apply:

-        else:
-            # DEBUG: Try alternative queries to see what might work
-            logger.info("DELETE DEBUG: Trying alternative queries...")
-            
-            # Try with regex (case insensitive)
-            regex_result = await chunks_col.find_one({"audio_uuid": {"$regex": f"^{audio_uuid}$", "$options": "i"}})
-            logger.info(f"DELETE DEBUG: Case-insensitive regex query result: {regex_result is not None}")
-            
-            # Try to find any conversation containing this uuid as substring
-            contains_result = await chunks_col.find_one({"audio_uuid": {"$regex": audio_uuid}})
-            logger.info(f"DELETE DEBUG: Contains substring query result: {contains_result is not None}")
+        # No alternative lookups; enforce exact match semantics.

388-395: Deletion order trade-off.

You delete DB first, then files. If file deletion fails, you lose traceability. Consider deleting files first (best‑effort), then DB, or add a soft “deleted=true” flag before physical cleanup.


431-436: Log with traceback on failure.

Use logger.exception and structured args; also satisfies Ruff TRY400/RUF010 hints.

Apply:

-    except Exception as e:
-        logger.error(f"Error deleting conversation {audio_uuid}: {e}")
+    except Exception:
+        logger.exception("Error deleting conversation %s", audio_uuid)
backends/advanced/scripts/delete_all_conversations_api.py (2)

82-84: Silence unused loop var.

Rename client_id to _client_id to satisfy Ruff B007.

Apply:

-        for client_id, client_conversations in conversations_dict.items():
+        for _client_id, client_conversations in conversations_dict.items():

121-123: Remove pointless f-string.

No placeholders present.

Apply:

-        print(f"\nDeletion complete:")
+        print("\nDeletion complete:")
backends/advanced/src/advanced_omi_backend/transcription.py (4)

147-155: Preserve traceback; avoid redundant cleanup.

Use logging.exception and bare raise; rely on finally for cleanup to avoid double-clearing.

-        except Exception as e:
-            logger.error(f"Error getting transcript from {self.provider.name}: {e}")
-            # Clean up buffer before re-raising
-            self._audio_buffer.clear()
-            self._audio_start_time = None
-            self._collecting = False
-            raise e
+        except Exception:
+            logger.exception(f"Error getting transcript from {self.provider.name}")
+            raise
         finally:
-            # Clear the buffer for all provider types (in case of success)
+            # Clear the buffer for all provider types
             self._audio_buffer.clear()
             self._audio_start_time = None
             self._collecting = False

171-181: No‑data path doesn’t update status.

When transcript_result is None/empty, DB status stays stale. Mark as EMPTY (or SKIPPED) before returning.

         if not transcript_result or not self._current_audio_uuid:
             logger.info(f"⚠️ No transcript result to process for {self._current_audio_uuid}")
             # Even with no transcript, signal completion to unblock memory processing
             if self._current_audio_uuid:
+                if self.chunk_repo:
+                    try:
+                        provider = self.provider.name if self.provider else None
+                        await self.chunk_repo.update_transcription_status(
+                            self._current_audio_uuid, "EMPTY", provider=provider
+                        )
+                    except Exception:
+                        logger.warning("Failed to mark transcription as EMPTY", exc_info=True)
                 coordinator = get_transcript_coordinator()
                 coordinator.signal_transcript_ready(self._current_audio_uuid)
                 logger.info(
                     f"⚠️ Signaled transcript completion (no data) for {self._current_audio_uuid}"
                 )
             return

59-62: Comment contradicts code behavior.

The “REMOVED: Memory processing …” comment is stale; _queue_memory_processing still exists and is invoked. Update comment or remove the method/call.

Also applies to: 301-303, 363-414


135-145: Avoid reloading config per request in hot path.

load_diarization_settings_from_file() is called on every batch process. Cache per instance (e.g., in init) unless config hot‑reload is required.

backends/advanced/src/advanced_omi_backend/transcript_coordinator.py (2)

34-39: Docstring return contract is inconsistent.

Returns should be “True on success, False on timeout”; failures now raise.

-        Returns:
-            True if transcript was completed successfully, False if timeout or failed
-        
-        Raises:
-            Exception: If transcription failed with an error
+        Returns:
+            True if transcript completed successfully, False on timeout
+
+        Raises:
+            TranscriptionFailedError: If transcription failed with an error

27-69: Optional: guard shared maps with the lock for consistency.

Consider using self._lock when adding/removing entries in transcript_events/failures (not just on creation/cleanup) for clearer invariants. Low risk in single-threaded asyncio, but safer.

extras/asr-services/pyproject.toml (2)

11-11: Align NumPy constraint with parakeet group to prevent resolver flip-flops.

Base deps allow NumPy 2.x while parakeet group pins <2.0. Align to a single constraint.

Apply:

-  "numpy>=1.26",
+  "numpy>=1.26,<2.0",

41-44: Pin dev requests to match runtime floor or newer.

Dev uses requests>=2.31.0; runtime doesn’t need it. If not used in dev tooling, drop it; otherwise consider aligning to >=2.32.3.

-    "requests>=2.31.0",
+    "requests>=2.32.3",
extras/asr-services/.env.template (3)

15-24: Defaults contradict comments and docker-compose; align for sane OOTB behavior.

Template sets 120s chunks/thresholds while comment suggests 20–40s and compose defaults to 30/5/60. Align to compose defaults.

-CHUNK_DURATION_SECONDS=120.0
-OVERLAP_DURATION_SECONDS=10.0
-MIN_AUDIO_FOR_CHUNKING=120.0
+CHUNK_DURATION_SECONDS=30.0
+OVERLAP_DURATION_SECONDS=5.0
+MIN_AUDIO_FOR_CHUNKING=60.0

9-10: Satisfy dotenv-linter ordering (cosmetic).

Place container port before host port to quash the UnorderedKey warning.

-PARAKEET_HOST_PORT=8767
-PARAKEET_CONTAINER_PORT=8765
+PARAKEET_CONTAINER_PORT=8765
+PARAKEET_HOST_PORT=8767

27-27: Add trailing newline.

Fixes EndingBlankLine warning.

-CONFIDENCE_THRESHOLD=0.8
+CONFIDENCE_THRESHOLD=0.8
+
extras/asr-services/docker-compose.yml (2)

24-29: Keep env template and compose defaults in sync.

Compose uses 30/5/60/0.8; ensure .env.template matches to avoid surprising behavior when .env is copied.


23-23: Provide a safe default for PARAKEET_MODEL.

Avoid empty value if not set.

-      - PARAKEET_MODEL=$PARAKEET_MODEL
+      - PARAKEET_MODEL=${PARAKEET_MODEL:-nvidia/parakeet-tdt-0.6b-v3}
extras/asr-services/enhanced_chunking.py (6)

36-44: Don’t permanently mutate model decoding flags; restore after use.

You force compute_timestamps=True but never restore the prior value. Cache and restore in reset() (or context-manage).

         if hasattr(self.asr_model, 'decoding') and hasattr(self.asr_model.decoding, 'compute_timestamps'):
-            original_value = self.asr_model.decoding.compute_timestamps
+            self._original_compute_timestamps = self.asr_model.decoding.compute_timestamps
             self.asr_model.decoding.compute_timestamps = True
-            logger.info(f"🔧 TIMESTAMP CONFIG: Set compute_timestamps=True (was: {original_value})")
+            logger.info(f"🔧 TIMESTAMP CONFIG: Set compute_timestamps=True")
         else:
             logger.warning("🚨 TIMESTAMP CONFIG: Model does not have compute_timestamps attribute!")
@@
     def reset(self):
         """Reset the chunked inference state and clear accumulated hypotheses."""
         super().reset()
         self.all_hypotheses = []
         self.chunk_offsets = [0]
         self.merged_hypothesis = None
+        # Restore timestamp flag if it existed
+        if hasattr(self.asr_model, 'decoding') and hasattr(self.asr_model.decoding, 'compute_timestamps'):
+            if hasattr(self, "_original_compute_timestamps"):
+                self.asr_model.decoding.compute_timestamps = self._original_compute_timestamps

53-53: Remove unused param to match parent signature.

keep_logits is unused and diverges from the base signature.

-    def _get_batch_preds(self, keep_logits=False):
+    def _get_batch_preds(self):

107-113: Avoid blind catch; log stack and return via else.

Follow ruff TRY300/TRY400.

-        try:
-            self.merged_hypothesis = self._join_hypotheses(self.all_hypotheses)
-            return [self.merged_hypothesis] if self.merged_hypothesis else self.all_hypotheses
-        except Exception as e:
-            logger.warning(f"Hypothesis joining failed: {e}, returning raw hypotheses")
-            return self.all_hypotheses
+        try:
+            self.merged_hypothesis = self._join_hypotheses(self.all_hypotheses)
+        except Exception:
+            logger.exception("Hypothesis joining failed; returning raw hypotheses")
+            return self.all_hypotheses
+        else:
+            return [self.merged_hypothesis] if self.merged_hypothesis else self.all_hypotheses

357-363: Import os for env-driven reconciliation.

Needed for the overlap/confidence env defaults used below.

-import logging
+import logging
+import os

473-477: Derive final text from reconciled words to avoid duplicates.

Prefer building text from words, fallback to result_text.

-            response = {
-                'text': result_text,
+            response = {
+                'text': result_text or " ".join(w.get('word','') for w in words if w.get('word')),
                 'words': words,
-                'segments': [{'start': 0.0, 'end': end_time, 'text': result_text}]
+                'segments': [{'start': 0.0, 'end': end_time, 'text': result_text or ""}]
             }

513-514: Use logger.exception for tracebacks.

Keeps stack info.

-        logger.error(f"Enhanced chunking failed: {e}")
+        logger.exception("Enhanced chunking failed")
extras/asr-services/parakeet-offline.py (5)

42-47: Remove unused imports from enhanced_chunking

Only transcribe_with_enhanced_chunking is referenced; the rest add lint noise.

-from enhanced_chunking import (
-    TimestampedFrameBatchChunkedRNNT,
-    extract_timestamps_from_hypotheses,
-    transcribe_with_enhanced_chunking,
-)
+from enhanced_chunking import transcribe_with_enhanced_chunking

58-64: Wire env knobs into the chunking call (and drop unused if not needed)

CHUNK_DURATION_SECONDS, OVERLAP_DURATION_SECONDS, and CONFIDENCE_THRESHOLD are defined but unused. Either pass them to the chunking path or remove to avoid configuration drift.

Would you like me to thread CHUNK_DURATION_SECONDS/OVERLAP_DURATION_SECONDS into the transcribe_with_enhanced_chunking call (see suggestion below in the chunked method)?


111-113: Remove dead field: chunked_processor

Never read; keep the state minimal.

-        # Chunking components are now handled by enhanced_chunking.transcribe_with_enhanced_chunking
-        self.chunked_processor = None  # Will be initialized when needed
+        # Enhanced chunking is invoked on demand; no per-instance processor state needed.

203-215: Adaptive routing based on total duration looks good

Good thresholding and early return to the chunked path.

Consider extracting the total duration math into a small helper for reuse in batch/streaming paths.


288-289: Preserve traceback when re-raising

Use bare raise to keep the original stack.

-            raise e
+            raise
backends/advanced/upload_files.py (3)

216-216: Type hint uses PEP 585 generics (requires Python 3.9+).
If runtime can be <3.9, switch to typing.List for compatibility.

Apply this minimal change if needed:

-def upload_files_async(files: list[str], token: str, base_url: str = "http://localhost:8000") -> bool:
+from typing import List
+def upload_files_async(files: List[str], token: str, base_url: str = "http://localhost:8000") -> bool:

262-273: Guard against missing job_id in response.
Avoid polling with a None job_id.

Apply:

         job_data = response.json()
         job_id = job_data.get("job_id")
         total_files = job_data.get("total_files", 0)
+        if not job_id:
+            logger.error("Missing 'job_id' in async start response.")
+            logger.error(f"Response payload: {job_data}")
+            return False

410-421: Validate inputs: accumulate errors, enforce .wav, optionally check format.
Current flow exits on first missing path and allows non-wav files.

Apply:

     if args.files:
-        # Use provided files
-        wav_files = []
-        for file_path in args.files:
-            file_path = Path(file_path).expanduser().resolve()
-            if file_path.exists():
-                wav_files.append(str(file_path))
-                logger.info(f"Added file: {file_path}")
-            else:
-                logger.error(f"File not found: {file_path}")
-                sys.exit(1)
+        # Use provided files
+        wav_files = []
+        had_errors = False
+        for file_path in args.files:
+            p = Path(file_path).expanduser().resolve()
+            if not p.exists():
+                logger.error(f"File not found: {p}")
+                had_errors = True
+                continue
+            if p.suffix.lower() != ".wav":
+                logger.error(f"Unsupported file type (only .wav): {p.name}")
+                had_errors = True
+                continue
+            is_valid, fmt_err = validate_audio_format(str(p))
+            if not is_valid:
+                logger.error(f"Invalid WAV format: {fmt_err} ({p.name})")
+                had_errors = True
+                continue
+            wav_files.append(str(p))
+            logger.info(f"Added file: {p}")
+        if had_errors and not wav_files:
+            sys.exit(1)
@@
     if not wav_files:
         logger.error("No files to upload")
         sys.exit(1)

Also applies to: 435-437

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 37d2e03 and 6b7525f.

⛔ Files ignored due to path filters (1)
  • extras/asr-services/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • CLAUDE.md (1 hunks)
  • backends/advanced/scripts/delete_all_conversations_api.py (1 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/transcript_coordinator.py (5 hunks)
  • backends/advanced/src/advanced_omi_backend/transcription.py (2 hunks)
  • backends/advanced/src/advanced_omi_backend/transcription_providers.py (1 hunks)
  • backends/advanced/upload_files.py (3 hunks)
  • extras/asr-services/.dockerignore (1 hunks)
  • extras/asr-services/.env.template (1 hunks)
  • extras/asr-services/docker-compose.yml (1 hunks)
  • extras/asr-services/enhanced_chunking.py (1 hunks)
  • extras/asr-services/parakeet-offline.py (7 hunks)
  • extras/asr-services/pyproject.toml (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-09T02:03:55.976Z
Learnt from: CR
PR: AnkushMalaker/friend-lite#0
File: CLAUDE.md:0-0
Timestamp: 2025-09-09T02:03:55.976Z
Learning: Applies to **/.dockerignore : Docker projects use .dockerignore with an exclude pattern; explicitly include required files so they are sent to the Docker build context

Applied to files:

  • extras/asr-services/.dockerignore
🪛 Ruff (0.12.2)
backends/advanced/src/advanced_omi_backend/transcription_providers.py

557-557: Use raise without specifying exception name

Remove exception name

(TRY201)

backends/advanced/src/advanced_omi_backend/transcription.py

106-106: Do not catch blind exception: Exception

(BLE001)


108-108: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


153-153: Use raise without specifying exception name

Remove exception name

(TRY201)

backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py

322-322: Use explicit conversion flag

Replace with conversion flag

(RUF010)


406-406: Do not catch blind exception: Exception

(BLE001)


417-417: Do not catch blind exception: Exception

(BLE001)


431-431: Do not catch blind exception: Exception

(BLE001)


432-432: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


435-435: Use explicit conversion flag

Replace with conversion flag

(RUF010)

backends/advanced/scripts/delete_all_conversations_api.py

82-82: Loop control variable client_id not used within loop body

Rename unused client_id to _client_id

(B007)


121-121: f-string without any placeholders

Remove extraneous f prefix

(F541)

extras/asr-services/enhanced_chunking.py

53-53: Unused method argument: keep_logits

(ARG002)


109-109: Consider moving this statement to an else block

(TRY300)


110-110: Do not catch blind exception: Exception

(BLE001)


196-196: Unused function argument: model

(ARG001)


216-216: f-string without any placeholders

Remove extraneous f prefix

(F541)


276-276: f-string without any placeholders

Remove extraneous f prefix

(F541)


282-282: f-string without any placeholders

Remove extraneous f prefix

(F541)


307-307: f-string without any placeholders

Remove extraneous f prefix

(F541)


345-345: f-string without any placeholders

Remove extraneous f prefix

(F541)


348-348: f-string without any placeholders

Remove extraneous f prefix

(F541)


349-349: Consider moving this statement to an else block

(TRY300)


384-384: f-string without any placeholders

Remove extraneous f prefix

(F541)


402-402: f-string without any placeholders

Remove extraneous f prefix

(F541)


419-419: f-string without any placeholders

Remove extraneous f prefix

(F541)


432-432: f-string without any placeholders

Remove extraneous f prefix

(F541)


444-444: f-string without any placeholders

Remove extraneous f prefix

(F541)


456-456: f-string without any placeholders

Remove extraneous f prefix

(F541)


488-488: f-string without any placeholders

Remove extraneous f prefix

(F541)


497-497: f-string without any placeholders

Remove extraneous f prefix

(F541)


499-499: f-string without any placeholders

Remove extraneous f prefix

(F541)


508-508: f-string without any placeholders

Remove extraneous f prefix

(F541)


510-510: Consider moving this statement to an else block

(TRY300)


513-513: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

backends/advanced/src/advanced_omi_backend/transcript_coordinator.py

58-58: Create your own exception

(TRY002)


58-58: Avoid specifying long messages outside the exception class

(TRY003)

extras/asr-services/parakeet-offline.py

148-148: Avoid specifying long messages outside the exception class

(TRY003)


196-196: Do not catch blind exception: Exception

(BLE001)


289-289: Use raise without specifying exception name

Remove exception name

(TRY201)


557-557: Do not catch blind exception: Exception

(BLE001)


559-559: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🪛 dotenv-linter (3.3.0)
extras/asr-services/.env.template

[warning] 9-9: [UnorderedKey] The PARAKEET_CONTAINER_PORT key should go before the PARAKEET_HOST_PORT key

(UnorderedKey)


[warning] 27-27: [EndingBlankLine] No blank line at the end of the file

(EndingBlankLine)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: integration-tests
🔇 Additional comments (10)
backends/advanced/src/advanced_omi_backend/controllers/system_controller.py (1)

570-572: LGTM: duration guard removal.

Proceeding with long files is fine; you already scale timeouts below.

Ensure infra can handle very large uploads (memory/CPU) and adjust chunking if needed.

backends/advanced/src/advanced_omi_backend/transcription.py (1)

102-117: Unified failure pathway is the right direction.

Centrally surfacing provider errors to DB + coordinator looks good and will simplify upstream handling.

backends/advanced/src/advanced_omi_backend/transcript_coordinator.py (1)

27-39: Verify callers handle TranscriptionFailedError from wait_for_transcript_completion

rg found only the function definition (backends/advanced/src/advanced_omi_backend/transcript_coordinator.py) and a docs reference; no call sites detected — ensure all callers catch TranscriptionFailedError (or Exception) and correctly handle timeouts/failures.

extras/asr-services/.dockerignore (1)

4-4: Correct include for new module.

Adding !enhanced_chunking.py after * ensures the chunker is in the build context. Looks good.

extras/asr-services/pyproject.toml (2)

23-29: Confirmed: Dockerfile_Parakeet uses --group parakeet.
Both extras/asr-services/Dockerfile_Parakeet and extras/asr-services/Dockerfile_Parakeet.blackwell run uv sync --no-install-project --group parakeet in the builder stage.


18-19: Confirm dependency intent and lock — confirmed.

Both packages are actively used and pinned in the lockfile: easy-audio-interfaces==0.7.1 (imports in extras/asr-services/client.py, extras/asr-services/parakeet-offline.py, extras/speaker-recognition/laptop_client.py) and httpx==0.28.1 (imports in backends/advanced/src/advanced_omi_backend/transcription_providers.py, backends/advanced/src/advanced_omi_backend/memory/providers/mcp_client.py, extras/openmemory-mcp/test_standalone.py, extras/havpe-relay/main.py).

extras/asr-services/parakeet-offline.py (1)

27-27: Import looks good

Needed for WAV serialization below.

backends/advanced/upload_files.py (3)

6-6: Argparse import for CLI — LGTM.


423-433: Default test file fallback — LGTM.
Path resolution via project_root is clear; good guidance on missing asset.


439-442: Upload summary logging — LGTM.
Concise, user-friendly list before the POST.

Comment on lines 320 to 343
logger.info(f"DELETE DEBUG: Starting delete for audio_uuid='{audio_uuid}'")
logger.info(f"DELETE DEBUG: audio_uuid type={type(audio_uuid)}, length={len(audio_uuid)}")
logger.info(f"DELETE DEBUG: audio_uuid repr={repr(audio_uuid)}")
logger.info(f"DELETE DEBUG: audio_uuid bytes={audio_uuid.encode('utf-8').hex()}")
logger.info(f"DELETE DEBUG: Database collection={chunks_col.name}, database={chunks_col.database.name}")

# DEBUG: Test if ANY conversations exist in the collection
total_count = await chunks_col.count_documents({})
logger.info(f"DELETE DEBUG: Total conversations in collection: {total_count}")

# DEBUG: Try to find a few conversations to compare
sample_conversations = []
async for doc in chunks_col.find({}).limit(3):
sample_conversations.append({
"audio_uuid": doc.get("audio_uuid"),
"audio_uuid_type": type(doc.get("audio_uuid")),
"audio_uuid_repr": repr(doc.get("audio_uuid"))
})
logger.info(f"DELETE DEBUG: Sample conversations: {sample_conversations}")

# DEBUG: Execute the exact query we're about to use
query = {"audio_uuid": audio_uuid}
logger.info(f"DELETE DEBUG: Query being executed: {query}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove noisy/sensitive “DELETE DEBUG” logs (PII-ish, high volume).

Logs include raw UUID bytes/repr and collection details at INFO. This is risky and noisy in prod.

Apply:

-        # DEBUG: Log comprehensive details about the search
-        logger.info(f"DELETE DEBUG: Starting delete for audio_uuid='{audio_uuid}'")
-        logger.info(f"DELETE DEBUG: audio_uuid type={type(audio_uuid)}, length={len(audio_uuid)}")
-        logger.info(f"DELETE DEBUG: audio_uuid repr={repr(audio_uuid)}")
-        logger.info(f"DELETE DEBUG: audio_uuid bytes={audio_uuid.encode('utf-8').hex()}")
-        logger.info(f"DELETE DEBUG: Database collection={chunks_col.name}, database={chunks_col.database.name}")
-        
-        # DEBUG: Test if ANY conversations exist in the collection
-        total_count = await chunks_col.count_documents({})
-        logger.info(f"DELETE DEBUG: Total conversations in collection: {total_count}")
-        
-        # DEBUG: Try to find a few conversations to compare
-        sample_conversations = []
-        async for doc in chunks_col.find({}).limit(3):
-            sample_conversations.append({
-                "audio_uuid": doc.get("audio_uuid"),
-                "audio_uuid_type": type(doc.get("audio_uuid")),
-                "audio_uuid_repr": repr(doc.get("audio_uuid"))
-            })
-        logger.info(f"DELETE DEBUG: Sample conversations: {sample_conversations}")
+        logger.debug("Deleting conversation audio_uuid=%s in collection=%s", audio_uuid, chunks_col.name)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info(f"DELETE DEBUG: Starting delete for audio_uuid='{audio_uuid}'")
logger.info(f"DELETE DEBUG: audio_uuid type={type(audio_uuid)}, length={len(audio_uuid)}")
logger.info(f"DELETE DEBUG: audio_uuid repr={repr(audio_uuid)}")
logger.info(f"DELETE DEBUG: audio_uuid bytes={audio_uuid.encode('utf-8').hex()}")
logger.info(f"DELETE DEBUG: Database collection={chunks_col.name}, database={chunks_col.database.name}")
# DEBUG: Test if ANY conversations exist in the collection
total_count = await chunks_col.count_documents({})
logger.info(f"DELETE DEBUG: Total conversations in collection: {total_count}")
# DEBUG: Try to find a few conversations to compare
sample_conversations = []
async for doc in chunks_col.find({}).limit(3):
sample_conversations.append({
"audio_uuid": doc.get("audio_uuid"),
"audio_uuid_type": type(doc.get("audio_uuid")),
"audio_uuid_repr": repr(doc.get("audio_uuid"))
})
logger.info(f"DELETE DEBUG: Sample conversations: {sample_conversations}")
# DEBUG: Execute the exact query we're about to use
query = {"audio_uuid": audio_uuid}
logger.info(f"DELETE DEBUG: Query being executed: {query}")
logger.debug("Deleting conversation audio_uuid=%s in collection=%s", audio_uuid, chunks_col.name)
# DEBUG: Execute the exact query we're about to use
query = {"audio_uuid": audio_uuid}
logger.info(f"DELETE DEBUG: Query being executed: {query}")
🧰 Tools
🪛 Ruff (0.12.2)

322-322: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 320 to 343, remove or tone down the verbose DELETE DEBUG logging
that emits raw/encoded audio_uuid, repr bytes and collection/database names;
replace with a single non-sensitive info-level log indicating a delete operation
for an audio identifier (use masked or hashed id if needed) and move detailed
debug lines to a debug-level block behind a feature flag or
logger.isEnabledFor(DEBUG) check so sensitive data is not logged at INFO and
high-volume logs are avoided.

Comment on lines +401 to +405
full_audio_path = Path("/app/audio_chunks") / audio_path
if full_audio_path.exists():
full_audio_path.unlink()
deleted_files.append(str(full_audio_path))
logger.info(f"Deleted audio file: {full_audio_path}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid hard-coded base path for audio files.

Join intelligently: respect absolute paths; otherwise prepend base dir. Consider moving base dir to config.

Apply:

-                full_audio_path = Path("/app/audio_chunks") / audio_path
+                full_audio_path = Path(audio_path)
+                if not full_audio_path.is_absolute():
+                    full_audio_path = Path("/app/audio_chunks") / audio_path
@@
-                full_cropped_path = Path("/app/audio_chunks") / cropped_audio_path
+                full_cropped_path = Path(cropped_audio_path)
+                if not full_cropped_path.is_absolute():
+                    full_cropped_path = Path("/app/audio_chunks") / cropped_audio_path

Also applies to: 412-417

🤖 Prompt for AI Agents
In
backends/advanced/src/advanced_omi_backend/controllers/conversation_controller.py
around lines 401-405 and 412-417, do not hard-code "/app/audio_chunks"; instead
read a base audio directory from configuration (or environment) and join paths
safely: if the provided audio_path is absolute, use it as-is (after
resolving/normalizing); otherwise prepend the configured base dir and resolve;
then check existence and delete. Also ensure you canonicalize/resolve paths to
avoid path traversal and log the resolved path; apply this same logic to both
code blocks mentioned.

Comment on lines +681 to +691
# Check if transcription failed - immediately fail the job
if transcription_status == "FAILED":
audio_logger.error(
f"❌ [Job {job_id}] Transcription failed, marking file as failed: {filename}"
)
await job_tracker.update_file_status(
job_id, filename, FileStatus.FAILED,
error_message="Transcription failed"
)
break # Exit monitoring loop for this file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid double memory-trigger and don’t trigger after transcription failure.

Currently you always call close_current_conversation() after the loop even if transcription failed or was already triggered, causing duplicate/undesired memory runs.

Apply:

@@
-                # Track whether memory processing has been triggered to avoid duplicate calls
-                memory_triggered = False
+                # Track whether memory processing has been triggered to avoid duplicate calls
+                memory_triggered = False
+                transcription_failed = False
@@
-                            if transcription_status == "FAILED":
+                            if transcription_status == "FAILED":
                                 audio_logger.error(
                                     f"❌ [Job {job_id}] Transcription failed, marking file as failed: {filename}"
                                 )
                                 await job_tracker.update_file_status(
                                     job_id, filename, FileStatus.FAILED, 
                                     error_message="Transcription failed"
                                 )
-                                break  # Exit monitoring loop for this file
+                                transcription_failed = True
+                                break  # Exit monitoring loop for this file
@@
-                # Signal end of conversation - trigger memory processing
-                await client_state.close_current_conversation()
-                await asyncio.sleep(0.5)
+                # Signal end of conversation to trigger memory only if appropriate
+                if not transcription_failed and not memory_triggered:
+                    await client_state.close_current_conversation()
+                    await asyncio.sleep(0.5)

Also applies to: 727-729

Comment on lines 48 to 60
try:
# Wait for the transcript to be ready
await asyncio.wait_for(event.wait(), timeout=timeout)

# Check if this was a failure
if audio_uuid in self.transcript_failures:
error_msg = self.transcript_failures[audio_uuid]
logger.error(f"Transcript failed for {audio_uuid}: {error_msg}")
# Clean up failure tracking
self.transcript_failures.pop(audio_uuid, None)
raise Exception(f"Transcription failed: {error_msg}")

logger.info(f"Transcript ready event received for {audio_uuid}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Race: failure signaled before waiter leads to timeout instead of raising.

If signal_transcript_failed fires before any waiter creates the event, waiters will block until timeout. Fast‑fail on pre‑recorded failures and raise a typed exception.

 class TranscriptCoordinator:
@@
-        try:
+        try:
             # Wait for the transcript to be ready
-            await asyncio.wait_for(event.wait(), timeout=timeout)
+            # Fast-fail if failure already recorded
+            if audio_uuid in self.transcript_failures:
+                error_msg = self.transcript_failures.pop(audio_uuid, None) or "unknown error"
+                logger.error(f"Transcript failed for {audio_uuid}: {error_msg}")
+                raise TranscriptionFailedError(error_msg)
+            await asyncio.wait_for(event.wait(), timeout=timeout)
             
             # Check if this was a failure
-            if audio_uuid in self.transcript_failures:
-                error_msg = self.transcript_failures[audio_uuid]
-                logger.error(f"Transcript failed for {audio_uuid}: {error_msg}")
-                # Clean up failure tracking
-                self.transcript_failures.pop(audio_uuid, None)
-                raise Exception(f"Transcription failed: {error_msg}")
+            if audio_uuid in self.transcript_failures:
+                error_msg = self.transcript_failures.pop(audio_uuid, None) or "unknown error"
+                logger.error(f"Transcript failed for {audio_uuid}: {error_msg}")
+                raise TranscriptionFailedError(error_msg)

Add a typed exception near the top:

 logger = logging.getLogger(__name__)
 
+class TranscriptionFailedError(Exception):
+    """Raised when transcription fails for an audio UUID."""
+
🧰 Tools
🪛 Ruff (0.12.2)

58-58: Create your own exception

(TRY002)


58-58: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/transcript_coordinator.py around
lines 48 to 60, there is a race where signal_transcript_failed may record a
failure before a waiter creates the event causing waiters to timeout instead of
failing fast; define a typed exception (e.g., TranscriptionFailed(Exception))
near the top of the module, then before awaiting event.wait() check if
audio_uuid is in self.transcript_failures and if so capture and pop the error
message and immediately raise TranscriptionFailed with that message (also keep
the existing post-wait failure check to cover failures signaled during the
wait).

Comment on lines 87 to 106
def signal_transcript_failed(self, audio_uuid: str, error_message: str):
"""Signal that transcript processing failed for the given audio_uuid.

This should be called by TranscriptionManager when transcription fails.
Waiting processes will be unblocked and can check for failure status.

Args:
audio_uuid: The audio UUID that failed transcription
error_message: Description of the failure
"""
# Store the failure message
self.transcript_failures[audio_uuid] = error_message

if audio_uuid in self.transcript_events:
# Set the event to unblock waiting processes
self.transcript_events[audio_uuid].set()
logger.error(f"Signaled transcript failed for {audio_uuid}: {error_message}")
else:
logger.debug(f"No waiting processors for failed transcript {audio_uuid}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Always create+set the event on failure to unblock future waiters.

Ensure a waiter starting after failure is immediately released.

     def signal_transcript_failed(self, audio_uuid: str, error_message: str):
@@
-        # Store the failure message
-        self.transcript_failures[audio_uuid] = error_message
-        
-        if audio_uuid in self.transcript_events:
-            # Set the event to unblock waiting processes
-            self.transcript_events[audio_uuid].set()
-            logger.error(f"Signaled transcript failed for {audio_uuid}: {error_message}")
-        else:
-            logger.debug(f"No waiting processors for failed transcript {audio_uuid}")
+        # Store the failure message
+        self.transcript_failures[audio_uuid] = error_message
+        # Ensure an event exists, then set it
+        event = self.transcript_events.get(audio_uuid)
+        if event is None:
+            event = asyncio.Event()
+            self.transcript_events[audio_uuid] = event
+        event.set()
+        logger.error(f"Signaled transcript failed for {audio_uuid}: {error_message}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def signal_transcript_failed(self, audio_uuid: str, error_message: str):
"""Signal that transcript processing failed for the given audio_uuid.
This should be called by TranscriptionManager when transcription fails.
Waiting processes will be unblocked and can check for failure status.
Args:
audio_uuid: The audio UUID that failed transcription
error_message: Description of the failure
"""
# Store the failure message
self.transcript_failures[audio_uuid] = error_message
if audio_uuid in self.transcript_events:
# Set the event to unblock waiting processes
self.transcript_events[audio_uuid].set()
logger.error(f"Signaled transcript failed for {audio_uuid}: {error_message}")
else:
logger.debug(f"No waiting processors for failed transcript {audio_uuid}")
def signal_transcript_failed(self, audio_uuid: str, error_message: str):
"""Signal that transcript processing failed for the given audio_uuid.
This should be called by TranscriptionManager when transcription fails.
Waiting processes will be unblocked and can check for failure status.
Args:
audio_uuid: The audio UUID that failed transcription
error_message: Description of the failure
"""
# Store the failure message
self.transcript_failures[audio_uuid] = error_message
# Ensure an event exists, then set it
event = self.transcript_events.get(audio_uuid)
if event is None:
event = asyncio.Event()
self.transcript_events[audio_uuid] = event
event.set()
logger.error(f"Signaled transcript failed for {audio_uuid}: {error_message}")
🤖 Prompt for AI Agents
In backends/advanced/src/advanced_omi_backend/transcript_coordinator.py around
lines 87 to 106, the current signal_transcript_failed only sets an event if one
already exists, which means waiters that start after the failure won't be
unblocked; always create an Event for the audio_uuid if missing, set it, store
the error message in transcript_failures, and log appropriately so any future
waiter sees the failure immediately.

Comment on lines +447 to +452
words = extract_timestamps_from_hypotheses(hypotheses, chunk_start_time=0.0, model=model)

timestamp_end = time.time()
timestamp_duration = timestamp_end - timestamp_start
logger.info(f"📊 TIMING: ✅ Timestamp extraction completed in {timestamp_duration:.3f}s")
print(f"📊 TIMING: ✅ PHASE END: Timestamp Extraction completed in {timestamp_duration:.3f}s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Apply overlap reconciliation and confidence filtering.

Use env-configured overlap/confidence to clean the word stream.

-            words = extract_timestamps_from_hypotheses(hypotheses, chunk_start_time=0.0, model=model)
+            words = extract_timestamps_from_hypotheses(hypotheses, chunk_start_time=0.0, model=model)
+            overlap_seconds = float(os.getenv("OVERLAP_DURATION_SECONDS", "5.0"))
+            confidence_threshold = float(os.getenv("CONFIDENCE_THRESHOLD", "0.0"))
+            words = _reconcile_overlaps(words, overlap_seconds=overlap_seconds, confidence_threshold=confidence_threshold)

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +488 to +508
logger.info(f"📊 TIMING: =================== COMPLETE TIMING SUMMARY ===================")
logger.info(f"📊 TIMING: 🔧 Initialization: {init_duration:.3f}s")
logger.info(f"📊 TIMING: 📁 Audio Loading: {loading_duration:.3f}s")
logger.info(f"📊 TIMING: 🚀 Processing: {processing_duration:.3f}s")
logger.info(f"📊 TIMING: 🔀 Reconcile: {reconcile_duration:.3f}s")
logger.info(f"📊 TIMING: 📝 Timestamp Extraction: {timestamp_duration:.3f}s")
logger.info(f"📊 TIMING: 📄 Final Formatting: {format_duration:.3f}s")
logger.info(f"📊 TIMING: 🎯 TOTAL END-TO-END: {overall_duration:.3f}s")
logger.info(f"📊 TIMING: Enhanced chunking completed. Transcribed {words_count} words")
logger.info(f"📊 TIMING: ================================================================")

print(f"📊 TIMING: =================== COMPLETE TIMING SUMMARY ===================")
print(f"📊 TIMING: 🔧 Initialization: {init_duration:.3f}s")
print(f"📊 TIMING: 📁 Audio Loading: {loading_duration:.3f}s")
print(f"📊 TIMING: 🚀 Processing: {processing_duration:.3f}s")
print(f"📊 TIMING: 🔀 Reconcile: {reconcile_duration:.3f}s")
print(f"📊 TIMING: 📝 Timestamp Extraction: {timestamp_duration:.3f}s")
print(f"📊 TIMING: 📄 Final Formatting: {format_duration:.3f}s")
print(f"📊 TIMING: 🎯 TOTAL END-TO-END: {overall_duration:.3f}s")
print(f"📊 TIMING: Enhanced chunking completed. Transcribed {words_count} words")
print(f"📊 TIMING: ================================================================")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Reduce console noise; prefer logger.debug and drop f-strings without placeholders.

Dozens of print(f"...") lines trip Ruff F541 and flood stdout. Switch to logger.debug("...") or gate prints behind a debug flag.

Run Ruff locally to see remaining hits:


🏁 Script executed:

#!/bin/bash
ruff check extras/asr-services/enhanced_chunking.py

Length of output: 16749


Reduce console noise; fix ruff-reported issues in extras/asr-services/enhanced_chunking.py

  • ruff: 23 issues found (16 auto-fixable). Run ruff --fix, then re-run to confirm.
  • Replace print(...) calls with logger.debug(...) (or gate prints behind a debug flag); prefer logger lazy formatting (logger.info("... %s", val)) over f-strings for runtime values.
  • Remove extraneous f-prefix on literal strings (F541) — many banner/debug prints are f-strings without placeholders (e.g., lines ~216, 276, 282, 307, 345, 348, 384, 402, 419, 432, 444, 456, 488–508).
  • Fix unused arguments: keep_logits (line 53) and model (line 196) — remove or rename to _keep_logits/_model.
  • Avoid blind except Exception (line ~110); catch specific exceptions and use logger.exception(...) in except blocks (see logger.error at line ~513 -> replace with logger.exception).
  • Move return statements out of try blocks or use try/except/else (TRY300 hits around lines ~109, ~349, ~510).
🧰 Tools
🪛 Ruff (0.12.2)

488-488: f-string without any placeholders

Remove extraneous f prefix

(F541)


497-497: f-string without any placeholders

Remove extraneous f prefix

(F541)


499-499: f-string without any placeholders

Remove extraneous f prefix

(F541)


508-508: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
In extras/asr-services/enhanced_chunking.py around lines 488 to 508 and
elsewhere as noted, replace the print(...) banner and timing prints with
logger.debug(...) (or guard them behind a debug flag), remove the unnecessary
f-strings on literal-only banners (delete the f prefix), and switch logger calls
to lazy formatting (e.g., logger.info("Processing: %.3fs", processing_duration))
for runtime values; run ruff --fix and re-run ruff to auto-fix the 16
auto-fixable issues, rename unused arguments keep_logits and model to
_keep_logits and _model (or remove them) at their declaration sites, replace
bare except Exception blocks with specific exception types and use
logger.exception(...) in except handlers, and move return statements out of try
blocks or use try/except/else patterns where TRY300 is flagged (around lines
~109, ~349, ~510) to satisfy the remaining lints.

Comment on lines +132 to +198
async def _transcribe_chunked(self, speech: Sequence[AudioChunk]) -> dict:
"""Chunked transcription method for long audio."""
logger.info("Using chunked transcription for long audio")

# Save audio to temporary file for NeMo processing
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmpfile:
tmpfile_name = tmpfile.name

# Convert audio chunks to numpy array and save as WAV
audio_arrays = []
for chunk in speech:
if chunk.width == 2:
audio_array = np.frombuffer(chunk.audio, dtype=np.int16).astype(np.float32) / np.iinfo(np.int16).max
elif chunk.width == 4:
audio_array = np.frombuffer(chunk.audio, dtype=np.int32).astype(np.float32) / np.iinfo(np.int32).max
else:
raise ValueError(f"Unsupported width: {chunk.width}")
audio_arrays.append(audio_array)

# Concatenate and save as WAV file
full_audio = np.concatenate(audio_arrays)

# Convert to int16 for WAV format
audio_int16 = (full_audio * np.iinfo(np.int16).max).astype(np.int16)

with wave.open(tmpfile_name, 'wb') as wav_file:
wav_file.setnchannels(1) # Mono
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self._rate)
wav_file.writeframes(audio_int16.tobytes())

try:
# Use the enhanced chunking function for proper NeMo integration
logger.info("🔍 ISOLATE: About to call transcribe_with_enhanced_chunking")
result = await transcribe_with_enhanced_chunking(
model=self.model,
audio_file_path=tmpfile_name,
frame_len=4.0, # 4-second frames
total_buffer=8.0 # 8-second total buffer
)

logger.info(f"🔍 ISOLATE: transcribe_with_enhanced_chunking returned: {type(result)}")

if result is None:
logger.error("🔍 ISOLATE: result is None!")
return {"text": "", "words": [], "segments": []}

logger.info(f"🔍 ISOLATE: result.get('words') type: {type(result.get('words'))}")
words = result.get('words', [])
logger.info(f"🔍 ISOLATE: words variable type: {type(words)}")

if words is None:
logger.warning("🔍 ISOLATE: words is None, using empty list")
words = []

logger.info(f"🔍 ISOLATE: About to call len() on words: {type(words)}")
word_count = len(words) if words else 0
logger.info(f"Enhanced chunking completed: {word_count} words")
return result

finally:
# Clean up temporary file
try:
os.unlink(tmpfile_name)
except Exception as e:
logger.warning(f"Failed to delete temporary file {tmpfile_name}: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Stream-write WAV to avoid OOM and fix Windows file-handle contention; use env-based chunk params; quiet debug logs

Current code concatenates all audio into RAM and opens the temp file twice while it’s still open, which is fragile on Windows and memory-heavy for long audio. Stream frames directly to a temp WAV, and drive frame/buffer from env. Also demote noisy “ISOLATE” logs to debug.

-        # Save audio to temporary file for NeMo processing
-        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmpfile:
-            tmpfile_name = tmpfile.name
-
-            # Convert audio chunks to numpy array and save as WAV
-            audio_arrays = []
-            for chunk in speech:
-                if chunk.width == 2:
-                    audio_array = np.frombuffer(chunk.audio, dtype=np.int16).astype(np.float32) / np.iinfo(np.int16).max
-                elif chunk.width == 4:
-                    audio_array = np.frombuffer(chunk.audio, dtype=np.int32).astype(np.float32) / np.iinfo(np.int32).max
-                else:
-                    raise ValueError(f"Unsupported width: {chunk.width}")
-                audio_arrays.append(audio_array)
-
-            # Concatenate and save as WAV file
-            full_audio = np.concatenate(audio_arrays)
-
-            # Convert to int16 for WAV format
-            audio_int16 = (full_audio * np.iinfo(np.int16).max).astype(np.int16)
-
-            with wave.open(tmpfile_name, 'wb') as wav_file:
-                wav_file.setnchannels(1)  # Mono
-                wav_file.setsampwidth(2)  # 16-bit
-                wav_file.setframerate(self._rate)
-                wav_file.writeframes(audio_int16.tobytes())
+        # Save audio to temporary file for NeMo processing (streaming write; cross‑platform safe)
+        fd, tmpfile_name = tempfile.mkstemp(suffix=".wav")
+        os.close(fd)
+        with wave.open(tmpfile_name, "wb") as wav_file:
+            wav_file.setnchannels(1)   # mono
+            wav_file.setsampwidth(2)   # 16‑bit
+            wav_file.setframerate(self._rate)
+            for chunk in speech:
+                if chunk.width == 2:
+                    # Already 16‑bit PCM mono @ 16k
+                    wav_file.writeframes(chunk.audio)
+                elif chunk.width == 4:
+                    # Convert int32 -> int16 with clipping
+                    i32 = np.frombuffer(chunk.audio, dtype=np.int32)
+                    f32 = i32.astype(np.float32) / np.iinfo(np.int32).max
+                    i16 = np.clip(f32, -1.0, 1.0)
+                    wav_file.writeframes((i16 * np.iinfo(np.int16).max).astype(np.int16).tobytes())
+                else:
+                    raise ValueError(f"Unsupported width: {chunk.width}")
@@
-            logger.info("🔍 ISOLATE: About to call transcribe_with_enhanced_chunking")
+            logger.debug("About to call transcribe_with_enhanced_chunking")
             result = await transcribe_with_enhanced_chunking(
                 model=self.model,
                 audio_file_path=tmpfile_name,
-                frame_len=4.0,      # 4-second frames
-                total_buffer=8.0    # 8-second total buffer
+                frame_len=CHUNK_DURATION_SECONDS,
+                # keep ~2x overlap by default; aligns with prior 4s/8s behavior
+                total_buffer=max(OVERLAP_DURATION_SECONDS * 2.0, CHUNK_DURATION_SECONDS)
             )
@@
-            logger.info(f"🔍 ISOLATE: transcribe_with_enhanced_chunking returned: {type(result)}")
+            logger.debug("transcribe_with_enhanced_chunking returned: %s", type(result))
@@
-                logger.error("🔍 ISOLATE: result is None!")
+                logger.error("Chunked transcription returned None")
                 return {"text": "", "words": [], "segments": []}
@@
-            logger.info(f"🔍 ISOLATE: result.get('words') type: {type(result.get('words'))}")
+            logger.debug("words type: %s", type(result.get("words")))
             words = result.get('words', [])
-            logger.info(f"🔍 ISOLATE: words variable type: {type(words)}")
+            logger.debug("words variable type: %s", type(words))
@@
-            logger.info(f"🔍 ISOLATE: About to call len() on words: {type(words)}")
             word_count = len(words) if words else 0
-            logger.info(f"Enhanced chunking completed: {word_count} words")
+            logger.info("Enhanced chunking completed: %d words", word_count)

Note: If transcribe_with_enhanced_chunking supports CONFIDENCE_THRESHOLD, consider threading it too.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

148-148: Avoid specifying long messages outside the exception class

(TRY003)


196-196: Do not catch blind exception: Exception

(BLE001)

Comment on lines +449 to 553
request_start = time.time()
logger.info(f"🕐 TIMING: Transcription request started at {time.strftime('%H:%M:%S', time.localtime(request_start))}")

try:
# Read uploaded file
file_read_start = time.time()
audio_content = await file.read()
file_read_time = time.time() - file_read_start
logger.info(f"🕐 TIMING: File read completed in {file_read_time:.3f}s (size: {len(audio_content)} bytes)")

# Save to temporary file for processing
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
tmp_file.write(audio_content)
tmp_filename = tmp_file.name

try:
# TIMING: LocalFileStreamer creation
streamer_create_start = time.time()
streamer = LocalFileStreamer(tmp_filename,
chunk_size_samples=CHUNK_SAMPLES)
streamer_create_time = time.time() - streamer_create_start
logger.info(f"🕐 TIMING: LocalFileStreamer creation: {streamer_create_time:.3f}s")

# TIMING: streamer.open()
streamer_open_start = time.time()
await streamer.open()

# Get audio properties from first chunk to determine if resampling needed
streamer_open_time = time.time() - streamer_open_start
logger.info(f"🕐 TIMING: LocalFileStreamer open: {streamer_open_time:.3f}s")

# TIMING: First chunk read
first_chunk_start = time.time()
first_chunk = await streamer.read()
first_chunk_time = time.time() - first_chunk_start
logger.info(f"🕐 TIMING: First chunk read: {first_chunk_time:.3f}s")

original_rate = first_chunk.rate
original_channels = first_chunk.channels
original_width = first_chunk.width

logger.info(f"Input audio: {original_rate}Hz, {original_channels}ch, {original_width*8}bit")

# Collect ALL audio chunks using iter_frames()

# TIMING: Collect all chunks using iter_frames()
iter_frames_start = time.time()
all_chunks = [first_chunk] # Include the first chunk we already read
chunk_count = 1
async for chunk in streamer.iter_frames():
all_chunks.append(chunk)

chunk_count += 1
# Progress every 10,000 chunks
if chunk_count % 10000 == 0:
elapsed = time.time() - iter_frames_start
logger.info(f"🕐 TIMING: Loaded {chunk_count} chunks in {elapsed:.3f}s so far...")

iter_frames_time = time.time() - iter_frames_start
logger.info(f"🕐 TIMING: iter_frames() completed: {iter_frames_time:.3f}s for {len(all_chunks)} chunks")
logger.info(f"Loaded {len(all_chunks)} audio chunks, total duration: {sum(len(c.audio)/(c.rate*c.width*c.channels) for c in all_chunks):.2f}s")

# Setup resampling if needed

# TIMING: Setup resampling
resampling_setup_start = time.time()
needs_resampling = (original_rate != PARAKEET_SAMPLING_RATE or original_channels != 1)

resampling_setup_time = time.time() - resampling_setup_start
logger.info(f"🕐 TIMING: Resampling setup check: {resampling_setup_time:.3f}s")

if needs_resampling:
logger.info(f"Resampling from {original_rate}Hz/{original_channels}ch to {PARAKEET_SAMPLING_RATE}Hz/1ch")

# TIMING: ResamplingBlock creation
resampler_create_start = time.time()
resampler = ResamplingBlock(
resample_rate=PARAKEET_SAMPLING_RATE,
resample_channels=1
)

# Resample all chunks
resampler_create_time = time.time() - resampler_create_start
logger.info(f"🕐 TIMING: ResamplingBlock creation: {resampler_create_time:.3f}s")

# TIMING: Resample all chunks
resampling_start = time.time()
resampled_chunks = []
for chunk in all_chunks:
for i, chunk in enumerate(all_chunks):
if i % 10000 == 0 and i > 0:
elapsed = time.time() - resampling_start
logger.info(f"🕐 TIMING: Resampled {i}/{len(all_chunks)} chunks in {elapsed:.3f}s so far...")

async for resampled_chunk in resampler.process_chunk(chunk):
resampled_chunks.append(resampled_chunk)


resampling_time = time.time() - resampling_start
logger.info(f"🕐 TIMING: Resampling completed: {resampling_time:.3f}s")
final_chunks = resampled_chunks
logger.info(f"Resampled to {len(final_chunks)} chunks at {PARAKEET_SAMPLING_RATE}Hz")
else:
final_chunks = all_chunks
logger.info("No resampling needed - audio already at 16kHz mono")

# Transcribe with all chunks (assertion will verify they're all 16kHz)
transcribe_start = time.time()
logger.info(f"🕐 TIMING: Starting NeMo transcription with {len(final_chunks)} chunks")
result = await transcriber.transcribe_async(final_chunks)
transcribe_time = time.time() - transcribe_start
logger.info(f"🕐 TIMING: Transcription completed in {transcribe_time:.3f}s")

total_time = time.time() - request_start
logger.info(f"🕐 TIMING: Total request time: {total_time:.3f}s")
return JSONResponse(content=result)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Batch path is memory-heavy and leaks streamer; stream upload to disk, guard empty input, and close streamer in finally

  • Reading the entire upload into memory and materializing all_chunks scales poorly for long audio.
  • streamer is never closed; unlink may race with an open handle on some OSes.
  • first_chunk may be None; guard it.
-        # Read uploaded file
-        file_read_start = time.time()
-        audio_content = await file.read()
-        file_read_time = time.time() - file_read_start
-        logger.info(f"🕐 TIMING: File read completed in {file_read_time:.3f}s (size: {len(audio_content)} bytes)")
-
-        # Save to temporary file for processing
-        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
-            tmp_file.write(audio_content)
-            tmp_filename = tmp_file.name
+        # Stream upload to a temp file to avoid holding the whole body in RAM
+        file_read_start = time.time()
+        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
+            tmp_filename = tmp_file.name
+            await file.seek(0)
+            while True:
+                chunk = await file.read(1 << 20)  # 1 MiB
+                if not chunk:
+                    break
+                tmp_file.write(chunk)
+        file_size = os.path.getsize(tmp_filename)
+        file_read_time = time.time() - file_read_start
+        logger.info(f"🕐 TIMING: File persisted in {file_read_time:.3f}s (size: {file_size} bytes)")
@@
-            streamer = LocalFileStreamer(tmp_filename,
+            streamer: Optional[LocalFileStreamer] = None
+            streamer = LocalFileStreamer(tmp_filename,
                                          chunk_size_samples=CHUNK_SAMPLES)
@@
-            first_chunk = await streamer.read()
+            first_chunk = await streamer.read()
+            if first_chunk is None:
+                raise HTTPException(status_code=400, detail="Empty audio file")
@@
-            return JSONResponse(content=result)
+            return JSONResponse(content=result)
         finally:
-            os.unlink(tmp_filename)
+            # Ensure streamer is closed before unlink
+            try:
+                if streamer is not None and hasattr(streamer, "close"):
+                    await streamer.close()
+            except Exception:
+                logger.debug("Streamer close failed; continuing with cleanup")
+            os.unlink(tmp_filename)

Optional follow-up: If long-audio is common, short-circuit this path by calling the chunked file-based transcribe directly instead of materializing all frames.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
request_start = time.time()
logger.info(f"🕐 TIMING: Transcription request started at {time.strftime('%H:%M:%S', time.localtime(request_start))}")
try:
# Read uploaded file
file_read_start = time.time()
audio_content = await file.read()
file_read_time = time.time() - file_read_start
logger.info(f"🕐 TIMING: File read completed in {file_read_time:.3f}s (size: {len(audio_content)} bytes)")
# Save to temporary file for processing
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
tmp_file.write(audio_content)
tmp_filename = tmp_file.name
try:
# TIMING: LocalFileStreamer creation
streamer_create_start = time.time()
streamer = LocalFileStreamer(tmp_filename,
chunk_size_samples=CHUNK_SAMPLES)
streamer_create_time = time.time() - streamer_create_start
logger.info(f"🕐 TIMING: LocalFileStreamer creation: {streamer_create_time:.3f}s")
# TIMING: streamer.open()
streamer_open_start = time.time()
await streamer.open()
# Get audio properties from first chunk to determine if resampling needed
streamer_open_time = time.time() - streamer_open_start
logger.info(f"🕐 TIMING: LocalFileStreamer open: {streamer_open_time:.3f}s")
# TIMING: First chunk read
first_chunk_start = time.time()
first_chunk = await streamer.read()
first_chunk_time = time.time() - first_chunk_start
logger.info(f"🕐 TIMING: First chunk read: {first_chunk_time:.3f}s")
original_rate = first_chunk.rate
original_channels = first_chunk.channels
original_width = first_chunk.width
logger.info(f"Input audio: {original_rate}Hz, {original_channels}ch, {original_width*8}bit")
# Collect ALL audio chunks using iter_frames()
# TIMING: Collect all chunks using iter_frames()
iter_frames_start = time.time()
all_chunks = [first_chunk] # Include the first chunk we already read
chunk_count = 1
async for chunk in streamer.iter_frames():
all_chunks.append(chunk)
chunk_count += 1
# Progress every 10,000 chunks
if chunk_count % 10000 == 0:
elapsed = time.time() - iter_frames_start
logger.info(f"🕐 TIMING: Loaded {chunk_count} chunks in {elapsed:.3f}s so far...")
iter_frames_time = time.time() - iter_frames_start
logger.info(f"🕐 TIMING: iter_frames() completed: {iter_frames_time:.3f}s for {len(all_chunks)} chunks")
logger.info(f"Loaded {len(all_chunks)} audio chunks, total duration: {sum(len(c.audio)/(c.rate*c.width*c.channels) for c in all_chunks):.2f}s")
# Setup resampling if needed
# TIMING: Setup resampling
resampling_setup_start = time.time()
needs_resampling = (original_rate != PARAKEET_SAMPLING_RATE or original_channels != 1)
resampling_setup_time = time.time() - resampling_setup_start
logger.info(f"🕐 TIMING: Resampling setup check: {resampling_setup_time:.3f}s")
if needs_resampling:
logger.info(f"Resampling from {original_rate}Hz/{original_channels}ch to {PARAKEET_SAMPLING_RATE}Hz/1ch")
# TIMING: ResamplingBlock creation
resampler_create_start = time.time()
resampler = ResamplingBlock(
resample_rate=PARAKEET_SAMPLING_RATE,
resample_channels=1
)
# Resample all chunks
resampler_create_time = time.time() - resampler_create_start
logger.info(f"🕐 TIMING: ResamplingBlock creation: {resampler_create_time:.3f}s")
# TIMING: Resample all chunks
resampling_start = time.time()
resampled_chunks = []
for chunk in all_chunks:
for i, chunk in enumerate(all_chunks):
if i % 10000 == 0 and i > 0:
elapsed = time.time() - resampling_start
logger.info(f"🕐 TIMING: Resampled {i}/{len(all_chunks)} chunks in {elapsed:.3f}s so far...")
async for resampled_chunk in resampler.process_chunk(chunk):
resampled_chunks.append(resampled_chunk)
resampling_time = time.time() - resampling_start
logger.info(f"🕐 TIMING: Resampling completed: {resampling_time:.3f}s")
final_chunks = resampled_chunks
logger.info(f"Resampled to {len(final_chunks)} chunks at {PARAKEET_SAMPLING_RATE}Hz")
else:
final_chunks = all_chunks
logger.info("No resampling needed - audio already at 16kHz mono")
# Transcribe with all chunks (assertion will verify they're all 16kHz)
transcribe_start = time.time()
logger.info(f"🕐 TIMING: Starting NeMo transcription with {len(final_chunks)} chunks")
result = await transcriber.transcribe_async(final_chunks)
transcribe_time = time.time() - transcribe_start
logger.info(f"🕐 TIMING: Transcription completed in {transcribe_time:.3f}s")
total_time = time.time() - request_start
logger.info(f"🕐 TIMING: Total request time: {total_time:.3f}s")
return JSONResponse(content=result)
request_start = time.time()
logger.info(f"🕐 TIMING: Transcription request started at {time.strftime('%H:%M:%S', time.localtime(request_start))}")
try:
# Stream upload to a temp file to avoid holding the whole body in RAM
file_read_start = time.time()
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
tmp_filename = tmp_file.name
await file.seek(0)
while True:
chunk = await file.read(1 << 20) # 1 MiB
if not chunk:
break
tmp_file.write(chunk)
file_size = os.path.getsize(tmp_filename)
file_read_time = time.time() - file_read_start
logger.info(f"🕐 TIMING: File persisted in {file_read_time:.3f}s (size: {file_size} bytes)")
try:
# TIMING: LocalFileStreamer creation
streamer_create_start = time.time()
streamer: Optional[LocalFileStreamer] = None
streamer = LocalFileStreamer(tmp_filename,
chunk_size_samples=CHUNK_SAMPLES)
streamer_create_time = time.time() - streamer_create_start
logger.info(f"🕐 TIMING: LocalFileStreamer creation: {streamer_create_time:.3f}s")
# TIMING: streamer.open()
streamer_open_start = time.time()
await streamer.open()
streamer_open_time = time.time() - streamer_open_start
logger.info(f"🕐 TIMING: LocalFileStreamer open: {streamer_open_time:.3f}s")
# TIMING: First chunk read
first_chunk_start = time.time()
first_chunk = await streamer.read()
first_chunk_time = time.time() - first_chunk_start
logger.info(f"🕐 TIMING: First chunk read: {first_chunk_time:.3f}s")
if first_chunk is None:
raise HTTPException(status_code=400, detail="Empty audio file")
original_rate = first_chunk.rate
original_channels = first_chunk.channels
original_width = first_chunk.width
logger.info(f"Input audio: {original_rate}Hz, {original_channels}ch, {original_width*8}bit")
# TIMING: Collect all chunks using iter_frames()
iter_frames_start = time.time()
all_chunks = [first_chunk] # Include the first chunk we already read
chunk_count = 1
async for chunk in streamer.iter_frames():
all_chunks.append(chunk)
chunk_count += 1
# Progress every 10,000 chunks
if chunk_count % 10000 == 0:
elapsed = time.time() - iter_frames_start
logger.info(f"🕐 TIMING: Loaded {chunk_count} chunks in {elapsed:.3f}s so far...")
iter_frames_time = time.time() - iter_frames_start
logger.info(f"🕐 TIMING: iter_frames() completed: {iter_frames_time:.3f}s for {len(all_chunks)} chunks")
logger.info(f"Loaded {len(all_chunks)} audio chunks, total duration: {sum(len(c.audio)/(c.rate*c.width*c.channels) for c in all_chunks):.2f}s")
# TIMING: Setup resampling
resampling_setup_start = time.time()
needs_resampling = (original_rate != PARAKEET_SAMPLING_RATE or original_channels != 1)
resampling_setup_time = time.time() - resampling_setup_start
logger.info(f"🕐 TIMING: Resampling setup check: {resampling_setup_time:.3f}s")
if needs_resampling:
logger.info(f"Resampling from {original_rate}Hz/{original_channels}ch to {PARAKEET_SAMPLING_RATE}Hz/1ch")
# TIMING: ResamplingBlock creation
resampler_create_start = time.time()
resampler = ResamplingBlock(
resample_rate=PARAKEET_SAMPLING_RATE,
resample_channels=1
)
resampler_create_time = time.time() - resampler_create_start
logger.info(f"🕐 TIMING: ResamplingBlock creation: {resampler_create_time:.3f}s")
# TIMING: Resample all chunks
resampling_start = time.time()
resampled_chunks = []
for i, chunk in enumerate(all_chunks):
if i % 10000 == 0 and i > 0:
elapsed = time.time() - resampling_start
logger.info(f"🕐 TIMING: Resampled {i}/{len(all_chunks)} chunks in {elapsed:.3f}s so far...")
async for resampled_chunk in resampler.process_chunk(chunk):
resampled_chunks.append(resampled_chunk)
resampling_time = time.time() - resampling_start
logger.info(f"🕐 TIMING: Resampling completed: {resampling_time:.3f}s")
final_chunks = resampled_chunks
logger.info(f"Resampled to {len(final_chunks)} chunks at {PARAKEET_SAMPLING_RATE}Hz")
else:
final_chunks = all_chunks
logger.info("No resampling needed - audio already at 16kHz mono")
# Transcribe with all chunks (assertion will verify they're all 16kHz)
transcribe_start = time.time()
logger.info(f"🕐 TIMING: Starting NeMo transcription with {len(final_chunks)} chunks")
result = await transcriber.transcribe_async(final_chunks)
transcribe_time = time.time() - transcribe_start
logger.info(f"🕐 TIMING: Transcription completed in {transcribe_time:.3f}s")
total_time = time.time() - request_start
logger.info(f"🕐 TIMING: Total request time: {total_time:.3f}s")
return JSONResponse(content=result)
finally:
# Ensure streamer is closed before unlink
try:
if streamer is not None and hasattr(streamer, "close"):
await streamer.close()
except Exception:
logger.debug("Streamer close failed; continuing with cleanup")
os.unlink(tmp_filename)
🤖 Prompt for AI Agents
extras/asr-services/parakeet-offline.py lines 449-553: the current flow reads
the entire uploaded file into memory, materializes all_chunks, never guarantees
streamer.close(), and assumes first_chunk is non-null; fix by streaming the
upload directly to a temporary file (avoid await file.read()), check and return
a 400 if the file is empty, open the LocalFileStreamer and ensure it is closed
in a finally block (and unlink the temp file after closing to avoid races),
avoid building an in-memory all_chunks list for long audio—either process chunks
on-disk (pass the streamer/iterator directly to a chunked transcribe API) or
write chunks to a temp file and stream them into the resampler/transcriber, and
handle the case where the initial read yields None by validating before using
original_rate/channels/width.

Comment on lines 558 to 560
error_time = time.time() - request_start
logger.error(f"🕐 TIMING: Error occurred after {error_time:.3f}s - {e}")
raise HTTPException(status_code=500, detail=str(e))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don’t swallow HTTPException; log stack with exception()

Preserve HTTP exceptions and avoid exposing internal errors to clients; use logger.exception for full traceback.

-    except Exception as e:
-        error_time = time.time() - request_start
-        logger.error(f"🕐 TIMING: Error occurred after {error_time:.3f}s - {e}")
-        raise HTTPException(status_code=500, detail=str(e))
+    except HTTPException:
+        # Pass through FastAPI exceptions (e.g., 4xx/5xx already classified)
+        raise
+    except Exception as e:
+        error_time = time.time() - request_start
+        logger.exception("🕐 TIMING: Error occurred after %.3fs", error_time)
+        # Avoid leaking internal error details to clients
+        raise HTTPException(status_code=500, detail="Transcription failed") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
error_time = time.time() - request_start
logger.error(f"🕐 TIMING: Error occurred after {error_time:.3f}s - {e}")
raise HTTPException(status_code=500, detail=str(e))
except HTTPException:
# Pass through FastAPI exceptions (e.g., 4xx/5xx already classified)
raise
except Exception as e:
error_time = time.time() - request_start
logger.exception("🕐 TIMING: Error occurred after %.3fs", error_time)
# Avoid leaking internal error details to clients
raise HTTPException(status_code=500, detail="Transcription failed") from e
🧰 Tools
🪛 Ruff (0.12.2)

559-559: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


560-560: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In extras/asr-services/parakeet-offline.py around lines 558-560, the current
code logs errors with logger.error and always raises HTTPException exposing
internal error text; change it to preserve existing HTTPException by re-raising
if isinstance(e, HTTPException), otherwise call logger.exception to log the full
traceback (including error_time), and then raise a new generic
HTTPException(status_code=500, detail="Internal server error") so internal
details are not leaked to clients. Ensure exception logging includes the
computed error_time in the message and do not swallow or transform existing
HTTPException instances.

@AnkushMalaker AnkushMalaker merged commit 55bce5c into main Sep 17, 2025
2 checks passed
@AnkushMalaker AnkushMalaker deleted the long-audio-fix branch September 17, 2025 23:42
@coderabbitai coderabbitai bot mentioned this pull request Sep 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant