Skip to content

Commit

Permalink
feat: parse string into timestamp with timezone
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Jan 17, 2024
1 parent ecab6f6 commit 4358217
Show file tree
Hide file tree
Showing 16 changed files with 350 additions and 63 deletions.
3 changes: 2 additions & 1 deletion src/common/function/src/scalars/timestamp/to_unixtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ pub struct ToUnixtimeFunction;
const NAME: &str = "to_unixtime";

fn convert_to_seconds(arg: &str) -> Option<i64> {
// FIXME(dennis): use timezone in function context
if let Ok(dt) = DateTime::from_str(arg) {
return Some(dt.val() / 1000);
}

if let Ok(ts) = Timestamp::from_str(arg) {
if let Ok(ts) = Timestamp::from_str_utc(arg) {
return Some(ts.split().0);
}

Expand Down
59 changes: 36 additions & 23 deletions src/common/time/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ use core::default::Default;
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::time::Duration;

use arrow::datatypes::TimeUnit as ArrowTimeUnit;
use chrono::{
DateTime, Days, Months, NaiveDate, NaiveDateTime, NaiveTime, TimeZone as ChronoTimeZone, Utc,
DateTime, Days, LocalResult, Months, NaiveDate, NaiveDateTime, NaiveTime,
TimeZone as ChronoTimeZone, Utc,
};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use crate::error::{ArithmeticOverflowSnafu, Error, ParseTimestampSnafu, TimestampOverflowSnafu};
use crate::error::{ArithmeticOverflowSnafu, ParseTimestampSnafu, Result, TimestampOverflowSnafu};
use crate::timezone::{get_timezone, Timezone};
use crate::util::div_ceil;
use crate::util::{datetime_to_utc, div_ceil};
use crate::{error, Interval};

/// Timestamp represents the value of units(seconds/milliseconds/microseconds/nanoseconds) elapsed
Expand Down Expand Up @@ -352,24 +352,26 @@ impl Timestamp {
pub fn from_chrono_date(date: NaiveDate) -> Option<Self> {
Timestamp::from_chrono_datetime(date.and_time(NaiveTime::default()))
}
}

impl FromStr for Timestamp {
type Err = Error;
/// Accepts a string in RFC3339 / ISO8601 standard format and some variants and converts it to a nanosecond precision timestamp.
/// It no timezone specified in string, it cast to nanosecond epoch timestamp in UTC.
pub fn from_str_utc(s: &str) -> Result<Self> {
Self::from_str(s, None)
}

/// Accepts a string in RFC3339 / ISO8601 standard format and some variants and converts it to a nanosecond precision timestamp.
/// This code is copied from [arrow-datafusion](https://github.com/apache/arrow-datafusion/blob/arrow2/datafusion-physical-expr/src/arrow_temporal_util.rs#L71)
/// with some bugfixes.
/// Supported format:
/// - `2022-09-20T14:16:43.012345Z` (Zulu timezone)
/// - `2022-09-20T14:16:43.012345+08:00` (Explicit offset)
/// - `2022-09-20T14:16:43.012345` (Zulu timezone, with T)
/// - `2022-09-20T14:16:43.012345` (The given timezone, with T)
/// - `2022-09-20T14:16:43` (Zulu timezone, no fractional seconds, with T)
/// - `2022-09-20 14:16:43.012345Z` (Zulu timezone, without T)
/// - `2022-09-20 14:16:43` (Zulu timezone, without T)
/// - `2022-09-20 14:16:43.012345` (Zulu timezone, without T)
/// - `2022-09-20 14:16:43` (The given timezone, without T)
/// - `2022-09-20 14:16:43.012345` (The given timezone, without T)
#[allow(deprecated)]
fn from_str(s: &str) -> Result<Self, Self::Err> {
pub fn from_str(s: &str, timezone: Option<Timezone>) -> Result<Self> {
// RFC3339 timestamp (with a T)
let s = s.trim();
if let Ok(ts) = DateTime::parse_from_rfc3339(s) {
Expand All @@ -386,19 +388,19 @@ impl FromStr for Timestamp {
}

if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
return naive_datetime_to_timestamp(s, ts);
return naive_datetime_to_timestamp(s, ts, timezone);
}

if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
return naive_datetime_to_timestamp(s, ts);
return naive_datetime_to_timestamp(s, ts, timezone);
}

if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
return naive_datetime_to_timestamp(s, ts);
return naive_datetime_to_timestamp(s, ts, timezone);
}

if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
return naive_datetime_to_timestamp(s, ts);
return naive_datetime_to_timestamp(s, ts, timezone);
}

ParseTimestampSnafu { raw: s }.fail()
Expand All @@ -410,9 +412,20 @@ impl FromStr for Timestamp {
fn naive_datetime_to_timestamp(
s: &str,
datetime: NaiveDateTime,
timezone: Option<Timezone>,
) -> crate::error::Result<Timestamp> {
Timestamp::from_chrono_datetime(Utc.from_utc_datetime(&datetime).naive_utc())
.context(ParseTimestampSnafu { raw: s })
if timezone.is_none() {
return Timestamp::from_chrono_datetime(Utc.from_utc_datetime(&datetime).naive_utc())
.context(ParseTimestampSnafu { raw: s });
}

// Safety: already check timezone above
match datetime_to_utc(&datetime, timezone.unwrap()) {
LocalResult::None => ParseTimestampSnafu { raw: s }.fail(),
LocalResult::Single(utc) | LocalResult::Ambiguous(utc, _) => {
Timestamp::from_chrono_datetime(utc).context(ParseTimestampSnafu { raw: s })
}
}
}

impl From<i64> for Timestamp {
Expand Down Expand Up @@ -766,7 +779,7 @@ mod tests {
// Input timestamp string is regarded as local timezone if no timezone is specified,
// but expected timestamp is in UTC timezone
fn check_from_str(s: &str, expect: &str) {
let ts = Timestamp::from_str(s).unwrap();
let ts = Timestamp::from_str_utc(s).unwrap();
let time = ts.to_chrono_datetime().unwrap();
assert_eq!(expect, time.to_string());
}
Expand All @@ -792,7 +805,7 @@ mod tests {
fn test_to_iso8601_string() {
set_default_timezone(Some("Asia/Shanghai")).unwrap();
let datetime_str = "2020-09-08 13:42:29.042+0000";
let ts = Timestamp::from_str(datetime_str).unwrap();
let ts = Timestamp::from_str_utc(datetime_str).unwrap();
assert_eq!("2020-09-08 21:42:29.042+0800", ts.to_iso8601_string());

let ts_millis = 1668070237000;
Expand Down Expand Up @@ -1059,17 +1072,17 @@ mod tests {
std::env::set_var("TZ", "Asia/Shanghai");
assert_eq!(
Timestamp::new(28800, TimeUnit::Second),
Timestamp::from_str("1970-01-01 08:00:00.000").unwrap()
Timestamp::from_str_utc("1970-01-01 08:00:00.000").unwrap()
);

assert_eq!(
Timestamp::new(28800, TimeUnit::Second),
Timestamp::from_str("1970-01-01 08:00:00").unwrap()
Timestamp::from_str_utc("1970-01-01 08:00:00").unwrap()
);

assert_eq!(
Timestamp::new(28800, TimeUnit::Second),
Timestamp::from_str(" 1970-01-01 08:00:00 ").unwrap()
Timestamp::from_str_utc(" 1970-01-01 08:00:00 ").unwrap()
);
}

Expand Down Expand Up @@ -1236,7 +1249,7 @@ mod tests {
];

for s in valid_strings {
Timestamp::from_str(s).unwrap();
Timestamp::from_str_utc(s).unwrap();
}
}
}
13 changes: 10 additions & 3 deletions src/common/time/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use chrono::{LocalResult, NaiveDateTime, TimeZone};
use chrono_tz::Tz;

use crate::timezone::get_timezone;
use crate::Timezone;

pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String {
match get_timezone(None) {
Expand All @@ -29,9 +30,15 @@ pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String {
}

pub fn local_datetime_to_utc(local: &NaiveDateTime) -> LocalResult<NaiveDateTime> {
match get_timezone(None) {
crate::Timezone::Offset(offset) => offset.from_local_datetime(local).map(|x| x.naive_utc()),
crate::Timezone::Named(tz) => tz.from_local_datetime(local).map(|x| x.naive_utc()),
datetime_to_utc(local, get_timezone(None))
}

pub fn datetime_to_utc(datetime: &NaiveDateTime, timezone: Timezone) -> LocalResult<NaiveDateTime> {
match timezone {
crate::Timezone::Offset(offset) => {
offset.from_local_datetime(datetime).map(|x| x.naive_utc())
}
crate::Timezone::Named(tz) => tz.from_local_datetime(datetime).map(|x| x.naive_utc()),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/types/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mod tests {

// timestamp -> other types
test_can_cast!(
Value::Timestamp(Timestamp::from_str("2021-01-01 00:00:00").unwrap()),
Value::Timestamp(Timestamp::from_str_utc("2021-01-01 00:00:00").unwrap()),
null_datatype,
int64_datatype,
date_datatype,
Expand Down
6 changes: 3 additions & 3 deletions src/datatypes/src/types/date_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ mod tests {
fn test_date_cast() {
set_default_timezone(Some("Asia/Shanghai")).unwrap();
// timestamp -> date
let ts = Value::Timestamp(Timestamp::from_str("2000-01-01 08:00:01").unwrap());
let ts = Value::Timestamp(Timestamp::from_str_utc("2000-01-01 08:00:01").unwrap());
let date = ConcreteDataType::date_datatype().try_cast(ts).unwrap();
assert_eq!(date, Value::Date(Date::from_str("2000-01-01").unwrap()));

// this case bind with Zulu timezone.
let ts = Value::Timestamp(Timestamp::from_str("2000-01-02 07:59:59").unwrap());
let ts = Value::Timestamp(Timestamp::from_str_utc("2000-01-02 07:59:59").unwrap());
let date = ConcreteDataType::date_datatype().try_cast(ts).unwrap();
assert_eq!(date, Value::Date(Date::from_str("2000-01-02").unwrap()));

// while this case is offsetted to Asia/Shanghai.
let ts = Value::Timestamp(Timestamp::from_str("2000-01-02 07:59:59+08:00").unwrap());
let ts = Value::Timestamp(Timestamp::from_str_utc("2000-01-02 07:59:59+08:00").unwrap());
let date = ConcreteDataType::date_datatype().try_cast(ts).unwrap();
assert_eq!(date, Value::Date(Date::from_str("2000-01-01").unwrap()));

Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/types/datetime_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod tests {
);

// cast from Timestamp
let val = Value::Timestamp(Timestamp::from_str("2020-09-08 21:42:29+0800").unwrap());
let val = Value::Timestamp(Timestamp::from_str_utc("2020-09-08 21:42:29+0800").unwrap());
let dt = ConcreteDataType::datetime_datatype().try_cast(val).unwrap();
assert_eq!(
dt,
Expand Down
6 changes: 3 additions & 3 deletions src/datatypes/src/types/timestamp_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;

use arrow::datatypes::{
DataType as ArrowDataType, TimeUnit as ArrowTimeUnit,
TimestampMicrosecondType as ArrowTimestampMicrosecondType,
Expand Down Expand Up @@ -132,7 +130,7 @@ macro_rules! impl_data_type_for_timestamp {
fn try_cast(&self, from: Value)-> Option<Value>{
match from {
Value::Timestamp(v) => v.convert_to(TimeUnit::$unit).map(Value::Timestamp),
Value::String(v) => Timestamp::from_str(v.as_utf8()).map(Value::Timestamp).ok(),
Value::String(v) => Timestamp::from_str_utc(v.as_utf8()).map(Value::Timestamp).ok(),
Value::Int64(v) => Some(Value::Timestamp(Timestamp::new(v, TimeUnit::$unit))),
Value::DateTime(v) => Timestamp::new_second(v.val()).convert_to(TimeUnit::$unit).map(Value::Timestamp),
Value::Date(v) => Timestamp::new_second(v.to_secs()).convert_to(TimeUnit::$unit).map(Value::Timestamp),
Expand Down Expand Up @@ -203,6 +201,8 @@ impl_data_type_for_timestamp!(Microsecond);

#[cfg(test)]
mod tests {
use std::str::FromStr;

use common_time::timezone::set_default_timezone;
use common_time::{Date, DateTime};

Expand Down
3 changes: 1 addition & 2 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;

use arrow::datatypes::{DataType as ArrowDataType, Field};
Expand Down Expand Up @@ -424,7 +423,7 @@ pub fn duration_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue
/// Return `None` if given scalar value cannot be converted to a valid timestamp.
pub fn scalar_value_to_timestamp(scalar: &ScalarValue) -> Option<Timestamp> {
match scalar {
ScalarValue::Utf8(Some(s)) => match Timestamp::from_str(s) {
ScalarValue::Utf8(Some(s)) => match Timestamp::from_str_utc(s) {
Ok(t) => Some(t),
Err(e) => {
logging::error!(e;"Failed to convert string literal {s} to timestamp");
Expand Down
13 changes: 8 additions & 5 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod dml;
mod show;
mod tql;

use std::str::FromStr;
use std::sync::Arc;

use catalog::CatalogManagerRef;
Expand Down Expand Up @@ -317,8 +316,8 @@ fn to_copy_database_request(
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY)?;
let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY)?;
let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY, query_ctx)?;

let time_range = match (start_timestamp, end_timestamp) {
(Some(start), Some(end)) => TimestampRange::new(start, end),
Expand All @@ -338,10 +337,14 @@ fn to_copy_database_request(
}

/// Extracts timestamp from a [HashMap<String, String>] with given key.
fn extract_timestamp(map: &OptionMap, key: &str) -> Result<Option<Timestamp>> {
fn extract_timestamp(
map: &OptionMap,
key: &str,
query_ctx: &QueryContextRef,
) -> Result<Option<Timestamp>> {
map.get(key)
.map(|v| {
Timestamp::from_str(v)
Timestamp::from_str(v, Some(query_ctx.timezone()))
.map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
})
.transpose()
Expand Down
42 changes: 27 additions & 15 deletions src/query/src/optimizer/type_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::sync::Arc;

use common_time::timestamp::{TimeUnit, Timestamp};
use common_time::Timezone;
use datafusion::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue};
Expand Down Expand Up @@ -143,8 +141,8 @@ impl TypeConverter {
}

/// Retrieve the timezone from query context.
fn get_timezone(&self) -> Option<Arc<str>> {
Some(format!("{}", self.query_ctx.timezone()).into())
fn get_timezone(&self) -> Option<Timezone> {
Some(self.query_ctx.timezone())
}

fn cast_scalar_value(
Expand Down Expand Up @@ -297,15 +295,16 @@ fn timestamp_to_timestamp_ms_expr(val: i64, unit: TimeUnit) -> Expr {
Expr::Literal(ScalarValue::TimestampMillisecond(Some(timestamp), None))
}

fn string_to_timestamp_ms(string: &str, timezone: Option<Arc<str>>) -> Result<ScalarValue> {
let ts = Timestamp::from_str(string).map_err(|e| DataFusionError::External(Box::new(e)))?;
fn string_to_timestamp_ms(string: &str, timezone: Option<Timezone>) -> Result<ScalarValue> {
let ts = Timestamp::from_str(string, timezone)
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let value = Some(ts.value());
let scalar = match ts.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(value, timezone),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(value, timezone),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(value, timezone),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(value, timezone),
TimeUnit::Second => ScalarValue::TimestampSecond(value, None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(value, None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(value, None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(value, None),
};
Ok(scalar)
}
Expand Down Expand Up @@ -335,8 +334,21 @@ mod tests {
);

assert_eq!(
string_to_timestamp_ms("2009-02-13 23:31:30Z", Some("Asia/Shanghai".into())).unwrap(),
ScalarValue::TimestampSecond(Some(1234567890), Some("Asia/Shanghai".into()))
string_to_timestamp_ms(
"2009-02-13 23:31:30",
Some(Timezone::from_tz_string("Asia/Shanghai").unwrap())
)
.unwrap(),
ScalarValue::TimestampSecond(Some(1234567890 - 8 * 3600), None)
);

assert_eq!(
string_to_timestamp_ms(
"2009-02-13 23:31:30",
Some(Timezone::from_tz_string("-8:00").unwrap())
)
.unwrap(),
ScalarValue::TimestampSecond(Some(1234567890 + 8 * 3600), None)
);
}

Expand Down Expand Up @@ -485,8 +497,8 @@ mod tests {
.unwrap();
let expected = String::from(
"Aggregate: groupBy=[[]], aggr=[[COUNT(column1)]]\
\n Filter: TimestampSecond(-28800, Some(\"UTC\")) <= column3\
\n Filter: column3 > TimestampSecond(-28800, Some(\"UTC\"))\
\n Filter: TimestampSecond(-28800, None) <= column3\
\n Filter: column3 > TimestampSecond(-28800, None)\
\n Values: (Int64(1), Float64(1), TimestampMillisecond(1, None))",
);
assert_eq!(format!("{}", transformed_plan.display_indent()), expected);
Expand Down

0 comments on commit 4358217

Please sign in to comment.