Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::array::{
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use fcore::row::InternalRow;
use fluss as fcore;
use std::borrow::Cow;

pub const DATA_TYPE_BOOLEAN: i32 = 1;
pub const DATA_TYPE_TINYINT: i32 = 2;
Expand Down Expand Up @@ -218,9 +219,8 @@ pub fn ffi_row_to_core(row: &ffi::FfiGenericRow) -> fcore::row::GenericRow<'_> {
DATUM_TYPE_INT64 => Datum::Int64(field.i64_val),
DATUM_TYPE_FLOAT32 => Datum::Float32(field.f32_val.into()),
DATUM_TYPE_FLOAT64 => Datum::Float64(field.f64_val.into()),
DATUM_TYPE_STRING => Datum::String(field.string_val.as_str()),
// todo: avoid copy bytes for blob
DATUM_TYPE_BYTES => Datum::Blob(field.bytes_val.clone().into()),
DATUM_TYPE_STRING => Datum::String(Cow::Borrowed(field.string_val.as_str())),
DATUM_TYPE_BYTES => Datum::Blob(Cow::Borrowed(field.bytes_val.as_slice())),
_ => Datum::Null,
};
generic_row.set_field(idx, datum);
Expand Down
9 changes: 2 additions & 7 deletions crates/fluss/src/row/binary/binary_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,12 @@ impl InnerValueWriter {
writer.write_boolean(*v);
}
(InnerValueWriter::Binary, Datum::Blob(v)) => {
writer.write_binary(v.as_ref(), v.len());
}
(InnerValueWriter::Binary, Datum::BorrowedBlob(v)) => {
writer.write_binary(v.as_ref(), v.len());
let b = v.as_ref();
writer.write_binary(b, b.len());
}
(InnerValueWriter::Bytes, Datum::Blob(v)) => {
writer.write_bytes(v.as_ref());
}
(InnerValueWriter::Bytes, Datum::BorrowedBlob(v)) => {
writer.write_bytes(v.as_ref());
}
(InnerValueWriter::TinyInt, Datum::Int8(v)) => {
writer.write_byte(*v as u8);
}
Expand Down
7 changes: 5 additions & 2 deletions crates/fluss/src/row/compacted/compacted_row_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use bytes::Bytes;
use std::borrow::Cow;

