Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#[macro_use]
extern crate napi_derive;

use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use databend_driver::LoadMethod;
use napi::{bindgen_prelude::*, Env};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -316,9 +316,17 @@ impl ToNapiValue for Value<'_> {
databend_driver::Value::Number(n) => {
NumberValue::to_napi_value(env, NumberValue(n.clone()))
}
databend_driver::Value::Timestamp(dt) => DateTime::to_napi_value(env, *dt),
databend_driver::Value::Timestamp(dt) => {
let mut js_date = std::ptr::null_mut();
let millis = dt.timestamp().as_millisecond() as f64;
check_status!(
unsafe { sys::napi_create_date(env, millis, &mut js_date) },
"Failed to convert jiff timestamp into napi value",
)?;
Ok(js_date)
}
databend_driver::Value::TimestampTz(dt) => {
let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT);
let formatted = dt.strftime(TIMESTAMP_TIMEZONE_FORMAT);
String::to_napi_value(env, formatted.to_string())
}
databend_driver::Value::Date(_) => {
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ctor = "0.2"
env_logger = "0.11.8"
http = "1.0"
once_cell = "1.21"
pyo3 = { version = "0.24.2", features = ["extension-module", "chrono", "chrono-tz"] }
pyo3 = { version = "0.24.2", features = ["extension-module", "chrono", "chrono-tz", "jiff-02"] }
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] }
tokio = "1.44"

Expand Down
24 changes: 21 additions & 3 deletions bindings/python/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use tokio_stream::StreamExt;

use crate::exceptions::map_error_to_exception;
use crate::utils::wait_for_future;
#[cfg(feature = "cp38")]
use databend_driver::{self, zoned_to_chrono_datetime, zoned_to_chrono_fixed_offset};

