Skip to content

Commit

Permalink
fix issue with now() returning same value across statements (#3210)
Browse files Browse the repository at this point in the history
  • Loading branch information
kmitchener committed Aug 23, 2022
1 parent c72f547 commit eedc787
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 51 deletions.
21 changes: 19 additions & 2 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,25 @@ impl DataFrame {

/// Create a physical plan
pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
let state = self.session_state.read().clone();
state.create_physical_plan(&self.plan).await
// this function is copied from SessionContext function of the
// same name
let state_cloned = {
let mut state = self.session_state.write();
state.execution_props.start_execution();

// We need to clone `state` to release the lock that is not `Send`. We could
// make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
// propagate async even to the `LogicalPlan` building methods.
// Cloning `state` here is fine as we then pass it as immutable `&state`, which
// means that we avoid write consistency issues as the cloned version will not
// be written to. As for eventual modifications that would be applied to the
// original state after it has been cloned, they will not be picked up by the
// clone but that is okay, as it is equivalent to postponing the state update
// by keeping the lock until the end of the function scope.
state.clone()
};

state_cloned.create_physical_plan(&self.plan).await
}

/// Filter the DataFrame by column. Returns a new DataFrame only containing the
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,15 @@ impl TableProvider for ViewTable {

async fn scan(
&self,
ctx: &SessionState,
state: &SessionState,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
ctx.create_physical_plan(&self.logical_plan).await
// clone state and start_execution so that now() works in views
let mut state_cloned = state.clone();
state_cloned.execution_props.start_execution();
state_cloned.create_physical_plan(&self.logical_plan).await
}
}

Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,18 @@ impl SessionContext {
match (or_replace, view) {
(true, Ok(_)) => {
self.deregister_table(name.as_str())?;
let plan = self.optimize(&input)?;
let table =
Arc::new(ViewTable::try_new(plan.clone(), definition)?);
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
}
(_, Err(_)) => {
let plan = self.optimize(&input)?;
let table =
Arc::new(ViewTable::try_new(plan.clone(), definition)?);
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name.as_str(), table)?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ pub use datafusion_expr::{
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr,
nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif,
octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace,
reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, sha512,
signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
};
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ pub struct ColumnStatistics {

/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
/// Each `ExecutionPlan` is Partition-aware and is responsible for
/// Each `ExecutionPlan` is partition-aware and is responsible for
/// creating the actual `async` [`SendableRecordBatchStream`]s
/// of [`RecordBatch`] that incrementally compute the operator's
/// output from its input partition.
///
/// [`ExecutionPlan`] can be displayed in an simplified form using the
/// [`ExecutionPlan`] can be displayed in a simplified form using the
/// return value from [`displayable`] in addition to the (normally
/// quite verbose) `Debug` output.
pub trait ExecutionPlan: Debug + Send + Sync {
Expand Down Expand Up @@ -168,7 +168,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// The default implementation returns `true`
///
/// WARNING: if you override this default and return `false`, your
/// operator can not rely on datafusion preserving the input order
/// operator can not rely on DataFusion preserving the input order
/// as it will likely not.
fn relies_on_input_order(&self) -> bool {
true
Expand Down Expand Up @@ -200,7 +200,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// parallelism may outweigh any benefits
///
/// The default implementation returns `true` unless this operator
/// has signalled it requiers a single child input partition.
/// has signalled it requires a single child input partition.
fn benefits_from_input_partitioning(&self) -> bool {
// By default try to maximize parallelism with more CPUs if
// possible
Expand Down
100 changes: 81 additions & 19 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,31 +436,93 @@ async fn test_current_timestamp_expressions() -> Result<()> {
}

#[tokio::test]
async fn test_current_timestamp_expressions_non_optimized() -> Result<()> {
let t1 = chrono::Utc::now().timestamp();
async fn test_now_in_same_stmt_using_sql_function() -> Result<()> {
let ctx = SessionContext::new();
let sql = "SELECT NOW(), NOW() as t2";

let msg = format!("Creating logical plan for '{}'", sql);
let plan = ctx.create_logical_plan(sql).expect(&msg);
let df1 = ctx.sql("select now(), now() as now2").await?;
let result = result_vec(&df1.collect().await?);
assert_eq!(result[0][0], result[0][1]);

let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
Ok(())
}

let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let task_ctx = ctx.task_ctx();
let res = collect(plan, task_ctx).await.expect(&msg);
let actual = result_vec(&res);
#[tokio::test]
async fn test_now_across_statements() -> Result<()> {
let ctx = SessionContext::new();

let res1 = actual[0][0].as_str();
let res2 = actual[0][1].as_str();
let t3 = chrono::Utc::now().timestamp();
let t2_naive =
chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d %H:%M:%S%.6f").unwrap();
let actual1 = execute(&ctx, "SELECT NOW()").await;
let res1 = actual1[0][0].as_str();

let t2 = t2_naive.timestamp();
assert!(t1 <= t2 && t2 <= t3);
assert_eq!(res2, res1);
let actual2 = execute(&ctx, "SELECT NOW()").await;
let res2 = actual2[0][0].as_str();

assert!(res1 < res2);

Ok(())
}

#[tokio::test]
async fn test_now_across_statements_using_sql_function() -> Result<()> {
let ctx = SessionContext::new();

let df1 = ctx.sql("select now()").await?;
let rb1 = df1.collect().await?;
let result1 = result_vec(&rb1);
let res1 = result1[0][0].as_str();

let df2 = ctx.sql("select now()").await?;
let rb2 = df2.collect().await?;
let result2 = result_vec(&rb2);
let res2 = result2[0][0].as_str();

assert!(res1 < res2);

Ok(())
}

#[tokio::test]
async fn test_now_dataframe_api() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx.sql("select 1").await?; // use this to get a DataFrame
let df = df.select(vec![now(), now().alias("now2")])?;
let result = result_vec(&df.collect().await?);
assert_eq!(result[0][0], result[0][1]);

Ok(())
}

#[tokio::test]
async fn test_now_dataframe_api_across_statements() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx.sql("select 1").await?; // use this to get a DataFrame
let df = df.select(vec![now()])?;
let result = result_vec(&df.collect().await?);

let df = ctx.sql("select 1").await?;
let df = df.select(vec![now()])?;
let result2 = result_vec(&df.collect().await?);

assert_ne!(result[0][0], result2[0][0]);

Ok(())
}

#[tokio::test]
async fn test_now_in_view() -> Result<()> {
let ctx = SessionContext::new();
let _df = ctx
.sql("create or replace view test_now as select now()")
.await?
.collect()
.await?;

let df = ctx.sql("select * from test_now").await?;
let result = result_vec(&df.collect().await?);

let df1 = ctx.sql("select * from test_now").await?;
let result2 = result_vec(&df1.collect().await?);

assert_ne!(result[0][0], result2[0][0]);

Ok(())
}
Expand Down
12 changes: 9 additions & 3 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,8 @@ nary_scalar_expr!(Btrim, btrim);
//there is a func concat_ws before, so use concat_ws_expr as name.c
nary_scalar_expr!(ConcatWithSeparator, concat_ws_expr);
nary_scalar_expr!(Concat, concat_expr);
nary_scalar_expr!(Now, now_expr);

// date functions
unary_scalar_expr!(Now, now, "current time"); //TODO this is not a unary expression https://github.com/apache/arrow-datafusion/issues/3069
scalar_expr!(DatePart, date_part, part, date);
scalar_expr!(DateTrunc, date_trunc, part, date);
scalar_expr!(DateBin, date_bin, stride, source, origin);
Expand Down Expand Up @@ -398,6 +396,15 @@ pub fn coalesce(args: Vec<Expr>) -> Expr {
}
}

/// Returns current timestamp in nanoseconds, using the same value for all instances of now() in
/// same statement.
pub fn now() -> Expr {
Expr::ScalarFunction {
fun: BuiltinScalarFunction::Now,
args: vec![],
}
}

/// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
pub fn case(expr: Expr) -> CaseBuilder {
CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
Expand Down Expand Up @@ -564,7 +571,6 @@ mod test {
test_unary_scalar_expr!(Atan, atan);
test_unary_scalar_expr!(Floor, floor);
test_unary_scalar_expr!(Ceil, ceil);
test_unary_scalar_expr!(Now, now);
test_unary_scalar_expr!(Round, round);
test_unary_scalar_expr!(Trunc, trunc);
test_unary_scalar_expr!(Abs, abs);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
/// specified timestamp.
///
/// The semantics of `now()` require it to return the same value
/// whenever it is called in a query. This this value is chosen during
/// planning time and bound into a closure that
/// wherever it appears within a single statement. This value is
/// chosen during planning time.
pub fn make_now(
now_ts: DateTime<Utc>,
) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue> {
Expand Down
9 changes: 2 additions & 7 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_expr::{
character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, date_bin,
date_part, date_trunc, digest, exp, floor, from_unixtime, left, ln, log10, log2,
logical_plan::{PlanType, StringifiedPlan},
lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match,
lower, lpad, ltrim, md5, now, nullif, octet_length, power, random, regexp_match,
regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
Expand Down Expand Up @@ -1117,12 +1117,7 @@ pub fn parse_expr(
ScalarFunction::ToTimestampSeconds => {
Ok(to_timestamp_seconds(parse_expr(&args[0], registry)?))
}
ScalarFunction::Now => Ok(now_expr(
args.to_owned()
.iter()
.map(|expr| parse_expr(expr, registry))
.collect::<Result<Vec<_>, _>>()?,
)),
ScalarFunction::Now => Ok(now()),
ScalarFunction::Translate => Ok(translate(
parse_expr(&args[0], registry)?,
parse_expr(&args[1], registry)?,
Expand Down
5 changes: 3 additions & 2 deletions docs/source/user-guide/sql/scalar_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut
`extract(field FROM source)`

- The `extract` function retrieves subfields such as year or hour from date/time values.
`source` must be a value expression of type timestamp, Data32, or Data64. `field` is an identifier that selects what field to extract from the source value.
`source` must be a value expression of type timestamp, Date32, or Date64. `field` is an identifier that selects what field to extract from the source value.
The `extract` function returns values of type u32.
- `year` :`extract(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 2020`
- `month`:`extract(month FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 9`
Expand All @@ -273,7 +273,8 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut

### `now`

current time
Returns current time as `Timestamp(Nanoseconds, UTC)`. Returns same value for the function
wherever it appears in the statement, using a value chosen at planning time.

## Other Functions

Expand Down

0 comments on commit eedc787

Please sign in to comment.