Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement dedup and filter for vectors #245

Merged
merged 21 commits into from Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/time/src/lib.rs
Expand Up @@ -6,5 +6,8 @@ pub mod timestamp;
pub mod timestamp_millis;
pub mod util;

pub use date::Date;
pub use datetime::DateTime;
pub use range::RangeMillis;
pub use timestamp::Timestamp;
pub use timestamp_millis::TimestampMillis;
6 changes: 6 additions & 0 deletions src/datatypes/src/error.rs
Expand Up @@ -50,6 +50,12 @@ pub enum Error {

#[snafu(display("{}", msg))]
CastType { msg: String, backtrace: Backtrace },

#[snafu(display("Arrow failed to compute, source: {}", source))]
ArrowCompute {
source: arrow::error::ArrowError,
backtrace: Backtrace,
},
}

impl ErrorExt for Error {
Expand Down
1 change: 0 additions & 1 deletion src/datatypes/src/scalar.rs

This file was deleted.

99 changes: 81 additions & 18 deletions src/datatypes/src/scalars.rs
@@ -1,14 +1,11 @@
use std::any::Any;

use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime, Timestamp};

use crate::prelude::*;
use crate::vectors::date::DateVector;
use crate::vectors::datetime::DateTimeVector;
use crate::value::{ListValue, ListValueRef};
use crate::vectors::*;

pub mod common;

fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
match iter.size_hint() {
(_lower, Some(upper)) => upper,
Expand Down Expand Up @@ -244,9 +241,9 @@ impl<'a> ScalarRef<'a> for &'a [u8] {
}
}

