diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index def6e2f..0e7e8fb 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1523,7 +1523,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=f2e1fa29c983244d607d5b61e789e810b291f810#f2e1fa29c983244d607d5b61e789e810b291f810" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=0589677785239560695742a18b6aa9afc1afa86b#0589677785239560695742a18b6aa9afc1afa86b" dependencies = [ "anyhow", "apache-avro", @@ -1577,7 +1577,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.5.0" +version = "0.6.0" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index ec72758..a8e8e06 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.5.0" +version = "0.6.0" edition = "2021" [lib] @@ -12,7 +12,7 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "f2e1fa29c983244d607d5b61e789e810b291f810" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "0589677785239560695742a18b6aa9afc1afa86b" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" diff --git a/iceberg_rust_ffi/src/full.rs b/iceberg_rust_ffi/src/full.rs index fae6c86..747e8b8 100644 --- a/iceberg_rust_ffi/src/full.rs +++ b/iceberg_rust_ffi/src/full.rs @@ -43,6 +43,13 @@ impl_scan_builder_method!( n: usize ); +impl_scan_builder_method!( + iceberg_scan_with_manifest_file_concurrency_limit, + IcebergScan, + with_manifest_file_concurrency_limit, + n: usize +); + impl_scan_builder_method!( iceberg_scan_with_manifest_entry_concurrency_limit, IcebergScan, diff --git a/iceberg_rust_ffi/src/incremental.rs b/iceberg_rust_ffi/src/incremental.rs index 4f8ee1e..77966c5 100644 --- a/iceberg_rust_ffi/src/incremental.rs +++ b/iceberg_rust_ffi/src/incremental.rs @@ -105,17 +105,24 @@ pub extern "C" fn iceberg_new_incremental_scan( // Use macros from scan_common for shared functionality impl_select_columns!(iceberg_incremental_select_columns, IcebergIncrementalScan); +impl_scan_builder_method!( + iceberg_incremental_scan_with_manifest_file_concurrency_limit, + IcebergIncrementalScan, + with_manifest_file_concurrency_limit, + n: usize +); + impl_scan_builder_method!( iceberg_incremental_scan_with_data_file_concurrency_limit, IcebergIncrementalScan, - with_concurrency_limit_data_files, + with_data_file_concurrency_limit, n: usize ); impl_scan_builder_method!( iceberg_incremental_scan_with_manifest_entry_concurrency_limit, IcebergIncrementalScan, - with_concurrency_limit_manifest_entries, + with_manifest_entry_concurrency_limit, n: usize ); diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index b4d8276..cb45dfa 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -21,7 +21,10 @@ export FILE_COLUMN, POS_COLUMN const rust_lib = iceberg_rust_ffi_jll.libiceberg_rust_ffi """ + struct StaticConfig + Runtime configuration for the Iceberg library. +Value of 0 means use all CPU cores, regardless of the number of threads in the Julia runtime. """ struct StaticConfig n_threads::Culonglong @@ -89,7 +92,7 @@ This starts a `tokio` runtime for handling Iceberg requests. It must be called before sending a request. """ function init_runtime( - config::StaticConfig=StaticConfig(0); + config::StaticConfig=StaticConfig(Threads.nthreads()); on_rust_panic::Function=default_panic_hook ) global _PANIC_HOOK diff --git a/src/full.jl b/src/full.jl index 68cb3b8..567f628 100644 --- a/src/full.jl +++ b/src/full.jl @@ -73,6 +73,23 @@ function with_data_file_concurrency_limit!(scan::Scan, n::UInt) return nothing end +""" + with_manifest_file_concurrency_limit!(scan::Scan, n::UInt) + +Sets the manifest file concurrency level for the full scan. +""" +function with_manifest_file_concurrency_limit!(scan::Scan, n::UInt) + result = @ccall rust_lib.iceberg_scan_with_manifest_file_concurrency_limit( + convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}}, + n::Csize_t + )::Cint + + if result != 0 + error("Failed to set data file concurrency limit for incremental scan") + end + return nothing +end + """ with_manifest_entry_concurrency_limit!(scan::Scan, n::UInt) diff --git a/src/incremental.jl b/src/incremental.jl index 5bfae67..45ea602 100644 --- a/src/incremental.jl +++ b/src/incremental.jl @@ -109,6 +109,23 @@ function select_columns!(scan::IncrementalScan, column_names::Vector{String}) return nothing end +""" + with_manifest_file_concurrency_limit!(scan::IncrementalScan, n::UInt) + +Sets the manifest file concurrency level for the incremental scan. +""" +function with_manifest_file_concurrency_limit!(scan::IncrementalScan, n::UInt) + result = @ccall rust_lib.iceberg_incremental_scan_with_manifest_file_concurrency_limit( + convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}}, + n::Csize_t + )::Cint + + if result != 0 + error("Failed to set data file concurrency limit for incremental scan") + end + return nothing +end + """ with_data_file_concurrency_limit!(scan::IncrementalScan, n::UInt) diff --git a/test/runtests.jl b/test/runtests.jl index f7e6199..df0c885 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -8,8 +8,9 @@ using Arrow # Test runtime initialization - this should work @test_nowarn init_runtime() - # Test that we can initialize multiple times safely - @test_nowarn init_runtime() + # Test that we can initialize multiple times safely. But a new static config + # wouldn't take effect, would silently ignore the config. + @test_nowarn init_runtime(StaticConfig(1)) println("✅ Runtime initialization successful") end @@ -397,6 +398,11 @@ end @test scan3.ptr != C_NULL println("✅ Incremental scan created with nothing for both snapshot IDs") + RustyIceberg.with_manifest_file_concurrency_limit!(scan3, UInt(2)) + RustyIceberg.with_manifest_entry_concurrency_limit!(scan3, UInt(256)) + RustyIceberg.with_data_file_concurrency_limit!(scan3, UInt(1024)) + RustyIceberg.with_batch_size!(scan3, UInt(50)) + inserts_stream3, deletes_stream3 = RustyIceberg.scan!(scan3) @test inserts_stream3 != C_NULL @test deletes_stream3 != C_NULL @@ -671,6 +677,28 @@ end end end + @testset "with_manifest_file_concurrency_limit! - Full Scan" begin + table = RustyIceberg.table_open(customer_path) + scan = RustyIceberg.new_scan(table) + + # Set concurrency limit (should not error) + @test_nowarn RustyIceberg.with_manifest_file_concurrency_limit!(scan, UInt(4)) + stream = RustyIceberg.scan!(scan) + + try + batch_ptr = RustyIceberg.next_batch(stream) + while batch_ptr != C_NULL + RustyIceberg.free_batch(batch_ptr) + batch_ptr = RustyIceberg.next_batch(stream) + end + println("✅ with_manifest_file_concurrency_limit! test passed for full scan") + finally + RustyIceberg.free_stream(stream) + RustyIceberg.free_scan!(scan) + RustyIceberg.free_table(table) + end + end + @testset "with_manifest_entry_concurrency_limit! - Incremental Scan" begin table = RustyIceberg.table_open(incremental_path) scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id) @@ -736,6 +764,7 @@ end RustyIceberg.select_columns!(scan, ["n"]) RustyIceberg.with_batch_size!(scan, UInt(5)) RustyIceberg.with_data_file_concurrency_limit!(scan, UInt(2)) + RustyIceberg.with_manifest_file_concurrency_limit!(scan, UInt(2)) RustyIceberg.with_manifest_entry_concurrency_limit!(scan, UInt(2)) inserts_stream, deletes_stream = RustyIceberg.scan!(scan)