Skip to content

Commit

Permalink
Add InList support for timestamp type. (#3449) (#3450)
Browse files Browse the repository at this point in the history
* Add `InList` support for timestamp type. (#3449)

* Update datafusion/physical-expr/src/expressions/in_list.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
Ted-Jiang and alamb committed Sep 12, 2022
1 parent f48a997 commit 17f069d
Showing 1 changed file with 203 additions and 6 deletions.
209 changes: 203 additions & 6 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use std::sync::Arc;
use arrow::array::GenericStringArray;
use arrow::array::{
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, OffsetSizeTrait, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
Int64Array, Int8Array, OffsetSizeTrait, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::{
datatypes::{DataType, Schema},
Expand All @@ -34,10 +35,12 @@ use arrow::{

use crate::PhysicalExpr;
use arrow::array::*;
use arrow::datatypes::TimeUnit;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::{
Binary, Boolean, Date32, Date64, Decimal128, Int16, Int32, Int64, Int8, LargeBinary,
LargeUtf8, UInt16, UInt32, UInt64, UInt8, Utf8,
LargeUtf8, TimestampMicrosecond, TimestampMillisecond, TimestampNanosecond,
TimestampSecond, UInt16, UInt32, UInt64, UInt8, Utf8,
};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
Expand Down Expand Up @@ -111,8 +114,8 @@ macro_rules! make_contains_primitive {
.iter()
.flat_map(|expr| match expr {
ColumnarValue::Scalar(s) => match s {
ScalarValue::$SCALAR_VALUE(Some(v)) => Some(*v),
ScalarValue::$SCALAR_VALUE(None) => None,
ScalarValue::$SCALAR_VALUE(Some(v), ..) => Some(*v),
ScalarValue::$SCALAR_VALUE(None, ..) => None,
datatype => unreachable!("InList can't reach other data type {} for {}.", datatype, s),
},
ColumnarValue::Array(_) => {
Expand Down Expand Up @@ -175,7 +178,7 @@ macro_rules! set_contains_for_primitive {
let native_set = $SET_VALUES
.iter()
.flat_map(|v| match v {
$SCALAR_VALUE(value) => *value,
$SCALAR_VALUE(value, ..) => *value,
datatype => {
unreachable!(
"InList can't reach other data type {} for {}.",
Expand Down Expand Up @@ -691,6 +694,60 @@ impl PhysicalExpr for InListExpr {
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
Ok(make_set_contains_decimal(array, set, self.negated))
}
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => {
let array = array
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap();
Ok(set_contains_for_primitive!(
array,
set,
TimestampSecond,
self.negated,
i64
))
}
TimeUnit::Millisecond => {
let array = array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
Ok(set_contains_for_primitive!(
array,
set,
TimestampMillisecond,
self.negated,
i64
))
}
TimeUnit::Microsecond => {
let array = array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
Ok(set_contains_for_primitive!(
array,
set,
TimestampMicrosecond,
self.negated,
i64
))
}
TimeUnit::Nanosecond => {
let array = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
Ok(set_contains_for_primitive!(
array,
set,
TimestampNanosecond,
self.negated,
i64
))
}
},
datatype => Result::Err(DataFusionError::NotImplemented(format!(
"InSet does not support datatype {:?}.",
datatype
Expand Down Expand Up @@ -849,6 +906,44 @@ impl PhysicalExpr for InListExpr {
self.negated,
))
}
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => {
make_contains_primitive!(
array,
list_values,
self.negated,
TimestampSecond,
TimestampSecondArray
)
}
TimeUnit::Millisecond => {
make_contains_primitive!(
array,
list_values,
self.negated,
TimestampMillisecond,
TimestampMillisecondArray
)
}
TimeUnit::Microsecond => {
make_contains_primitive!(
array,
list_values,
self.negated,
TimestampMicrosecond,
TimestampMicrosecondArray
)
}
TimeUnit::Nanosecond => {
make_contains_primitive!(
array,
list_values,
self.negated,
TimestampNanosecond,
TimestampNanosecondArray
)
}
},
datatype => Result::Err(DataFusionError::NotImplemented(format!(
"InList does not support datatype {:?}.",
datatype
Expand Down Expand Up @@ -1659,4 +1754,106 @@ mod tests {

Ok(())
}

#[test]
fn in_list_set_timestamp() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
)]);
let a = TimestampMicrosecondArray::from(vec![
Some(1388588401000000000),
Some(1288588501000000000),
None,
]);
let col_a = col("a", &schema)?;
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;

let mut list = vec![
lit(ScalarValue::TimestampMicrosecond(
Some(1388588401000000000),
None,
)),
lit(ScalarValue::TimestampMicrosecond(None, None)),
lit(ScalarValue::TimestampMicrosecond(
Some(1388588401000000001),
None,
)),
];
let start_ts = 1388588401000000001;
for v in start_ts..(start_ts + OPTIMIZER_INSET_THRESHOLD + 4) {
list.push(lit(ScalarValue::TimestampMicrosecond(Some(v as i64), None)));
}

in_list!(
batch,
list.clone(),
&false,
vec![Some(true), None, None],
col_a.clone(),
&schema
);

in_list!(
batch,
list.clone(),
&true,
vec![Some(false), None, None],
col_a.clone(),
&schema
);

Ok(())
}

#[test]
fn in_list_timestamp() -> Result<()> {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
)]);
let a = TimestampMicrosecondArray::from(vec![
Some(1388588401000000000),
Some(1288588501000000000),
None,
]);
let col_a = col("a", &schema)?;
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;

let list = vec![
lit(ScalarValue::TimestampMicrosecond(
Some(1388588401000000000),
None,
)),
lit(ScalarValue::TimestampMicrosecond(
Some(1388588401000000001),
None,
)),
lit(ScalarValue::TimestampMicrosecond(
Some(1388588401000000002),
None,
)),
];

in_list!(
batch,
list.clone(),
&false,
vec![Some(true), Some(false), None],
col_a.clone(),
&schema
);

in_list!(
batch,
list.clone(),
&true,
vec![Some(false), Some(true), None],
col_a.clone(),
&schema
);
Ok(())
}
}

0 comments on commit 17f069d

Please sign in to comment.