Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions iceberg_rust_ffi/src/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,21 @@ impl_select_columns!(iceberg_select_columns, IcebergScan);
impl_scan_builder_method!(
iceberg_scan_with_data_file_concurrency_limit,
IcebergScan,
with_data_file_concurrency_limit
with_data_file_concurrency_limit,
n: usize
);

impl_scan_builder_method!(
iceberg_scan_with_manifest_entry_concurrency_limit,
IcebergScan,
with_manifest_entry_concurrency_limit
with_manifest_entry_concurrency_limit,
n: usize
);

impl_with_batch_size!(iceberg_scan_with_batch_size, IcebergScan);

impl_scan_builder_method!(iceberg_scan_with_file_column, IcebergScan, with_file_column);

impl_scan_build!(iceberg_scan_build, IcebergScan);

// Async function to initialize stream from a table scan
Expand Down
12 changes: 10 additions & 2 deletions iceberg_rust_ffi/src/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,28 @@ impl_select_columns!(iceberg_incremental_select_columns, IcebergIncrementalScan)
impl_scan_builder_method!(
iceberg_incremental_scan_with_data_file_concurrency_limit,
IcebergIncrementalScan,
with_concurrency_limit_data_files
with_concurrency_limit_data_files,
n: usize
);

impl_scan_builder_method!(
iceberg_incremental_scan_with_manifest_entry_concurrency_limit,
IcebergIncrementalScan,
with_concurrency_limit_manifest_entries
with_concurrency_limit_manifest_entries,
n: usize
);

impl_with_batch_size!(
iceberg_incremental_scan_with_batch_size,
IcebergIncrementalScan
);

impl_scan_builder_method!(
iceberg_incremental_scan_with_file_column,
IcebergIncrementalScan,
with_file_column
);

impl_scan_build!(iceberg_incremental_scan_build, IcebergIncrementalScan);

// Get unzipped Arrow streams from incremental scan (async)
Expand Down
29 changes: 25 additions & 4 deletions iceberg_rust_ffi/src/scan_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,32 @@ macro_rules! impl_select_columns {
};
}

/// Macro to generate scan builder methods that take a usize parameter
/// Macro to generate scan builder methods with zero or more parameters
///
/// # Examples
///
/// With single parameter:
/// ```ignore
/// impl_scan_builder_method!(
/// iceberg_scan_with_data_file_concurrency_limit,
/// IcebergScan,
/// with_data_file_concurrency_limit,
/// n: usize
/// );
/// ```
///
/// Without parameters:
/// ```ignore
/// impl_scan_builder_method!(
/// iceberg_scan_with_file_column,
/// IcebergScan,
/// with_file_column
/// );
/// ```
macro_rules! impl_scan_builder_method {
($fn_name:ident, $scan_type:ident, $builder_method:ident) => {
($fn_name:ident, $scan_type:ident, $builder_method:ident $(, $param:ident: $param_type:ty)*) => {
#[no_mangle]
pub extern "C" fn $fn_name(scan: &mut *mut $scan_type, n: usize) -> CResult {
pub extern "C" fn $fn_name(scan: &mut *mut $scan_type $(, $param: $param_type)*) -> CResult {
if scan.is_null() || (*scan).is_null() {
return CResult::Error;
}
Expand All @@ -59,7 +80,7 @@ macro_rules! impl_scan_builder_method {
}

*scan = Box::into_raw(Box::new($scan_type {
builder: scan_ref.builder.map(|b| b.$builder_method(n)),
builder: scan_ref.builder.map(|b| b.$builder_method($($param),*)),
scan: scan_ref.scan,
}));

Expand Down
4 changes: 2 additions & 2 deletions src/RustyIceberg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ export IcebergException
export new_incremental_scan, free_incremental_scan!
export table_open, free_table, new_scan, free_scan!
export select_columns!, with_batch_size!, with_data_file_concurrency_limit!, with_manifest_entry_concurrency_limit!
export with_file_column!
export scan!, next_batch, free_batch, free_stream

const Option{T} = Union{T, Nothing}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this one here in this PR, as it is not used.

export FILE_COLUMN

# Always use the JLL library - override via Preferences if needed for local development
# To use a local build, set the preference:
Expand Down
26 changes: 26 additions & 0 deletions src/full.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,32 @@ function with_batch_size!(scan::Scan, n::UInt)
return nothing
end

"""
with_file_column!(scan::Scan)

Add the _file metadata column to the scan.

The _file column contains the file path for each row, which can be useful for
tracking which data files contain specific rows.

# Example
```julia
scan = new_scan(table)
with_file_column!(scan)
stream = scan!(scan)
```
"""
function with_file_column!(scan::Scan)
result = @ccall rust_lib.iceberg_scan_with_file_column(
convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}}
)::Cint

if result != 0
error("Failed to add file column to scan")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can throw IcebergException instead of this one? Not sure, we're not consistent in this regard

end
return nothing
end

"""
build!(scan::Scan)

Expand Down
26 changes: 26 additions & 0 deletions src/incremental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,32 @@ function with_batch_size!(scan::IncrementalScan, n::UInt)
return nothing
end

"""
with_file_column!(scan::IncrementalScan)

Add the _file metadata column to the incremental scan.

The _file column contains the file path for each row, which can be useful for
tracking which data files contain specific rows during incremental scans.

# Example
```julia
scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id)
with_file_column!(scan)
inserts_stream, deletes_stream = scan!(scan)
```
"""
function with_file_column!(scan::IncrementalScan)
result = @ccall rust_lib.iceberg_incremental_scan_with_file_column(
convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}}
)::Cint

if result != 0
error("Failed to add file column to incremental scan")
end
return nothing
end

"""
build!(scan::IncrementalScan)

Expand Down
19 changes: 19 additions & 0 deletions src/scan_common.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ This stream can be used to fetch batches of Arrow data asynchronously.
"""
const ArrowStream = Ptr{Cvoid}

"""
FILE_COLUMN

The name of the metadata column containing file paths (_file).

This constant can be used with the `select_columns!` function to include
file path information in query results. It corresponds to the _file metadata
column in Iceberg tables.

# Example
```julia
# Select specific columns including the file path
scan = new_scan(table)
select_columns!(scan, ["id", "name", FILE_COLUMN])
stream = scan!(scan)
```
"""
const FILE_COLUMN = "_file"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you said you wanted to make this a function and fetch from rust_lib that way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I mentioned in the meeting: that does not work cleanly


"""
BatchResponse

Expand Down
108 changes: 108 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,114 @@ end
RustyIceberg.free_table(table)
end
end

@testset "select_columns! with with_file_column! - Full Scan" begin
table = RustyIceberg.table_open(customer_path)
scan = RustyIceberg.new_scan(table)

# Select specific columns AND include file metadata
RustyIceberg.select_columns!(scan, ["c_custkey", "c_name"])
RustyIceberg.with_file_column!(scan)
stream = RustyIceberg.scan!(scan)

try
batch_ptr = RustyIceberg.next_batch(stream)
@test batch_ptr != C_NULL

batch = unsafe_load(batch_ptr)
arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length))
df = DataFrame(arrow_table)

# Should have selected columns plus file column
@test "c_custkey" in names(df)
@test "c_name" in names(df)
@test "_file" in names(df)
@test !isempty(df)

# Verify file column contains file paths (strings ending in .parquet)
file_paths = df._file
@test all(endswith.(file_paths, ".parquet"))

RustyIceberg.free_batch(batch_ptr)
println("✅ select_columns! with with_file_column! test passed for full scan")
finally
RustyIceberg.free_stream(stream)
RustyIceberg.free_scan!(scan)
RustyIceberg.free_table(table)
end
end

@testset "select_columns! with with_file_column! - Incremental Scan" begin
table = RustyIceberg.table_open(incremental_path)
scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id)

