From 79ad8eb418dedbf510847fbba9ec9a6c4f518e4c Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Thu, 20 Nov 2025 13:21:54 +0100 Subject: [PATCH 1/6] Add _pos --- iceberg_rust_ffi/Cargo.lock | 131 ++++++-------------- iceberg_rust_ffi/Cargo.toml | 6 +- iceberg_rust_ffi/src/full.rs | 2 + iceberg_rust_ffi/src/incremental.rs | 6 + src/RustyIceberg.jl | 4 +- src/full.jl | 26 ++++ src/incremental.jl | 26 ++++ src/scan_common.jl | 19 +++ test/runtests.jl | 186 ++++++++++++++++++++++++++++ 9 files changed, 309 insertions(+), 97 deletions(-) diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index c4e42de..b521b4a 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -123,23 +123,21 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow-arith" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", - "num", + "num-traits", ] [[package]] name = "arrow-array" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -148,29 +146,31 @@ dependencies = [ "chrono", "half", "hashbrown 0.16.0", - "num", + "num-complex", + "num-integer", + "num-traits", ] [[package]] name = "arrow-buffer" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e003216336f70446457e280807a73899dd822feaf02087d31febca1363e2fccc" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "bytes", "half", - "num", + "num-bigint", + "num-traits", ] [[package]] name = "arrow-cast" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", + "arrow-ord", "arrow-schema", "arrow-select", "atoi", @@ -178,27 +178,26 @@ dependencies = [ "chrono", "half", "lexical-core", - "num", + "num-traits", "ryu", ] [[package]] name = "arrow-data" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-buffer", "arrow-schema", "half", - "num", + "num-integer", + "num-traits", ] [[package]] name = "arrow-ipc" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", @@ -210,9 +209,8 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", @@ -223,29 +221,26 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" [[package]] name = "arrow-select" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "ahash 0.8.12", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", - "num", + "num-traits", ] [[package]] name = "arrow-string" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "arrow-array", "arrow-buffer", @@ -253,7 +248,7 @@ dependencies = [ "arrow-schema", "arrow-select", "memchr", - "num", + "num-traits", "regex", "regex-syntax", ] @@ -1518,7 +1513,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=ae83309fd198ec30f052a7e9f983711c5f581aea#ae83309fd198ec30f052a7e9f983711c5f581aea" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=5003e5d01cc081fbaeaedb3e4b9321c7727c5ae3#5003e5d01cc081fbaeaedb3e4b9321c7727c5ae3" dependencies = [ "anyhow", "apache-avro", @@ -1563,7 +1558,6 @@ dependencies = [ "serde_repr", "serde_with", "strum", - "thrift", "tokio", "typed-builder", "url", @@ -1943,9 +1937,9 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "lz4_flex" -version = "0.11.5" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" +checksum = "ab6473172471198271ff72e9379150e9dfd70d8e533e0752a27e515b48dd375e" dependencies = [ "twox-hash", ] @@ -2079,20 +2073,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "num" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" -dependencies = [ - "num-bigint", - "num-complex", - "num-integer", - "num-iter", - "num-rational", - "num-traits", -] - [[package]] name = "num-bigint" version = "0.4.6" @@ -2128,28 +2108,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-iter" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - -[[package]] -name = "num-rational" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" -dependencies = [ - "num-bigint", - "num-integer", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.19" @@ -2404,9 +2362,8 @@ dependencies = [ [[package]] name = "parquet" -version = "56.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0dbd48ad52d7dccf8ea1b90a3ddbfaea4f69878dd7683e51c507d4bc52b5b27" +version = "57.0.0" +source = "git+https://github.com/apache/arrow-rs?rev=fea605cb16f7524cb69a197bfa581a1d4f5fe5d0#fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -2425,8 +2382,9 @@ dependencies = [ "half", "hashbrown 0.16.0", "lz4_flex", - "num", "num-bigint", + "num-integer", + "num-traits", "paste", "seq-macro", "simdutf8", @@ -3579,15 +3537,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - [[package]] name = "thrift" version = "0.17.0" @@ -3596,9 +3545,7 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "log", "ordered-float 2.10.1", - "threadpool", ] [[package]] diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 29c9ec3..49737a0 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -12,14 +12,14 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "ae83309fd198ec30f052a7e9f983711c5f581aea" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "5003e5d01cc081fbaeaedb3e4b9321c7727c5ae3" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" libc = "0.2" anyhow = "1.0" -arrow-array = "56.2.0" -arrow-ipc = "56.2.0" +arrow-array = { git = "https://github.com/apache/arrow-rs", rev = "fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" } +arrow-ipc = { git = "https://github.com/apache/arrow-rs", rev = "fea605cb16f7524cb69a197bfa581a1d4f5fe5d0" } tracing-subscriber = "0.3" tracing = "0.1" once_cell = "1.19" diff --git a/iceberg_rust_ffi/src/full.rs b/iceberg_rust_ffi/src/full.rs index 7ede34a..fae6c86 100644 --- a/iceberg_rust_ffi/src/full.rs +++ b/iceberg_rust_ffi/src/full.rs @@ -54,6 +54,8 @@ impl_with_batch_size!(iceberg_scan_with_batch_size, IcebergScan); impl_scan_builder_method!(iceberg_scan_with_file_column, IcebergScan, with_file_column); +impl_scan_builder_method!(iceberg_scan_with_pos_column, IcebergScan, with_pos_column); + impl_scan_build!(iceberg_scan_build, IcebergScan); // Async function to initialize stream from a table scan diff --git a/iceberg_rust_ffi/src/incremental.rs b/iceberg_rust_ffi/src/incremental.rs index 3d85368..4f8ee1e 100644 --- a/iceberg_rust_ffi/src/incremental.rs +++ b/iceberg_rust_ffi/src/incremental.rs @@ -130,6 +130,12 @@ impl_scan_builder_method!( with_file_column ); +impl_scan_builder_method!( + iceberg_incremental_scan_with_pos_column, + IcebergIncrementalScan, + with_pos_column +); + impl_scan_build!(iceberg_incremental_scan_build, IcebergIncrementalScan); // Get unzipped Arrow streams from incremental scan (async) diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index c372924..b4d8276 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -11,9 +11,9 @@ export IcebergException export new_incremental_scan, free_incremental_scan! export table_open, free_table, new_scan, free_scan! export select_columns!, with_batch_size!, with_data_file_concurrency_limit!, with_manifest_entry_concurrency_limit! -export with_file_column! +export with_file_column!, with_pos_column! export scan!, next_batch, free_batch, free_stream -export FILE_COLUMN +export FILE_COLUMN, POS_COLUMN # Always use the JLL library - override via Preferences if needed for local development # To use a local build, set the preference: diff --git a/src/full.jl b/src/full.jl index b64a26e..68cb3b8 100644 --- a/src/full.jl +++ b/src/full.jl @@ -133,6 +133,32 @@ function with_file_column!(scan::Scan) return nothing end +""" + with_pos_column!(scan::Scan) + +Add the _pos metadata column to the scan. + +The _pos column contains the position of each row within its data file, which can +be useful for tracking row locations and debugging. + +# Example +```julia +scan = new_scan(table) +with_pos_column!(scan) +stream = scan!(scan) +``` +""" +function with_pos_column!(scan::Scan) + result = @ccall rust_lib.iceberg_scan_with_pos_column( + convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}} + )::Cint + + if result != 0 + error("Failed to add pos column to scan") + end + return nothing +end + """ build!(scan::Scan) diff --git a/src/incremental.jl b/src/incremental.jl index a18254c..5bfae67 100644 --- a/src/incremental.jl +++ b/src/incremental.jl @@ -186,6 +186,32 @@ function with_file_column!(scan::IncrementalScan) return nothing end +""" + with_pos_column!(scan::IncrementalScan) + +Add the _pos metadata column to the incremental scan. + +The _pos column contains the position of each row within its data file, which can +be useful for tracking row locations during incremental scans. + +# Example +```julia +scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id) +with_pos_column!(scan) +inserts_stream, deletes_stream = scan!(scan) +``` +""" +function with_pos_column!(scan::IncrementalScan) + result = @ccall rust_lib.iceberg_incremental_scan_with_pos_column( + convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}} + )::Cint + + if result != 0 + error("Failed to add pos column to incremental scan") + end + return nothing +end + """ build!(scan::IncrementalScan) diff --git a/src/scan_common.jl b/src/scan_common.jl index c002e36..cf94dbd 100644 --- a/src/scan_common.jl +++ b/src/scan_common.jl @@ -27,6 +27,25 @@ stream = scan!(scan) """ const FILE_COLUMN = "_file" +""" + POS_COLUMN + +The name of the metadata column containing row positions within files (_pos). + +This constant can be used with the `select_columns!` function to include +position information in query results. It corresponds to the _pos metadata +column in Iceberg tables, which represents the row's position within its data file. + +# Example +```julia +# Select specific columns including the position +scan = new_scan(table) +select_columns!(scan, ["id", "name", POS_COLUMN]) +stream = scan!(scan) +``` +""" +const POS_COLUMN = "_pos" + """ BatchResponse diff --git a/test/runtests.jl b/test/runtests.jl index a077b90..a61ba77 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -688,6 +688,11 @@ end # Verify file column contains file paths (strings ending in .parquet) file_paths = df._file @test all(endswith.(file_paths, ".parquet")) + # Verify file paths are non-empty strings with proper structure + @test all(length.(file_paths) .> 0) + # Should be full S3 paths like "s3://warehouse/tpch.sf01/customer/data/data_customer-00000.parquet" + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) println("✅ select_columns! with with_file_column! test passed for full scan") @@ -722,6 +727,9 @@ end # Verify file column contains file paths file_paths = df._file @test all(endswith.(file_paths, ".parquet")) + @test all(length.(file_paths) .> 0) + @test all(startswith.(file_paths, "s3://warehouse/incremental/")) + @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) println("✅ select_columns! with with_file_column! test passed for incremental scan") @@ -760,6 +768,9 @@ end # Verify file column contains file paths file_paths = df._file @test all(endswith.(file_paths, ".parquet")) + @test all(length.(file_paths) .> 0) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) println("✅ select_columns! with FILE_COLUMN constant test passed") @@ -769,6 +780,181 @@ end RustyIceberg.free_table(table) end end + + @testset "select_columns! with with_pos_column! - Full Scan" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + + # Select specific columns AND include pos metadata + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name"]) + RustyIceberg.with_pos_column!(scan) + stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(stream) + @test batch_ptr != C_NULL + + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have selected columns plus pos column + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_pos" in names(df) + @test !isempty(df) + + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer + + # Verify positions are sequential within the batch (starting from 0) + # Positions should start from 0 for the first row in each file + @test minimum(positions) == 0 + # Check that positions are unique and sequential (no gaps or duplicates in same file) + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ select_columns! with with_pos_column! test passed for full scan") + finally + RustyIceberg.free_stream(stream) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + end + + @testset "select_columns! with with_pos_column! - Incremental Scan" begin + table = RustyIceberg.table_open(incremental_path) + scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id) + + # Select specific column AND include pos metadata for incremental scan + RustyIceberg.select_columns!(scan, ["n"]) + RustyIceberg.with_pos_column!(scan) + inserts_stream, deletes_stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(inserts_stream) + if batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have the selected column "n" plus pos column + @test "n" in names(df) + @test "_pos" in names(df) + @test !isempty(df) + + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer + + # Verify positions are sequential within the batch + @test minimum(positions) == 0 + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ select_columns! with with_pos_column! test passed for incremental scan") + end + finally + RustyIceberg.free_stream(inserts_stream) + RustyIceberg.free_stream(deletes_stream) + RustyIceberg.free_incremental_scan!(scan) + RustyIceberg.free_table(table) + end + end + + @testset "select_columns! with POS_COLUMN constant" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + + # Select columns including POS_COLUMN constant + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name", RustyIceberg.POS_COLUMN]) + stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(stream) + @test batch_ptr != C_NULL + + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have selected columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + # POS_COLUMN should be "_pos" + @test "_pos" in names(df) + @test !isempty(df) + + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer + + # Verify positions are sequential within the batch + @test minimum(positions) == 0 + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ select_columns! with POS_COLUMN constant test passed") + finally + RustyIceberg.free_stream(stream) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + end + + @testset "with_file_column! and with_pos_column! combined" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + + # Select columns and include both file and pos metadata + RustyIceberg.select_columns!(scan, ["c_custkey", "c_name"]) + RustyIceberg.with_file_column!(scan) + RustyIceberg.with_pos_column!(scan) + stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(stream) + @test batch_ptr != C_NULL + + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) + + # Should have selected columns plus both metadata columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_file" in names(df) + @test "_pos" in names(df) + @test !isempty(df) + + # Verify both metadata columns + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test all(df._pos .>= 0) + @test eltype(df._pos) <: Integer + + # Verify positions are sequential within the batch + positions = df._pos + @test minimum(positions) == 0 + @test length(unique(positions)) == length(positions) + @test maximum(positions) == length(positions) - 1 + + RustyIceberg.free_batch(batch_ptr) + println("✅ with_file_column! and with_pos_column! combined test passed") + finally + RustyIceberg.free_stream(stream) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + end end end # End of testset From 26e14fb75a930173ad6b74f7769a107df4f22e89 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Thu, 20 Nov 2025 13:49:20 +0100 Subject: [PATCH 2/6] update commit to sha from main --- iceberg_rust_ffi/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 49737a0..e0bd02f 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -12,7 +12,7 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "5003e5d01cc081fbaeaedb3e4b9321c7727c5ae3" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "cd1daca8d45eb2a78bc90f1eec18435502c6bc04" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" From 0d3313efa46cf552370477f0b3e9f1d5a1c7b322 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Thu, 20 Nov 2025 14:12:07 +0100 Subject: [PATCH 3/6] Forgot Cargo.lock --- iceberg_rust_ffi/Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index b521b4a..2f94f32 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1513,7 +1513,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=5003e5d01cc081fbaeaedb3e4b9321c7727c5ae3#5003e5d01cc081fbaeaedb3e4b9321c7727c5ae3" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=cd1daca8d45eb2a78bc90f1eec18435502c6bc04#cd1daca8d45eb2a78bc90f1eec18435502c6bc04" dependencies = [ "anyhow", "apache-avro", From 81987db3e08fb0222f35a4bc7fb90c3f427eee46 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Thu, 20 Nov 2025 15:36:55 +0100 Subject: [PATCH 4/6] Make tests deterministic --- iceberg_rust_ffi/Cargo.lock | 2 +- iceberg_rust_ffi/Cargo.toml | 2 +- test/runtests.jl | 310 +++++++++++++++++++----------------- 3 files changed, 166 insertions(+), 148 deletions(-) diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 2f94f32..0d35ed5 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1567,7 +1567,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.3.0" +version = "0.4.0" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index e0bd02f..9b5bba2 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.3.0" +version = "0.4.0" edition = "2021" [lib] diff --git a/test/runtests.jl b/test/runtests.jl index a61ba77..f8e2877 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -109,13 +109,14 @@ end selected_batch_count = 0 batch_ptr = RustyIceberg.next_batch(stream2) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) @test arrow_table isa Arrow.Table @test length(arrow_table) <= 8 push!(selected_arrow_tables, arrow_table) RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream2) end @test !isempty(selected_arrow_tables) @@ -367,9 +368,9 @@ end inserts_stream2, deletes_stream2 = RustyIceberg.scan!(scan2) try - # Read one batch from inserts to verify column selection + # Read all batches from inserts to verify column selection batch_ptr = RustyIceberg.next_batch(inserts_stream2) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -379,8 +380,9 @@ end @test !isempty(df) RustyIceberg.free_batch(batch_ptr) - println("✅ Incremental scan column selection test successful") + batch_ptr = RustyIceberg.next_batch(inserts_stream2) end + println("✅ Incremental scan column selection test successful") finally RustyIceberg.free_stream(inserts_stream2) RustyIceberg.free_stream(deletes_stream2) @@ -411,16 +413,17 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - @test names(df) == ["c_custkey", "c_name"] - @test !isempty(df) + @test names(df) == ["c_custkey", "c_name"] + @test !isempty(df) - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -438,7 +441,7 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -447,6 +450,7 @@ end @test !isempty(df) RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ select_columns! test passed for incremental scan") finally @@ -467,15 +471,16 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - # Batch size should be respected (at most 10 rows) - @test length(arrow_table) <= 10 + # Batch size should be respected (at most 10 rows) + @test length(arrow_table) <= 10 - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_batch_size! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -493,13 +498,14 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) @test length(arrow_table) <= 10 RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ with_batch_size! test passed for incremental scan") finally @@ -520,9 +526,10 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - RustyIceberg.free_batch(batch_ptr) + while batch_ptr != C_NULL + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_data_file_concurrency_limit! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -540,8 +547,9 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ with_data_file_concurrency_limit! test passed for incremental scan") finally @@ -562,9 +570,10 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - RustyIceberg.free_batch(batch_ptr) + while batch_ptr != C_NULL + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_manifest_entry_concurrency_limit! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -582,8 +591,9 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ with_manifest_entry_concurrency_limit! test passed for incremental scan") finally @@ -608,18 +618,19 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Verify all configurations - @test names(df) == ["c_custkey", "c_name", "c_address"] - @test length(arrow_table) <= 5 - @test !isempty(df) + # Verify all configurations + @test names(df) == ["c_custkey", "c_name", "c_address"] + @test length(arrow_table) <= 5 + @test !isempty(df) - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ Combined builder methods test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -642,7 +653,7 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -652,6 +663,7 @@ end @test !isempty(df) RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ Combined builder methods test passed for incremental scan") finally @@ -673,28 +685,29 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns plus file column - @test "c_custkey" in names(df) - @test "c_name" in names(df) - @test "_file" in names(df) - @test !isempty(df) + # Should have selected columns plus file column + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_file" in names(df) + @test !isempty(df) - # Verify file column contains file paths (strings ending in .parquet) - file_paths = df._file - @test all(endswith.(file_paths, ".parquet")) - # Verify file paths are non-empty strings with proper structure - @test all(length.(file_paths) .> 0) - # Should be full S3 paths like "s3://warehouse/tpch.sf01/customer/data/data_customer-00000.parquet" - @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) - @test eltype(file_paths) <: AbstractString + # Verify file column contains file paths (strings ending in .parquet) + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + # Verify file paths are non-empty strings with proper structure + @test all(length.(file_paths) .> 0) + # Should be full S3 paths like "s3://warehouse/tpch.sf01/customer/data/data_customer-00000.parquet" + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with with_file_column! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -714,7 +727,7 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -732,8 +745,9 @@ end @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) - println("✅ select_columns! with with_file_column! test passed for incremental scan") + batch_ptr = RustyIceberg.next_batch(inserts_stream) end + println("✅ select_columns! with with_file_column! test passed for incremental scan") finally RustyIceberg.free_stream(inserts_stream) RustyIceberg.free_stream(deletes_stream) @@ -752,27 +766,28 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns - @test "c_custkey" in names(df) - @test "c_name" in names(df) - # FILE_COLUMN should be "_file" - @test "_file" in names(df) - @test !isempty(df) + # Should have selected columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + # FILE_COLUMN should be "_file" + @test "_file" in names(df) + @test !isempty(df) - # Verify file column contains file paths - file_paths = df._file - @test all(endswith.(file_paths, ".parquet")) - @test all(length.(file_paths) .> 0) - @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) - @test eltype(file_paths) <: AbstractString + # Verify file column contains file paths + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + @test all(length.(file_paths) .> 0) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with FILE_COLUMN constant test passed") finally RustyIceberg.free_stream(stream) @@ -792,31 +807,32 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns plus pos column - @test "c_custkey" in names(df) - @test "c_name" in names(df) - @test "_pos" in names(df) - @test !isempty(df) + # Should have selected columns plus pos column + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_pos" in names(df) + @test !isempty(df) - # Verify pos column contains non-negative integers - positions = df._pos - @test all(positions .>= 0) - @test eltype(positions) <: Integer + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer - # Verify positions are sequential within the batch (starting from 0) - # Positions should start from 0 for the first row in each file - @test minimum(positions) == 0 - # Check that positions are unique and sequential (no gaps or duplicates in same file) - @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 + # Verify positions - represent row position within the source file + # Positions should be unique within the batch + @test length(unique(positions)) == length(positions) + # Positions should be non-negative and sequential (form a contiguous range) + sorted_pos = sort(positions) + @test all(diff(sorted_pos) .== 1) # Sequential with no gaps - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with with_pos_column! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -836,7 +852,7 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -854,11 +870,11 @@ end # Verify positions are sequential within the batch @test minimum(positions) == 0 @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 RustyIceberg.free_batch(batch_ptr) - println("✅ select_columns! with with_pos_column! test passed for incremental scan") + batch_ptr = RustyIceberg.next_batch(inserts_stream) end + println("✅ select_columns! with with_pos_column! test passed for incremental scan") finally RustyIceberg.free_stream(inserts_stream) RustyIceberg.free_stream(deletes_stream) @@ -877,30 +893,31 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns - @test "c_custkey" in names(df) - @test "c_name" in names(df) - # POS_COLUMN should be "_pos" - @test "_pos" in names(df) - @test !isempty(df) + # Should have selected columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + # POS_COLUMN should be "_pos" + @test "_pos" in names(df) + @test !isempty(df) - # Verify pos column contains non-negative integers - positions = df._pos - @test all(positions .>= 0) - @test eltype(positions) <: Integer + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer - # Verify positions are sequential within the batch - @test minimum(positions) == 0 - @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 + # Verify positions - represent row position within the source file + @test length(unique(positions)) == length(positions) + sorted_pos = sort(positions) + @test all(diff(sorted_pos) .== 1) # Sequential with no gaps - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with POS_COLUMN constant test passed") finally RustyIceberg.free_stream(stream) @@ -921,33 +938,34 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns plus both metadata columns - @test "c_custkey" in names(df) - @test "c_name" in names(df) - @test "_file" in names(df) - @test "_pos" in names(df) - @test !isempty(df) + # Should have selected columns plus both metadata columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_file" in names(df) + @test "_pos" in names(df) + @test !isempty(df) - # Verify both metadata columns - file_paths = df._file - @test all(endswith.(file_paths, ".parquet")) - @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) - @test all(df._pos .>= 0) - @test eltype(df._pos) <: Integer + # Verify both metadata columns + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test all(df._pos .>= 0) + @test eltype(df._pos) <: Integer - # Verify positions are sequential within the batch - positions = df._pos - @test minimum(positions) == 0 - @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 + # Verify positions - represent row position within the source file + positions = df._pos + @test length(unique(positions)) == length(positions) + sorted_pos = sort(positions) + @test all(diff(sorted_pos) .== 1) # Sequential with no gaps - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_file_column! and with_pos_column! combined test passed") finally RustyIceberg.free_stream(stream) From 4f7c1f3de04bcae12bbf6b50e82526f611af4a12 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Thu, 20 Nov 2025 15:51:45 +0100 Subject: [PATCH 5/6] one more test condition --- test/runtests.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/runtests.jl b/test/runtests.jl index f8e2877..c05bdf6 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -870,6 +870,7 @@ end # Verify positions are sequential within the batch @test minimum(positions) == 0 @test length(unique(positions)) == length(positions) + @test maximum(positions) <= length(df.n) RustyIceberg.free_batch(batch_ptr) batch_ptr = RustyIceberg.next_batch(inserts_stream) From d90a58f97ade7c9555bff31cb02b9662dcc53267 Mon Sep 17 00:00:00 2001 From: Vukasin Stefanovic Date: Thu, 20 Nov 2025 16:00:09 +0100 Subject: [PATCH 6/6] verify positions in incremental test --- test/runtests.jl | 56 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 5 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index c05bdf6..2f3e8da 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -851,6 +851,9 @@ end inserts_stream, deletes_stream = RustyIceberg.scan!(scan) try + all_positions = Int64[] + all_n_values = Int64[] + batch_ptr = RustyIceberg.next_batch(inserts_stream) while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) @@ -862,19 +865,62 @@ end @test "_pos" in names(df) @test !isempty(df) - # Verify pos column contains non-negative integers + # Gather positions and values positions = df._pos @test all(positions .>= 0) @test eltype(positions) <: Integer + @test length(unique(positions)) == length(positions) # No duplicates within batch - # Verify positions are sequential within the batch - @test minimum(positions) == 0 - @test length(unique(positions)) == length(positions) - @test maximum(positions) <= length(df.n) + append!(all_positions, positions) + append!(all_n_values, df.n) RustyIceberg.free_batch(batch_ptr) batch_ptr = RustyIceberg.next_batch(inserts_stream) end + + # Verify positions across all batches + # Positions represent row numbers within individual Parquet files (0-indexed) + # The incremental scan reads from multiple Parquet files, each with ~20 rows + + @test length(all_positions) == 98 # 98 total records (n=201-299 excluding deleted n=250) + @test all(all_positions .>= 0) + @test eltype(all_positions) <: Integer + + # Count occurrences of each position across all files + position_counts = Dict{Int64, Int}() + for pos in all_positions + position_counts[pos] = get(position_counts, pos, 0) + 1 + end + + # Baseline expectations: + # - Positions 0-19 represent rows within each Parquet file + # - Most positions appear multiple times (once per file) + # - The deleted record (n=250) creates one missing occurrence + + # Verify all positions are in expected range (0-19 per file) + @test minimum(all_positions) == 0 + @test maximum(all_positions) == 19 + @test length(unique(all_positions)) == 20 # All positions 0-19 are present + + # Expected baseline: positions appear with these frequencies + # (derived from actual data structure where n=250 is deleted) + expected_counts = Dict{Int64, Int}() + for pos in 0:19 + if pos == 10 + expected_counts[pos] = 4 # Missing one occurrence (deleted record n=250 at position 10) + elseif pos == 19 + expected_counts[pos] = 4 # Some files have only 19 rows + else + expected_counts[pos] = 5 # Most positions appear 5 times (once per file) + end + end + + @test position_counts == expected_counts + + # Verify no gaps in the position range + full_range = 0:19 + @test Set(keys(position_counts)) == Set(full_range) + println("✅ select_columns! with with_pos_column! test passed for incremental scan") finally RustyIceberg.free_stream(inserts_stream)