impl Scalar for common_time::date::Date {
impl Scalar for Date {
type VectorType = DateVector;
type RefType<'a> = common_time::date::Date;
type RefType<'a> = Date;

fn as_scalar_ref(&self) -> Self::RefType<'_> {
*self
Expand All @@ -257,18 +254,18 @@ impl Scalar for common_time::date::Date {
}
}

impl<'a> ScalarRef<'a> for common_time::date::Date {
impl<'a> ScalarRef<'a> for Date {
type VectorType = DateVector;
type ScalarType = common_time::date::Date;
type ScalarType = Date;

fn to_owned_scalar(&self) -> Self::ScalarType {
*self
}
}

impl Scalar for common_time::datetime::DateTime {
impl Scalar for DateTime {
type VectorType = DateTimeVector;
type RefType<'a> = common_time::datetime::DateTime;
type RefType<'a> = DateTime;

fn as_scalar_ref(&self) -> Self::RefType<'_> {
*self
Expand All @@ -279,9 +276,9 @@ impl Scalar for common_time::datetime::DateTime {
}
}

impl<'a> ScalarRef<'a> for common_time::datetime::DateTime {
impl<'a> ScalarRef<'a> for DateTime {
type VectorType = DateTimeVector;
type ScalarType = common_time::datetime::DateTime;
type ScalarType = DateTime;

fn to_owned_scalar(&self) -> Self::ScalarType {
*self
Expand Down Expand Up @@ -310,10 +307,41 @@ impl<'a> ScalarRef<'a> for Timestamp {
}
}

impl Scalar for ListValue {
type VectorType = ListVector;
type RefType<'a> = ListValueRef<'a>;

fn as_scalar_ref(&self) -> Self::RefType<'_> {
ListValueRef::Ref { val: self }
}

fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> {
long
}
}

impl<'a> ScalarRef<'a> for ListValueRef<'a> {
type VectorType = ListVector;
type ScalarType = ListValue;

fn to_owned_scalar(&self) -> Self::ScalarType {
match self {
ListValueRef::Indexed { vector, idx } => match vector.get(*idx) {
// Normally should not get `Value::Null` if the `ListValueRef` comes
// from the iterator of the ListVector, but we avoid panic and just
// returns a default list value in such case since `ListValueRef` may
// be constructed manually.
Value::Null => ListValue::default(),
Value::List(v) => v,
_ => unreachable!(),
},
ListValueRef::Ref { val } => (*val).clone(),
}
}
}

#[cfg(test)]
mod tests {
use common_time::date::Date;

use super::*;
use crate::vectors::binary::BinaryVector;
use crate::vectors::primitive::Int32Vector;
Expand Down Expand Up @@ -357,7 +385,7 @@ mod tests {
}

#[test]
pub fn test_build_date_vector() {
fn test_build_date_vector() {
let expect: Vec<Option<Date>> = vec![
Some(Date::new(0)),
Some(Date::new(-1)),
Expand All @@ -369,14 +397,49 @@ mod tests {
}

#[test]
pub fn test_date_scalar() {
fn test_date_scalar() {
let date = Date::new(1);
assert_eq!(date, date.as_scalar_ref());
assert_eq!(date, date.to_owned_scalar());
}

#[test]
pub fn test_build_timestamp_vector() {
fn test_datetime_scalar() {
let dt = DateTime::new(123);
assert_eq!(dt, dt.as_scalar_ref());
assert_eq!(dt, dt.to_owned_scalar());
}

#[test]
fn test_list_value_scalar() {
let list_value = ListValue::new(
Some(Box::new(vec![Value::Int32(123)])),
ConcreteDataType::int32_datatype(),
);
let list_ref = ListValueRef::Ref { val: &list_value };
assert_eq!(list_ref, list_value.as_scalar_ref());
assert_eq!(list_value, list_ref.to_owned_scalar());

let mut builder =
ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 1);
builder.push(None);
builder.push(Some(list_value.as_scalar_ref()));
let vector = builder.finish();

let ref_on_vec = ListValueRef::Indexed {
vector: &vector,
idx: 0,
};
assert_eq!(ListValue::default(), ref_on_vec.to_owned_scalar());
let ref_on_vec = ListValueRef::Indexed {
vector: &vector,
idx: 1,
};
assert_eq!(list_value, ref_on_vec.to_owned_scalar());
}

#[test]
fn test_build_timestamp_vector() {
let expect: Vec<Option<Timestamp>> = vec![Some(10.into()), None, Some(42.into())];
let vector: TimestampVector = build_vector_from_slice(&expect);
assert_vector_eq(&expect, &vector);
Expand Down
23 changes: 0 additions & 23 deletions src/datatypes/src/scalars/common.rs

This file was deleted.

7 changes: 3 additions & 4 deletions src/datatypes/src/type_id.rs
@@ -1,6 +1,3 @@
#[cfg(any(test, feature = "test"))]
use crate::data_type::ConcreteDataType;

/// Unique identifier for logical data type.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LogicalTypeId {
Expand Down Expand Up @@ -43,7 +40,9 @@ impl LogicalTypeId {
/// # Panics
/// Panics if data type is not supported.
#[cfg(any(test, feature = "test"))]
pub fn data_type(&self) -> ConcreteDataType {
pub fn data_type(&self) -> crate::data_type::ConcreteDataType {
use crate::data_type::ConcreteDataType;

match self {
LogicalTypeId::Null => ConcreteDataType::null_datatype(),
LogicalTypeId::Boolean => ConcreteDataType::boolean_datatype(),
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/types/list_type.rs
Expand Up @@ -45,7 +45,7 @@ impl DataType for ListType {
}

fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
Box::new(ListVectorBuilder::with_capacity(
Box::new(ListVectorBuilder::with_type_capacity(
*self.inner.clone(),
capacity,
))
Expand Down
9 changes: 8 additions & 1 deletion src/datatypes/src/types/primitive_type.rs
Expand Up @@ -10,6 +10,7 @@ use snafu::OptionExt;
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Result};
use crate::scalars::ScalarVectorBuilder;
use crate::scalars::{Scalar, ScalarRef};
use crate::type_id::LogicalTypeId;
use crate::types::primitive_traits::Primitive;
use crate::value::{Value, ValueRef};
Expand All @@ -30,7 +31,13 @@ impl<T: Primitive, U: Primitive> PartialEq<PrimitiveType<U>> for PrimitiveType<T
impl<T: Primitive> Eq for PrimitiveType<T> {}

/// A trait that provide helper methods for a primitive type to implementing the [PrimitiveVector].
pub trait PrimitiveElement: Primitive {
pub trait PrimitiveElement
where
for<'a> Self: Primitive
+ Scalar<VectorType = PrimitiveVector<Self>>
+ ScalarRef<'a, ScalarType = Self, VectorType = PrimitiveVector<Self>>
+ Scalar<RefType<'a> = Self>,
{
/// Construct the data type struct.
fn build_data_type() -> ConcreteDataType;

Expand Down
31 changes: 25 additions & 6 deletions src/datatypes/src/value.rs
Expand Up @@ -110,7 +110,7 @@ impl Value {
Value::Binary(v) => ValueRef::Binary(v),
Value::Date(v) => ValueRef::Date(*v),
Value::DateTime(v) => ValueRef::DateTime(*v),
Value::List(v) => ValueRef::List(ListValueRef::Ref(v)),
Value::List(v) => ValueRef::List(ListValueRef::Ref { val: v }),
Value::Timestamp(v) => ValueRef::Timestamp(*v),
}
}
Expand Down Expand Up @@ -282,6 +282,12 @@ impl ListValue {
}
}

impl Default for ListValue {
fn default() -> ListValue {
ListValue::new(None, ConcreteDataType::null_datatype())
}
}

impl PartialOrd for ListValue {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
Expand Down Expand Up @@ -464,19 +470,32 @@ impl<'a> From<&'a [u8]> for ValueRef<'a> {
}
}

impl<'a> From<Option<ListValueRef<'a>>> for ValueRef<'a> {
fn from(list: Option<ListValueRef>) -> ValueRef {
match list {
Some(v) => ValueRef::List(v),
None => ValueRef::Null,
}
}
}

/// Reference to a [ListValue].
// Comparison still requires some allocation (call of `to_value()`) and might be avoidable.
///
/// Now comparison still requires some allocation (call of `to_value()`) and
/// might be avoidable by downcasting and comparing the underlying array slice
/// if it becomes bottleneck.
#[derive(Debug, Clone, Copy)]
pub enum ListValueRef<'a> {
Indexed { vector: &'a ListVector, idx: usize },
Ref(&'a ListValue),
Ref { val: &'a ListValue },
MichaelScofield marked this conversation as resolved.
Show resolved Hide resolved
}

impl<'a> ListValueRef<'a> {
/// Convert self to [Value]. This method would clone the underlying data.
fn to_value(self) -> Value {
match self {
ListValueRef::Indexed { vector, idx } => vector.get(idx),
ListValueRef::Ref(v) => Value::List((*v).clone()),
ListValueRef::Ref { val } => Value::List(val.clone()),
}
}
}
Expand Down Expand Up @@ -796,7 +815,7 @@ mod tests {
datatype: ConcreteDataType::int32_datatype(),
};
assert_eq!(
ValueRef::List(ListValueRef::Ref(&list)),
ValueRef::List(ListValueRef::Ref { val: &list }),
Value::List(list.clone()).as_value_ref()
);
}
Expand Down Expand Up @@ -831,7 +850,7 @@ mod tests {
items: None,
datatype: ConcreteDataType::int32_datatype(),
};
check_as_correct!(ListValueRef::Ref(&list), List, as_list);
check_as_correct!(ListValueRef::Ref { val: &list }, List, as_list);

let wrong_value = ValueRef::Int32(12345);
assert!(wrong_value.as_binary().is_err());
Expand Down
18 changes: 13 additions & 5 deletions src/datatypes/src/vectors.rs
Expand Up @@ -9,10 +9,21 @@ mod helper;
mod list;
pub mod mutable;
pub mod null;
mod operations;
pub mod primitive;
mod string;
mod timestamp;

pub mod all {
//! All vector types.
pub use crate::vectors::{
BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Float32Vector,
Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, ListVector, NullVector,
PrimitiveVector, StringVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector,
UInt8Vector,
};
}

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -29,6 +40,7 @@ pub use helper::Helper;
pub use list::*;
pub use mutable::MutableVector;
pub use null::*;
pub use operations::VectorOp;
pub use primitive::*;
use snafu::ensure;
pub use string::*;
Expand Down Expand Up @@ -59,7 +71,7 @@ impl<'a> Validity<'a> {
}

/// Vector of data values.
pub trait Vector: Send + Sync + Serializable + Debug {
pub trait Vector: Send + Sync + Serializable + Debug + VectorOp {
/// Returns the data type of the vector.
///
/// This may require heap allocation.
Expand Down Expand Up @@ -140,10 +152,6 @@ pub trait Vector: Send + Sync + Serializable + Debug {
Ok(self.get(index))
}

// Copies each element according offsets parameter.
// (i-th element should be copied offsets[i] - offsets[i - 1] times.)
fn replicate(&self, offsets: &[usize]) -> VectorRef;

/// Returns the reference of value at `index`.
///
/// # Panics
Expand Down