Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion duckdbservice/arrow_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@ func RowsToRecord(alloc memory.Allocator, rows *sql.Rows, schema *arrow.Schema,

numFields := schema.NumFields()
count := 0
for rows.Next() && count < batchSize {
// Order matters: check `count < batchSize` first, then call rows.Next().
// The reverse (rows.Next() && count < batchSize) advances the cursor once
// more after the final scan and that row is silently dropped — the next
// call to RowsToRecord starts from the row *after* the one we skipped.
// Production reads were losing one row at every batch boundary
// (batchSize=1024) for unbounded SELECTs; COUNT(*) still returned the
// parquet-metadata row count, so the discrepancy was invisible to
// aggregation queries. See TestRowsToRecordNoRowsLostAtBatchBoundary.
for count < batchSize && rows.Next() {
values := make([]interface{}, numFields)
valuePtrs := make([]interface{}, numFields)
for i := range values {
Expand Down
100 changes: 100 additions & 0 deletions duckdbservice/arrow_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,3 +1059,103 @@ func TestGetQuerySchemaTrailingSemicolon(t *testing.T) {
}
}

// TestRowsToRecordNoRowsLostAtBatchBoundary reproduces the production bug where
// RowsToRecord silently dropped one row at every batch transition for unbounded
// SELECTs. The driver-level cursor was being advanced by the loop condition
// `rows.Next() && count < batchSize` even when the batch was already full, so
// the row at index `batchSize`, `2*batchSize`, ... never reached a Scan call
// and was lost when the caller asked for the next batch.
//
// Reported by Marce Coll on dbt_marce.credit_purchase_events
// (12617 rows reported by COUNT(*), 12605 rows delivered by SELECT) and
// dbt.usage_allocation (2,689,942 vs 2,687,758). Symptoms:
// - SELECT col FROM big_table delivers fewer rows than COUNT(*).
// - WHERE col = X still returns the missing row.
// - Number of lost rows scales linearly with table size.
func TestRowsToRecordNoRowsLostAtBatchBoundary(t *testing.T) {
alloc := memory.NewGoAllocator()
db, err := sql.Open("duckdb", "")
if err != nil {
t.Fatalf("open: %v", err)
}
defer func() { _ = db.Close() }()

// Pick row counts that exercise multiple batch transitions and a partial
// final batch. batchSize = 1024 matches the value used by DoGetStatement.
const batchSize = 1024
cases := []struct {
name string
rows int
}{
{"single batch exact", 1024},
{"one row over boundary", 1025},
{"two batches exact", 2048},
{"three batches + partial", 3500},
{"production case credit_purchase_events", 12617},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tbl := fmt.Sprintf("t_%d", tc.rows)
if _, err := db.Exec(fmt.Sprintf(
"CREATE TABLE %s AS SELECT i AS id FROM range(0, %d) t(i)", tbl, tc.rows,
)); err != nil {
t.Fatalf("create table: %v", err)
}

ctx := context.Background()
rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT id FROM %s ORDER BY id", tbl))
if err != nil {
t.Fatalf("query: %v", err)
}
defer func() { _ = rows.Close() }()

schema, err := GetQuerySchema(ctx, db, fmt.Sprintf("SELECT id FROM %s", tbl), nil)
if err != nil {
t.Fatalf("schema: %v", err)
}

seen := make([]bool, tc.rows)
delivered := 0
for {
rec, err := RowsToRecord(alloc, rows, schema, batchSize)
if err != nil {
t.Fatalf("RowsToRecord: %v", err)
}
if rec == nil {
break
}
col := rec.Column(0).(*array.Int64)
for i := 0; i < col.Len(); i++ {
id := col.Value(i)
if id < 0 || id >= int64(tc.rows) {
rec.Release()
t.Fatalf("delivered out-of-range id %d (expected 0..%d)", id, tc.rows-1)
}
if seen[id] {
rec.Release()
t.Fatalf("id %d delivered twice", id)
}
seen[id] = true
delivered++
}
rec.Release()
}

if delivered != tc.rows {
// Walk the seen set to point at the first dropped id; this
// matches the production symptom (deterministic missing rows).
firstDropped := -1
for i, ok := range seen {
if !ok {
firstDropped = i
break
}
}
t.Fatalf("delivered %d rows, expected %d (first dropped id = %d)",
delivered, tc.rows, firstDropped)
}
})
}
}

Loading