interleave metric and nonblock IO#76
Conversation
| *[_run_and_pipeline(output_id_to_record[oid], oid) for oid in pending_output_ids], | ||
| return_exceptions=True, |
There was a problem hiding this comment.
Since we except Exception inside _run_and_pipeline(), we don't expect _run_and_pipeline() to raise anything. Can you make that intentional by removing return_exceptions=True?
Also, it's a tiny detail, but since we're here, we don't need to instantiate a list; we can simply use a generator (in parentheses).
| *[_run_and_pipeline(output_id_to_record[oid], oid) for oid in pending_output_ids], | |
| return_exceptions=True, | |
| *(_run_and_pipeline(output_id_to_record[oid], oid) for oid in pending_output_ids) |
| if isinstance(item, Exception): | ||
| logger.error(f"Unexpected pipeline coroutine exception: {item}") | ||
| continue |
There was a problem hiding this comment.
Again, that shouldn't happen, so I think we can clean that up. Also, if that even happened, this wouldn't add the sample to either finished_ids or not_finished_ids, and it would disappear, which seems a bit broken. Does that make sense?
| if not self._audio_buffer and self.user_audio_buffer and self.assistant_audio_buffer: | ||
| diff_bytes = abs(len(self.user_audio_buffer) - len(self.assistant_audio_buffer)) | ||
| diff_ms = diff_bytes / (2 * self._audio_sample_rate) * 1000 | ||
| if diff_ms > 500: | ||
| logger.warning( | ||
| f"Audio buffer length mismatch: user={len(self.user_audio_buffer)} " | ||
| f"assistant={len(self.assistant_audio_buffer)} " | ||
| f"diff={diff_ms:.0f}ms — mixed recording may be temporally skewed" | ||
| ) | ||
| from eva.assistant.audio_bridge import pcm16_mix # lazy: avoids circular import at module load | ||
|
|
||
| self._audio_buffer = bytearray(pcm16_mix(bytes(self.user_audio_buffer), bytes(self.assistant_audio_buffer))) | ||
| elif not self._audio_buffer and self.user_audio_buffer: | ||
| self._audio_buffer = bytearray(self.user_audio_buffer) | ||
| elif not self._audio_buffer and self.assistant_audio_buffer: | ||
| self._audio_buffer = bytearray(self.assistant_audio_buffer) |
There was a problem hiding this comment.
Detail: I think a single if not self._audio_buffer would be clearer than repeating 3 times.
| if not self._audio_buffer and self.user_audio_buffer and self.assistant_audio_buffer: | |
| diff_bytes = abs(len(self.user_audio_buffer) - len(self.assistant_audio_buffer)) | |
| diff_ms = diff_bytes / (2 * self._audio_sample_rate) * 1000 | |
| if diff_ms > 500: | |
| logger.warning( | |
| f"Audio buffer length mismatch: user={len(self.user_audio_buffer)} " | |
| f"assistant={len(self.assistant_audio_buffer)} " | |
| f"diff={diff_ms:.0f}ms — mixed recording may be temporally skewed" | |
| ) | |
| from eva.assistant.audio_bridge import pcm16_mix # lazy: avoids circular import at module load | |
| self._audio_buffer = bytearray(pcm16_mix(bytes(self.user_audio_buffer), bytes(self.assistant_audio_buffer))) | |
| elif not self._audio_buffer and self.user_audio_buffer: | |
| self._audio_buffer = bytearray(self.user_audio_buffer) | |
| elif not self._audio_buffer and self.assistant_audio_buffer: | |
| self._audio_buffer = bytearray(self.assistant_audio_buffer) | |
| if not self._audio_buffer: | |
| if self.user_audio_buffer and self.assistant_audio_buffer: | |
| diff_bytes = abs(len(self.user_audio_buffer) - len(self.assistant_audio_buffer)) | |
| diff_ms = diff_bytes / (2 * self._audio_sample_rate) * 1000 | |
| if diff_ms > 500: | |
| logger.warning( | |
| f"Audio buffer length mismatch: user={len(self.user_audio_buffer)} " | |
| f"assistant={len(self.assistant_audio_buffer)} " | |
| f"diff={diff_ms:.0f}ms — mixed recording may be temporally skewed" | |
| ) | |
| from eva.assistant.audio_bridge import pcm16_mix # lazy: avoids circular import at module load | |
| self._audio_buffer = bytearray( | |
| pcm16_mix(bytes(self.user_audio_buffer), bytes(self.assistant_audio_buffer)) | |
| ) | |
| elif self.user_audio_buffer: | |
| self._audio_buffer = bytearray(self.user_audio_buffer) | |
| elif self.assistant_audio_buffer: | |
| self._audio_buffer = bytearray(self.assistant_audio_buffer) |
| if isinstance(self.pipeline_config, SpeechToSpeechConfig): | ||
| self.audit_log.save_transcript_jsonl(transcript_path) | ||
| elif not transcript_path.exists(): |
There was a problem hiding this comment.
| if isinstance(self.pipeline_config, SpeechToSpeechConfig): | |
| self.audit_log.save_transcript_jsonl(transcript_path) | |
| elif not transcript_path.exists(): | |
| if isinstance(self.pipeline_config, SpeechToSpeechConfig) or not transcript_path.exists(): |
| logger.error(f"Error saving agent perf stats: {e}", exc_info=True) | ||
|
|
||
| # Call base class to save audit_log, audio, scenario DBs, latencies | ||
| await super().save_outputs() |
There was a problem hiding this comment.
Duplicating most of super().save_outputs(), instead of calling it, worries me about them going out of sync. Have you considered refactoring so that you can keep calling super().save_outputs()?
| # Phase 6: Fire metrics immediately if passed | ||
| if vr.passed and metrics_runner is not None: | ||
| rdir = self.output_dir / "records" / output_id | ||
| task = asyncio.create_task(metrics_runner._run_and_save_record(output_id, rdir)) |
There was a problem hiding this comment.
Now that we call _run_and_save_record() from two places outside of running.py, should we remove the "private" underscore prefix?
| # STEP 7: Run full metrics on successful records | ||
| # STEP 7: Await background metrics, then run final aggregation pass. | ||
| # Background tasks already wrote metrics.json for records validated during the loop. | ||
| # The final run() skips already-computed records and only does summary aggregation. |
There was a problem hiding this comment.
Since we're now mutating metrics_runner.record_ids, I think we need to be extra careful with the ordering of the steps. Would adding a comment like that make sense?
| # The final run() skips already-computed records and only does summary aggregation. | |
| # The final run() skips already-computed records and only does summary aggregation. | |
| # IMPORTANT: Must await all background tasks BEFORE mutating metrics_runner.record_ids | |
| # below — running tasks read record_ids to filter which records to process. |
2399c98 to
0a593bb
Compare
Changes:
allow validation and metrics to run on successful conversations even if other conversations need rerun. These will be saved and used at the final pass once all retries are done or success is for all records
Much smaller changes:
Release semaphore before doing expensive writes for audio
Don't block main thread with IO. Files reads are large enough where this will help somewhat, Probably not a lot though.