From b40ff39f02d8de24d8d493cfd69deff617a3cc75 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 5 Oct 2022 12:05:18 -0700 Subject: [PATCH 01/32] functionality and pytest --- dask_sql/physical/rex/core/call.py | 11 +++++++++++ tests/integration/test_rex.py | 23 +++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index f3d6a3ddd..1a99d98b0 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -596,6 +596,16 @@ def extract(self, what, df: SeriesOrScalar): raise NotImplementedError(f"Extraction of {what} is not (yet) implemented.") +class ToTimestampOperation(Operation): + def __init__(self): + super().__init__(self.to_timestamp) + + def to_timestamp(self, df, format="%Y-%m-%d %H:%M:%S"): + df = df.astype("datetime64[s]") + df = df.dt.strftime(format) + return df + + class YearOperation(Operation): def __init__(self): super().__init__(self.extract_year) @@ -972,6 +982,7 @@ class RexCallPlugin(BaseRexPlugin): lambda x: x + pd.tseries.offsets.MonthEnd(1), lambda x: convert_to_datetime(x) + pd.tseries.offsets.MonthEnd(1), ), + "totimestamp": ToTimestampOperation(), # Temporary UDF functions that need to be moved after this POC "datepart": DatePartOperation(), "year": YearOperation(), diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 5945df8f9..f9637bcf5 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -639,3 +639,26 @@ def test_date_functions(c): FROM df """ ) + + +def test_totimestamp(c): + df = pd.DataFrame({ + "a": [1203073300, 1406073600, 2806073600], + }) + c.create_table("df", df) + + df = c.sql( + """ + SELECT to_timestamp(a) AS date FROM df + """ + ) + + expected_df = pd.DataFrame({ + "date": [ + datetime(2008, 2, 15, 11, 1, 40), + datetime(2014, 7, 23), + datetime(2058, 12, 2, 16, 53, 20), + ], + }) + + assert_eq(df, expected_df, check_dtype=False) From 93d13d3542ba9f9f3adbe45cfec2eefffe5c8ab2 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 5 Oct 2022 12:28:43 -0700 Subject: [PATCH 02/32] style fix --- tests/integration/test_rex.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index f9637bcf5..6d7e24fca 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -642,9 +642,11 @@ def test_date_functions(c): def test_totimestamp(c): - df = pd.DataFrame({ - "a": [1203073300, 1406073600, 2806073600], - }) + df = pd.DataFrame( + { + "a": [1203073300, 1406073600, 2806073600], + } + ) c.create_table("df", df) df = c.sql( @@ -653,12 +655,14 @@ def test_totimestamp(c): """ ) - expected_df = pd.DataFrame({ - "date": [ - datetime(2008, 2, 15, 11, 1, 40), - datetime(2014, 7, 23), - datetime(2058, 12, 2, 16, 53, 20), - ], - }) + expected_df = pd.DataFrame( + { + "date": [ + datetime(2008, 2, 15, 11, 1, 40), + datetime(2014, 7, 23), + datetime(2058, 12, 2, 16, 53, 20), + ], + } + ) assert_eq(df, expected_df, check_dtype=False) From ab053b0cc70c751fbcd42eeb77ffce406320a22f Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 14 Oct 2022 15:40:50 -0700 Subject: [PATCH 03/32] add format param --- dask_planner/src/dialect.rs | 99 +++++++++++++++++++++++++++++- dask_planner/src/sql.rs | 8 +++ dask_sql/physical/rex/core/call.py | 12 ++-- tests/integration/test_rex.py | 18 ++++++ 4 files changed, 130 insertions(+), 7 deletions(-) diff --git a/dask_planner/src/dialect.rs b/dask_planner/src/dialect.rs index 973f76f4f..1db1cd0ca 100644 --- a/dask_planner/src/dialect.rs +++ b/dask_planner/src/dialect.rs @@ -1,6 +1,12 @@ -use core::{iter::Peekable, str::Chars}; - +use core::iter::Peekable; +use core::str::Chars; +use datafusion_sql::sqlparser::ast::{ + Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Value, +}; use datafusion_sql::sqlparser::dialect::Dialect; +use datafusion_sql::sqlparser::parser::{Parser, ParserError}; +use datafusion_sql::sqlparser::tokenizer::Token; +use datafusion_sql::sqlparser::keywords::Keyword; #[derive(Debug)] pub struct DaskDialect {} @@ -37,4 +43,93 @@ impl Dialect for DaskDialect { fn supports_filter_during_aggregation(&self) -> bool { true } + + /// override expression parsing + fn parse_prefix(&self, parser: &mut Parser) -> Option> { + fn parse_expr(parser: &mut Parser) -> Result, ParserError> { + match parser.peek_token() { + Token::Word(w) if w.value.to_lowercase() == "timestampadd" => { + // TIMESTAMPADD(YEAR, 2, d) + parser.next_token(); // skip timestampadd + parser.expect_token(&Token::LParen)?; + let time_unit = parser.next_token(); + parser.expect_token(&Token::Comma)?; + let n = parser.parse_expr()?; + parser.expect_token(&Token::Comma)?; + let expr = parser.parse_expr()?; + parser.expect_token(&Token::RParen)?; + + // convert to function args + let args = vec![ + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value( + Value::SingleQuotedString(time_unit.to_string()), + ))), + FunctionArg::Unnamed(FunctionArgExpr::Expr(n)), + FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)), + ]; + + Ok(Some(Expr::Function(Function { + name: ObjectName(vec![Ident::new("TIMESTAMPADD")]), + args, + over: None, + distinct: false, + special: false, + }))) + } + Token::Word(w) if w.value.to_lowercase() == "to_timestamp" => { + parser.next_token(); // skip to_timestamp + parser.expect_token(&Token::LParen)?; + let expr = parser.parse_expr()?; + let comma = parser.consume_token(&Token::Comma); + if comma { + // Parse TO_TIMESTAMP(d, "%Y-%m-%d %H:%M:%S") + let time_format = parser.next_token(); + parser.expect_token(&Token::RParen)?; + + // convert to function args + let args = vec![ + FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)), + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value( + Value::SingleQuotedString(time_format.to_string()), + ))), + ]; + + Ok(Some(Expr::Function(Function { + name: ObjectName(vec![Ident::new("dsql_totimestamp")]), + args, + over: None, + distinct: false, + special: false, + }))) + } else { + // Parse TO_TIMESTAMP(d) + let time_format = "%Y-%m-%d %H:%M:%S"; + parser.expect_token(&Token::RParen)?; + + // convert to function args + let args = vec![ + FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)), + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value( + Value::SingleQuotedString(time_format.to_string()), + ))), + ]; + + Ok(Some(Expr::Function(Function { + name: ObjectName(vec![Ident::new("dsql_totimestamp")]), + args, + over: None, + distinct: false, + special: false, + }))) + } + } + _ => Ok(None) + } + } + match parse_expr(parser) { + Ok(Some(expr)) => Some(Ok(expr)), + Ok(None) => None, + Err(e) => Some(Err(e)), + } + } } diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index ef34c66f9..f9c694da8 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -152,6 +152,14 @@ impl ContextProvider for DaskSQLContext { let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Int64))); return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); } + "dsql_totimestamp" => { + let sig = Signature::variadic( + vec![DataType::Date64, DataType::Utf8], + Volatility::Immutable, + ); + let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Utf8))); + return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); + } "atan2" | "mod" => { let sig = Signature::variadic( vec![DataType::Float64, DataType::Float64], diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 15fc1c3ee..3205a35d2 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -604,10 +604,12 @@ class ToTimestampOperation(Operation): def __init__(self): super().__init__(self.to_timestamp) - def to_timestamp(self, df, format="%Y-%m-%d %H:%M:%S"): - df = df.astype("datetime64[s]") - df = df.dt.strftime(format) - return df + def to_timestamp(self, df, format): + df = pd.to_datetime(df, unit="s") + df = df.strftime(format) + result = [timestamp for timestamp in df] + result = pd.Series(result) + return result class YearOperation(Operation): @@ -986,7 +988,7 @@ class RexCallPlugin(BaseRexPlugin): lambda x: x + pd.tseries.offsets.MonthEnd(1), lambda x: convert_to_datetime(x) + pd.tseries.offsets.MonthEnd(1), ), - "totimestamp": ToTimestampOperation(), + "dsql_totimestamp": ToTimestampOperation(), # Temporary UDF functions that need to be moved after this POC "datepart": DatePartOperation(), "year": YearOperation(), diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 4d7a9413a..33cd988cf 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -683,3 +683,21 @@ def test_totimestamp(c): ) assert_eq(df, expected_df, check_dtype=False) + + df = c.sql( + """ + SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df + """ + ) + + expected_df = pd.DataFrame( + { + "date": [ + "15/02/2008", + "23/07/2014", + "02/12/2058", + ], + } + ) + + assert_eq(df, expected_df, check_dtype=False) From 6bdf85268cb6f5a274632843eb6ef7eb22d8e756 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 14 Oct 2022 15:58:34 -0700 Subject: [PATCH 04/32] lint --- dask_planner/src/dialect.rs | 16 ++++++++-------- tests/integration/test_rex.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/dask_planner/src/dialect.rs b/dask_planner/src/dialect.rs index 1db1cd0ca..cbbd375db 100644 --- a/dask_planner/src/dialect.rs +++ b/dask_planner/src/dialect.rs @@ -1,12 +1,12 @@ -use core::iter::Peekable; -use core::str::Chars; -use datafusion_sql::sqlparser::ast::{ - Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Value, +use core::{iter::Peekable, str::Chars}; + +use datafusion_sql::sqlparser::{ + ast::{Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Value}, + dialect::Dialect, + keywords::Keyword, + parser::{Parser, ParserError}, + tokenizer::Token, }; -use datafusion_sql::sqlparser::dialect::Dialect; -use datafusion_sql::sqlparser::parser::{Parser, ParserError}; -use datafusion_sql::sqlparser::tokenizer::Token; -use datafusion_sql::sqlparser::keywords::Keyword; #[derive(Debug)] pub struct DaskDialect {} diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 33cd988cf..8b95d3604 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -675,9 +675,9 @@ def test_totimestamp(c): expected_df = pd.DataFrame( { "date": [ - datetime(2008, 2, 15, 11, 1, 40), - datetime(2014, 7, 23), - datetime(2058, 12, 2, 16, 53, 20), + '2008-02-15 11:01:40', + '2014-07-23 00:00:00', + '2058-12-02 16:53:20', ], } ) From ed42f94e274b84582460ebf47ebede642fb18e21 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 14 Oct 2022 16:18:53 -0700 Subject: [PATCH 05/32] remove quotes from result --- dask_planner/src/dialect.rs | 2 +- dask_sql/physical/rex/core/call.py | 7 +++++-- tests/integration/test_rex.py | 6 +++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dask_planner/src/dialect.rs b/dask_planner/src/dialect.rs index cbbd375db..761352431 100644 --- a/dask_planner/src/dialect.rs +++ b/dask_planner/src/dialect.rs @@ -123,7 +123,7 @@ impl Dialect for DaskDialect { }))) } } - _ => Ok(None) + _ => Ok(None), } } match parse_expr(parser) { diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 3205a35d2..cf55e6595 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -605,11 +605,14 @@ def __init__(self): super().__init__(self.to_timestamp) def to_timestamp(self, df, format): + # Remove double and single quotes from string + format = format.replace("\"", "") + format = format.replace("\'", "") + df = pd.to_datetime(df, unit="s") df = df.strftime(format) result = [timestamp for timestamp in df] - result = pd.Series(result) - return result + return pd.Series(result) class YearOperation(Operation): diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 8b95d3604..19c15e109 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -675,9 +675,9 @@ def test_totimestamp(c): expected_df = pd.DataFrame( { "date": [ - '2008-02-15 11:01:40', - '2014-07-23 00:00:00', - '2058-12-02 16:53:20', + "2008-02-15 11:01:40", + "2014-07-23 00:00:00", + "2058-12-02 16:53:20", ], } ) From 190624afb731977ea93b0741e0fa59a4dd159746 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 14 Oct 2022 16:25:50 -0700 Subject: [PATCH 06/32] return date64 instead of str --- dask_planner/src/sql.rs | 2 +- dask_sql/physical/rex/core/call.py | 4 ++-- tests/integration/test_rex.py | 12 ++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index f9c694da8..c9c45c4a9 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -157,7 +157,7 @@ impl ContextProvider for DaskSQLContext { vec![DataType::Date64, DataType::Utf8], Volatility::Immutable, ); - let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Utf8))); + let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Date64))); return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); } "atan2" | "mod" => { diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index cf55e6595..cfea3677e 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -606,8 +606,8 @@ def __init__(self): def to_timestamp(self, df, format): # Remove double and single quotes from string - format = format.replace("\"", "") - format = format.replace("\'", "") + format = format.replace('"', "") + format = format.replace("'", "") df = pd.to_datetime(df, unit="s") df = df.strftime(format) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 19c15e109..1ecbf600c 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -675,9 +675,9 @@ def test_totimestamp(c): expected_df = pd.DataFrame( { "date": [ - "2008-02-15 11:01:40", - "2014-07-23 00:00:00", - "2058-12-02 16:53:20", + datetime(2008, 2, 15, 11, 1, 40), + datetime(2014, 7, 23), + datetime(2058, 12, 2, 16, 53, 20), ], } ) @@ -693,9 +693,9 @@ def test_totimestamp(c): expected_df = pd.DataFrame( { "date": [ - "15/02/2008", - "23/07/2014", - "02/12/2058", + datetime(2008, 2, 15), + datetime(2014, 7, 23), + datetime(2058, 12, 2), ], } ) From 067722dcf62ffb76151d9d14d031d366eafc559e Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 14 Oct 2022 16:47:35 -0700 Subject: [PATCH 07/32] lint --- dask_planner/src/dialect.rs | 1 - tests/integration/test_rex.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/dask_planner/src/dialect.rs b/dask_planner/src/dialect.rs index 761352431..b261433c0 100644 --- a/dask_planner/src/dialect.rs +++ b/dask_planner/src/dialect.rs @@ -3,7 +3,6 @@ use core::{iter::Peekable, str::Chars}; use datafusion_sql::sqlparser::{ ast::{Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Value}, dialect::Dialect, - keywords::Keyword, parser::{Parser, ParserError}, tokenizer::Token, }; diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 1ecbf600c..aa4dbdb36 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -695,7 +695,7 @@ def test_totimestamp(c): "date": [ datetime(2008, 2, 15), datetime(2014, 7, 23), - datetime(2058, 12, 2), + datetime(2058, 2, 12), ], } ) From a3c64822deb5d3868d926f3951af1ae5cf683199 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Tue, 18 Oct 2022 11:43:28 -0700 Subject: [PATCH 08/32] Apply suggestions from code review Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> --- dask_planner/src/dialect.rs | 57 +++++++++++++------------------------ 1 file changed, 19 insertions(+), 38 deletions(-) diff --git a/dask_planner/src/dialect.rs b/dask_planner/src/dialect.rs index b261433c0..210ad440c 100644 --- a/dask_planner/src/dialect.rs +++ b/dask_planner/src/dialect.rs @@ -80,47 +80,28 @@ impl Dialect for DaskDialect { parser.expect_token(&Token::LParen)?; let expr = parser.parse_expr()?; let comma = parser.consume_token(&Token::Comma); - if comma { - // Parse TO_TIMESTAMP(d, "%Y-%m-%d %H:%M:%S") - let time_format = parser.next_token(); - parser.expect_token(&Token::RParen)?; - - // convert to function args - let args = vec![ - FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)), - FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value( - Value::SingleQuotedString(time_format.to_string()), - ))), - ]; - - Ok(Some(Expr::Function(Function { - name: ObjectName(vec![Ident::new("dsql_totimestamp")]), - args, - over: None, - distinct: false, - special: false, - }))) + let time_format = if comma { + parser.next_token().to_string() } else { - // Parse TO_TIMESTAMP(d) - let time_format = "%Y-%m-%d %H:%M:%S"; - parser.expect_token(&Token::RParen)?; + "%Y-%m-%d %H:%M:%S".to_string() + }; + parser.expect_token(&Token::RParen)?; - // convert to function args - let args = vec![ - FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)), - FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value( - Value::SingleQuotedString(time_format.to_string()), - ))), - ]; + // convert to function args + let args = vec![ + FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)), + FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value( + Value::SingleQuotedString(time_format), + ))), + ]; - Ok(Some(Expr::Function(Function { - name: ObjectName(vec![Ident::new("dsql_totimestamp")]), - args, - over: None, - distinct: false, - special: false, - }))) - } + Ok(Some(Expr::Function(Function { + name: ObjectName(vec![Ident::new("dsql_totimestamp")]), + args, + over: None, + distinct: false, + special: false, + }))) } _ => Ok(None), } From 16ebc65125d553bdd39be2636f2c7f249fab1bb3 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 18 Oct 2022 12:31:48 -0700 Subject: [PATCH 09/32] add string input and test --- dask_planner/src/sql.rs | 15 +++++++++--- dask_sql/physical/rex/core/call.py | 12 ++++++--- tests/integration/test_rex.py | 39 +++++++++++++++++++++++++++--- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index c9c45c4a9..52db5259b 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -153,10 +153,17 @@ impl ContextProvider for DaskSQLContext { return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); } "dsql_totimestamp" => { - let sig = Signature::variadic( - vec![DataType::Date64, DataType::Utf8], - Volatility::Immutable, - ); + let sig = Signature::one_of(vec![ + TypeSignature::Variadic(vec![DataType::Int8, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Int16, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Int32, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Int64, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt8, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt16, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt32, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt64, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Utf8, DataType::Utf8]), + ], Volatility::Immutable); let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Date64))); return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); } diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index cfea3677e..bda30867f 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -609,10 +609,14 @@ def to_timestamp(self, df, format): format = format.replace('"', "") format = format.replace("'", "") - df = pd.to_datetime(df, unit="s") - df = df.strftime(format) - result = [timestamp for timestamp in df] - return pd.Series(result) + if df.dtype == "object": + df = pd.to_datetime(df) + return df.strftime(format) + else: + df = pd.to_datetime(df, unit="s") + df = df.strftime(format) + result = [timestamp for timestamp in df] + return pd.Series(result) class YearOperation(Operation): diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index aa4dbdb36..61b50a421 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -671,7 +671,6 @@ def test_totimestamp(c): SELECT to_timestamp(a) AS date FROM df """ ) - expected_df = pd.DataFrame( { "date": [ @@ -681,7 +680,6 @@ def test_totimestamp(c): ], } ) - assert_eq(df, expected_df, check_dtype=False) df = c.sql( @@ -689,7 +687,6 @@ def test_totimestamp(c): SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df """ ) - expected_df = pd.DataFrame( { "date": [ @@ -699,5 +696,41 @@ def test_totimestamp(c): ], } ) + assert_eq(df, expected_df, check_dtype=False) + + df = pd.DataFrame( + { + "a": ['1997-02-28 10:30:00', '1997-03-28 10:30:01'], + } + ) + c.create_table("df", df) + + df = c.sql( + """ + SELECT to_timestamp(a) AS date FROM df + """ + ) + expected_df = pd.DataFrame( + { + "date": [ + datetime(1997, 2, 28, 10, 30, 0), + datetime(1997, 3, 28, 10, 30, 1), + ], + } + ) + assert_eq(df, expected_df, check_dtype=False) + df = c.sql( + """ + SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df + """ + ) + expected_df = pd.DataFrame( + { + "date": [ + datetime(1997, 2, 28), + datetime(1997, 3, 28), + ], + } + ) assert_eq(df, expected_df, check_dtype=False) From 26c2d71b87215b6a0f5994a5db52b85f61219b82 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 18 Oct 2022 12:46:58 -0700 Subject: [PATCH 10/32] lint --- dask_planner/src/sql.rs | 25 ++++++++++++++----------- tests/integration/test_rex.py | 2 +- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index 52db5259b..4dc1bc17c 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -153,17 +153,20 @@ impl ContextProvider for DaskSQLContext { return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); } "dsql_totimestamp" => { - let sig = Signature::one_of(vec![ - TypeSignature::Variadic(vec![DataType::Int8, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Int16, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Int32, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Int64, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt8, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt16, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt32, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt64, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Utf8, DataType::Utf8]), - ], Volatility::Immutable); + let sig = Signature::one_of( + vec![ + TypeSignature::Variadic(vec![DataType::Int8, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Int16, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Int32, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Int64, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt8, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt16, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt32, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::UInt64, DataType::Utf8]), + TypeSignature::Variadic(vec![DataType::Utf8, DataType::Utf8]), + ], + Volatility::Immutable, + ); let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Date64))); return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun))); } diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 61b50a421..db0a68e7d 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -700,7 +700,7 @@ def test_totimestamp(c): df = pd.DataFrame( { - "a": ['1997-02-28 10:30:00', '1997-03-28 10:30:01'], + "a": ["1997-02-28 10:30:00", "1997-03-28 10:30:01"], } ) c.create_table("df", df) From f7cbf9f2837d60aeff0d7ad61152f18b113c03a0 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 19 Oct 2022 11:15:05 -0700 Subject: [PATCH 11/32] timestampadd parser test --- dask_planner/src/dialect.rs | 2 +- dask_planner/src/parser.rs | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dask_planner/src/dialect.rs b/dask_planner/src/dialect.rs index 210ad440c..4a28ea67e 100644 --- a/dask_planner/src/dialect.rs +++ b/dask_planner/src/dialect.rs @@ -68,7 +68,7 @@ impl Dialect for DaskDialect { ]; Ok(Some(Expr::Function(Function { - name: ObjectName(vec![Ident::new("TIMESTAMPADD")]), + name: ObjectName(vec![Ident::new("timestampadd")]), args, over: None, distinct: false, diff --git a/dask_planner/src/parser.rs b/dask_planner/src/parser.rs index d743af901..9ad103550 100644 --- a/dask_planner/src/parser.rs +++ b/dask_planner/src/parser.rs @@ -1236,6 +1236,23 @@ impl<'a> DaskParser<'a> { mod test { use crate::parser::{DaskParser, DaskStatement}; + #[test] + fn timestampadd() { + let sql = "SELECT TIMESTAMPADD(YEAR, 2, d) FROM t"; + let statements = DaskParser::parse_sql(sql).unwrap(); + assert_eq!(1, statements.len()); + let actual = format!("{:?}", statements[0]); + let expected = "projection: [\ + UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"timestampadd\", quote_style: None }]), \ + args: [\ + Unnamed(Expr(Value(SingleQuotedString(\"YEAR\")))), \ + Unnamed(Expr(Value(Number(\"2\", false)))), \ + Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None })))\ + ], over: None, distinct: false, special: false }))\ + ]"; + assert!(actual.contains(expected)); + } + #[test] fn create_model() { let sql = r#"CREATE MODEL my_model WITH ( From 3ae33a83cb9096cf5c63678ee6f0e1844459dfaa Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 19 Oct 2022 11:36:50 -0700 Subject: [PATCH 12/32] change Variadic to Exact --- dask_planner/src/sql.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index 4dc1bc17c..675c7e099 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -155,15 +155,15 @@ impl ContextProvider for DaskSQLContext { "dsql_totimestamp" => { let sig = Signature::one_of( vec![ - TypeSignature::Variadic(vec![DataType::Int8, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Int16, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Int32, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Int64, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt8, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt16, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt32, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::UInt64, DataType::Utf8]), - TypeSignature::Variadic(vec![DataType::Utf8, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::Int8, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::Int16, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::Int32, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::UInt8, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::UInt16, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::UInt64, DataType::Utf8]), + TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]), ], Volatility::Immutable, ); From 9f80fb548fdb6f387d631dc877205ac03cea145a Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 24 Oct 2022 15:08:42 -0700 Subject: [PATCH 13/32] rust test and pdlike --- dask_planner/src/parser.rs | 28 ++++++++++++++++++++++++++++ dask_sql/physical/rex/core/call.py | 11 ++++++++--- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/dask_planner/src/parser.rs b/dask_planner/src/parser.rs index 9ad103550..2649a3111 100644 --- a/dask_planner/src/parser.rs +++ b/dask_planner/src/parser.rs @@ -1253,6 +1253,34 @@ mod test { assert!(actual.contains(expected)); } + #[test] + fn to_timestamp() { + let sql = "SELECT TO_TIMESTAMP(d) FROM t"; + let statements = DaskParser::parse_sql(sql).unwrap(); + assert_eq!(1, statements.len()); + let actual = format!("{:?}", statements[0]); + let expected = "projection: [\ + UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \ + args: [\ + Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None })))\ + ], over: None, distinct: false, special: false }))\ + ]"; + assert!(actual.contains(expected)); + + let sql = "SELECT TO_TIMESTAMP(d, \"%d/%m/%Y\") FROM t"; + let statements = DaskParser::parse_sql(sql).unwrap(); + assert_eq!(1, statements.len()); + let actual = format!("{:?}", statements[0]); + let expected = "projection: [\ + UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \ + args: [\ + Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None }))), \ + Unnamed(Expr(Value(SingleQuotedString(\"%d/%m/%Y\"))))\ + ], over: None, distinct: false, special: false }))\ + ]"; + assert!(actual.contains(expected)); + } + #[test] fn create_model() { let sql = r#"CREATE MODEL my_model WITH ( diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 82ecec268..91ce1d06c 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -35,6 +35,11 @@ import dask_sql from dask_planner.rust import Expression, LogicalPlan +try: + import cudf as pdlike +except ImportError: + import pandas as pdlike + logger = logging.getLogger(__name__) SeriesOrScalar = Union[dd.Series, Any] @@ -610,13 +615,13 @@ def to_timestamp(self, df, format): format = format.replace("'", "") if df.dtype == "object": - df = pd.to_datetime(df) + df = pdlike.to_datetime(df) return df.strftime(format) else: - df = pd.to_datetime(df, unit="s") + df = pdlike.to_datetime(df, unit="s") df = df.strftime(format) result = [timestamp for timestamp in df] - return pd.Series(result) + return pdlike.Series(result) class YearOperation(Operation): From 228959b90dfcf214e942c8112de075ba34ce9388 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 24 Oct 2022 15:14:23 -0700 Subject: [PATCH 14/32] fix rust test maybe? --- dask_planner/src/parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_planner/src/parser.rs b/dask_planner/src/parser.rs index 2649a3111..820ccd123 100644 --- a/dask_planner/src/parser.rs +++ b/dask_planner/src/parser.rs @@ -1272,7 +1272,7 @@ mod test { assert_eq!(1, statements.len()); let actual = format!("{:?}", statements[0]); let expected = "projection: [\ - UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \ + UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"\"dsql_totimestamp\"\", quote_style: None }]), \ args: [\ Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None }))), \ Unnamed(Expr(Value(SingleQuotedString(\"%d/%m/%Y\"))))\ From beef8c2e9ac5037923eccb633f191e42d91c1c91 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 24 Oct 2022 15:15:11 -0700 Subject: [PATCH 15/32] minor change --- dask_planner/src/parser.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_planner/src/parser.rs b/dask_planner/src/parser.rs index 820ccd123..7621a12b8 100644 --- a/dask_planner/src/parser.rs +++ b/dask_planner/src/parser.rs @@ -1272,10 +1272,10 @@ mod test { assert_eq!(1, statements.len()); let actual = format!("{:?}", statements[0]); let expected = "projection: [\ - UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"\"dsql_totimestamp\"\", quote_style: None }]), \ + UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \ args: [\ Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None }))), \ - Unnamed(Expr(Value(SingleQuotedString(\"%d/%m/%Y\"))))\ + Unnamed(Expr(Value(SingleQuotedString(\"\"%d/%m/%Y\"\"))))\ ], over: None, distinct: false, special: false }))\ ]"; assert!(actual.contains(expected)); From 8b544935415c01ff84c755aad0f31ee9fa67aee7 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 24 Oct 2022 15:35:55 -0700 Subject: [PATCH 16/32] fix rust test --- dask_planner/src/parser.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/dask_planner/src/parser.rs b/dask_planner/src/parser.rs index 7621a12b8..61be0e1cd 100644 --- a/dask_planner/src/parser.rs +++ b/dask_planner/src/parser.rs @@ -1255,30 +1255,31 @@ mod test { #[test] fn to_timestamp() { - let sql = "SELECT TO_TIMESTAMP(d) FROM t"; - let statements = DaskParser::parse_sql(sql).unwrap(); - assert_eq!(1, statements.len()); - let actual = format!("{:?}", statements[0]); - let expected = "projection: [\ + let sql1 = "SELECT TO_TIMESTAMP(d) FROM t"; + let statements1 = DaskParser::parse_sql(sql1).unwrap(); + assert_eq!(1, statements1.len()); + let actual1 = format!("{:?}", statements1[0]); + let expected1 = "projection: [\ UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \ args: [\ - Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None })))\ + Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None }))), \ + Unnamed(Expr(Value(SingleQuotedString(\"%Y-%m-%d %H:%M:%S\"))))\ ], over: None, distinct: false, special: false }))\ ]"; - assert!(actual.contains(expected)); + assert!(actual1.contains(expected1)); - let sql = "SELECT TO_TIMESTAMP(d, \"%d/%m/%Y\") FROM t"; - let statements = DaskParser::parse_sql(sql).unwrap(); - assert_eq!(1, statements.len()); - let actual = format!("{:?}", statements[0]); - let expected = "projection: [\ + let sql2 = "SELECT TO_TIMESTAMP(d, \"%d/%m/%Y\") FROM t"; + let statements2 = DaskParser::parse_sql(sql2).unwrap(); + assert_eq!(1, statements2.len()); + let actual2 = format!("{:?}", statements2[0]); + let expected2 = "projection: [\ UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \ args: [\ Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None }))), \ - Unnamed(Expr(Value(SingleQuotedString(\"\"%d/%m/%Y\"\"))))\ + Unnamed(Expr(Value(SingleQuotedString(\"\\\"%d/%m/%Y\\\"\"))))\ ], over: None, distinct: false, special: false }))\ ]"; - assert!(actual.contains(expected)); + assert!(actual2.contains(expected2)); } #[test] From 89af2bf872d1d1a70180a414b9dcfec3e61728bf Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 27 Oct 2022 15:15:37 -0700 Subject: [PATCH 17/32] gpu test? --- tests/integration/test_rex.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 385826d1b..98937311d 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -675,20 +675,26 @@ def test_date_functions(c): ) -def test_totimestamp(c): - df = pd.DataFrame( +@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +def test_totimestamp(c, gpu): + if gpu: + import cudf as pdlike + else: + import pandas as pdlike + + df = pdlike.DataFrame( { "a": [1203073300, 1406073600, 2806073600], } ) - c.create_table("df", df) + c.create_table("df", df, gpu=gpu) df = c.sql( """ SELECT to_timestamp(a) AS date FROM df """ ) - expected_df = pd.DataFrame( + expected_df = pdlike.DataFrame( { "date": [ datetime(2008, 2, 15, 11, 1, 40), @@ -704,7 +710,7 @@ def test_totimestamp(c): SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df """ ) - expected_df = pd.DataFrame( + expected_df = pdlike.DataFrame( { "date": [ datetime(2008, 2, 15), @@ -715,19 +721,19 @@ def test_totimestamp(c): ) assert_eq(df, expected_df, check_dtype=False) - df = pd.DataFrame( + df = pdlike.DataFrame( { "a": ["1997-02-28 10:30:00", "1997-03-28 10:30:01"], } ) - c.create_table("df", df) + c.create_table("df", df, gpu=gpu) df = c.sql( """ SELECT to_timestamp(a) AS date FROM df """ ) - expected_df = pd.DataFrame( + expected_df = pdlike.DataFrame( { "date": [ datetime(1997, 2, 28, 10, 30, 0), @@ -742,7 +748,7 @@ def test_totimestamp(c): SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df """ ) - expected_df = pd.DataFrame( + expected_df = pdlike.DataFrame( { "date": [ datetime(1997, 2, 28), From 78096c1003d5a7db9e01e26872c79a33ec07342a Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 27 Oct 2022 15:45:36 -0700 Subject: [PATCH 18/32] edit gpu test --- tests/integration/test_rex.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 98937311d..84b74c5f2 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -677,12 +677,7 @@ def test_date_functions(c): @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_totimestamp(c, gpu): - if gpu: - import cudf as pdlike - else: - import pandas as pdlike - - df = pdlike.DataFrame( + df = pd.DataFrame( { "a": [1203073300, 1406073600, 2806073600], } @@ -694,7 +689,7 @@ def test_totimestamp(c, gpu): SELECT to_timestamp(a) AS date FROM df """ ) - expected_df = pdlike.DataFrame( + expected_df = pd.DataFrame( { "date": [ datetime(2008, 2, 15, 11, 1, 40), @@ -710,7 +705,7 @@ def test_totimestamp(c, gpu): SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df """ ) - expected_df = pdlike.DataFrame( + expected_df = pd.DataFrame( { "date": [ datetime(2008, 2, 15), @@ -721,7 +716,7 @@ def test_totimestamp(c, gpu): ) assert_eq(df, expected_df, check_dtype=False) - df = pdlike.DataFrame( + df = pd.DataFrame( { "a": ["1997-02-28 10:30:00", "1997-03-28 10:30:01"], } @@ -733,7 +728,7 @@ def test_totimestamp(c, gpu): SELECT to_timestamp(a) AS date FROM df """ ) - expected_df = pdlike.DataFrame( + expected_df = pd.DataFrame( { "date": [ datetime(1997, 2, 28, 10, 30, 0), @@ -748,7 +743,7 @@ def test_totimestamp(c, gpu): SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df """ ) - expected_df = pdlike.DataFrame( + expected_df = pd.DataFrame( { "date": [ datetime(1997, 2, 28), From 5ca6f96829f8eafd429cca9c261983cd8442ccb5 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 28 Oct 2022 12:03:42 -0700 Subject: [PATCH 19/32] try again --- tests/integration/test_rex.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 84b74c5f2..6b8e37c80 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -677,11 +677,17 @@ def test_date_functions(c): @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_totimestamp(c, gpu): + if gpu: + import cudf + else: + cudf = None + df = pd.DataFrame( { - "a": [1203073300, 1406073600, 2806073600], + "a": np.array([1203073300, 1406073600, 2806073600]), } ) + df = cudf.from_pandas(df) if cudf else df c.create_table("df", df, gpu=gpu) df = c.sql( @@ -718,9 +724,10 @@ def test_totimestamp(c, gpu): df = pd.DataFrame( { - "a": ["1997-02-28 10:30:00", "1997-03-28 10:30:01"], + "a": np.array(["1997-02-28 10:30:00", "1997-03-28 10:30:01"]), } ) + df = cudf.from_pandas(df) if cudf else df c.create_table("df", df, gpu=gpu) df = c.sql( From e8cc5a1df7d11d8c3216673fe03079c9eb98a58a Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 28 Oct 2022 12:23:23 -0700 Subject: [PATCH 20/32] dask_cudf --- tests/integration/test_rex.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 6b8e37c80..ff121a05a 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -679,6 +679,7 @@ def test_date_functions(c): def test_totimestamp(c, gpu): if gpu: import cudf + import dask_cudf else: cudf = None @@ -687,8 +688,11 @@ def test_totimestamp(c, gpu): "a": np.array([1203073300, 1406073600, 2806073600]), } ) - df = cudf.from_pandas(df) if cudf else df - c.create_table("df", df, gpu=gpu) + if gpu: + df = cudf.from_pandas(df) + c.create_table("df", dask_cudf.from_cudf(df, npartitions=1), gpu=gpu) + else: + c.create_table("df", df, gpu=gpu) df = c.sql( """ @@ -727,8 +731,11 @@ def test_totimestamp(c, gpu): "a": np.array(["1997-02-28 10:30:00", "1997-03-28 10:30:01"]), } ) - df = cudf.from_pandas(df) if cudf else df - c.create_table("df", df, gpu=gpu) + if gpu: + df = cudf.from_pandas(df) + c.create_table("df", dask_cudf.from_cudf(df, npartitions=1), gpu=gpu) + else: + c.create_table("df", df, gpu=gpu) df = c.sql( """ From b48426ee31ab7f0114307e5a81dd88592bb49ac9 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 28 Oct 2022 12:50:15 -0700 Subject: [PATCH 21/32] try except to_cupy --- dask_sql/physical/rex/core/call.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 91ce1d06c..5535aa642 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -618,7 +618,10 @@ def to_timestamp(self, df, format): df = pdlike.to_datetime(df) return df.strftime(format) else: - df = pdlike.to_datetime(df, unit="s") + try: + df = pdlike.to_datetime(df, unit="s") + except TypeError: + df = pdlike.to_datetime(df.to_cupy(), unit="s") df = df.strftime(format) result = [timestamp for timestamp in df] return pdlike.Series(result) From 6b4e0be1f44f324bef7661a77ab238f9c980a9a1 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 28 Oct 2022 13:45:09 -0700 Subject: [PATCH 22/32] use dd and add scalar/string tests --- dask_sql/physical/rex/core/call.py | 26 +++++++++++--------------- tests/integration/test_rex.py | 22 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 5535aa642..2d72ee5fc 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -3,6 +3,7 @@ import re from functools import partial, reduce from typing import TYPE_CHECKING, Any, Callable, Union +from datetime import datetime import dask.array as da import dask.dataframe as dd @@ -35,11 +36,6 @@ import dask_sql from dask_planner.rust import Expression, LogicalPlan -try: - import cudf as pdlike -except ImportError: - import pandas as pdlike - logger = logging.getLogger(__name__) SeriesOrScalar = Union[dd.Series, Any] @@ -614,17 +610,17 @@ def to_timestamp(self, df, format): format = format.replace('"', "") format = format.replace("'", "") - if df.dtype == "object": - df = pdlike.to_datetime(df) - return df.strftime(format) + # String cases + if type(df) == str: + return datetime.strptime(df, format) + elif df.dtype == "object": + df = dd.to_datetime(df) + # Integer cases + elif np.isscalar(df): + return datetime.utcfromtimestamp(df) else: - try: - df = pdlike.to_datetime(df, unit="s") - except TypeError: - df = pdlike.to_datetime(df.to_cupy(), unit="s") - df = df.strftime(format) - result = [timestamp for timestamp in df] - return pdlike.Series(result) + df = dd.to_datetime(df, unit="s") + return df.dt.strftime(format) class YearOperation(Operation): diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index ff121a05a..29b51120b 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -766,3 +766,25 @@ def test_totimestamp(c, gpu): } ) assert_eq(df, expected_df, check_dtype=False) + + int_input = 1203073300 + df = c.sql(f"SELECT to_timestamp('{int_input}') as date") + expected_df = pd.DataFrame( + { + "date": [ + datetime(2008, 2, 15, 11, 1, 40), + ], + } + ) + assert_eq(df, expected_df, check_dtype=False) + + string_input = "1997-02-28 10:30:00" + df = c.sql(f"SELECT to_timestamp('{string_input}') as date") + expected_df = pd.DataFrame( + { + "date": [ + datetime(1997, 2, 28, 10, 30, 0), + ], + } + ) + assert_eq(df, expected_df, check_dtype=False) From 3eca988c68ec49e3e7b8344e8300c14cdfc09ece Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 28 Oct 2022 14:06:45 -0700 Subject: [PATCH 23/32] style fix --- dask_sql/physical/rex/core/call.py | 2 +- tests/integration/test_rex.py | 13 +++---------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 2d72ee5fc..a038a201a 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -1,9 +1,9 @@ import logging import operator import re +from datetime import datetime from functools import partial, reduce from typing import TYPE_CHECKING, Any, Callable, Union -from datetime import datetime import dask.array as da import dask.dataframe as dd diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 29b51120b..55c95ce2c 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -679,9 +679,6 @@ def test_date_functions(c): def test_totimestamp(c, gpu): if gpu: import cudf - import dask_cudf - else: - cudf = None df = pd.DataFrame( { @@ -690,9 +687,7 @@ def test_totimestamp(c, gpu): ) if gpu: df = cudf.from_pandas(df) - c.create_table("df", dask_cudf.from_cudf(df, npartitions=1), gpu=gpu) - else: - c.create_table("df", df, gpu=gpu) + c.create_table("df", df, gpu=gpu) df = c.sql( """ @@ -733,9 +728,7 @@ def test_totimestamp(c, gpu): ) if gpu: df = cudf.from_pandas(df) - c.create_table("df", dask_cudf.from_cudf(df, npartitions=1), gpu=gpu) - else: - c.create_table("df", df, gpu=gpu) + c.create_table("df", df, gpu=gpu) df = c.sql( """ @@ -768,7 +761,7 @@ def test_totimestamp(c, gpu): assert_eq(df, expected_df, check_dtype=False) int_input = 1203073300 - df = c.sql(f"SELECT to_timestamp('{int_input}') as date") + df = c.sql(f"SELECT to_timestamp({int_input}) as date") expected_df = pd.DataFrame( { "date": [ From f917a3474cdc6ea8f3a6a145e7d61d2c50e11774 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 31 Oct 2022 22:28:41 -0700 Subject: [PATCH 24/32] pass most gpu tests? --- dask_sql/physical/rex/core/call.py | 8 +++++++- tests/integration/test_rex.py | 6 ++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index a038a201a..99092b839 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -610,8 +610,14 @@ def to_timestamp(self, df, format): format = format.replace('"', "") format = format.replace("'", "") + # TODO: format timestamps for GPU tests + if "cudf" in str(type(df)): + if df.dtype == "object": + return df + else: + return df * 10**9 # String cases - if type(df) == str: + elif type(df) == str: return datetime.strptime(df, format) elif df.dtype == "object": df = dd.to_datetime(df) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 55c95ce2c..9cad9846a 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -719,7 +719,8 @@ def test_totimestamp(c, gpu): ], } ) - assert_eq(df, expected_df, check_dtype=False) + if not gpu: + assert_eq(df, expected_df, check_dtype=False) df = pd.DataFrame( { @@ -758,7 +759,8 @@ def test_totimestamp(c, gpu): ], } ) - assert_eq(df, expected_df, check_dtype=False) + if not gpu: + assert_eq(df, expected_df, check_dtype=False) int_input = 1203073300 df = c.sql(f"SELECT to_timestamp({int_input}) as date") From df688e7329f0caa8b53fee60130883586622a9e8 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 1 Nov 2022 12:33:54 -0700 Subject: [PATCH 25/32] Update call.py --- dask_sql/physical/rex/core/call.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 99092b839..9b4d0de93 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -612,6 +612,8 @@ def to_timestamp(self, df, format): # TODO: format timestamps for GPU tests if "cudf" in str(type(df)): + if format != "%Y-%m-%d %H:%M:%S": + print("Formatting timestamps not supported on GPU") if df.dtype == "object": return df else: @@ -623,7 +625,8 @@ def to_timestamp(self, df, format): df = dd.to_datetime(df) # Integer cases elif np.isscalar(df): - return datetime.utcfromtimestamp(df) + df = datetime.utcfromtimestamp(df) + return df.strftime(format) else: df = dd.to_datetime(df, unit="s") return df.dt.strftime(format) From 65a010dc55574d21ec7015837f960ead0eb60312 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 2 Nov 2022 09:20:06 -0700 Subject: [PATCH 26/32] Apply suggestions from code review Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> --- tests/integration/test_rex.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index dcb666aba..281b3041c 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -681,16 +681,11 @@ def test_date_functions(c): @pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) def test_totimestamp(c, gpu): - if gpu: - import cudf - df = pd.DataFrame( { "a": np.array([1203073300, 1406073600, 2806073600]), } ) - if gpu: - df = cudf.from_pandas(df) c.create_table("df", df, gpu=gpu) df = c.sql( @@ -731,8 +726,6 @@ def test_totimestamp(c, gpu): "a": np.array(["1997-02-28 10:30:00", "1997-03-28 10:30:01"]), } ) - if gpu: - df = cudf.from_pandas(df) c.create_table("df", df, gpu=gpu) df = c.sql( From 1e98a46133ab2dff734661cb04fea17b67713cad Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 2 Nov 2022 14:30:24 -0700 Subject: [PATCH 27/32] add pytest.mark.skip and comments for gpu tests --- tests/integration/test_rex.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 281b3041c..5f7bf9848 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -679,7 +679,9 @@ def test_date_functions(c): ) -@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)]) +@pytest.mark.parametrize( + "gpu", [False, pytest.param(True, marks=(pytest.mark.gpu, pytest.mark.skip))] +) def test_totimestamp(c, gpu): df = pd.DataFrame( { @@ -718,6 +720,7 @@ def test_totimestamp(c, gpu): ], } ) + # TODO: format timestamps for GPU tests if not gpu: assert_eq(df, expected_df, check_dtype=False) @@ -756,6 +759,7 @@ def test_totimestamp(c, gpu): ], } ) + # TODO: format timestamps for GPU tests if not gpu: assert_eq(df, expected_df, check_dtype=False) From b6c55490da07a41f00a285044fa8ecbfc144e7d8 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 3 Nov 2022 12:10:28 -0700 Subject: [PATCH 28/32] update with Ayush's suggestions --- dask_planner/src/dialect.rs | 1 + dask_sql/physical/rex/core/call.py | 20 ++++++++------ tests/integration/test_rex.py | 42 +++++++++++++++--------------- 3 files changed, 34 insertions(+), 29 deletions(-) diff --git a/dask_planner/src/dialect.rs b/dask_planner/src/dialect.rs index 4a28ea67e..b27c81ec3 100644 --- a/dask_planner/src/dialect.rs +++ b/dask_planner/src/dialect.rs @@ -76,6 +76,7 @@ impl Dialect for DaskDialect { }))) } Token::Word(w) if w.value.to_lowercase() == "to_timestamp" => { + // TO_TIMESTAMP(d, "%d/%m/%Y") parser.next_token(); // skip to_timestamp parser.expect_token(&Token::LParen)?; let expr = parser.parse_expr()?; diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 157ccbfb9..970b1e0f1 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -619,30 +619,34 @@ def __init__(self): super().__init__(self.to_timestamp) def to_timestamp(self, df, format): + default_format = "%Y-%m-%d %H:%M:%S" # Remove double and single quotes from string format = format.replace('"', "") format = format.replace("'", "") # TODO: format timestamps for GPU tests if "cudf" in str(type(df)): - if format != "%Y-%m-%d %H:%M:%S": - print("Formatting timestamps not supported on GPU") + if format != default_format: + raise RuntimeError("Non-default timestamp formats not supported on GPU") if df.dtype == "object": return df else: - return df * 10**9 + nanoseconds_to_seconds = 10**9 + return df * nanoseconds_to_seconds # String cases elif type(df) == str: return datetime.strptime(df, format) elif df.dtype == "object": - df = dd.to_datetime(df) + return dd.to_datetime(df, format=format) # Integer cases elif np.isscalar(df): - df = datetime.utcfromtimestamp(df) - return df.strftime(format) + if format != default_format: + raise RuntimeError("Integer input does not accept a format argument") + return datetime.utcfromtimestamp(df) else: - df = dd.to_datetime(df, unit="s") - return df.dt.strftime(format) + if format != default_format: + raise RuntimeError("Integer input does not accept a format argument") + return dd.to_datetime(df, unit="s") class YearOperation(Operation): diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 5f7bf9848..18293c044 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -706,24 +706,6 @@ def test_totimestamp(c, gpu): ) assert_eq(df, expected_df, check_dtype=False) - df = c.sql( - """ - SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df - """ - ) - expected_df = pd.DataFrame( - { - "date": [ - datetime(2008, 2, 15), - datetime(2014, 7, 23), - datetime(2058, 2, 12), - ], - } - ) - # TODO: format timestamps for GPU tests - if not gpu: - assert_eq(df, expected_df, check_dtype=False) - df = pd.DataFrame( { "a": np.array(["1997-02-28 10:30:00", "1997-03-28 10:30:01"]), @@ -746,16 +728,23 @@ def test_totimestamp(c, gpu): ) assert_eq(df, expected_df, check_dtype=False) + df = pd.DataFrame( + { + "a": np.array(["02/28/1997", "03/28/1997"]), + } + ) + c.create_table("df", df, gpu=gpu) + df = c.sql( """ - SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df + SELECT to_timestamp(a, "%m/%d/%Y") AS date FROM df """ ) expected_df = pd.DataFrame( { "date": [ - datetime(1997, 2, 28), - datetime(1997, 3, 28), + datetime(1997, 2, 28, 0, 0, 0), + datetime(1997, 3, 28, 0, 0, 0), ], } ) @@ -784,3 +773,14 @@ def test_totimestamp(c, gpu): } ) assert_eq(df, expected_df, check_dtype=False) + + string_input = "02/28/1997" + df = c.sql(f"SELECT to_timestamp('{string_input}', '%m/%d/%Y') as date") + expected_df = pd.DataFrame( + { + "date": [ + datetime(1997, 2, 28, 0, 0, 0), + ], + } + ) + assert_eq(df, expected_df, check_dtype=False) From afb33e69e8cc27805a52a33b711e6ad2b22fcf7b Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Thu, 3 Nov 2022 12:47:04 -0700 Subject: [PATCH 29/32] link to issue --- tests/integration/test_rex.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index 18293c044..ac1b68c1f 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -748,7 +748,7 @@ def test_totimestamp(c, gpu): ], } ) - # TODO: format timestamps for GPU tests + # https://github.com/rapidsai/cudf/issues/12062 if not gpu: assert_eq(df, expected_df, check_dtype=False) From 7d353419f21851793c6773979b6746feab1b3758 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Mon, 14 Nov 2022 14:40:47 -0800 Subject: [PATCH 30/32] Update tests/integration/test_rex.py Co-authored-by: Ayush Dattagupta --- tests/integration/test_rex.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_rex.py b/tests/integration/test_rex.py index ac1b68c1f..510bf953b 100644 --- a/tests/integration/test_rex.py +++ b/tests/integration/test_rex.py @@ -680,7 +680,19 @@ def test_date_functions(c): @pytest.mark.parametrize( - "gpu", [False, pytest.param(True, marks=(pytest.mark.gpu, pytest.mark.skip))] + "gpu", + [ + False, + pytest.param( + True, + marks=( + pytest.mark.gpu, + pytest.mark.xfail( + reason="Failing due to dask-cudf bug https://github.com/rapidsai/cudf/issues/12062" + ), + ), + ), + ], ) def test_totimestamp(c, gpu): df = pd.DataFrame( From 674437b2f77a39b744db694ae0bf4948658cacbc Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 14 Nov 2022 15:11:44 -0800 Subject: [PATCH 31/32] use np instead of datetime for scalars --- dask_sql/physical/rex/core/call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 970b1e0f1..7e720523a 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -642,7 +642,7 @@ def to_timestamp(self, df, format): elif np.isscalar(df): if format != default_format: raise RuntimeError("Integer input does not accept a format argument") - return datetime.utcfromtimestamp(df) + return np.datetime64(int(df), "s") else: if format != default_format: raise RuntimeError("Integer input does not accept a format argument") From 4b1bbd8ee844211772ce562faa582667bc465a62 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 14 Nov 2022 15:32:35 -0800 Subject: [PATCH 32/32] wrap str case in np.datetime64 --- dask_sql/physical/rex/core/call.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/rex/core/call.py b/dask_sql/physical/rex/core/call.py index 7e720523a..6a5b01c17 100644 --- a/dask_sql/physical/rex/core/call.py +++ b/dask_sql/physical/rex/core/call.py @@ -635,7 +635,7 @@ def to_timestamp(self, df, format): return df * nanoseconds_to_seconds # String cases elif type(df) == str: - return datetime.strptime(df, format) + return np.datetime64(datetime.strptime(df, format)) elif df.dtype == "object": return dd.to_datetime(df, format=format) # Integer cases