pub static VERSION: Lazy<String> = Lazy::new(|| {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
Expand Down Expand Up @@ -80,15 +82,31 @@ impl<'py> IntoPyObject<'py> for Value {
databend_driver::Value::Timestamp(dt) => {
#[cfg(feature = "cp38")]
{
// chrono_tz -> PyDateTime isn't implemented for Python < 3.9 (no zoneinfo).
dt.with_timezone(&dt.offset().fix()).into_bound_py_any(py)?
let chrono_dt = zoned_to_chrono_datetime(&dt).map_err(|e| {
PyException::new_err(format!("failed to convert timestamp: {e}"))
})?;
chrono_dt
.with_timezone(&chrono_dt.offset().fix())
.into_bound_py_any(py)?
}
#[cfg(not(feature = "cp38"))]
{
dt.into_bound_py_any(py)?
}
}
databend_driver::Value::TimestampTz(t) => t.into_bound_py_any(py)?,
databend_driver::Value::TimestampTz(t) => {
#[cfg(feature = "cp38")]
{
let chrono_dt = zoned_to_chrono_fixed_offset(&t).map_err(|e| {
PyException::new_err(format!("failed to convert timestamp_tz: {e}"))
})?;
chrono_dt.into_bound_py_any(py)?
}
#[cfg(not(feature = "cp38"))]
{
t.into_bound_py_any(py)?
}
}
databend_driver::Value::Date(_) => {
let d = NaiveDate::try_from(self.0)
.map_err(|e| PyException::new_err(format!("failed to convert date: {e}")))?;
Expand Down
4 changes: 3 additions & 1 deletion driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ pub use databend_driver_core::rows::{
Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats,
};
pub use databend_driver_core::value::Interval;
pub use databend_driver_core::value::{NumberValue, Value};
pub use databend_driver_core::value::{
zoned_to_chrono_datetime, zoned_to_chrono_fixed_offset, NumberValue, Value,
};

pub use databend_driver_macros::serde_bend;
pub use databend_driver_macros::TryFromRow;
Expand Down
1 change: 1 addition & 0 deletions sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ itertools = "0.14"
lexical-core = "1.0.5"
memchr = "2.7"
roaring = { version = "0.10.12", features = ["serde"] }
jiff = "0.2.10"
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
url = { version = "2.5", default-features = false }
6 changes: 6 additions & 0 deletions sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ impl From<serde_json::Error> for Error {
}
}

impl From<jiff::Error> for Error {
fn from(e: jiff::Error) -> Self {
Error::Parsing(e.to_string())
}
}

impl From<hex::FromHexError> for Error {
fn from(e: hex::FromHexError) -> Self {
Error::Parsing(e.to_string())
Expand Down
38 changes: 18 additions & 20 deletions sql/src/value/arrow_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use arrow_array::{
StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
use chrono::{FixedOffset, LocalResult, TimeZone};
use databend_client::schema::{
DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
Expand All @@ -33,6 +32,7 @@ use databend_client::schema::{
};
use databend_client::ResultFormatSettings;
use ethnum::i256;
use jiff::{tz, Timestamp};
use jsonb::RawJsonb;

/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
Expand Down Expand Up @@ -103,15 +103,14 @@ impl
let v = array.value(seq);
let unix_ts = v as u64 as i64;
let offset = (v >> 64) as i32;
let offset = FixedOffset::east_opt(offset)
.ok_or_else(|| Error::Parsing("invalid offset".to_string()))?;
let dt =
offset.timestamp_micros(unix_ts).single().ok_or_else(|| {
Error::Parsing(format!(
"Invalid timestamp_micros {unix_ts} for offset {offset}"
))
})?;
Ok(Value::TimestampTz(dt))
let offset = tz::Offset::from_seconds(offset).map_err(|e| {
Error::Parsing(format!("invalid offset: {offset}, {e}"))
})?;
let time_zone = tz::TimeZone::fixed(offset);
let timestamp = Timestamp::from_microsecond(unix_ts).map_err(|e| {
Error::Parsing(format!("Invalid timestamp_micros {unix_ts}: {e}"))
})?;
Ok(Value::TimestampTz(timestamp.to_zoned(time_zone)))
}
None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
}
Expand Down Expand Up @@ -347,16 +346,15 @@ impl
let ts = array.value(seq);
match tz {
None => {
let ltz = settings.timezone;
let dt = match ltz.timestamp_micros(ts) {
LocalResult::Single(dt) => dt,
LocalResult::None => {
return Err(Error::Parsing(format!(
"time {ts} not exists in timezone {ltz}"
)))
}
LocalResult::Ambiguous(dt1, _dt2) => dt1,
};
let timestamp = Timestamp::from_microsecond(ts).map_err(|e| {
Error::Parsing(format!("Invalid timestamp_micros {ts}: {e}"))
})?;
let tz_name = settings.timezone.name();
let dt = timestamp.in_tz(tz_name).map_err(|e| {
Error::Parsing(format!(
"Invalid timezone {tz_name} for timestamp {ts}: {e}"
))
})?;
Ok(Value::Timestamp(dt))
}
Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
Expand Down
7 changes: 3 additions & 4 deletions sql/src/value/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::{DateTime, FixedOffset};
use chrono_tz::Tz;
use databend_client::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
use ethnum::i256;
use jiff::Zoned;

// Thu 1970-01-01 is R.D. 719163
pub(crate) const DAYS_FROM_CE: i32 = 719_163;
Expand Down Expand Up @@ -48,8 +47,8 @@ pub enum Value {
String(String),
Number(NumberValue),
/// Microseconds from 1970-01-01 00:00:00 UTC
Timestamp(DateTime<Tz>),
TimestampTz(DateTime<FixedOffset>),
Timestamp(Zoned),
TimestampTz(Zoned),
Date(i32),
Array(Vec<Value>),
Map(Vec<(Value, Value)>),
Expand Down
66 changes: 59 additions & 7 deletions sql/src/value/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, TimeZone};
use chrono::{
DateTime, Datelike, FixedOffset, LocalResult, NaiveDate, NaiveDateTime, TimeZone, Utc,
};
use chrono_tz::Tz;
use std::collections::HashMap;
use std::hash::Hash;

use crate::error::{ConvertError, Error, Result};

use super::{NumberValue, Value, DAYS_FROM_CE};
use jiff::{tz::TimeZone as JiffTimeZone, Timestamp, Zoned};

impl TryFrom<Value> for bool {
type Error = Error;
Expand Down Expand Up @@ -70,11 +73,62 @@ impl_try_from_number_value!(i64);
impl_try_from_number_value!(f32);
impl_try_from_number_value!(f64);

fn unix_micros_from_zoned(zdt: &Zoned) -> i64 {
zdt.timestamp().as_microsecond()
}

fn naive_datetime_from_micros(micros: i64) -> Result<NaiveDateTime> {
DateTime::<Utc>::from_timestamp_micros(micros)
.map(|dt| dt.naive_utc())
.ok_or_else(|| Error::Parsing(format!("invalid unix timestamp {micros}")))
}

pub fn zoned_to_chrono_datetime(zdt: &Zoned) -> Result<DateTime<Tz>> {
let tz_name = zdt.time_zone().iana_name().ok_or_else(|| {
ConvertError::new(
"DateTime",
"timestamp does not contain an IANA time zone".to_string(),
)
})?;
let tz: Tz = tz_name.parse().map_err(|_| {
ConvertError::new(
"DateTime",
format!("invalid time zone identifier {tz_name}"),
)
})?;
let micros = unix_micros_from_zoned(zdt);
match tz.timestamp_micros(micros) {
LocalResult::Single(dt) => Ok(dt),
LocalResult::Ambiguous(dt, _) => Ok(dt),
LocalResult::None => Err(Error::Parsing(format!(
"time {micros} not exists in timezone {tz_name}"
))),
}
}

pub fn zoned_to_chrono_fixed_offset(zdt: &Zoned) -> Result<DateTime<FixedOffset>> {
let offset_seconds = zdt.offset().seconds();
let offset = FixedOffset::east_opt(offset_seconds)
.ok_or_else(|| Error::Parsing(format!("invalid offset {offset_seconds}")))?;
let micros = unix_micros_from_zoned(zdt);
let naive = naive_datetime_from_micros(micros)?;
Ok(DateTime::<FixedOffset>::from_naive_utc_and_offset(
naive, offset,
))
}

fn zoned_from_naive_datetime(naive_dt: &NaiveDateTime) -> Zoned {
let micros = naive_dt.and_utc().timestamp_micros();
let timestamp = Timestamp::from_microsecond(micros)
.expect("NaiveDateTime out of range for Timestamp conversion");
timestamp.to_zoned(JiffTimeZone::UTC)
}

impl TryFrom<Value> for NaiveDateTime {
type Error = Error;
fn try_from(val: Value) -> Result<Self> {
match val {
Value::Timestamp(dt) => Ok(dt.naive_utc()),
Value::Timestamp(dt) => naive_datetime_from_micros(unix_micros_from_zoned(&dt)),
_ => Err(ConvertError::new("NaiveDateTime", format!("{val}")).into()),
}
}
Expand All @@ -84,7 +138,7 @@ impl TryFrom<Value> for DateTime<Tz> {
type Error = Error;
fn try_from(val: Value) -> Result<Self> {
match val {
Value::Timestamp(dt) => Ok(dt),
Value::Timestamp(dt) => zoned_to_chrono_datetime(&dt),
_ => Err(ConvertError::new("DateTime", format!("{val}")).into()),
}
}
Expand Down Expand Up @@ -426,15 +480,13 @@ impl From<&NaiveDate> for Value {

impl From<NaiveDateTime> for Value {
fn from(naive_dt: NaiveDateTime) -> Self {
let dt = Tz::UTC.from_local_datetime(&naive_dt).unwrap();
Value::Timestamp(dt)
Value::Timestamp(zoned_from_naive_datetime(&naive_dt))
}
}

impl From<&NaiveDateTime> for Value {
fn from(naive_dt: &NaiveDateTime) -> Self {
let dt = Tz::UTC.from_local_datetime(naive_dt).unwrap();
Value::Timestamp(dt)
Value::Timestamp(zoned_from_naive_datetime(naive_dt))
}
}

Expand Down
4 changes: 2 additions & 2 deletions sql/src/value/format/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ impl Value {
}
}
Value::Timestamp(dt) => {
let formatted = dt.format(TIMESTAMP_FORMAT);
let formatted = dt.strftime(TIMESTAMP_FORMAT);
if raw {
write!(f, "{formatted}")
} else {
write!(f, "'{formatted}'")
}
}
Value::TimestampTz(dt) => {
let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT);
let formatted = dt.strftime(TIMESTAMP_TIMEZONE_FORMAT);
if raw {
write!(f, "{formatted}")
} else {
Expand Down
2 changes: 1 addition & 1 deletion sql/src/value/format/into_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl TryFrom<Value> for String {
})?;
Ok(date.format("%Y-%m-%d").to_string())
}
Value::Timestamp(dt) => Ok(dt.format(TIMESTAMP_FORMAT).to_string()),
Value::Timestamp(dt) => Ok(dt.strftime(TIMESTAMP_FORMAT).to_string()),
_ => Err(ConvertError::new("string", format!("{val:?}")).into()),
}
}
Expand Down
5 changes: 2 additions & 3 deletions sql/src/value/format/result_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,11 @@ impl Value {
Self::write_string(bytes, s, raw);
}
Value::Timestamp(dt) => {
let s = format!("{}", dt.format(TIMESTAMP_FORMAT));
let s = dt.strftime(TIMESTAMP_FORMAT).to_string();
Self::write_string(bytes, &s, raw);
}
Value::TimestampTz(dt) => {
let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT);
let s = format!("{}", formatted);
let s = dt.strftime(TIMESTAMP_TIMEZONE_FORMAT).to_string();
Self::write_string(bytes, &s, raw);
}
Value::Date(i) => {
Expand Down
4 changes: 2 additions & 2 deletions sql/src/value/format/to_sql_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ impl Value {
Value::String(s) => format!("'{}'", s),
Value::Number(n) => n.to_string(),
Value::Timestamp(dt) => {
format!("'{}'", dt.format(TIMESTAMP_FORMAT))
format!("'{}'", dt.strftime(TIMESTAMP_FORMAT))
}
Value::TimestampTz(dt) => {
let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT);
let formatted = dt.strftime(TIMESTAMP_TIMEZONE_FORMAT);
format!("'{formatted}'")
}
Value::Date(d) => {
Expand Down
3 changes: 2 additions & 1 deletion sql/src/value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ mod interval;
mod string_decoder;

pub use base::{NumberValue, Value};
pub use convert::{zoned_to_chrono_datetime, zoned_to_chrono_fixed_offset};
pub use interval::Interval;

use base::{DAYS_FROM_CE, TIMESTAMP_TIMEZONE_FORMAT};
use base::{DAYS_FROM_CE, TIMESTAMP_FORMAT, TIMESTAMP_TIMEZONE_FORMAT};
pub use format::FormatOptions;
Loading
Loading