diff --git a/README.md b/README.md index 2b2532d..fd39a87 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,7 @@ CI runs with the custom iceberg_rust_ffi, built from the source. Releases run wi ### JLL Release To make a JLL release, create a new PR in JuliaPackaging/Yggdrasil repo, e.g. like [this one](https://github.com/JuliaPackaging/Yggdrasil/pull/12532/files). +It's not necessary, but it's a good practice to upgrade version in Cargo.toml in iceberg_rust_ffi, so that we can correlate JLL version with iceberg_rust_ffi Rust version. ### RustyIceberg release To create a new RustyIceberg release, simply bump the version in Project.toml, merge that in `main`, and then open that commit and comment like [here](https://github.com/RelationalAI/RustyIceberg.jl/commit/cbebb0e9611f70867e6ad2fbca0060a44345ae31#commitcomment-170551595). This will trigger an update in JuliaRegistries (should take ~20m), which will then invoke a TagBot in this repository, which will also run CI tests with the official JLL. diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 2f94f32..0d35ed5 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1567,7 +1567,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.3.0" +version = "0.4.0" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index e0bd02f..9b5bba2 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.3.0" +version = "0.4.0" edition = "2021" [lib] diff --git a/test/runtests.jl b/test/runtests.jl index a61ba77..2f3e8da 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -109,13 +109,14 @@ end selected_batch_count = 0 batch_ptr = RustyIceberg.next_batch(stream2) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) @test arrow_table isa Arrow.Table @test length(arrow_table) <= 8 push!(selected_arrow_tables, arrow_table) RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream2) end @test !isempty(selected_arrow_tables) @@ -367,9 +368,9 @@ end inserts_stream2, deletes_stream2 = RustyIceberg.scan!(scan2) try - # Read one batch from inserts to verify column selection + # Read all batches from inserts to verify column selection batch_ptr = RustyIceberg.next_batch(inserts_stream2) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -379,8 +380,9 @@ end @test !isempty(df) RustyIceberg.free_batch(batch_ptr) - println("✅ Incremental scan column selection test successful") + batch_ptr = RustyIceberg.next_batch(inserts_stream2) end + println("✅ Incremental scan column selection test successful") finally RustyIceberg.free_stream(inserts_stream2) RustyIceberg.free_stream(deletes_stream2) @@ -411,16 +413,17 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - @test names(df) == ["c_custkey", "c_name"] - @test !isempty(df) + @test names(df) == ["c_custkey", "c_name"] + @test !isempty(df) - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -438,7 +441,7 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -447,6 +450,7 @@ end @test !isempty(df) RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ select_columns! test passed for incremental scan") finally @@ -467,15 +471,16 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - # Batch size should be respected (at most 10 rows) - @test length(arrow_table) <= 10 + # Batch size should be respected (at most 10 rows) + @test length(arrow_table) <= 10 - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_batch_size! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -493,13 +498,14 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) @test length(arrow_table) <= 10 RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ with_batch_size! test passed for incremental scan") finally @@ -520,9 +526,10 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - RustyIceberg.free_batch(batch_ptr) + while batch_ptr != C_NULL + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_data_file_concurrency_limit! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -540,8 +547,9 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ with_data_file_concurrency_limit! test passed for incremental scan") finally @@ -562,9 +570,10 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - RustyIceberg.free_batch(batch_ptr) + while batch_ptr != C_NULL + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_manifest_entry_concurrency_limit! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -582,8 +591,9 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ with_manifest_entry_concurrency_limit! test passed for incremental scan") finally @@ -608,18 +618,19 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Verify all configurations - @test names(df) == ["c_custkey", "c_name", "c_address"] - @test length(arrow_table) <= 5 - @test !isempty(df) + # Verify all configurations + @test names(df) == ["c_custkey", "c_name", "c_address"] + @test length(arrow_table) <= 5 + @test !isempty(df) - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ Combined builder methods test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -642,7 +653,7 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -652,6 +663,7 @@ end @test !isempty(df) RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(inserts_stream) end println("✅ Combined builder methods test passed for incremental scan") finally @@ -673,28 +685,29 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns plus file column - @test "c_custkey" in names(df) - @test "c_name" in names(df) - @test "_file" in names(df) - @test !isempty(df) + # Should have selected columns plus file column + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_file" in names(df) + @test !isempty(df) - # Verify file column contains file paths (strings ending in .parquet) - file_paths = df._file - @test all(endswith.(file_paths, ".parquet")) - # Verify file paths are non-empty strings with proper structure - @test all(length.(file_paths) .> 0) - # Should be full S3 paths like "s3://warehouse/tpch.sf01/customer/data/data_customer-00000.parquet" - @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) - @test eltype(file_paths) <: AbstractString + # Verify file column contains file paths (strings ending in .parquet) + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + # Verify file paths are non-empty strings with proper structure + @test all(length.(file_paths) .> 0) + # Should be full S3 paths like "s3://warehouse/tpch.sf01/customer/data/data_customer-00000.parquet" + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with with_file_column! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -714,7 +727,7 @@ end try batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -732,8 +745,9 @@ end @test eltype(file_paths) <: AbstractString RustyIceberg.free_batch(batch_ptr) - println("✅ select_columns! with with_file_column! test passed for incremental scan") + batch_ptr = RustyIceberg.next_batch(inserts_stream) end + println("✅ select_columns! with with_file_column! test passed for incremental scan") finally RustyIceberg.free_stream(inserts_stream) RustyIceberg.free_stream(deletes_stream) @@ -752,27 +766,28 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns - @test "c_custkey" in names(df) - @test "c_name" in names(df) - # FILE_COLUMN should be "_file" - @test "_file" in names(df) - @test !isempty(df) + # Should have selected columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + # FILE_COLUMN should be "_file" + @test "_file" in names(df) + @test !isempty(df) - # Verify file column contains file paths - file_paths = df._file - @test all(endswith.(file_paths, ".parquet")) - @test all(length.(file_paths) .> 0) - @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) - @test eltype(file_paths) <: AbstractString + # Verify file column contains file paths + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + @test all(length.(file_paths) .> 0) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test eltype(file_paths) <: AbstractString - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with FILE_COLUMN constant test passed") finally RustyIceberg.free_stream(stream) @@ -792,31 +807,32 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns plus pos column - @test "c_custkey" in names(df) - @test "c_name" in names(df) - @test "_pos" in names(df) - @test !isempty(df) + # Should have selected columns plus pos column + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_pos" in names(df) + @test !isempty(df) - # Verify pos column contains non-negative integers - positions = df._pos - @test all(positions .>= 0) - @test eltype(positions) <: Integer + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer - # Verify positions are sequential within the batch (starting from 0) - # Positions should start from 0 for the first row in each file - @test minimum(positions) == 0 - # Check that positions are unique and sequential (no gaps or duplicates in same file) - @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 + # Verify positions - represent row position within the source file + # Positions should be unique within the batch + @test length(unique(positions)) == length(positions) + # Positions should be non-negative and sequential (form a contiguous range) + sorted_pos = sort(positions) + @test all(diff(sorted_pos) .== 1) # Sequential with no gaps - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with with_pos_column! test passed for full scan") finally RustyIceberg.free_stream(stream) @@ -835,8 +851,11 @@ end inserts_stream, deletes_stream = RustyIceberg.scan!(scan) try + all_positions = Int64[] + all_n_values = Int64[] + batch_ptr = RustyIceberg.next_batch(inserts_stream) - if batch_ptr != C_NULL + while batch_ptr != C_NULL batch = unsafe_load(batch_ptr) arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) df = DataFrame(arrow_table) @@ -846,19 +865,63 @@ end @test "_pos" in names(df) @test !isempty(df) - # Verify pos column contains non-negative integers + # Gather positions and values positions = df._pos @test all(positions .>= 0) @test eltype(positions) <: Integer + @test length(unique(positions)) == length(positions) # No duplicates within batch - # Verify positions are sequential within the batch - @test minimum(positions) == 0 - @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 + append!(all_positions, positions) + append!(all_n_values, df.n) RustyIceberg.free_batch(batch_ptr) - println("✅ select_columns! with with_pos_column! test passed for incremental scan") + batch_ptr = RustyIceberg.next_batch(inserts_stream) end + + # Verify positions across all batches + # Positions represent row numbers within individual Parquet files (0-indexed) + # The incremental scan reads from multiple Parquet files, each with ~20 rows + + @test length(all_positions) == 98 # 98 total records (n=201-299 excluding deleted n=250) + @test all(all_positions .>= 0) + @test eltype(all_positions) <: Integer + + # Count occurrences of each position across all files + position_counts = Dict{Int64, Int}() + for pos in all_positions + position_counts[pos] = get(position_counts, pos, 0) + 1 + end + + # Baseline expectations: + # - Positions 0-19 represent rows within each Parquet file + # - Most positions appear multiple times (once per file) + # - The deleted record (n=250) creates one missing occurrence + + # Verify all positions are in expected range (0-19 per file) + @test minimum(all_positions) == 0 + @test maximum(all_positions) == 19 + @test length(unique(all_positions)) == 20 # All positions 0-19 are present + + # Expected baseline: positions appear with these frequencies + # (derived from actual data structure where n=250 is deleted) + expected_counts = Dict{Int64, Int}() + for pos in 0:19 + if pos == 10 + expected_counts[pos] = 4 # Missing one occurrence (deleted record n=250 at position 10) + elseif pos == 19 + expected_counts[pos] = 4 # Some files have only 19 rows + else + expected_counts[pos] = 5 # Most positions appear 5 times (once per file) + end + end + + @test position_counts == expected_counts + + # Verify no gaps in the position range + full_range = 0:19 + @test Set(keys(position_counts)) == Set(full_range) + + println("✅ select_columns! with with_pos_column! test passed for incremental scan") finally RustyIceberg.free_stream(inserts_stream) RustyIceberg.free_stream(deletes_stream) @@ -877,30 +940,31 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns - @test "c_custkey" in names(df) - @test "c_name" in names(df) - # POS_COLUMN should be "_pos" - @test "_pos" in names(df) - @test !isempty(df) + # Should have selected columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + # POS_COLUMN should be "_pos" + @test "_pos" in names(df) + @test !isempty(df) - # Verify pos column contains non-negative integers - positions = df._pos - @test all(positions .>= 0) - @test eltype(positions) <: Integer + # Verify pos column contains non-negative integers + positions = df._pos + @test all(positions .>= 0) + @test eltype(positions) <: Integer - # Verify positions are sequential within the batch - @test minimum(positions) == 0 - @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 + # Verify positions - represent row position within the source file + @test length(unique(positions)) == length(positions) + sorted_pos = sort(positions) + @test all(diff(sorted_pos) .== 1) # Sequential with no gaps - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ select_columns! with POS_COLUMN constant test passed") finally RustyIceberg.free_stream(stream) @@ -921,33 +985,34 @@ end try batch_ptr = RustyIceberg.next_batch(stream) - @test batch_ptr != C_NULL - - batch = unsafe_load(batch_ptr) - arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) - df = DataFrame(arrow_table) + while batch_ptr != C_NULL + batch = unsafe_load(batch_ptr) + arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length)) + df = DataFrame(arrow_table) - # Should have selected columns plus both metadata columns - @test "c_custkey" in names(df) - @test "c_name" in names(df) - @test "_file" in names(df) - @test "_pos" in names(df) - @test !isempty(df) + # Should have selected columns plus both metadata columns + @test "c_custkey" in names(df) + @test "c_name" in names(df) + @test "_file" in names(df) + @test "_pos" in names(df) + @test !isempty(df) - # Verify both metadata columns - file_paths = df._file - @test all(endswith.(file_paths, ".parquet")) - @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) - @test all(df._pos .>= 0) - @test eltype(df._pos) <: Integer + # Verify both metadata columns + file_paths = df._file + @test all(endswith.(file_paths, ".parquet")) + @test all(startswith.(file_paths, "s3://warehouse/tpch.sf01/customer/data/data_customer-")) + @test all(df._pos .>= 0) + @test eltype(df._pos) <: Integer - # Verify positions are sequential within the batch - positions = df._pos - @test minimum(positions) == 0 - @test length(unique(positions)) == length(positions) - @test maximum(positions) == length(positions) - 1 + # Verify positions - represent row position within the source file + positions = df._pos + @test length(unique(positions)) == length(positions) + sorted_pos = sort(positions) + @test all(diff(sorted_pos) .== 1) # Sequential with no gaps - RustyIceberg.free_batch(batch_ptr) + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end println("✅ with_file_column! and with_pos_column! combined test passed") finally RustyIceberg.free_stream(stream)