Skip to content

feat(sqlite-provider): implement streaming full-table scan for adaptive filtering#4

Merged
anoop-narang merged 7 commits into
mainfrom
feat/sqlite-streaming-scan
Mar 17, 2026
Merged

feat(sqlite-provider): implement streaming full-table scan for adaptive filtering#4
anoop-narang merged 7 commits into
mainfrom
feat/sqlite-streaming-scan

Conversation

@anoop-narang
Copy link
Copy Markdown
Collaborator

Summary

  • SqliteLookupProvider.scan() previously returned NotImplemented, causing vector search queries with a WHERE clause to fail with "SqliteLookupProvider does not support full table scans"
  • Adds SqliteFullScanExec: a leaf ExecutionPlan that streams all rows from the SQLite table in 1024-row batches via a bounded tokio::sync::mpsc channel
  • The blocking SQLite cursor runs in spawn_blocking; each batch is evaluated through evaluate_filters() and dropped immediately, keeping peak memory at O(batch_size) rather than O(total_rows)
  • The existing semaphore and connection pool are shared with fetch_by_keys, so concurrent scans and key lookups stay within the configured pool size
  • No changes to planner.rs or any other file — the existing adaptive_filtered_execute logic works correctly once scan() returns a valid streaming plan

Test plan

  • Vector search without WHERE clause still works (unfiltered path unchanged)
  • Vector search with WHERE clause no longer returns NotImplemented error
  • Large tables: memory stays bounded (scan batches are dropped after filter evaluation)
  • Concurrent queries: semaphore prevents exceeding pool size

…ve filtering

SqliteLookupProvider.scan() previously returned NotImplemented, causing
the adaptive filtered path in USearchExec to fail when a WHERE clause
was combined with vector search.

Add SqliteFullScanExec: a leaf ExecutionPlan that streams all rows from
the SQLite table in 1024-row batches via a bounded tokio mpsc channel.
The blocking SQLite cursor runs in spawn_blocking; the async consumer
processes each batch through evaluate_filters() and drops it immediately,
keeping peak memory at O(batch_size) rather than O(total_rows).

