diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index bbd0217..648ddcf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -99,6 +99,9 @@ jobs: ${{ runner.os }}- - uses: julia-actions/julia-buildpkg@v1 + - name: Force recompile with custom library + if: inputs.build_rust + run: julia --project=. -e 'Base.compilecache(Base.identify_package("RustyIceberg"))' - uses: julia-actions/julia-runtest@v1 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v3 diff --git a/Makefile b/Makefile index 1ca6832..5c7625d 100644 --- a/Makefile +++ b/Makefile @@ -49,7 +49,7 @@ test: build fi @set -a && . ./.env && set +a && \ export ICEBERG_RUST_LIB=$$(pwd)/$(TARGET_DIR) && \ - $(JULIA_THREADS_ENV) julia --project=. -e 'using Pkg; Pkg.test()' + $(JULIA_THREADS_ENV) julia --project=. -e 'Base.compilecache(Base.identify_package("RustyIceberg")); using Pkg; Pkg.test()' # Start Julia REPL with environment configured (requires .env file) repl: build diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6ebbfb0..1618875 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -73,9 +73,10 @@ services: entrypoint: | /bin/sh -c " until (/usr/bin/mc alias set minio http://minio:9000 root password) do echo '...waiting...' && sleep 1; done; - /usr/bin/mc rm -r --force minio/warehouse + /usr/bin/mc rm -r --force minio/warehouse; + /usr/bin/mc rb --force minio/warehouse; /usr/bin/mc mb minio/warehouse; /usr/bin/mc cp -r /input/tpch.sf01/ minio/warehouse/tpch.sf01/; /usr/bin/mc cp -r /input_incremental/ minio/warehouse/incremental/; - tail -f /dev/null + tail -f /dev/null; " \ No newline at end of file diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 15d2bad..86f79c4 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1518,7 +1518,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=ccb1e0c9983a1bdcb5b70fa637759df526a9a75e#ccb1e0c9983a1bdcb5b70fa637759df526a9a75e" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=37e79b805407a0340d08823373cafed9cdac0083#37e79b805407a0340d08823373cafed9cdac0083" dependencies = [ "anyhow", "apache-avro", @@ -1540,6 +1540,7 @@ dependencies = [ "chrono", "derive_builder", "expect-test", + "flate2", "fnv", "futures", "itertools", @@ -1572,7 +1573,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.2.0" +version = "0.3.0" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 5ee1ace..857982e 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.2.0" +version = "0.3.0" edition = "2021" [lib] @@ -12,7 +12,7 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "ccb1e0c9983a1bdcb5b70fa637759df526a9a75e" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "37e79b805407a0340d08823373cafed9cdac0083" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" diff --git a/iceberg_rust_ffi/src/lib.rs b/iceberg_rust_ffi/src/lib.rs index 56787ee..80323ec 100644 --- a/iceberg_rust_ffi/src/lib.rs +++ b/iceberg_rust_ffi/src/lib.rs @@ -82,6 +82,15 @@ impl Default for IcebergStaticConfig { } } +// FFI structure for passing key-value properties +#[repr(C)] +pub struct PropertyEntry { + pub key: *const c_char, + pub value: *const c_char, +} + +unsafe impl Send for PropertyEntry {} + // Direct structures - no opaque wrappers #[repr(C)] pub struct IcebergTable { @@ -302,12 +311,42 @@ export_runtime_op!( .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in snapshot path: {}", e))? }; - Ok(snapshot_path_str.to_string()) + let scheme_str = unsafe { + CStr::from_ptr(scheme).to_str() + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in scheme: {}", e))? + }; + + // Convert properties from FFI to Rust Vec + let mut props = Vec::new(); + if !properties.is_null() && properties_len > 0 { + let properties_slice = unsafe { + std::slice::from_raw_parts(properties, properties_len) + }; + + for prop in properties_slice { + let key = unsafe { + CStr::from_ptr(prop.key).to_str() + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in property key: {}", e))? + }; + let value = unsafe { + CStr::from_ptr(prop.value).to_str() + .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in property value: {}", e))? + }; + props.push((key.to_string(), value.to_string())); + } + } + + Ok((snapshot_path_str.to_string(), scheme_str.to_string(), props)) }, - full_metadata_path, + result_tuple, async { - // Create file IO for S3 - let file_io = FileIOBuilder::new("s3").build()?; + let (full_metadata_path, scheme_string, props) = result_tuple; + + // Create file IO with the specified scheme + // Default behavior (when props is empty) uses environment variables for credentials + let file_io = FileIOBuilder::new(&scheme_string) + .with_props(props) + .build()?; // Create table identifier let table_ident = TableIdent::from_strs(["default", "table"])?; @@ -318,7 +357,10 @@ export_runtime_op!( Ok::(IcebergTable { table: static_table.into_table() }) }, - snapshot_path: *const c_char + snapshot_path: *const c_char, + scheme: *const c_char, + properties: *const PropertyEntry, + properties_len: usize ); diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index f34d66c..3bfbad5 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -260,20 +260,80 @@ end # High-level functions using the async API pattern from RustyObjectStore.jl """ - table_open(snapshot_path::String)::IcebergTable + PropertyEntry + +FFI structure for passing key-value properties to Rust. + +# Fields +- `key::Ptr{Cchar}`: Pointer to the key string +- `value::Ptr{Cchar}`: Pointer to the value string +""" +struct PropertyEntry + key::Ptr{Cchar} + value::Ptr{Cchar} +end + +""" + table_open(snapshot_path::String; scheme::String="s3", properties::Dict{String,String}=Dict{String,String}())::Table Open an Iceberg table from the given snapshot path. + +# Arguments +- `snapshot_path::String`: Path to the metadata.json file for the table snapshot +- `scheme::String`: Storage scheme (e.g., "s3", "file"). Defaults to "s3" +- `properties::Dict{String,String}`: Optional key-value properties for the FileIO configuration. + By default (empty dict), credentials are read from environment variables (AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY, AWS_REGION, AWS_ENDPOINT_URL, etc.). + + Common S3 properties include: + - "s3.endpoint": Custom S3 endpoint URL + - "s3.access-key-id": AWS access key ID + - "s3.secret-access-key": AWS secret access key + - "s3.session-token": AWS session token + - "s3.region": AWS region + - "s3.allow-anonymous": Set to "true" for anonymous access (no credentials) + +# Example +```julia +# Open with credentials from environment variables (default) +table = table_open("s3://bucket/path/metadata/metadata.json") + +# Open with anonymous S3 access +table = table_open( + "s3://bucket/path/metadata/metadata.json", + properties=Dict("s3.allow-anonymous" => "true") +) + +# Open with custom S3 credentials +table = table_open( + "s3://bucket/path/metadata/metadata.json", + scheme="s3", + properties=Dict( + "s3.endpoint" => "http://localhost:9000", + "s3.access-key-id" => "minioadmin", + "s3.secret-access-key" => "minioadmin", + "s3.region" => "us-east-1" + ) +) +``` """ -function table_open(snapshot_path::String) +function table_open(snapshot_path::String; scheme::String="s3", properties::Dict{String,String}=Dict{String,String}()) response = TableResponse() ct = current_task() event = Base.Event() handle = pointer_from_objref(event) + # Convert properties dict to array of PropertyEntry structs + property_entries = [PropertyEntry(pointer(k), pointer(v)) for (k, v) in properties] + properties_len = length(property_entries) + preserve_task(ct) - result = GC.@preserve response event try + result = GC.@preserve response event property_entries properties try result = @ccall rust_lib.iceberg_table_open( snapshot_path::Cstring, + scheme::Cstring, + (properties_len > 0 ? pointer(property_entries) : C_NULL)::Ptr{PropertyEntry}, + properties_len::Csize_t, response::Ref{TableResponse}, handle::Ptr{Cvoid} )::Cint