fix(wren-ai-service): make DDLChunker synchronous to fix asyncio Task passed to embedding node#2194
fix(wren-ai-service): make DDLChunker synchronous to fix asyncio Task passed to embedding node#2194octo-patch wants to merge 2 commits intoCanner:mainfrom
Conversation
…to embedding node When Hamilton's AsyncDriver executes the indexing DAG, it wraps async nodes in asyncio Tasks. Under complex MDL schemas with many relationships, the async chunk node's Task was being passed unawaited to the downstream embedding node instead of the actual dict result, causing the embedder to receive an asyncio Task repr string rather than the document chunks. This makes DDLChunker.run() and its helpers synchronous, matching the pattern used by all other indexing pipelines (historical_question, table_description, project_meta). The async machinery in _model_preprocessor was unnecessary since MODEL_PREPROCESSORS is empty by default and all helper operations are CPU-bound string manipulations. Update tests to call chunker.run() synchronously accordingly. Fixes Canner#2138
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughConvert the DB schema DDL chunking pipeline from asynchronous to synchronous execution by removing Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py (1)
11-547: Optional: dropasync/@pytest.mark.asynciofrom chunker-only tests.After this change, every
test_*fromtest_empty_mdlthroughtest_metriccontains noawaitand only exercises the now-synchronouschunker.run(...). Theasync defsignature and@pytest.mark.asynciomarker are redundant for these and can be converted to plaindefto make the sync contract ofDDLChunker.runexplicit in the tests.test_pipeline_run(Line 550) must stay async since it awaitspipe.run(...)at Line 619.♻️ Example diff for one test (apply the same pattern to the other chunker-only tests)
-@pytest.mark.asyncio -async def test_empty_mdl(): +def test_empty_mdl(): chunker = DDLChunker() mdl = {"models": [], "views": [], "relationships": [], "metrics": []} document = chunker.run(mdl, column_batch_size=1) assert document == {"documents": []}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py` around lines 11 - 547, The tests that only call the synchronous DDLChunker.run (test_empty_mdl, test_single_model, test_multiple_models, test_column_is_primary_key, test_column_with_properties, test_column_with_nested_columns, test_column_with_calculated_property, test_column_with_relationship, test_column_batch_size, test_view, test_view_with_properties, test_metric) should be converted from async def + `@pytest.mark.asyncio` to plain def without the marker; remove the `@pytest.mark.asyncio` decorator and change each async def test_* to def so they clearly reflect that DDLChunker.run is synchronous, but leave test_pipeline_run (which awaits pipe.run(...)) as async with its marker intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@wren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py`:
- Around line 11-547: The tests that only call the synchronous DDLChunker.run
(test_empty_mdl, test_single_model, test_multiple_models,
test_column_is_primary_key, test_column_with_properties,
test_column_with_nested_columns, test_column_with_calculated_property,
test_column_with_relationship, test_column_batch_size, test_view,
test_view_with_properties, test_metric) should be converted from async def +
`@pytest.mark.asyncio` to plain def without the marker; remove the
`@pytest.mark.asyncio` decorator and change each async def test_* to def so they
clearly reflect that DDLChunker.run is synchronous, but leave test_pipeline_run
(which awaits pipe.run(...)) as async with its marker intact.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 926a368c-bf58-4f8e-850d-9e90e31c8e5a
📒 Files selected for processing (2)
wren-ai-service/src/pipelines/indexing/db_schema.pywren-ai-service/tests/pytest/pipelines/indexing/test_db_schema.py
DDLChunker.run() is now synchronous, so the chunker test cases no longer need pytest.mark.asyncio or async def. Only test_pipeline_run keeps async because it still awaits DBSchema.run.
|
Thanks for the careful review @coderabbitai! Pushed 33999d1a which drops Please re-review when you have a chance. |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chainLet me pull up the current state of the files to verify everything looks good. 🏁 Script executed: Length of output: 18048 🏁 Script executed: Length of output: 12861
Test file (
Source file (
The sync/async boundary is clean and the fix directly addresses the root cause: Hamilton's |
Fixes #2138
Problem
When Hamilton's
AsyncDriverexecutes the DB schema indexing DAG, it wraps async nodes in asyncio Tasks. Under complex MDL schemas (many tables with many relationships), the asyncchunknode's Task was sometimes passed unawaited to the downstreamembeddingnode.This caused
embeddingto receive an asyncio Task object (e.g."<Task finished name='Task-1479' coro=<AsyncGraphAd...") instead of the actual{"documents": [...]}dict, which then causedembedder.run()to send invalid content to the embedding endpoint, returning a400 Bad Requestfrom Ollama.The bug was consistently reproducible with complex MDL schemas (20+ tables, 30+ relationships) but did not appear with simpler schemas, matching the issue description.
Solution
Make
DDLChunker.run()and its internal helpers synchronous, eliminating the async Task from the Hamilton DAG. This matches the pattern used by all other indexing pipelines (HistoricalQuestion,TableDescription,ProjectMeta,SqlPairs) where thechunknode is already synchronous.The async machinery in
_model_preprocessor(usingasyncio.gather) was unnecessary because:MODEL_PREPROCESSORSis empty by default — theawaitin the dict comprehension never executesChanges:
DDLChunker.run():async def→def, removesawaitDDLChunker._get_ddl_commands():async def→def, removesawaitDDLChunker._model_preprocessor():async def→def, removesasyncio.gatherDDLChunker._preprocessor()(inner):async def→def, removesawaitin dict comprehensionchunk()Hamilton node:async def→def, removesawaitimport asynciochunker.run()synchronouslyTesting
test_db_schema.pyupdated and pass with the sync interfacetest_pipeline_runintegration test (which uses the fullDBSchemaHamilton pipeline) continues to work since the downstreamembedding,clean, andwritenodes remain asyncSummary by CodeRabbit