diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index d95da142..fef73cea 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -25,6 +25,7 @@ use arrow::array::{ use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; use fcore::row::InternalRow; use fluss as fcore; +use std::borrow::Cow; pub const DATA_TYPE_BOOLEAN: i32 = 1; pub const DATA_TYPE_TINYINT: i32 = 2; @@ -218,9 +219,8 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> { DATUM_TYPE_INT64 => Datum::Int64(field.i64_val), DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()), DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()), - DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()), - // todo: avoid copy bytes for blob - DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()), + DATUM_TYPE_STRING => Datum::String(Cow::Borrowed(field.string_val.as_str())), + DATUM_TYPE_BYTES => Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())), _ => Datum::Null, }; generic_row.set_field(idx, datum); diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs index 44f10b63..9917c7b7 100644 --- a/crates/fluss/src/row/binary/binary_writer.rs +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -170,17 +170,12 @@ impl InnerValueWriter { writer.write_boolean(*v); } (InnerValueWriter::Binary, Datum::Blob(v)) => { - writer.write_binary(v.as_ref(), v.len()); - } - (InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => { - writer.write_binary(v.as_ref(), v.len()); + let b = v.as_ref(); + writer.write_binary(b, b.len()); } (InnerValueWriter::Bytes, Datum::Blob(v)) => { writer.write_bytes(v.as_ref()); } - (InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => { - writer.write_bytes(v.as_ref()); - } (InnerValueWriter::TinyInt, Datum::Int8(v)) => { writer.write_byte(*v as u8); } diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 19afe887..00d94ad6 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -16,6 +16,7 @@ // under the License. use bytes::Bytes; +use std::borrow::Cow; use crate::{ metadata::DataType, @@ -52,10 +53,12 @@ impl CompactedRowDeserializer { DataType::Float(_) => Datum::Float32(reader.read_float().into()), DataType::Double(_) => Datum::Float64(reader.read_double().into()), // TODO: use read_char(length) in the future, but need to keep compatibility - DataType::Char(_) | DataType::String(_) => Datum::OwnedString(reader.read_string()), + DataType::Char(_) | DataType::String(_) => { + Datum::String(Cow::Owned(reader.read_string())) + } // TODO: use read_binary(length) in the future, but need to keep compatibility DataType::Bytes(_) | DataType::Binary(_) => { - Datum::Blob(reader.read_bytes().into_vec().into()) + Datum::Blob(Cow::Owned(reader.read_bytes().into_vec())) } _ => panic!("unsupported DataType in CompactedRowDeserializer"), }; diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 78872a9d..607866ed 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -24,11 +24,9 @@ use arrow::array::{ use jiff::ToSpan; use ordered_float::OrderedFloat; use parse_display::Display; -use ref_cast::RefCast; use rust_decimal::Decimal; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::ops::Deref; +use serde::Serialize; +use std::borrow::Cow; #[allow(dead_code)] const THIRTY_YEARS_MICROSECONDS: i64 = 946_684_800_000_000; @@ -52,14 +50,9 @@ pub enum Datum<'a> { #[display("{0}")] Float64(F64), #[display("'{0}'")] - String(&'a str), - /// Owned string - #[display("'{0}'")] - OwnedString(String), - #[display("{0}")] - Blob(Blob), + String(Str<'a>), #[display("{:?}")] - BorrowedBlob(&'a [u8]), + Blob(Blob<'a>), #[display("{0}")] Decimal(Decimal), #[display("{0}")] @@ -78,7 +71,6 @@ impl Datum<'_> { pub fn as_str(&self) -> &str { match self { Self::String(s) => s, - Self::OwnedString(s) => s.as_str(), _ => panic!("not a string: {self:?}"), } } @@ -86,7 +78,6 @@ impl Datum<'_> { pub fn as_blob(&self) -> &[u8] { match self { Self::Blob(blob) => blob.as_ref(), - Self::BorrowedBlob(blob) => blob, _ => panic!("not a blob: {self:?}"), } } @@ -121,10 +112,19 @@ impl<'a> From for Datum<'a> { } } +pub type Str<'a> = Cow<'a, str>; + +impl<'a> From for Datum<'a> { + #[inline] + fn from(s: String) -> Self { + Datum::String(Cow::Owned(s)) + } +} + impl<'a> From<&'a str> for Datum<'a> { #[inline] fn from(s: &'a str) -> Datum<'a> { - Datum::String(s) + Datum::String(Cow::Borrowed(s)) } } @@ -226,8 +226,7 @@ impl<'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b str { #[inline] fn try_from(from: &'b Datum<'a>) -> std::result::Result { match from { - Datum::String(i) => Ok(*i), - Datum::OwnedString(s) => Ok(s.as_str()), + Datum::String(s) => Ok(s.as_ref()), _ => Err(()), } } @@ -295,10 +294,8 @@ impl Datum<'_> { Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v), Datum::Float32(v) => append_value_to_arrow!(Float32Builder, v.into_inner()), Datum::Float64(v) => append_value_to_arrow!(Float64Builder, v.into_inner()), - Datum::String(v) => append_value_to_arrow!(StringBuilder, *v), - Datum::OwnedString(v) => append_value_to_arrow!(StringBuilder, v.as_str()), + Datum::String(v) => append_value_to_arrow!(StringBuilder, v.as_ref()), Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, v.as_ref()), - Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder, *v), Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | Datum::TimestampTz(_) => { return Err(RowConvertError { message: format!( @@ -349,58 +346,6 @@ impl_to_arrow!(&str, StringBuilder); pub type F32 = OrderedFloat; pub type F64 = OrderedFloat; -#[allow(dead_code)] -pub type Str = Box; - -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, Default)] -pub struct Blob(Box<[u8]>); - -impl Deref for Blob { - type Target = BlobRef; - - fn deref(&self) -> &Self::Target { - BlobRef::new(&self.0) - } -} - -impl BlobRef { - pub fn new(bytes: &[u8]) -> &Self { - // SAFETY: `&BlobRef` and `&[u8]` have the same layout. - BlobRef::ref_cast(bytes) - } -} - -/// A slice of a blob. -#[repr(transparent)] -#[derive(PartialEq, Eq, PartialOrd, Ord, RefCast, Hash)] -pub struct BlobRef([u8]); - -impl fmt::Debug for Blob { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.as_ref()) - } -} - -impl fmt::Display for Blob { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.as_ref()) - } -} - -impl AsRef<[u8]> for BlobRef { - fn as_ref(&self) -> &[u8] { - &self.0 - } -} - -impl Deref for BlobRef { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)] pub struct Date(i32); @@ -410,15 +355,17 @@ pub struct Timestamp(i64); #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)] pub struct TimestampLtz(i64); -impl From> for Blob { +pub type Blob<'a> = Cow<'a, [u8]>; + +impl<'a> From> for Datum<'a> { fn from(vec: Vec) -> Self { - Blob(vec.into()) + Datum::Blob(Blob::from(vec)) } } impl<'a> From<&'a [u8]> for Datum<'a> { fn from(bytes: &'a [u8]) -> Datum<'a> { - Datum::BorrowedBlob(bytes) + Datum::Blob(Blob::from(bytes)) } } diff --git a/crates/fluss/src/row/field_getter.rs b/crates/fluss/src/row/field_getter.rs index 3a9cf0fa..8e529e54 100644 --- a/crates/fluss/src/row/field_getter.rs +++ b/crates/fluss/src/row/field_getter.rs @@ -83,7 +83,7 @@ pub enum InnerFieldGetter { impl InnerFieldGetter { pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> { match self { - InnerFieldGetter::Char { pos, len } => Datum::String(row.get_char(*pos, *len)), + InnerFieldGetter::Char { pos, len } => Datum::from(row.get_char(*pos, *len)), InnerFieldGetter::String { pos } => Datum::from(row.get_string(*pos)), InnerFieldGetter::Bool { pos } => Datum::from(row.get_boolean(*pos)), InnerFieldGetter::Binary { pos, len } => Datum::from(row.get_binary(*pos, *len)),