Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ version = "0.7.0"
license = "Apache-2.0"
repository = "https://github.com/apache/iceberg-rust"
# Check the MSRV policy in README.md before changing this
rust-version = "1.87"
rust-version = "1.88"

[workspace.dependencies]
anyhow = "1.0.72"
Expand Down
46 changes: 23 additions & 23 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,33 @@ impl GlueCatalog {
async fn new(config: GlueCatalogConfig) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
let mut file_io_props = config.props.clone();
if !file_io_props.contains_key(S3_ACCESS_KEY_ID) {
if let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID) {
file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
}
if !file_io_props.contains_key(S3_ACCESS_KEY_ID)
&& let Some(access_key_id) = file_io_props.get(AWS_ACCESS_KEY_ID)
{
file_io_props.insert(S3_ACCESS_KEY_ID.to_string(), access_key_id.to_string());
}
if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY) {
if let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY) {
file_io_props.insert(
S3_SECRET_ACCESS_KEY.to_string(),
secret_access_key.to_string(),
);
}
if !file_io_props.contains_key(S3_SECRET_ACCESS_KEY)
&& let Some(secret_access_key) = file_io_props.get(AWS_SECRET_ACCESS_KEY)
{
file_io_props.insert(
S3_SECRET_ACCESS_KEY.to_string(),
secret_access_key.to_string(),
);
}
if !file_io_props.contains_key(S3_REGION) {
if let Some(region) = file_io_props.get(AWS_REGION_NAME) {
file_io_props.insert(S3_REGION.to_string(), region.to_string());
}
if !file_io_props.contains_key(S3_REGION)
&& let Some(region) = file_io_props.get(AWS_REGION_NAME)
{
file_io_props.insert(S3_REGION.to_string(), region.to_string());
}
if !file_io_props.contains_key(S3_SESSION_TOKEN) {
if let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN) {
file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
}
if !file_io_props.contains_key(S3_SESSION_TOKEN)
&& let Some(session_token) = file_io_props.get(AWS_SESSION_TOKEN)
{
file_io_props.insert(S3_SESSION_TOKEN.to_string(), session_token.to_string());
}
if !file_io_props.contains_key(S3_ENDPOINT) {
if let Some(aws_endpoint) = config.uri.as_ref() {
file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
}
if !file_io_props.contains_key(S3_ENDPOINT)
&& let Some(aws_endpoint) = config.uri.as_ref()
{
file_io_props.insert(S3_ENDPOINT.to_string(), aws_endpoint.to_string());
}

let client = aws_sdk_glue::Client::new(&sdk_config);
Expand Down
68 changes: 33 additions & 35 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,10 @@ impl ArrowReader {
// we need to call next() to update the cache with the newly positioned value.
delete_vector_iter.advance_to(next_row_group_base_idx);
// Only update the cache if the cached value is stale (in the skipped range)
if let Some(cached_idx) = next_deleted_row_idx_opt {
if cached_idx < next_row_group_base_idx {
next_deleted_row_idx_opt = delete_vector_iter.next();
}
if let Some(cached_idx) = next_deleted_row_idx_opt
&& cached_idx < next_row_group_base_idx
{
next_deleted_row_idx_opt = delete_vector_iter.next();
}

// still increment the current page base index but then skip to the next row group
Expand Down Expand Up @@ -861,10 +861,10 @@ impl ArrowReader {
};

// If all row groups were filtered out, return an empty RowSelection (select no rows)
if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups.is_empty() {
return Ok(RowSelection::from(Vec::new()));
}
if let Some(selected_row_groups) = selected_row_groups
&& selected_row_groups.is_empty()
{
return Ok(RowSelection::from(Vec::new()));
}

let mut selected_row_groups_idx = 0;
Expand Down Expand Up @@ -897,10 +897,10 @@ impl ArrowReader {

results.push(selections_for_page);

if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
if let Some(selected_row_groups) = selected_row_groups
&& selected_row_groups_idx == selected_row_groups.len()
{
break;
}
}

Expand Down Expand Up @@ -1031,13 +1031,13 @@ fn apply_name_mapping_to_arrow_schema(

let mut metadata = field.metadata().clone();

if let Some(mapped_field) = mapped_field_opt {
if let Some(field_id) = mapped_field.field_id() {
// Field found in mapping with a field_id → assign it
metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
}
// If field_id is None, leave the field without an ID (will be filtered by projection)
if let Some(mapped_field) = mapped_field_opt
&& let Some(field_id) = mapped_field.field_id()
{
// Field found in mapping with a field_id → assign it
metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
}
// If field_id is None, leave the field without an ID (will be filtered by projection)
// If field not found in mapping, leave it without an ID (will be filtered by projection)

Field::new(field.name(), field.data_type().clone(), field.is_nullable())
Expand Down Expand Up @@ -2731,15 +2731,14 @@ message schema {
// Step 4: Verify we got 199 rows (not 200)
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();

println!("Total rows read: {}", total_rows);
println!("Total rows read: {total_rows}");
println!("Expected: 199 rows (deleted row 199 which had id=200)");

// This assertion will FAIL before the fix and PASS after the fix
assert_eq!(
total_rows, 199,
"Expected 199 rows after deleting row 199, but got {} rows. \
The bug causes position deletes in later row groups to be ignored.",
total_rows
"Expected 199 rows after deleting row 199, but got {total_rows} rows. \
The bug causes position deletes in later row groups to be ignored."
);

// Verify the deleted row (id=200) is not present
Expand Down Expand Up @@ -2950,16 +2949,15 @@ message schema {
// Row group 1 has 100 rows (ids 101-200), minus 1 delete (id=200) = 99 rows
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();

println!("Total rows read from row group 1: {}", total_rows);
println!("Total rows read from row group 1: {total_rows}");
println!("Expected: 99 rows (row group 1 has 100 rows, 1 delete at position 199)");

// This assertion will FAIL before the fix and PASS after the fix
assert_eq!(
total_rows, 99,
"Expected 99 rows from row group 1 after deleting position 199, but got {} rows. \
"Expected 99 rows from row group 1 after deleting position 199, but got {total_rows} rows. \
The bug causes position deletes to be lost when advance_to() is followed by next() \
when skipping unselected row groups.",
total_rows
when skipping unselected row groups."
);

// Verify the deleted row (id=200) is not present
Expand Down Expand Up @@ -3241,7 +3239,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
Expand Down Expand Up @@ -3338,7 +3336,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 3],
Expand Down Expand Up @@ -3424,7 +3422,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2, 3],
Expand Down Expand Up @@ -3524,7 +3522,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
Expand Down Expand Up @@ -3565,7 +3563,7 @@ message schema {
assert_eq!(all_values.len(), 6);

for i in 0..6 {
assert_eq!(all_names[i], format!("name_{}", i));
assert_eq!(all_names[i], format!("name_{i}"));
assert_eq!(all_values[i], i as i32);
}
}
Expand Down Expand Up @@ -3653,7 +3651,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
Expand Down Expand Up @@ -3749,7 +3747,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 5, 2],
Expand Down Expand Up @@ -3858,7 +3856,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/1.parquet", table_location),
data_file_path: format!("{table_location}/1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2, 3],
Expand Down Expand Up @@ -3997,7 +3995,7 @@ message schema {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data.parquet", table_location),
data_file_path: format!("{table_location}/data.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
Expand Down
35 changes: 17 additions & 18 deletions crates/iceberg/src/arrow/record_batch_projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,24 @@ impl RecordBatchProjector {
{
for (pos, field) in fields.iter().enumerate() {
let id = field_id_fetch_func(field)?;
if let Some(id) = id {
if target_field_id == id {
index_vec.push(pos);
return Ok(Some(field.clone()));
}
if let Some(id) = id
&& target_field_id == id
{
index_vec.push(pos);
return Ok(Some(field.clone()));
}
if let DataType::Struct(inner) = field.data_type() {
if searchable_field_func(field) {
if let Some(res) = Self::fetch_field_index(
inner,
index_vec,
target_field_id,
field_id_fetch_func,
searchable_field_func,
)? {
index_vec.push(pos);
return Ok(Some(res));
}
}
if let DataType::Struct(inner) = field.data_type()
&& searchable_field_func(field)
&& let Some(res) = Self::fetch_field_index(
inner,
index_vec,
target_field_id,
field_id_fetch_func,
searchable_field_func,
)?
{
index_vec.push(pos);
return Ok(Some(res));
}
}
Ok(None)
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ impl RecordBatchTransformer {
let this_field_id = field_id_str.parse().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("field id not parseable as an i32: {}", e),
format!("field id not parseable as an i32: {e}"),
)
})?;

Expand Down
33 changes: 15 additions & 18 deletions crates/iceberg/src/arrow/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,15 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
"The partner is not a decimal128 array",
)
})?;
if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() {
if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"The precision or scale ({arrow_precision},{arrow_scale}) of arrow decimal128 array is not compatible with iceberg decimal type ({precision},{scale})"
),
));
}
if let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type()
&& (*arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale)
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"The precision or scale ({arrow_precision},{arrow_scale}) of arrow decimal128 array is not compatible with iceberg decimal type ({precision},{scale})"
),
));
}
Ok(array.iter().map(|v| v.map(Literal::decimal)).collect())
}
Expand Down Expand Up @@ -351,10 +351,10 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
} else if let Some(array) = partner.as_any().downcast_ref::<StringArray>() {
Ok(array.iter().map(|v| v.map(Literal::string)).collect())
} else {
return Err(Error::new(
Err(Error::new(
ErrorKind::DataInvalid,
"The partner is not a string array",
));
))
}
}
PrimitiveType::Uuid => {
Expand Down Expand Up @@ -418,10 +418,10 @@ impl SchemaWithPartnerVisitor<ArrayRef> for ArrowArrayToIcebergStructConverter {
.map(|v| v.map(|v| Literal::binary(v.to_vec())))
.collect())
} else {
return Err(Error::new(
Err(Error::new(
ErrorKind::DataInvalid,
"The partner is not a binary array",
));
))
}
}
}
Expand Down Expand Up @@ -724,10 +724,7 @@ pub(crate) fn create_primitive_array_single_element(
}
_ => Err(Error::new(
ErrorKind::Unexpected,
format!(
"Unsupported constant type combination: {:?} with {:?}",
data_type, prim_lit
),
format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"),
)),
}
}
Expand Down Expand Up @@ -825,7 +822,7 @@ pub(crate) fn create_primitive_array_repeated(
(dt, _) => {
return Err(Error::new(
ErrorKind::Unexpected,
format!("unexpected target column type {}", dt),
format!("unexpected target column type {dt}"),
));
}
})
Expand Down
14 changes: 7 additions & 7 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,13 +1000,13 @@ mod _serde_set_statistics {
snapshot_id,
statistics,
} = SetStatistics::deserialize(deserializer)?;
if let Some(snapshot_id) = snapshot_id {
if snapshot_id != statistics.snapshot_id {
return Err(serde::de::Error::custom(format!(
"Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
statistics.snapshot_id
)));
}
if let Some(snapshot_id) = snapshot_id
&& snapshot_id != statistics.snapshot_id
{
return Err(serde::de::Error::custom(format!(
"Snapshot id to set {snapshot_id} does not match the statistics file snapshot id {}",
statistics.snapshot_id
)));
}

Ok(statistics)
Expand Down
Loading
Loading