Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions iceberg_rust_ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iceberg_rust_ffi"
version = "0.4.0"
version = "0.4.1"
edition = "2021"

[lib]
Expand All @@ -12,7 +12,7 @@ default = ["julia"]
julia = []

[dependencies]
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "cd1daca8d45eb2a78bc90f1eec18435502c6bc04" }
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "feeb34438f3803395c24b9ffeab95402fd9b2ddd" }
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Expand Down
89 changes: 89 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,95 @@ end
end
end

@testset "Incremental Scan with nothing for both snapshot IDs" begin
# Test scanning from root (nothing) to current (nothing) - full history
scan3 = new_incremental_scan(table, nothing, nothing)
@test scan3 isa RustyIceberg.IncrementalScan
@test scan3.ptr != C_NULL
println("✅ Incremental scan created with nothing for both snapshot IDs")

inserts_stream3, deletes_stream3 = RustyIceberg.scan!(scan3)
@test inserts_stream3 != C_NULL
@test deletes_stream3 != C_NULL
println("✅ Streams obtained successfully for full history scan")

try
# Read and validate from DELETES stream FIRST (to test the deadlock fix)
# This is the scenario that previously caused indefinite blocking
deletes_values = Tuple{String, Int64}[]
deletes_batches = 0

while true
batch_ptr = RustyIceberg.next_batch(deletes_stream3)
if batch_ptr == C_NULL
break
end
deletes_batches += 1
batch = unsafe_load(batch_ptr)
arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length))
@test arrow_table isa Arrow.Table

# Convert to DataFrame and extract position delete metadata
df = DataFrame(arrow_table)
for row in eachrow(df)
push!(deletes_values, (row.file_path, row.pos))
end

RustyIceberg.free_batch(batch_ptr)
end

# Verify we have no delete records (since this is a full history scan)
@test isempty(deletes_values)

println("✅ Full history deletes stream validated successfully")
println(" - Total deletes batches: $deletes_batches")
println(" - Total delete records: $(length(deletes_values))")

# Now read and validate from inserts stream AFTER deletes
inserts_values = Int64[]
inserts_batches = 0

while true
batch_ptr = RustyIceberg.next_batch(inserts_stream3)
if batch_ptr == C_NULL
break
end
inserts_batches += 1
batch = unsafe_load(batch_ptr)
arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length))

# Convert to DataFrame and collect values from column "n"
df = DataFrame(arrow_table)
@test "n" in names(df)
append!(inserts_values, df.n)

RustyIceberg.free_batch(batch_ptr)
end

# When scanning full history (nothing to nothing), we should get rows from all transactions
# Verify we have some rows
@test length(inserts_values) > 0
sort!(inserts_values)

# Verify we have expected values from the test table
# The test table was created with: range(1, 11), range(101, 200), range(201, 300)
# And row 150 and 250 were deleted
@test 1 in inserts_values # From first insert
@test 101 in inserts_values # From second insert
@test 201 in inserts_values # From third insert
@test 150 ∉ inserts_values # Was deleted
@test 250 ∉ inserts_values # Was deleted

println("✅ Full history inserts stream validated successfully")
println(" - Total inserts batches: $inserts_batches")
println(" - Total inserts rows: $(length(inserts_values))")
finally
RustyIceberg.free_stream(inserts_stream3)
RustyIceberg.free_stream(deletes_stream3)
RustyIceberg.free_incremental_scan!(scan3)
end
end

# Clean up table
RustyIceberg.free_table(table)
println("✅ Incremental scan test completed successfully!")
Expand Down