# Select specific column AND include file metadata for incremental scan
RustyIceberg.select_columns!(scan, ["n"])
RustyIceberg.with_file_column!(scan)
inserts_stream, deletes_stream = RustyIceberg.scan!(scan)

try
batch_ptr = RustyIceberg.next_batch(inserts_stream)
if batch_ptr != C_NULL
batch = unsafe_load(batch_ptr)
arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length))
df = DataFrame(arrow_table)

# Should have the selected column "n" plus file column
@test "n" in names(df)
@test "_file" in names(df)
@test !isempty(df)

# Verify file column contains file paths
file_paths = df._file
@test all(endswith.(file_paths, ".parquet"))

RustyIceberg.free_batch(batch_ptr)
println("✅ select_columns! with with_file_column! test passed for incremental scan")
end
finally
RustyIceberg.free_stream(inserts_stream)
RustyIceberg.free_stream(deletes_stream)
RustyIceberg.free_incremental_scan!(scan)
RustyIceberg.free_table(table)
end
end

@testset "select_columns! with FILE_COLUMN constant" begin
table = RustyIceberg.table_open(customer_path)
scan = RustyIceberg.new_scan(table)

# Select columns including FILE_COLUMN constant
RustyIceberg.select_columns!(scan, ["c_custkey", "c_name", RustyIceberg.FILE_COLUMN])
stream = RustyIceberg.scan!(scan)

try
batch_ptr = RustyIceberg.next_batch(stream)
@test batch_ptr != C_NULL

batch = unsafe_load(batch_ptr)
arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length))
df = DataFrame(arrow_table)

# Should have selected columns
@test "c_custkey" in names(df)
@test "c_name" in names(df)
# FILE_COLUMN should be "_file"
@test "_file" in names(df)
@test !isempty(df)

# Verify file column contains file paths
file_paths = df._file
@test all(endswith.(file_paths, ".parquet"))

RustyIceberg.free_batch(batch_ptr)
println("✅ select_columns! with FILE_COLUMN constant test passed")
finally
RustyIceberg.free_stream(stream)
RustyIceberg.free_scan!(scan)
RustyIceberg.free_table(table)
end
end
end

end # End of testset
Expand Down