Skip to content

Commit

Permalink
fix: timestamptz type mismatch (risingwavelabs#9090)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion committed Apr 10, 2023
1 parent ab09d29 commit 5bb0911
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions src/connector/src/sink/mod.rs
Expand Up @@ -299,8 +299,10 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value
(DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
json!(v.to_text())
}
(DataType::Timestamptz, ScalarRefImpl::Timestamp(v)) => {
json!(v.0.and_local_timezone(chrono::Utc).unwrap().to_rfc3339())
(DataType::Timestamptz, ScalarRefImpl::Int64(v)) => {
// risingwave's timestamp with timezone is stored in UTC and does not maintain the
// timezone info and the time is in microsecond.
json!(v)
}
(DataType::Time, ScalarRefImpl::Time(v)) => {
// todo: just ignore the nanos part to avoid leap second complex
Expand Down Expand Up @@ -341,11 +343,10 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value
}
json!(map)
}
_ => {
return Err(ArrayError::internal(format!(
"datum_to_json_object: unsupported data type {:?}, {:?}",
field.data_type, scalar_ref,
)));
(data_type, scalar_ref) => {
return Err(ArrayError::internal(
format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref),
));
}
};

Expand All @@ -356,6 +357,7 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value
mod tests {

use risingwave_common::types::{Interval, ScalarImpl, Time, Timestamp};
use risingwave_expr::vector_op::cast::str_with_time_zone_to_timestamptz;

use super::*;
#[test]
Expand Down Expand Up @@ -401,23 +403,15 @@ mod tests {

// https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/time/ZonedTimestamp.java
let tstz_str = "2018-01-26T18:30:09.453Z";
let tstz_value = datum_to_json_object(
let tstz_inner = str_with_time_zone_to_timestamptz(tstz_str).unwrap();
datum_to_json_object(
&Field {
data_type: DataType::Timestamptz,
..mock_field.clone()
},
Some(
ScalarImpl::Timestamp(
chrono::DateTime::parse_from_rfc3339(tstz_str)
.unwrap()
.naive_utc()
.into(),
)
.as_scalar_ref_impl(),
),
Some(ScalarImpl::Int64(tstz_inner).as_scalar_ref_impl()),
)
.unwrap();
chrono::DateTime::parse_from_rfc3339(tstz_value.as_str().unwrap_or_default()).unwrap();

let ts_value = datum_to_json_object(
&Field {
Expand Down

0 comments on commit 5bb0911

Please sign in to comment.