Skip to content

Commit

Permalink
ARROW-8117 [Datafusion] [Rust] allow cast SQLTimestamp to Timestamp
Browse files Browse the repository at this point in the history
Because current sqlparser version cannot parse timestamps in queries, allowing casting number to timestamp enables selection on timestamp type columns.

Closes #6618 from mcassels/cast_number_to_timestamp_nanoseconds and squashes the following commits:

02fbf83 <Morgan Cassels> run cargo fmt
73fed60 <Morgan Cassels> support casting to timestamp

Authored-by: Morgan Cassels <morgan@urbanlogiq.com>
Signed-off-by: Paddy Horan <paddyhoran@hotmail.com>
  • Loading branch information
Morgan Cassels authored and paddyhoran committed Mar 14, 2020
1 parent 6d3c085 commit bc06c0d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
38 changes: 34 additions & 4 deletions rust/datafusion/src/execution/physical_plan/expressions.rs
Expand Up @@ -27,8 +27,8 @@ use crate::execution::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::logicalplan::{Operator, ScalarValue};
use arrow::array::{
ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, StringArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
Int64Array, Int8Array, StringArray, TimestampNanosecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow::array::{
Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder,
Expand All @@ -43,7 +43,7 @@ use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow::compute::kernels::comparison::{
eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, lt_eq_utf8, lt_utf8, neq_utf8, nlike_utf8,
};
use arrow::datatypes::{DataType, Schema};
use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;

/// Represents the column at a given index in a RecordBatch
Expand Down Expand Up @@ -936,6 +936,9 @@ macro_rules! binary_array_op {
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
other => Err(ExecutionError::General(format!(
"Unsupported data type {:?}",
other
Expand Down Expand Up @@ -1122,6 +1125,10 @@ impl CastExpr {
Ok(Self { expr, cast_type })
} else if expr_type == DataType::Binary && cast_type == DataType::Utf8 {
Ok(Self { expr, cast_type })
} else if is_numeric(&expr_type)
&& cast_type == DataType::Timestamp(TimeUnit::Nanosecond, None)
{
Ok(Self { expr, cast_type })
} else {
Err(ExecutionError::General(format!(
"Invalid CAST from {:?} to {:?}",
Expand Down Expand Up @@ -1230,7 +1237,7 @@ mod tests {
use super::*;
use crate::error::Result;
use crate::execution::physical_plan::common::get_scalar_value;
use arrow::array::{PrimitiveArray, StringArray};
use arrow::array::{PrimitiveArray, StringArray, Time64NanosecondArray};
use arrow::datatypes::*;

#[test]
Expand Down Expand Up @@ -1357,6 +1364,29 @@ mod tests {
Ok(())
}

#[test]
fn cast_i64_to_timestamp_nanoseconds() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
let a = Int64Array::from(vec![1, 2, 3, 4, 5]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;

let cast = CastExpr::try_new(
col(0),
&schema,
DataType::Timestamp(TimeUnit::Nanosecond, None),
)?;
let result = cast.evaluate(&batch)?;
assert_eq!(result.len(), 5);
let expected_result = Time64NanosecondArray::from(vec![1, 2, 3, 4]);
let result = result
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("failed to downcast to TimestampNanosecondArray");
assert_eq!(result.value(0), expected_result.value(0));

Ok(())
}

#[test]
fn invalid_cast() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
Expand Down
17 changes: 17 additions & 0 deletions rust/datafusion/src/sql/planner.rs
Expand Up @@ -470,6 +470,7 @@ pub fn convert_data_type(sql: &SQLType) -> Result<DataType> {
SQLType::Float(_) | SQLType::Real => Ok(DataType::Float64),
SQLType::Double => Ok(DataType::Float64),
SQLType::Char(_) | SQLType::Varchar(_) => Ok(DataType::Utf8),
SQLType::Timestamp => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
other => Err(ExecutionError::NotImplemented(format!(
"Unsupported SQL type {:?}",
other
Expand Down Expand Up @@ -532,6 +533,17 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn test_timestamp_selection() {
let sql = "SELECT state FROM person WHERE birth_date < CAST (158412331400600000 as timestamp)";

let expected = "Projection: #4\
\n Selection: #6 Lt CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\
\n TableScan: person projection=None";

quick_test(sql, expected);
}

#[test]
fn select_all_boolean_operators() {
let sql = "SELECT age, first_name, last_name \
Expand Down Expand Up @@ -658,6 +670,11 @@ mod tests {
Field::new("age", DataType::Int32, false),
Field::new("state", DataType::Utf8, false),
Field::new("salary", DataType::Float64, false),
Field::new(
"birth_date",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]))),
_ => None,
}
Expand Down

0 comments on commit bc06c0d

Please sign in to comment.