Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions sql/src/value/arrow_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow_array::{
StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
use chrono::{DateTime, FixedOffset};
use chrono::{FixedOffset, TimeZone};
use chrono_tz::Tz;
use databend_client::schema::{
DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
Expand Down Expand Up @@ -92,22 +92,16 @@ impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize, Tz)> for Value {
match array.as_any().downcast_ref::<Decimal128Array>() {
Some(array) => {
let v = array.value(seq);
let ts = v as u64 as i64;
let unix_ts = v as u64 as i64;
let offset = (v >> 64) as i32;

let secs = ts / 1_000_000;
let nanos = ((ts % 1_000_000) * 1000) as u32;
let dt = match DateTime::from_timestamp(secs, nanos) {
Some(t) => {
let off = FixedOffset::east_opt(offset).ok_or_else(|| {
Error::Parsing("invalid offset".to_string())
})?;
t.with_timezone(&off)
}
None => {
return Err(ConvertError::new("Datetime", format!("{v}")).into())
}
};
let offset = FixedOffset::east_opt(offset)
.ok_or_else(|| Error::Parsing("invalid offset".to_string()))?;
let dt =
offset.timestamp_micros(unix_ts).single().ok_or_else(|| {
Error::Parsing(format!(
"Invalid timestamp_micros {unix_ts} for offset {offset}"
))
})?;
Ok(Value::TimestampTz(dt))
}
None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
Expand Down
24 changes: 8 additions & 16 deletions sql/src/value/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,10 @@ impl TryFrom<Value> for NaiveDateTime {
type Error = Error;
fn try_from(val: Value) -> Result<Self> {
match val {
Value::Timestamp(i, _tz) => {
let secs = i / 1_000_000;
let nanos = ((i % 1_000_000) * 1000) as u32;
match DateTime::from_timestamp(secs, nanos) {
Some(t) => Ok(t.naive_utc()),
None => Err(ConvertError::new("NaiveDateTime", format!("{val}")).into()),
}
}
Value::Timestamp(i, _tz) => match DateTime::from_timestamp_micros(i) {
Some(t) => Ok(t.naive_utc()),
None => Err(ConvertError::new("NaiveDateTime", format!("{val}")).into()),
},
_ => Err(ConvertError::new("NaiveDateTime", format!("{val}")).into()),
}
}
Expand All @@ -126,14 +122,10 @@ impl TryFrom<Value> for DateTime<Tz> {
type Error = Error;
fn try_from(val: Value) -> Result<Self> {
match val {
Value::Timestamp(i, tz) => {
let secs = i / 1_000_000;
let nanos = ((i % 1_000_000) * 1000) as u32;
match DateTime::from_timestamp(secs, nanos) {
Some(t) => Ok(tz.from_utc_datetime(&t.naive_utc())),
None => Err(ConvertError::new("Datetime", format!("{val}")).into()),
}
}
Value::Timestamp(i, tz) => match DateTime::from_timestamp_micros(i) {
Some(t) => Ok(tz.from_utc_datetime(&t.naive_utc())),
None => Err(ConvertError::new("Datetime", format!("{val}")).into()),
},
_ => Err(ConvertError::new("DateTime", format!("{val}")).into()),
}
}
Expand Down
7 changes: 1 addition & 6 deletions sql/src/value/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,7 @@ fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std:
}
}
Value::Timestamp(micros, _tz) => {
let (mut secs, mut nanos) = (*micros / 1_000_000, (*micros % 1_000_000) * 1_000);
if nanos < 0 {
secs -= 1;
nanos += 1_000_000_000;
}
let t = DateTime::from_timestamp(secs, nanos as _).unwrap_or_default();
let t = DateTime::from_timestamp_micros(*micros).unwrap_or_default();
let t = t.naive_utc();
if raw {
write!(f, "{}", t.format(TIMESTAMP_FORMAT))
Expand Down
Loading