The semaphore and connection pool are shared with fetch_by_keys so
concurrent scans and key lookups stay within the configured pool size.
Comment thread src/sqlite_provider.rs
Comment on lines +540 to +544
if rows_in_batch > 0 {
if let Ok(batch) = build_scan_batch(&schema_task, col_bufs) {
let _ = tx_c.blocking_send(Ok(batch));
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — final-batch error is silently swallowed

If build_scan_batch returns an Err here, it's dropped and the stream terminates cleanly — the consumer receives a successful (but truncated) result with no indication that the last partial batch failed.

Suggested change
if rows_in_batch > 0 {
if let Ok(batch) = build_scan_batch(&schema_task, col_bufs) {
let _ = tx_c.blocking_send(Ok(batch));
}
}
if rows_in_batch > 0 {
match build_scan_batch(&schema_task, col_bufs) {
Ok(batch) => { let _ = tx_c.blocking_send(Ok(batch)); }
Err(e) => { let _ = tx_c.blocking_send(Err(e)); }
}
}

Comment thread src/sqlite_provider.rs Outdated

let pool_c = pool.clone();
let tx_c = tx.clone();
let _ = tokio::task::spawn_blocking(move || {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — spawn_blocking panic is silently discarded

let _ = spawn_blocking(...).await drops the Result<T, JoinError>. If the blocking task panics, the JoinError is thrown away, tx_c is dropped (the sender closes), and the consumer sees a clean end-of-stream with no error. A partial scan looks like a successful complete scan.

Propagate the join error:

Suggested change
let _ = tokio::task::spawn_blocking(move || {
if let Err(e) = tokio::task::spawn_blocking(move || {

and at the closing .await:

            }).await {
                let _ = tx.send(Err(DataFusionError::Execution(format!("scan task panicked: {e}")))).await;
            }

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@-

Address PR review comments:
- Send build_scan_batch errors for the last partial batch instead of
  silently dropping them (truncated scan looked like success)
- Propagate spawn_blocking JoinError so panics surface as stream errors
  instead of a clean end-of-stream
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — Existing test broken (tests/sqlite_provider_test.rs:147–160)

test_scan_returns_not_implemented asserts that scan() returns an Err containing "not support full table scans". After this PR, scan() returns Ok(...), so the test will fail. The test must be replaced with one that exercises the new streaming behavior (e.g., assert all rows are returned, assert batching works).

P1 — Connection pool error silently swallowed (src/sqlite_provider.rs:431–436)

When pool.lock() returns a poisoned mutex, the code converts the error to None and then reports "connection pool empty" instead of the real cause. fetch_by_keys propagates the poison error; this path should do the same.

Action Required

  1. Remove/replace test_scan_returns_not_implemented — it asserts behavior that no longer exists and will fail CI.
  2. Propagate the mutex-poison error in execute() rather than silently converting it to None.

Comment thread src/sqlite_provider.rs
Ok(mut g) => g.pop(),
Err(_) => None,
}
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swallowing the poison error and returning None means a poisoned mutex is reported as "connection pool empty", hiding the real cause. fetch_by_keys handles this correctly — propagate the error here too:

Suggested change
};
let conn = match pool.lock() {
Ok(mut g) => g.pop().ok_or_else(|| {
DataFusionError::Execution("SqliteFullScanExec: connection pool empty".into())
}),
Err(e) => Err(DataFusionError::Execution(format!(
"connection pool mutex poisoned: {e}"
))),
};
let conn = match conn {
Ok(c) => c,

Comment thread src/sqlite_provider.rs
Comment on lines +486 to +502
Ok(Some(row)) => {
let mut row_ok = true;
for (ci, buf) in col_bufs.iter_mut().enumerate() {
match row.get::<_, SqlValue>(ci) {
Ok(v) => buf.push(v),
Err(e) => {
let _ = tx_c.blocking_send(Err(
DataFusionError::Execution(e.to_string()),
));
row_ok = false;
break;
}
}
}
if !row_ok {
break;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a column read fails mid-row (say on column ci), columns 0..ci in col_bufs have one extra value that columns ci..n don't. The code correctly sends the error and breaks, but rows_in_batch still reflects the count of fully completed rows. If rows_in_batch > 0, the final flush at line 535 will call build_scan_batch with buffers of unequal length, causing RecordBatch::try_new to fail and a second error to be sent on the channel — masking the original one.

Fix: clear col_bufs (or just skip the flush) on the error path before breaking:

if !row_ok {
    // Discard partial row data so the final flush doesn't see
    // mismatched column buffer lengths.
    for buf in col_bufs.iter_mut() {
        buf.truncate(rows_in_batch);
    }
    break;
}

Comment thread src/sqlite_provider.rs
Err(DataFusionError::NotImplemented(
"SqliteLookupProvider does not support full table scans; use fetch_by_keys".into(),
))
Ok(Arc::new(SqliteFullScanExec::new(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _projection parameter is ignored — the scan always fetches every column regardless of what DataFusion requests. For tables with many wide columns this wastes I/O and memory.\n\nConsider building a projected schema here (same pattern used in fetch_by_keys) and passing it to SqliteFullScanExec:\n\nrust\nlet schema = match projection {\n None => self.schema.clone(),\n Some(idxs) => Arc::new(arrow_schema::Schema::new(\n idxs.iter().map(|&i| self.schema.field(i).clone()).collect::<Vec<_>>(),\n )),\n};\nOk(Arc::new(SqliteFullScanExec::new(\n self.pool.clone(),\n self.sem.clone(),\n self.table_name.clone(),\n schema,\n)))\n

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally left unprojected. The only caller (plan_extension in planner.rs) always passes projection: None because the adaptive filter path needs all columns available to evaluate arbitrary WHERE predicates — projecting here risks dropping columns the filter references. Adding projection support would require the caller to compute the union of filter-referenced columns and key column, which adds complexity for no current benefit.

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

Issues

P1 — test_scan_returns_not_implemented will fail (tests/sqlite_provider_test.rs:147–160)

The existing regression test asserts that scan() returns a NotImplemented error containing "not support full table scans". This PR removes that error, so the test will fail. The test needs to be replaced with one that exercises the new happy path (full scan returns all rows). The PR test plan lists this scenario but no test was added.

P2 — Projection parameter ignored (inline comment on src/sqlite_provider.rs:320)

P2 — Partial column buffer corruption on column-read error (inline comment on src/sqlite_provider.rs:486–502)

Action Required

  1. Replace or update test_scan_returns_not_implemented with a test that registers the provider in a SessionContext and verifies a full-table SQL query returns the correct rows.
  2. Address the partial-buffer truncation on the error path (see inline comment).

Prevents mismatched buffer lengths from causing a spurious second error
during the final-batch flush, which would mask the original column read
failure.
Comment thread src/sqlite_provider.rs
Comment on lines +320 to +325
Ok(Arc::new(SqliteFullScanExec::new(
self.pool.clone(),
self.sem.clone(),
self.table_name.clone(),
self.schema.clone(),
)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: projection hint is ignored — always scans all columns.

SqliteFullScanExec selects every column from SQLite regardless of the _projection argument. For wide tables where a query touches only a few columns, this wastes significant I/O per batch. The existing fetch_by_keys already handles projection correctly via out_schema.

SqliteFullScanExec should accept a projection: Option<Vec<usize>> field. The SQL col_list and the PlanProperties schema should both reflect only the projected columns, matching the pattern in execute_query_sync.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate of the earlier thread — see reply at #discussion_r2945452082. The sole caller (plan_extension) always passes projection: None because the adaptive filter path requires all columns to evaluate arbitrary WHERE predicates. Adding projection support risks correctness (dropping filter-referenced columns) for no current benefit.

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: P1 issues found - see inline comments and summary below.

ISSUES:

P1 - test_scan_returns_not_implemented will fail (tests/sqlite_provider_test.rs:147-160)

An existing test explicitly asserts scan() returns an error containing 'not support full table scans'. This PR changes scan() to return Ok(...), making that test fail. The test file was not updated in this PR.

P1 - No test coverage for the new streaming scan path

The PR's own test plan items are all unchecked. SqliteFullScanExec has no tests: batching behavior, the flush boundary at exactly SCAN_BATCH_SIZE rows, concurrency under the semaphore, and error propagation paths in the blocking task are all untested.

ACTION REQUIRED:

  1. Delete or replace test_scan_returns_not_implemented in tests/sqlite_provider_test.rs - it asserts the old NotImplemented behavior and will break CI.
  2. Add at least one integration test that exercises the new scan path end-to-end and verifies row counts and data correctness.

Update test_scan_returns_not_implemented → test_scan_streams_all_rows:
exercises the new SqliteFullScanExec end-to-end, verifying row count
and data correctness across streamed batches.
Comment thread src/sqlite_provider.rs
}

// Flush the last partial batch.
if rows_in_batch > 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a column-read error the loop breaks with rows_in_batch > 0, so this flush sends an Ok(batch) down the channel after an Err(...) was already sent. DataFusion consumers stop on the first error so this is harmless in practice, but it's confusing. Consider adding a had_error: bool flag and skipping the flush when it's set.

Comment thread src/sqlite_provider.rs
Err(DataFusionError::NotImplemented(
"SqliteLookupProvider does not support full table scans; use fetch_by_keys".into(),
))
Ok(Arc::new(SqliteFullScanExec::new(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: thread _projection through to SqliteFullScanExec and use it to build a narrower SELECT column list. Right now every full scan fetches all columns from SQLite regardless of what the query needs. DataFusion will add a ProjectionExec on top so results are correct, but wide tables pay unnecessary deserialization cost.

claude[bot]
claude Bot previously approved these changes Mar 17, 2026
Avoids sending Ok(batch) on the channel after an Err was already sent
for a mid-row column read failure.
Comment thread src/sqlite_provider.rs

fn execute(
&self,
_partition: usize,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: validate partition before spawning. UnknownPartitioning(1) declares a single partition, so any other value is a caller bug — better to surface it than silently spawn a duplicate scan.

Suggested change
_partition: usize,
partition: usize,
        if partition != 0 {
            return Err(DataFusionError::Internal(format!(
                "SqliteFullScanExec only has 1 partition, got partition {partition}"
            )));
        }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFusion never calls execute() with partition > 0 when UnknownPartitioning(1) is declared — this is enforced by the framework. No other ExecutionPlan in this codebase (including USearchExec) adds this guard. Adding it here would be inconsistent defensive code for a scenario that can't happen.

/// scan() returns a streaming ExecutionPlan that yields all rows in batches.
#[tokio::test]
async fn test_scan_returns_not_implemented() {
async fn test_scan_streams_all_rows() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing 3-row test covers only the final partial-batch flush. The more complex path — where a full 1024-row batch is emitted mid-scan and buffers are drained — is never exercised. Consider adding a second test that inserts SCAN_BATCH_SIZE + 1 rows (or at minimum > SCAN_BATCH_SIZE) and asserts the total row count, to cover the batch-boundary drain logic.

@anoop-narang anoop-narang merged commit dba4786 into main Mar 17, 2026
5 checks passed
@anoop-narang anoop-narang deleted the feat/sqlite-streaming-scan branch March 17, 2026 10:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant