Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/nodejs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export declare class Connection {
* Load data with stage attachment.
* The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`.
*/
streamLoad(sql: string, data: Array<Array<string>>): Promise<ServerStats>
streamLoad(sql: string, data: Array<Array<string>>, method?: string | undefined | null): Promise<ServerStats>
/**
* Load file with stage attachment.
* The SQL can be `INSERT INTO tbl VALUES` or `REPLACE INTO tbl VALUES`.
Expand Down
7 changes: 6 additions & 1 deletion bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use std::sync::Arc;
use std::{collections::HashMap, path::Path};
use tokio_stream::StreamExt;

const TIMESTAMP_TIMEZONE_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f %z";

static VERSION: Lazy<String> = Lazy::new(|| {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
version.to_string()
Expand Down Expand Up @@ -320,6 +322,10 @@ impl ToNapiValue for Value<'_> {
let v = DateTime::<Tz>::try_from(inner).map_err(format_napi_error)?;
DateTime::to_napi_value(env, v)
}
databend_driver::Value::TimestampTz(dt) => {
let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT);
String::to_napi_value(env, formatted.to_string())
}
databend_driver::Value::Date(_) => {
let inner = val.inner.clone();
let v = NaiveDate::try_from(inner).map_err(format_napi_error)?;
Expand Down Expand Up @@ -359,7 +365,6 @@ impl ToNapiValue for Value<'_> {
String::to_napi_value(env, s.to_string())
}
}
databend_driver::Value::TimestampTz(s) => String::to_napi_value(env, s.to_string()),
databend_driver::Value::Geometry(s) => String::to_napi_value(env, s.to_string()),
databend_driver::Value::Interval(s) => String::to_napi_value(env, s.to_string()),
databend_driver::Value::Geography(s) => String::to_napi_value(env, s.to_string()),
Expand Down
5 changes: 5 additions & 0 deletions bindings/nodejs/tests/binding.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ Then("Select types should be expected native types", async function () {
},
]);
}
// TimestampTz
if (!(DRIVER_VERSION > [0, 30, 3] && DB_VERSION >= [1, 2, 836])) {
const row = await this.conn.queryRow(`SELECT to_datetime_tz('2024-04-16 12:34:56.789 +0800'))`);
assert.deepEqual(row.values(), ["2024-04-16T12:34:56.789 +0800"]);
}
});

Then("Select numbers should iterate all rows", async function () {
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl<'py> IntoPyObject<'py> for Value {
t.into_bound_py_any(py)?
}
}
databend_driver::Value::TimestampTz(t) => t.into_bound_py_any(py)?,
databend_driver::Value::Date(_) => {
let d = NaiveDate::try_from(self.0)
.map_err(|e| PyException::new_err(format!("failed to convert date: {e}")))?;
Expand All @@ -113,7 +114,6 @@ impl<'py> IntoPyObject<'py> for Value {
let tuple = PyTuple::new(py, inner.into_iter().map(Value))?;
tuple.into_bound_py_any(py)?
}
databend_driver::Value::TimestampTz(s) => s.into_bound_py_any(py)?,
databend_driver::Value::Bitmap(s) => s.into_bound_py_any(py)?,
databend_driver::Value::Variant(s) => s.into_bound_py_any(py)?,
databend_driver::Value::Geometry(s) => s.into_bound_py_any(py)?,
Expand Down
22 changes: 20 additions & 2 deletions bindings/python/tests/blocking/steps/binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,34 @@ def _(context):
and DB_VERSION > (1, 2, 836)
and sys.version_info.minor >= 8
):
tz = "Asia/Shanghai"
if sys.version_info.minor >= 9:
from zoneinfo import ZoneInfo

tz_expected = ZoneInfo("Asia/Shanghai")
tz_expected = ZoneInfo(tz)
else:
tz_expected = timezone(timedelta(hours=8))
context.conn.exec("set timezone='Asia/Shanghai'")
context.conn.exec(f"set timezone='{tz}'")
row = context.conn.query_row("select to_datetime('2024-04-16 12:34:56.789')")
exp = datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=tz_expected)
assert row.values()[0] == exp, f"Tuple: {row.values()}"
context.conn.exec("set timezone='UTC'")

# wait for release 1.2.839
# if DB_VERSION >= (1, 2, 839):
# row = context.conn.query_row(
# f"settings(timezone='{tz}') select to_datetime('2024-04-16 12:34:56.789')"
# )
# exp = datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=tz_expected)
# assert row.values()[0] == exp, f"Tuple: {row.values()}"

tz_expected = timezone(timedelta(hours=6))
row = context.conn.query_row(
f"settings(timezone='{tz}') select to_timestamp_tz('2024-04-16 12:34:56.789 +0600')"
)
exp = datetime(2024, 4, 16, 12, 34, 56, 789000, tzinfo=tz_expected)
exp_bug = datetime(2024, 4, 16, 18, 34, 56, 789000, tzinfo=tz_expected)
assert row.values()[0] in (exp, exp_bug), f"Tuple: {row.values()[0]} {exp}"


@then("Select numbers should iterate all rows")
Expand Down
2 changes: 2 additions & 0 deletions sql/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub(crate) const ARROW_EXT_TYPE_GEOMETRY: &str = "Geometry";
pub(crate) const ARROW_EXT_TYPE_GEOGRAPHY: &str = "Geography";
pub(crate) const ARROW_EXT_TYPE_INTERVAL: &str = "Interval";
pub(crate) const ARROW_EXT_TYPE_VECTOR: &str = "Vector";
pub(crate) const ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE: &str = "TimestampTz";

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NumberDataType {
Expand Down Expand Up @@ -321,6 +322,7 @@ impl TryFrom<&Arc<ArrowField>> for Field {
ARROW_EXT_TYPE_GEOMETRY => DataType::Geometry,
ARROW_EXT_TYPE_GEOGRAPHY => DataType::Geography,
ARROW_EXT_TYPE_INTERVAL => DataType::Interval,
ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => DataType::TimestampTz,
ARROW_EXT_TYPE_VECTOR => match f.data_type() {
ArrowDataType::FixedSizeList(field, dimension) => {
let dimension = match field.data_type() {
Expand Down
80 changes: 64 additions & 16 deletions sql/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::cursor_ext::{
collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
ReadNumberExt,
};
use crate::error::{ConvertError, Error, Result};
use crate::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
use arrow_buffer::i256;
use chrono::{DateTime, Datelike, LocalResult, NaiveDate, NaiveDateTime, TimeZone};
use chrono::{DateTime, Datelike, FixedOffset, LocalResult, NaiveDate, NaiveDateTime, TimeZone};
use chrono_tz::Tz;
use geozero::wkb::FromWkb;
use geozero::wkb::WkbDialect;
Expand All @@ -25,18 +31,12 @@ use std::hash::Hash;
use std::io::BufRead;
use std::io::Cursor;

use crate::cursor_ext::{
collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
ReadNumberExt,
};
use crate::error::{ConvertError, Error, Result};
use crate::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};

use {
crate::schema::{
ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR, EXTENSION_KEY,
ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR,
EXTENSION_KEY,
},
arrow_array::{
Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
Expand All @@ -56,6 +56,7 @@ const NULL_VALUE: &str = "NULL";
const TRUE_VALUE: &str = "1";
const FALSE_VALUE: &str = "0";
const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f";
const TIMESTAMP_TIMEZONE_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f %z";

#[derive(Clone, Debug, PartialEq)]
pub enum NumberValue {
Expand Down Expand Up @@ -84,7 +85,7 @@ pub enum Value {
Number(NumberValue),
/// Microseconds from 1970-01-01 00:00:00 UTC
Timestamp(i64, Tz),
TimestampTz(String),
TimestampTz(DateTime<FixedOffset>),
Date(i32),
Array(Vec<Value>),
Map(Vec<(Value, Value)>),
Expand Down Expand Up @@ -233,7 +234,11 @@ impl TryFrom<(&DataType, String, Tz)> for Value {
let ts = dt_with_tz.timestamp_micros();
Ok(Self::Timestamp(ts, tz))
}
DataType::TimestampTz => Ok(Self::TimestampTz(v)),
DataType::TimestampTz => {
let t =
DateTime::<FixedOffset>::parse_from_str(v.as_str(), TIMESTAMP_TIMEZONE_FORMAT)?;
Ok(Self::TimestampTz(t))
}
DataType::Date => Ok(Self::Date(
NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce()
- DAYS_FROM_CE,
Expand Down Expand Up @@ -284,6 +289,34 @@ impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize, Tz)> for Value {
None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
}
}
ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => {
if field.is_nullable() && array.is_null(seq) {
return Ok(Value::Null);
}
match array.as_any().downcast_ref::<Decimal128Array>() {
Some(array) => {
let v = array.value(seq);
let ts = v as u64 as i64;
let offset = (v >> 64) as i32;

let secs = ts / 1_000_000;
let nanos = ((ts % 1_000_000) * 1000) as u32;
let dt = match DateTime::from_timestamp(secs, nanos) {
Some(t) => {
let off = FixedOffset::east_opt(offset).ok_or_else(|| {
Error::Parsing("invalid offset".to_string())
})?;
t.with_timezone(&off)
}
None => {
return Err(ConvertError::new("Datetime", format!("{v}")).into())
}
};
Ok(Value::TimestampTz(dt))
}
None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
}
}
ARROW_EXT_TYPE_INTERVAL => {
if field.is_nullable() && array.is_null(seq) {
return Ok(Value::Null);
Expand Down Expand Up @@ -914,7 +947,6 @@ fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std:
| Value::Bitmap(s)
| Value::Variant(s)
| Value::Interval(s)
| Value::TimestampTz(s)
| Value::Geometry(s)
| Value::Geography(s) => {
if raw {
Expand Down Expand Up @@ -992,6 +1024,14 @@ fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std:
write!(f, "]")?;
Ok(())
}
Value::TimestampTz(dt) => {
let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT);
if raw {
write!(f, "{formatted}")
} else {
write!(f, "'{formatted}'")
}
}
}
}

Expand Down Expand Up @@ -1852,9 +1892,9 @@ impl ValueDecoder {
fn read_timestamp_tz<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
let mut buf = Vec::new();
reader.read_quoted_text(&mut buf, b'\'')?;
Ok(Value::TimestampTz(unsafe {
String::from_utf8_unchecked(buf)
}))
let v = unsafe { std::str::from_utf8_unchecked(&buf) };
let t = DateTime::<FixedOffset>::parse_from_str(v, TIMESTAMP_TIMEZONE_FORMAT)?;
Ok(Value::TimestampTz(t))
}

fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
Expand Down Expand Up @@ -2034,6 +2074,11 @@ impl months_days_micros {
}
}

#[derive(Debug, Copy, Clone, Default, PartialEq, PartialOrd, Ord, Eq, Hash)]
#[allow(non_camel_case_types)]
#[repr(C)]
pub struct timestamp_tz(pub i128);

// From implementations for basic types to Value
impl From<&String> for Value {
fn from(s: &String) -> Self {
Expand Down Expand Up @@ -2233,7 +2278,10 @@ impl Value {
let dt = dt.with_timezone(tz);
format!("'{}'", dt.format(TIMESTAMP_FORMAT))
}
Value::TimestampTz(t) => format!("'{t}'"),
Value::TimestampTz(dt) => {
let formatted = dt.format(TIMESTAMP_TIMEZONE_FORMAT);
format!("'{formatted}'")
}
Value::Date(d) => {
let date = NaiveDate::from_num_days_from_ce_opt(*d + DAYS_FROM_CE).unwrap();
format!("'{}'", date.format("%Y-%m-%d"))
Expand Down
9 changes: 4 additions & 5 deletions tests/nox/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import nox
import os


def generate_params1():
for db_version in ["1.2.803", "1.2.791"]:
for body_format in ["arrow", "json"]:
Expand All @@ -23,6 +24,7 @@ def generate_params1():
continue
yield nox.param(db_version, body_format)


@nox.session
@nox.parametrize(["db_version", "body_format"], generate_params1())
def new_driver_with_old_servers(session, db_version, body_format):
Expand All @@ -40,7 +42,7 @@ def new_driver_with_old_servers(session, db_version, body_format):
"DATABEND_QUERY_VERSION": query_version,
"DATABEND_META_VERSION": query_version,
"DB_VERSION": db_version,
"BODY_FORMAT": body_format
"BODY_FORMAT": body_format,
}
session.run("make", "test-bindings-python", env=env)
session.run("make", "down")
Expand All @@ -61,9 +63,6 @@ def new_test_with_old_drivers(session, driver_version, body_format):
session.install("behave")
session.install(f"databend-driver=={driver_version}")
with session.chdir(".."):
env = {
"DRIVER_VERSION": driver_version,
"BODY_FORMAT": body_format
}
env = {"DRIVER_VERSION": driver_version, "BODY_FORMAT": body_format}
session.run("make", "test-bindings-python", env=env)
session.run("make", "down")
Loading