Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions iceberg_rust_ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iceberg_rust_ffi"
version = "0.5.0"
version = "0.6.0"
edition = "2021"

[lib]
Expand All @@ -12,7 +12,7 @@ default = ["julia"]
julia = []

[dependencies]
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "f2e1fa29c983244d607d5b61e789e810b291f810" }
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "0589677785239560695742a18b6aa9afc1afa86b" }
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Expand Down
7 changes: 7 additions & 0 deletions iceberg_rust_ffi/src/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ impl_scan_builder_method!(
n: usize
);

impl_scan_builder_method!(
iceberg_scan_with_manifest_file_concurrency_limit,
IcebergScan,
with_manifest_file_concurrency_limit,
n: usize
);

impl_scan_builder_method!(
iceberg_scan_with_manifest_entry_concurrency_limit,
IcebergScan,
Expand Down
11 changes: 9 additions & 2 deletions iceberg_rust_ffi/src/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,24 @@ pub extern "C" fn iceberg_new_incremental_scan(
// Use macros from scan_common for shared functionality
impl_select_columns!(iceberg_incremental_select_columns, IcebergIncrementalScan);

impl_scan_builder_method!(
iceberg_incremental_scan_with_manifest_file_concurrency_limit,
IcebergIncrementalScan,
with_manifest_file_concurrency_limit,
n: usize
);

impl_scan_builder_method!(
iceberg_incremental_scan_with_data_file_concurrency_limit,
IcebergIncrementalScan,
with_concurrency_limit_data_files,
with_data_file_concurrency_limit,
n: usize
);

impl_scan_builder_method!(
iceberg_incremental_scan_with_manifest_entry_concurrency_limit,
IcebergIncrementalScan,
with_concurrency_limit_manifest_entries,
with_manifest_entry_concurrency_limit,
n: usize
);

Expand Down
5 changes: 4 additions & 1 deletion src/RustyIceberg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ export FILE_COLUMN, POS_COLUMN
const rust_lib = iceberg_rust_ffi_jll.libiceberg_rust_ffi

"""
struct StaticConfig

Runtime configuration for the Iceberg library.
Value of 0 means use all CPU cores, regardless of the number of threads in the Julia runtime.
"""
struct StaticConfig
n_threads::Culonglong
Expand Down Expand Up @@ -89,7 +92,7 @@ This starts a `tokio` runtime for handling Iceberg requests.
It must be called before sending a request.
"""
function init_runtime(
config::StaticConfig=StaticConfig(0);
config::StaticConfig=StaticConfig(Threads.nthreads());
on_rust_panic::Function=default_panic_hook
)
global _PANIC_HOOK
Expand Down
17 changes: 17 additions & 0 deletions src/full.jl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,23 @@ function with_data_file_concurrency_limit!(scan::Scan, n::UInt)
return nothing
end

"""
with_manifest_file_concurrency_limit!(scan::Scan, n::UInt)

Sets the manifest file concurrency level for the full scan.
"""
function with_manifest_file_concurrency_limit!(scan::Scan, n::UInt)
result = @ccall rust_lib.iceberg_scan_with_manifest_file_concurrency_limit(
convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}},
n::Csize_t
)::Cint

if result != 0
error("Failed to set data file concurrency limit for incremental scan")
end
return nothing
end

"""
with_manifest_entry_concurrency_limit!(scan::Scan, n::UInt)

Expand Down
17 changes: 17 additions & 0 deletions src/incremental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ function select_columns!(scan::IncrementalScan, column_names::Vector{String})
return nothing
end

"""
with_manifest_file_concurrency_limit!(scan::IncrementalScan, n::UInt)

Sets the manifest file concurrency level for the incremental scan.
"""
function with_manifest_file_concurrency_limit!(scan::IncrementalScan, n::UInt)
result = @ccall rust_lib.iceberg_incremental_scan_with_manifest_file_concurrency_limit(
convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}},
n::Csize_t
)::Cint

if result != 0
error("Failed to set data file concurrency limit for incremental scan")
end
return nothing
end

"""
with_data_file_concurrency_limit!(scan::IncrementalScan, n::UInt)

Expand Down
33 changes: 31 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ using Arrow
# Test runtime initialization - this should work
@test_nowarn init_runtime()

# Test that we can initialize multiple times safely
@test_nowarn init_runtime()
# Test that we can initialize multiple times safely. But a new static config
# wouldn't take effect, would silently ignore the config.
@test_nowarn init_runtime(StaticConfig(1))

println("✅ Runtime initialization successful")
end
Expand Down Expand Up @@ -397,6 +398,11 @@ end
@test scan3.ptr != C_NULL
println("✅ Incremental scan created with nothing for both snapshot IDs")

RustyIceberg.with_manifest_file_concurrency_limit!(scan3, UInt(2))
RustyIceberg.with_manifest_entry_concurrency_limit!(scan3, UInt(256))
RustyIceberg.with_data_file_concurrency_limit!(scan3, UInt(1024))
RustyIceberg.with_batch_size!(scan3, UInt(50))

inserts_stream3, deletes_stream3 = RustyIceberg.scan!(scan3)
@test inserts_stream3 != C_NULL
@test deletes_stream3 != C_NULL
Expand Down Expand Up @@ -671,6 +677,28 @@ end
end
end

@testset "with_manifest_file_concurrency_limit! - Full Scan" begin
table = RustyIceberg.table_open(customer_path)
scan = RustyIceberg.new_scan(table)

# Set concurrency limit (should not error)
@test_nowarn RustyIceberg.with_manifest_file_concurrency_limit!(scan, UInt(4))
stream = RustyIceberg.scan!(scan)

try
batch_ptr = RustyIceberg.next_batch(stream)
while batch_ptr != C_NULL
RustyIceberg.free_batch(batch_ptr)
batch_ptr = RustyIceberg.next_batch(stream)
end
println("✅ with_manifest_file_concurrency_limit! test passed for full scan")
finally
RustyIceberg.free_stream(stream)
RustyIceberg.free_scan!(scan)
RustyIceberg.free_table(table)
end
end

@testset "with_manifest_entry_concurrency_limit! - Incremental Scan" begin
table = RustyIceberg.table_open(incremental_path)
scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id)
Expand Down Expand Up @@ -736,6 +764,7 @@ end
RustyIceberg.select_columns!(scan, ["n"])
RustyIceberg.with_batch_size!(scan, UInt(5))
RustyIceberg.with_data_file_concurrency_limit!(scan, UInt(2))
RustyIceberg.with_manifest_file_concurrency_limit!(scan, UInt(2))
RustyIceberg.with_manifest_entry_concurrency_limit!(scan, UInt(2))

inserts_stream, deletes_stream = RustyIceberg.scan!(scan)
Expand Down