Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/fluss/src/row/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ impl FlussMapWriter {
}
(DataType::Array(_), Datum::Array(v)) => writer.write_array(pos, v),
(DataType::Map(_), Datum::Map(v)) => writer.write_map(pos, v),
(DataType::Row(_), Datum::Row(v)) => writer.write_row(pos, v.as_ref())?,
_ => {
return Err(IllegalArgument {
message: format!("Type mismatch: expected {:?}, got {:?}", dt, datum),
Expand Down
45 changes: 11 additions & 34 deletions crates/fluss/src/row/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,25 +661,19 @@ impl InternalRow for ColumnarRow {
};

let column = self.column(pos)?;
let element_field = match column.data_type() {
ArrowDataType::List(field) => field,
match column.data_type() {
ArrowDataType::List(_) => {}
other => {
return Err(IllegalArgument {
message: format!("expected List array at position {pos}, got {other:?}"),
});
}
};

let actual_element_type = from_arrow_field(element_field)?;
if actual_element_type != *element_fluss_type {
return Err(IllegalArgument {
message: format!(
"Arrow list element type {:?} does not match expected Fluss type {:?}",
actual_element_type, element_fluss_type
),
});
}

// `to_arrow_type` is lossy (e.g. TIMESTAMP_LTZ → plain Arrow Timestamp);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to confirm since not mentioned in PR description: i guess this will be a behaviour, correct? that users won't know schema mismatch when calling get_array on position

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional. The old strict check tripped on lossy Arrow round-trips (e.g. TIMESTAMP_LTZ -> plain Arrow Timestamp) and rejected valid schemas.

Real shape mismatches still error from the per-element conversion below

// trust the Fluss schema and let the per-element conversion below catch
// real shape mismatches.

let list_arr = column
.as_any()
.downcast_ref::<ListArray>()
Expand Down Expand Up @@ -778,25 +772,9 @@ fn arrow_map_entry_to_fluss_map(
});
}

let actual_key_type = from_arrow_field(&fields[0])?;
if actual_key_type != *key_type {
return Err(IllegalArgument {
message: format!(
"Arrow map key type {:?} does not match expected Fluss type {:?}",
actual_key_type, key_type
),
});
}

let actual_value_type = from_arrow_field(&fields[1])?;
if actual_value_type != *value_type {
return Err(IllegalArgument {
message: format!(
"Arrow map value type {:?} does not match expected Fluss type {:?}",
actual_value_type, value_type
),
});
}
// `to_arrow_type` is lossy (e.g. TIMESTAMP_LTZ → plain Arrow Timestamp);
// trust the Fluss schema and let the per-element conversion below catch
// real shape mismatches.

let keys_arrow = struct_arr.column(0);
let values_arrow = struct_arr.column(1);
Expand Down Expand Up @@ -1443,8 +1421,7 @@ mod tests {

let err = row.get_array(0).unwrap_err();
assert!(
err.to_string()
.contains("Cannot convert Arrow type to Fluss type"),
err.to_string().contains("expected Int32Type"),
"unexpected error: {err}"
);
}
Expand Down Expand Up @@ -1735,7 +1712,7 @@ mod tests {
let err = row.get_map(0).expect_err("type mismatch must error");
let msg = err.to_string();
assert!(
msg.contains("does not match expected Fluss type"),
msg.contains("expected StringArray"),
"unexpected error: {msg}"
);
}
Expand Down
58 changes: 54 additions & 4 deletions crates/fluss/src/row/column_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
use crate::error::Error::RowConvertError;
use crate::error::{Error, Result};
use crate::metadata::{DataType, RowType};
use crate::row::FlussMap;
use crate::row::InternalRow;
use crate::row::datum::{
MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, append_decimal_to_builder,
millis_nanos_to_micros, millis_nanos_to_nanos,
};
use crate::row::{FlussArray, FlussMap, InternalRow};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder,
Expand Down Expand Up @@ -928,8 +927,8 @@ fn write_map_into(
let key_array = map.key_array();
let value_array = map.value_array();
for i in 0..map.size() {
key_writer.write_field_at(key_array, i)?;
value_writer.write_field_at(value_array, i)?;
write_array_element_into_column(key_writer, key_array, i)?;
write_array_element_into_column(value_writer, value_array, i)?;
}
let last = *offsets.last().unwrap();
offsets.push(
Expand All @@ -940,6 +939,57 @@ fn write_map_into(
Ok(())
}

// FlussArray carries no schema; nested row/map elements need the typed
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be fixed with refactoring, now I need this for test to pass

// inherent accessors (get_row/get_map with explicit types).
fn write_array_element_into_column(
writer: &mut ColumnWriter,
array: &FlussArray,
index: usize,
) -> Result<()> {
match &mut writer.inner {
TypedWriter::Struct {
field_writers,
validity,
row_type,
..
} => {
if array.is_null_at(index) {
for child in field_writers.iter_mut() {
child.append_null();
}
validity.push(false);
} else {
let nested = array.get_row(index, row_type)?;
for (j, child) in field_writers.iter_mut().enumerate() {
child.write_field_at(&nested, j)?;
}
validity.push(true);
}
Ok(())
}
TypedWriter::Map {
key_writer,
value_writer,
key_type,
value_type,
offsets,
validity,
} => {
if array.is_null_at(index) {
validity.push(false);
let last = *offsets.last().unwrap();
offsets.push(last);
} else {
let nested = array.get_map(index, key_type, value_type)?;
write_map_into(nested, key_writer, value_writer, offsets)?;
validity.push(true);
}
Ok(())
}
_ => writer.write_field_at(array, index),
}
}

fn finish_struct_array(
fields: arrow_schema::Fields,
child_arrays: Vec<ArrayRef>,
Expand Down
Loading
Loading