Add queue-storage COPY producer#206
Conversation
📝 WalkthroughWalkthroughThis PR introduces a new COPY-based batch ingestion path for queue storage. The public API Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant QS as QueueStorage
participant TX as Transaction
participant DB as PostgreSQL
participant LN as Lane State
participant UQ as Uniqueness Claims
Client->>QS: enqueue_params_copy(jobs)
QS->>TX: begin transaction
TX->>DB: allocate job IDs (sequence)
DB-->>TX: job_ids, next_seq
TX->>LN: reserve lane seq-ranges per queue
LN-->>TX: lane_seq ranges
TX->>UQ: synchronize uniqueness claims
UQ-->>TX: conflict check result
alt Conflict Detected
TX->>DB: ROLLBACK
DB-->>QS: error (UniqueConflict)
QS-->>Client: Result::Err
else No Conflict
TX->>DB: COPY ready_entries (CSV format)
TX->>DB: COPY deferred_jobs (CSV format)
TX->>LN: update lane counters in batch
TX->>DB: queue logical notifications
TX->>DB: COMMIT
DB-->>QS: rows_inserted
QS-->>Client: Result::Ok(usize)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
docs/test-plan.md (1)
171-171: ⚡ Quick winAdd an explicit run command for the new QSC scenario.
Line 171 adds the matrix row, but the “Running Tests” section does not yet include the corresponding
awa-modelcommand. Adding it keeps the plan directly executable.Proposed doc update
# COPY integration tests DATABASE_URL=postgres://postgres:test@localhost:15432/awa_test cargo test --package awa --test copy_test -- --nocapture +# Queue-storage COPY producer tests +DATABASE_URL=postgres://postgres:test@localhost:15432/awa_test cargo test --package awa-model --test queue_storage_copy_test -- --nocapture + # Python sync tests cd awa-python && DATABASE_URL=postgres://postgres:test@localhost:15432/awa_test .venv/bin/pytest tests/test_sync.py -v🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/test-plan.md` at line 171, The new matrix row QSC1-QSC2 (Queue storage COPY producer: ready/deferred direct COPY, unique-conflict rollback) needs a matching executable entry in the "Running Tests" section: add the corresponding awa-model run command for QSC1-QSC2 (using the same invocation pattern as other rows) so the test plan is directly executable and the scenario can be run without manual mapping.awa-model/tests/queue_storage_copy_test.rs (1)
158-167: ⚡ Quick winStrengthen rollback assertion to include lane-counter side effects.
Line 166 verifies no ready rows persisted; consider also asserting queue lane availability remains zero so transaction rollback guarantees cover counters too.
Suggested assertion addition
assert_eq!(ready_count, 0); + + let lane_available: i64 = sqlx::query_scalar(&format!( + "SELECT COALESCE(sum(available_count), 0)::bigint FROM {}.queue_lanes WHERE queue = $1", + store.schema() + )) + .bind(queue) + .fetch_one(&pool) + .await + .expect("count lane availability"); + assert_eq!(lane_available, 0); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awa-model/tests/queue_storage_copy_test.rs` around lines 158 - 167, The test currently asserts ready_entries count is zero but misses verifying lane-counter side effects; add a second SQL scalar query (similar to the ready_count query) against format!("SELECT count(*)::bigint FROM {}.lane_availability WHERE queue = $1", store.schema()) (or the project's lane-counter table name) binding queue and using pool, .await, .expect("count lane availability rows"), then assert_eq!(lane_count, 0) to ensure transaction rollback also cleared lane counters; place this directly after the ready_count assertion.awa-model/src/queue_storage.rs (2)
3863-3954: 🏗️ Heavy liftFactor the COPY enqueue planning out of the duplicated code.
insert_ready_rows_copy_tx,insert_deferred_rows_copy_tx, andenqueue_params_copynow repeat the batch path's lane allocation, uniqueness sync, state filtering, and notification prep almost verbatim. Because the contract here is “same semantics asenqueue_params_batch”, this copy/paste is likely to drift on the next behavior change. A shared planner plus pluggable sink (INSERTvsCOPY) would keep the paths aligned.Also applies to: 4097-4137, 4574-4677
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awa-model/src/queue_storage.rs` around lines 3863 - 3954, Extract the common enqueue planning logic (lane allocation, uniqueness sync, state filtering, and notification preparation) into a shared planner function (e.g., prepare_enqueue_plan or build_enqueue_plan) that returns a neutral "plan" type (e.g., Vec<RuntimeReadyInsert> or a Plan struct with ready_rows and count_deltas) and accept a pluggable sink strategy (an enum or trait for Sink::Insert vs Sink::Copy) so the caller can apply either execute_ready_copy_tx or the INSERT path; replace the duplicate logic in insert_ready_rows_copy_tx, insert_deferred_rows_copy_tx, and enqueue_params_copy to call the new planner and then perform only the sink-specific actions (calling execute_ready_copy_tx or the INSERT executor) and adjust_lane_counts_batch, ensuring you reuse existing helpers like next_job_ids, ensure_lane, sync_unique_claim, and adjust_lane_counts_batch to keep semantics identical to enqueue_params_batch.
410-539: ⚡ Quick winAdd a regression matrix around the handwritten CSV encoder.
This encoder now owns NULL-sentinel handling, quote/newline escaping, bytea hex, and JSON serialization. A small live-Postgres case matrix for values like
",","\"","\n","\\","", and literal"__AWA_NULL__"would make this much safer to evolve.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@awa-model/src/queue_storage.rs` around lines 410 - 539, The CSV encoder lacks regression tests for NULL-sentinel, quoting/escaping, bytea hex and JSON serialization; add a test matrix that exercises write_copy_field, write_copy_json, write_copy_optional_bytes and the row writers (write_ready_copy_row, write_deferred_copy_row) with cases: comma (","), double-quote ("\""), newline ("\n"), backslash ("\\"), empty string (""), and the literal COPY_NULL_SENTINEL ("__AWA_NULL__"); for each case produce the encoded buffer, then either parse it back with Postgres COPY (preferred) or a robust CSV parser to assert round-trip semantics and correct NULL handling/bytea hex encoding for Optional<Vec<u8>> and correct JSON stringification for serde_json::Value; implement tests named like test_copy_encoder_regression_matrix and test_copy_rows_regression_matrix that build sample RuntimeReadyInsert/DeferredJobRow structs with these values and assert expected decoded columns (including NULL when sentinel is present).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@awa-model/src/queue_storage.rs`:
- Around line 3751-3765: The current COPY implementation builds a full in-memory
CSV (csv_buf) by calling write_ready_copy_row for every row and only then
calling copy_in.send, which retains upstream row vectors and spikes memory;
change the logic to stream rows into copy_in in bounded chunks: allocate a
reusable buffer, iterate rows and append via write_ready_copy_row, and whenever
the buffer exceeds a configured threshold (e.g., ~64KB-1MB) call
copy_in.send(buffer_slice) then clear/truncate the buffer (or replace with a new
Vec) so memory is bounded and upstream rows can be dropped; apply the same
streaming-chunk pattern to the other COPY sites that use
write_ready_copy_row/csv_buf/copy_in (the other copy_in_raw usages) and ensure
tx.as_mut().copy_in_raw(...).await is used before sending chunks and
copy_in.finish() is called after the final chunk.
---
Nitpick comments:
In `@awa-model/src/queue_storage.rs`:
- Around line 3863-3954: Extract the common enqueue planning logic (lane
allocation, uniqueness sync, state filtering, and notification preparation) into
a shared planner function (e.g., prepare_enqueue_plan or build_enqueue_plan)
that returns a neutral "plan" type (e.g., Vec<RuntimeReadyInsert> or a Plan
struct with ready_rows and count_deltas) and accept a pluggable sink strategy
(an enum or trait for Sink::Insert vs Sink::Copy) so the caller can apply either
execute_ready_copy_tx or the INSERT path; replace the duplicate logic in
insert_ready_rows_copy_tx, insert_deferred_rows_copy_tx, and enqueue_params_copy
to call the new planner and then perform only the sink-specific actions (calling
execute_ready_copy_tx or the INSERT executor) and adjust_lane_counts_batch,
ensuring you reuse existing helpers like next_job_ids, ensure_lane,
sync_unique_claim, and adjust_lane_counts_batch to keep semantics identical to
enqueue_params_batch.
- Around line 410-539: The CSV encoder lacks regression tests for NULL-sentinel,
quoting/escaping, bytea hex and JSON serialization; add a test matrix that
exercises write_copy_field, write_copy_json, write_copy_optional_bytes and the
row writers (write_ready_copy_row, write_deferred_copy_row) with cases: comma
(","), double-quote ("\""), newline ("\n"), backslash ("\\"), empty string (""),
and the literal COPY_NULL_SENTINEL ("__AWA_NULL__"); for each case produce the
encoded buffer, then either parse it back with Postgres COPY (preferred) or a
robust CSV parser to assert round-trip semantics and correct NULL handling/bytea
hex encoding for Optional<Vec<u8>> and correct JSON stringification for
serde_json::Value; implement tests named like
test_copy_encoder_regression_matrix and test_copy_rows_regression_matrix that
build sample RuntimeReadyInsert/DeferredJobRow structs with these values and
assert expected decoded columns (including NULL when sentinel is present).
In `@awa-model/tests/queue_storage_copy_test.rs`:
- Around line 158-167: The test currently asserts ready_entries count is zero
but misses verifying lane-counter side effects; add a second SQL scalar query
(similar to the ready_count query) against format!("SELECT count(*)::bigint FROM
{}.lane_availability WHERE queue = $1", store.schema()) (or the project's
lane-counter table name) binding queue and using pool, .await, .expect("count
lane availability rows"), then assert_eq!(lane_count, 0) to ensure transaction
rollback also cleared lane counters; place this directly after the ready_count
assertion.
In `@docs/test-plan.md`:
- Line 171: The new matrix row QSC1-QSC2 (Queue storage COPY producer:
ready/deferred direct COPY, unique-conflict rollback) needs a matching
executable entry in the "Running Tests" section: add the corresponding awa-model
run command for QSC1-QSC2 (using the same invocation pattern as other rows) so
the test plan is directly executable and the scenario can be run without manual
mapping.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ee3b655b-2dda-4fae-81a3-bdd1f060744f
📒 Files selected for processing (7)
awa-model/README.mdawa-model/src/queue_storage.rsawa-model/tests/queue_storage_copy_test.rscorrectness/storage/MAPPING.mddocs/adr/008-copy-batch-ingestion.mddocs/architecture.mddocs/test-plan.md
…pha.1) The COPY-based producer landed in awa 0.6.0-alpha.1 (hardbyte/awa#206 + #208). On the bulk-everywhere bench shape it's ~9% faster than the previous enqueue_params_batch (multi-row INSERT) path at 128 workers. Default this adapter to the COPY path. Keep AWA_QS_PRODUCER_PATH=batch as a diagnostic A/B knob in case anyone wants to compare.
Summary
Local throughput note
Local microbench from the ignored queue-storage COPY benchmark, 8,192 jobs on local Postgres:
These are hardware/local-DB numbers, not a release claim; they show the direct COPY path is not universally faster at smaller batches but is competitive/slightly ahead at 128 in this run.
Validation