use crate::{
metadata::DataType,
Expand Down Expand Up @@ -52,10 +53,12 @@ impl CompactedRowDeserializer {
DataType::Float(_) => Datum::Float32(reader.read_float().into()),
DataType::Double(_) => Datum::Float64(reader.read_double().into()),
// TODO: use read_char(length) in the future, but need to keep compatibility
DataType::Char(_) | DataType::String(_) => Datum::OwnedString(reader.read_string()),
DataType::Char(_) | DataType::String(_) => {
Datum::String(Cow::Owned(reader.read_string()))
}
// TODO: use read_binary(length) in the future, but need to keep compatibility
DataType::Bytes(_) | DataType::Binary(_) => {
Datum::Blob(reader.read_bytes().into_vec().into())
Datum::Blob(Cow::Owned(reader.read_bytes().into_vec()))
}
_ => panic!("unsupported DataType in CompactedRowDeserializer"),
};
Expand Down
95 changes: 21 additions & 74 deletions crates/fluss/src/row/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use arrow::array::{
use jiff::ToSpan;
use ordered_float::OrderedFloat;
use parse_display::Display;
use ref_cast::RefCast;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::Deref;
use serde::Serialize;
use std::borrow::Cow;

#[allow(dead_code)]
const THIRTY_YEARS_MICROSECONDS: i64 = 946_684_800_000_000;
Expand All @@ -52,14 +50,9 @@ pub enum Datum<'a> {
#[display("{0}")]
Float64(F64),
#[display("'{0}'")]
String(&'a str),
/// Owned string
#[display("'{0}'")]
OwnedString(String),
#[display("{0}")]
Blob(Blob),
String(Str<'a>),
#[display("{:?}")]
BorrowedBlob(&'a [u8]),
Blob(Blob<'a>),
#[display("{0}")]
Decimal(Decimal),
#[display("{0}")]
Expand All @@ -78,15 +71,13 @@ impl Datum<'_> {
pub fn as_str(&self) -> &str {
match self {
Self::String(s) => s,
Self::OwnedString(s) => s.as_str(),
_ => panic!("not a string: {self:?}"),
}
}

pub fn as_blob(&self) -> &[u8] {
match self {
Self::Blob(blob) => blob.as_ref(),
Self::BorrowedBlob(blob) => blob,
_ => panic!("not a blob: {self:?}"),
}
}
Expand Down Expand Up @@ -121,10 +112,19 @@ impl<'a> From<i16> for Datum<'a> {
}
}

pub type Str<'a> = Cow<'a, str>;

impl<'a> From<String> for Datum<'a> {
#[inline]
fn from(s: String) -> Self {
Datum::String(Cow::Owned(s))
}
}

impl<'a> From<&'a str> for Datum<'a> {
#[inline]
fn from(s: &'a str) -> Datum<'a> {
Datum::String(s)
Datum::String(Cow::Borrowed(s))
}
}

Expand Down Expand Up @@ -226,8 +226,7 @@ impl<'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b str {
#[inline]
fn try_from(from: &'b Datum<'a>) -> std::result::Result<Self, Self::Error> {
match from {
Datum::String(i) => Ok(*i),
Datum::OwnedString(s) => Ok(s.as_str()),
Datum::String(s) => Ok(s.as_ref()),
_ => Err(()),
}
}
Expand Down Expand Up @@ -295,10 +294,8 @@ impl Datum<'_> {
Datum::Int64(v) => append_value_to_arrow!(Int64Builder, *v),
Datum::Float32(v) => append_value_to_arrow!(Float32Builder, v.into_inner()),
Datum::Float64(v) => append_value_to_arrow!(Float64Builder, v.into_inner()),
Datum::String(v) => append_value_to_arrow!(StringBuilder, *v),
Datum::OwnedString(v) => append_value_to_arrow!(StringBuilder, v.as_str()),
Datum::String(v) => append_value_to_arrow!(StringBuilder, v.as_ref()),
Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, v.as_ref()),
Datum::BorrowedBlob(v) => append_value_to_arrow!(BinaryBuilder, *v),
Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | Datum::TimestampTz(_) => {
return Err(RowConvertError {
message: format!(
Expand Down Expand Up @@ -349,58 +346,6 @@ impl_to_arrow!(&str, StringBuilder);

pub type F32 = OrderedFloat<f32>;
pub type F64 = OrderedFloat<f64>;
#[allow(dead_code)]
pub type Str = Box<str>;

#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, Default)]
pub struct Blob(Box<[u8]>);

impl Deref for Blob {
type Target = BlobRef;

fn deref(&self) -> &Self::Target {
BlobRef::new(&self.0)
}
}

impl BlobRef {
pub fn new(bytes: &[u8]) -> &Self {
// SAFETY: `&BlobRef` and `&[u8]` have the same layout.
BlobRef::ref_cast(bytes)
}
}

/// A slice of a blob.
#[repr(transparent)]
#[derive(PartialEq, Eq, PartialOrd, Ord, RefCast, Hash)]
pub struct BlobRef([u8]);

impl fmt::Debug for Blob {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.as_ref())
}
}

impl fmt::Display for Blob {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.as_ref())
}
}

impl AsRef<[u8]> for BlobRef {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl Deref for BlobRef {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.0
}
}

#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
pub struct Date(i32);

Expand All @@ -410,15 +355,17 @@ pub struct Timestamp(i64);
#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, Hash, Serialize)]
pub struct TimestampLtz(i64);

impl From<Vec<u8>> for Blob {
pub type Blob<'a> = Cow<'a, [u8]>;

impl<'a> From<Vec<u8>> for Datum<'a> {
fn from(vec: Vec<u8>) -> Self {
Blob(vec.into())
Datum::Blob(Blob::from(vec))
}
}

impl<'a> From<&'a [u8]> for Datum<'a> {
fn from(bytes: &'a [u8]) -> Datum<'a> {
Datum::BorrowedBlob(bytes)
Datum::Blob(Blob::from(bytes))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/row/field_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub enum InnerFieldGetter {
impl InnerFieldGetter {
pub fn get_field<'a>(&self, row: &'a dyn InternalRow) -> Datum<'a> {
match self {
InnerFieldGetter::Char { pos, len } => Datum::String(row.get_char(*pos, *len)),
InnerFieldGetter::Char { pos, len } => Datum::from(row.get_char(*pos, *len)),
InnerFieldGetter::String { pos } => Datum::from(row.get_string(*pos)),
InnerFieldGetter::Bool { pos } => Datum::from(row.get_boolean(*pos)),
InnerFieldGetter::Binary { pos, len } => Datum::from(row.get_binary(*pos, *len)),
Expand Down
Loading