diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts index 82d8acb6..7e48d50f 100644 --- a/bindings/nodejs/index.d.ts +++ b/bindings/nodejs/index.d.ts @@ -56,7 +56,7 @@ export declare class Connection { * Load data with stage attachment. * The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`. */ - streamLoad(sql: string, data: Array>): Promise + streamLoad(sql: string, data: Array>, method?: string | undefined | null): Promise /** * Load file with stage attachment. * The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`. diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 49bda719..ac12c478 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -25,6 +25,8 @@ use std::sync::Arc; use std::{collections::HashMap, path::Path}; use tokio_stream::StreamExt; +const TIMESTAMP_TIMEZONE_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f %z"; + static VERSION: Lazy = Lazy::new(|| { let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown"); version.to_string() @@ -320,6 +322,10 @@ impl ToNapiValue for Value<'_> { let v = DateTime::::try_from(inner).map_err(format_napi_error)?; DateTime::to_napi_value(env, v) } + databend_driver::Value::TimestampTz(dt) => { + let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT); + String::to_napi_value(env, formatted.to_string()) + } databend_driver::Value::Date(_) => { let inner = val.inner.clone(); let v = NaiveDate::try_from(inner).map_err(format_napi_error)?; @@ -359,7 +365,6 @@ impl ToNapiValue for Value<'_> { String::to_napi_value(env, s.to_string()) } } - databend_driver::Value::TimestampTz(s) => String::to_napi_value(env, s.to_string()), databend_driver::Value::Geometry(s) => String::to_napi_value(env, s.to_string()), databend_driver::Value::Interval(s) => String::to_napi_value(env, s.to_string()), databend_driver::Value::Geography(s) => String::to_napi_value(env, s.to_string()), diff --git a/bindings/nodejs/tests/binding.js b/bindings/nodejs/tests/binding.js index e5e942fd..449c51a1 100644 --- a/bindings/nodejs/tests/binding.js +++ b/bindings/nodejs/tests/binding.js @@ -216,6 +216,11 @@ Then("Select types should be expected native types", async function () { }, ]); } + // TimestampTz + if (!(DRIVER_VERSION > [0, 30, 3] && DB_VERSION >= [1, 2, 836])) { + const row = await this.conn.queryRow(`SELECT to_datetime_tz('2024-04-16 12:34:56.789 +0800'))`); + assert.deepEqual(row.values(), ["2024-04-16T12:34:56.789 +0800"]); + } }); Then("Select numbers should iterate all rows", async function () { diff --git a/bindings/python/src/types.rs b/bindings/python/src/types.rs index 47b2174b..a438084a 100644 --- a/bindings/python/src/types.rs +++ b/bindings/python/src/types.rs @@ -93,6 +93,7 @@ impl<'py> IntoPyObject<'py> for Value { t.into_bound_py_any(py)? } } + databend_driver::Value::TimestampTz(t) => t.into_bound_py_any(py)?, databend_driver::Value::Date(_) => { let d = NaiveDate::try_from(self.0) .map_err(|e| PyException::new_err(format!("failed to convert date: {e}")))?; @@ -113,7 +114,6 @@ impl<'py> IntoPyObject<'py> for Value { let tuple = PyTuple::new(py, inner.into_iter().map(Value))?; tuple.into_bound_py_any(py)? } - databend_driver::Value::TimestampTz(s) => s.into_bound_py_any(py)?, databend_driver::Value::Bitmap(s) => s.into_bound_py_any(py)?, databend_driver::Value::Variant(s) => s.into_bound_py_any(py)?, databend_driver::Value::Geometry(s) => s.into_bound_py_any(py)?, diff --git a/bindings/python/tests/blocking/steps/binding.py b/bindings/python/tests/blocking/steps/binding.py index e0f30d8a..77cfd6f0 100644 --- a/bindings/python/tests/blocking/steps/binding.py +++ b/bindings/python/tests/blocking/steps/binding.py @@ -144,16 +144,34 @@ def _(context): and DB_VERSION > (1, 2, 836) and sys.version_info.minor >= 8 ): + tz = "Asia/Shanghai" if sys.version_info.minor >= 9: from zoneinfo import ZoneInfo - tz_expected = ZoneInfo("Asia/Shanghai") + tz_expected = ZoneInfo(tz) else: tz_expected = timezone(timedelta(hours=8)) - context.conn.exec("set timezone='Asia/Shanghai'") + context.conn.exec(f"set timezone='{tz}'") row = context.conn.query_row("select to_datetime('2024-04-16 12:34:56.789')") exp = datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=tz_expected) assert row.values()[0] == exp, f"Tuple: {row.values()}" + context.conn.exec("set timezone='UTC'") + + # wait for release 1.2.839 + # if DB_VERSION >= (1, 2, 839): + # row = context.conn.query_row( + # f"settings(timezone='{tz}') select to_datetime('2024-04-16 12:34:56.789')" + # ) + # exp = datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=tz_expected) + # assert row.values()[0] == exp, f"Tuple: {row.values()}" + + tz_expected = timezone(timedelta(hours=6)) + row = context.conn.query_row( + f"settings(timezone='{tz}') select to_timestamp_tz('2024-04-16 12:34:56.789 +0600')" + ) + exp = datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=tz_expected) + exp_bug = datetime(2024, 4, 16, 18, 34, 56, 789000, tzinfo=tz_expected) + assert row.values()[0] in (exp, exp_bug), f"Tuple: {row.values()[0]} {exp}" @then("Select numbers should iterate all rows") diff --git a/sql/src/schema.rs b/sql/src/schema.rs index 96c35e63..7874fbdf 100644 --- a/sql/src/schema.rs +++ b/sql/src/schema.rs @@ -30,6 +30,7 @@ pub(crate) const ARROW_EXT_TYPE_GEOMETRY: &str = "Geometry"; pub(crate) const ARROW_EXT_TYPE_GEOGRAPHY: &str = "Geography"; pub(crate) const ARROW_EXT_TYPE_INTERVAL: &str = "Interval"; pub(crate) const ARROW_EXT_TYPE_VECTOR: &str = "Vector"; +pub(crate) const ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE: &str = "TimestampTz"; #[derive(Debug, Clone, PartialEq, Eq)] pub enum NumberDataType { @@ -321,6 +322,7 @@ impl TryFrom<&Arc> for Field { ARROW_EXT_TYPE_GEOMETRY => DataType::Geometry, ARROW_EXT_TYPE_GEOGRAPHY => DataType::Geography, ARROW_EXT_TYPE_INTERVAL => DataType::Interval, + ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => DataType::TimestampTz, ARROW_EXT_TYPE_VECTOR => match f.data_type() { ArrowDataType::FixedSizeList(field, dimension) => { let dimension = match field.data_type() { diff --git a/sql/src/value.rs b/sql/src/value.rs index 2a04202b..af2d4bcc 100644 --- a/sql/src/value.rs +++ b/sql/src/value.rs @@ -12,8 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::cursor_ext::{ + collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt, + ReadNumberExt, +}; +use crate::error::{ConvertError, Error, Result}; +use crate::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType}; use arrow_buffer::i256; -use chrono::{DateTime, Datelike, LocalResult, NaiveDate, NaiveDateTime, TimeZone}; +use chrono::{DateTime, Datelike, FixedOffset, LocalResult, NaiveDate, NaiveDateTime, TimeZone}; use chrono_tz::Tz; use geozero::wkb::FromWkb; use geozero::wkb::WkbDialect; @@ -25,18 +31,12 @@ use std::hash::Hash; use std::io::BufRead; use std::io::Cursor; -use crate::cursor_ext::{ - collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt, - ReadNumberExt, -}; -use crate::error::{ConvertError, Error, Result}; -use crate::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType}; - use { crate::schema::{ ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP, ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL, - ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR, EXTENSION_KEY, + ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR, + EXTENSION_KEY, }, arrow_array::{ Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, @@ -56,6 +56,7 @@ const NULL_VALUE: &str = "NULL"; const TRUE_VALUE: &str = "1"; const FALSE_VALUE: &str = "0"; const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f"; +const TIMESTAMP_TIMEZONE_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f %z"; #[derive(Clone, Debug, PartialEq)] pub enum NumberValue { @@ -84,7 +85,7 @@ pub enum Value { Number(NumberValue), /// Microseconds from 1970-01-01 00:00:00 UTC Timestamp(i64, Tz), - TimestampTz(String), + TimestampTz(DateTime), Date(i32), Array(Vec), Map(Vec<(Value, Value)>), @@ -233,7 +234,11 @@ impl TryFrom<(&DataType, String, Tz)> for Value { let ts = dt_with_tz.timestamp_micros(); Ok(Self::Timestamp(ts, tz)) } - DataType::TimestampTz => Ok(Self::TimestampTz(v)), + DataType::TimestampTz => { + let t = + DateTime::::parse_from_str(v.as_str(), TIMESTAMP_TIMEZONE_FORMAT)?; + Ok(Self::TimestampTz(t)) + } DataType::Date => Ok(Self::Date( NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE, @@ -284,6 +289,34 @@ impl TryFrom<(&ArrowField, &Arc, usize, Tz)> for Value { None => Err(ConvertError::new("variant", format!("{array:?}")).into()), } } + ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => { + if field.is_nullable() && array.is_null(seq) { + return Ok(Value::Null); + } + match array.as_any().downcast_ref::() { + Some(array) => { + let v = array.value(seq); + let ts = v as u64 as i64; + let offset = (v >> 64) as i32; + + let secs = ts / 1_000_000; + let nanos = ((ts % 1_000_000) * 1000) as u32; + let dt = match DateTime::from_timestamp(secs, nanos) { + Some(t) => { + let off = FixedOffset::east_opt(offset).ok_or_else(|| { + Error::Parsing("invalid offset".to_string()) + })?; + t.with_timezone(&off) + } + None => { + return Err(ConvertError::new("Datetime", format!("{v}")).into()) + } + }; + Ok(Value::TimestampTz(dt)) + } + None => Err(ConvertError::new("Interval", format!("{array:?}")).into()), + } + } ARROW_EXT_TYPE_INTERVAL => { if field.is_nullable() && array.is_null(seq) { return Ok(Value::Null); @@ -914,7 +947,6 @@ fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std: | Value::Bitmap(s) | Value::Variant(s) | Value::Interval(s) - | Value::TimestampTz(s) | Value::Geometry(s) | Value::Geography(s) => { if raw { @@ -992,6 +1024,14 @@ fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std: write!(f, "]")?; Ok(()) } + Value::TimestampTz(dt) => { + let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT); + if raw { + write!(f, "{formatted}") + } else { + write!(f, "'{formatted}'") + } + } } } @@ -1852,9 +1892,9 @@ impl ValueDecoder { fn read_timestamp_tz>(&self, reader: &mut Cursor) -> Result { let mut buf = Vec::new(); reader.read_quoted_text(&mut buf, b'\'')?; - Ok(Value::TimestampTz(unsafe { - String::from_utf8_unchecked(buf) - })) + let v = unsafe { std::str::from_utf8_unchecked(&buf) }; + let t = DateTime::::parse_from_str(v, TIMESTAMP_TIMEZONE_FORMAT)?; + Ok(Value::TimestampTz(t)) } fn read_bitmap>(&self, reader: &mut Cursor) -> Result { @@ -2034,6 +2074,11 @@ impl months_days_micros { } } +#[derive(Debug, Copy, Clone, Default, PartialEq, PartialOrd, Ord, Eq, Hash)] +#[allow(non_camel_case_types)] +#[repr(C)] +pub struct timestamp_tz(pub i128); + // From implementations for basic types to Value impl From<&String> for Value { fn from(s: &String) -> Self { @@ -2233,7 +2278,10 @@ impl Value { let dt = dt.with_timezone(tz); format!("'{}'", dt.format(TIMESTAMP_FORMAT)) } - Value::TimestampTz(t) => format!("'{t}'"), + Value::TimestampTz(dt) => { + let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT); + format!("'{formatted}'") + } Value::Date(d) => { let date = NaiveDate::from_num_days_from_ce_opt(*d + DAYS_FROM_CE).unwrap(); format!("'{}'", date.format("%Y-%m-%d")) diff --git a/tests/nox/noxfile.py b/tests/nox/noxfile.py index 60046154..c4f60370 100644 --- a/tests/nox/noxfile.py +++ b/tests/nox/noxfile.py @@ -15,6 +15,7 @@ import nox import os + def generate_params1(): for db_version in ["1.2.803", "1.2.791"]: for body_format in ["arrow", "json"]: @@ -23,6 +24,7 @@ def generate_params1(): continue yield nox.param(db_version, body_format) + @nox.session @nox.parametrize(["db_version", "body_format"], generate_params1()) def new_driver_with_old_servers(session, db_version, body_format): @@ -40,7 +42,7 @@ def new_driver_with_old_servers(session, db_version, body_format): "DATABEND_QUERY_VERSION": query_version, "DATABEND_META_VERSION": query_version, "DB_VERSION": db_version, - "BODY_FORMAT": body_format + "BODY_FORMAT": body_format, } session.run("make", "test-bindings-python", env=env) session.run("make", "down") @@ -61,9 +63,6 @@ def new_test_with_old_drivers(session, driver_version, body_format): session.install("behave") session.install(f"databend-driver=={driver_version}") with session.chdir(".."): - env = { - "DRIVER_VERSION": driver_version, - "BODY_FORMAT": body_format - } + env = {"DRIVER_VERSION": driver_version, "BODY_FORMAT": body_format} session.run("make", "test-bindings-python", env=env) session.run("make", "down")