From 7c200a1e4e160bd00f881535b0a437da90828ad5 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Wed, 19 Feb 2025 16:37:09 +0300 Subject: [PATCH 1/3] Fix IPC arrow format issue --- crates/control_plane/Cargo.toml | 1 - crates/control_plane/src/models/mod.rs | 48 ++++-- crates/control_plane/src/service.rs | 15 +- crates/control_plane/src/utils.rs | 203 ++++++++++++++++++------- crates/nexus/src/http/dbt/handlers.rs | 11 +- 5 files changed, 198 insertions(+), 80 deletions(-) diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index ac9ea17aa..f3c04bbbf 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -20,7 +20,6 @@ datafusion-functions-json = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion_iceberg = { workspace = true } -flatbuffers = { version = "24.3.25" } futures = { workspace = true } iceberg-rest-catalog = { workspace = true } iceberg-rust = { workspace = true } diff --git a/crates/control_plane/src/models/mod.rs b/crates/control_plane/src/models/mod.rs index 403ed99f7..e70331935 100644 --- a/crates/control_plane/src/models/mod.rs +++ b/crates/control_plane/src/models/mod.rs @@ -1,5 +1,5 @@ use arrow::array::RecordBatch; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, TimeUnit}; use chrono::{NaiveDateTime, Utc}; use iceberg_rust::object_store::ObjectStoreBuilder; use object_store::aws::AmazonS3Builder; @@ -481,17 +481,25 @@ impl ColumnInfo { DataType::Date32 | DataType::Date64 => { column_info.r#type = "date".to_string(); } - DataType::Timestamp(_, _) => { + DataType::Timestamp(unit, _) => { column_info.r#type = "timestamp_ntz".to_string(); column_info.precision = Some(0); - column_info.scale = Some(9); + let scale = match unit { + TimeUnit::Second => 0, + TimeUnit::Millisecond => 3, + TimeUnit::Microsecond => 6, + TimeUnit::Nanosecond => 9, + }; + column_info.scale = Some(scale); } DataType::Binary => { column_info.r#type = "binary".to_string(); column_info.byte_length = Some(8_388_608); column_info.length = Some(8_388_608); } - _ => {} + _ => { + column_info.r#type = "text".to_string(); + } } column_info } @@ -704,16 +712,20 @@ mod tests { assert_eq!(column_info.name, "test_field"); assert_eq!(column_info.r#type, "date"); - let field = Field::new( - "test_field", - DataType::Timestamp(TimeUnit::Second, None), - false, - ); - let column_info = ColumnInfo::from_field(&field); - assert_eq!(column_info.name, "test_field"); - assert_eq!(column_info.r#type, "timestamp_ntz"); - assert_eq!(column_info.precision.unwrap(), 0); - assert_eq!(column_info.scale.unwrap(), 9); + let units = [ + (TimeUnit::Second, 0), + (TimeUnit::Millisecond, 3), + (TimeUnit::Microsecond, 6), + (TimeUnit::Nanosecond, 9), + ]; + for (unit, scale) in units { + let field = Field::new("test_field", DataType::Timestamp(unit, None), false); + let column_info = ColumnInfo::from_field(&field); + assert_eq!(column_info.name, "test_field"); + assert_eq!(column_info.r#type, "timestamp_ntz"); + assert_eq!(column_info.precision.unwrap(), 0); + assert_eq!(column_info.scale.unwrap(), scale); + } let field = Field::new("test_field", DataType::Binary, false); let column_info = ColumnInfo::from_field(&field); @@ -721,6 +733,14 @@ mod tests { assert_eq!(column_info.r#type, "binary"); assert_eq!(column_info.byte_length.unwrap(), 8_388_608); assert_eq!(column_info.length.unwrap(), 8_388_608); + + // Фтн щерук ензу + let field = Field::new("test_field", DataType::Utf8View, false); + let column_info = ColumnInfo::from_field(&field); + assert_eq!(column_info.name, "test_field"); + assert_eq!(column_info.r#type, "text"); + assert_eq!(column_info.byte_length, None); + assert_eq!(column_info.length, None); } #[tokio::test] diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index d4ef3244a..e6fa41d58 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -2,7 +2,7 @@ use crate::error::{self, ControlPlaneError, ControlPlaneResult}; use crate::models::{ColumnInfo, Credentials, StorageProfile, StorageProfileCreateRequest}; use crate::models::{Warehouse, WarehouseCreateRequest}; use crate::repository::{StorageProfileRepository, WarehouseRepository}; -use crate::utils::convert_record_batches; +use crate::utils::{convert_record_batches, Config}; use arrow::record_batch::RecordBatch; use arrow_json::writer::JsonArray; use arrow_json::WriterBuilder; @@ -80,12 +80,14 @@ pub trait ControlService: Send + Sync { async fn create_session(&self, session_id: String) -> ControlPlaneResult<()>; async fn delete_session(&self, session_id: String) -> ControlPlaneResult<()>; + fn config(&self) -> &Config; } pub struct ControlServiceImpl { storage_profile_repo: Arc, warehouse_repo: Arc, df_sessions: Arc>>, + config: Config, } impl ControlServiceImpl { @@ -98,6 +100,7 @@ impl ControlServiceImpl { storage_profile_repo, warehouse_repo, df_sessions, + config: Config::default(), } } } @@ -327,8 +330,11 @@ impl ControlService for ControlServiceImpl { .context(crate::error::ExecutionSnafu)? .into_iter() .collect::>(); + + let serialization_format = self.config().dbt_serialization_format; // Add columns dbt metadata to each field - convert_record_batches(records).context(crate::error::DataFusionQuerySnafu { query }) + convert_record_batches(records, serialization_format) + .context(error::DataFusionQuerySnafu { query }) } #[tracing::instrument(level = "debug", skip(self))] @@ -538,12 +544,15 @@ impl ControlService for ControlServiceImpl { Ok(()) } + + fn config(&self) -> &Config { + &self.config + } } #[cfg(test)] #[allow(clippy::unwrap_used, clippy::expect_used)] mod tests { - use super::*; use crate::error::ControlPlaneError; use crate::models::{ diff --git a/crates/control_plane/src/utils.rs b/crates/control_plane/src/utils.rs index 5b553f53a..10b0a809d 100644 --- a/crates/control_plane/src/utils.rs +++ b/crates/control_plane/src/utils.rs @@ -1,6 +1,6 @@ use crate::models::ColumnInfo; use arrow::array::{ - Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + Array, Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UnionArray, }; use arrow::datatypes::{Field, Schema, TimeUnit}; @@ -9,7 +9,45 @@ use chrono::DateTime; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::datatypes::DataType; use datafusion::common::Result as DataFusionResult; +use std::fmt::Display; use std::sync::Arc; +use std::{env, fmt}; + +pub struct Config { + pub dbt_serialization_format: SerializationFormat, +} + +impl Default for Config { + fn default() -> Self { + Self { + dbt_serialization_format: SerializationFormat::new(), + } + } +} +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum SerializationFormat { + Arrow, + Json, +} + +impl Display for SerializationFormat { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Arrow => write!(f, "arrow"), + Self::Json => write!(f, "json"), + } + } +} + +impl SerializationFormat { + fn new() -> Self { + let var = env::var("DBT_SERIALIZATION_FORMAT").unwrap_or_else(|_| "json".to_string()); + match var.as_str() { + "arrow" => Self::Arrow, + _ => Self::Json, + } + } +} #[must_use] pub fn first_non_empty_type(union_array: &UnionArray) -> Option<(DataType, ArrayRef)> { @@ -25,6 +63,7 @@ pub fn first_non_empty_type(union_array: &UnionArray) -> Option<(DataType, Array pub fn convert_record_batches( records: Vec, + serialization_format: SerializationFormat, ) -> DataFusionResult<(Vec, Vec)> { let mut converted_batches = Vec::new(); let column_infos = ColumnInfo::from_batch(&records); @@ -54,7 +93,8 @@ pub fn convert_record_batches( } } DataType::Timestamp(unit, _) => { - let converted_column = convert_timestamp_to_struct(column, *unit); + let converted_column = + convert_timestamp_to_struct(column, *unit, serialization_format); fields.push( Field::new( field.name(), @@ -87,58 +127,95 @@ pub fn convert_record_batches( clippy::as_conversions, clippy::cast_possible_truncation )] -fn convert_timestamp_to_struct(column: &ArrayRef, unit: TimeUnit) -> ArrayRef { - let timestamps: Vec<_> = match unit { - TimeUnit::Second => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|x| { - x.map(|ts| { - let ts = DateTime::from_timestamp(ts, 0).unwrap(); - format!("{}", ts.timestamp()) - }) - }) - .collect(), - TimeUnit::Millisecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|x| { - x.map(|ts| { - let ts = DateTime::from_timestamp_millis(ts).unwrap(); - format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_millis()) - }) - }) - .collect(), - TimeUnit::Microsecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|x| { - x.map(|ts| { - let ts = DateTime::from_timestamp_micros(ts).unwrap(); - format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros()) - }) - }) - .collect(), - TimeUnit::Nanosecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|x| { - x.map(|ts| { - let ts = DateTime::from_timestamp_nanos(ts); - format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_nanos()) - }) - }) - .collect(), - }; - Arc::new(StringArray::from(timestamps)) as ArrayRef +fn convert_timestamp_to_struct( + column: &ArrayRef, + unit: TimeUnit, + ser: SerializationFormat, +) -> ArrayRef { + match ser { + SerializationFormat::Arrow => { + let timestamps: Vec<_> = match unit { + TimeUnit::Second => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect(), + TimeUnit::Millisecond => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect(), + TimeUnit::Microsecond => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect(), + TimeUnit::Nanosecond => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect(), + }; + Arc::new(Int64Array::from(timestamps)) as ArrayRef + } + SerializationFormat::Json => { + let timestamps: Vec<_> = match unit { + TimeUnit::Second => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|x| { + x.map(|ts| { + let ts = DateTime::from_timestamp(ts, 0).unwrap(); + format!("{}", ts.timestamp()) + }) + }) + .collect(), + TimeUnit::Millisecond => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|x| { + x.map(|ts| { + let ts = DateTime::from_timestamp_millis(ts).unwrap(); + format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_millis()) + }) + }) + .collect(), + TimeUnit::Microsecond => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|x| { + x.map(|ts| { + let ts = DateTime::from_timestamp_micros(ts).unwrap(); + format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros()) + }) + }) + .collect(), + TimeUnit::Nanosecond => column + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|x| { + x.map(|ts| { + let ts = DateTime::from_timestamp_nanos(ts); + format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_nanos()) + }) + }) + .collect(), + }; + Arc::new(StringArray::from(timestamps)) as ArrayRef + } + } } #[cfg(test)] @@ -209,7 +286,8 @@ mod tests { Arc::new(TimestampNanosecondArray::from(values)) as ArrayRef } }; - let result = convert_timestamp_to_struct(×tamp_array, *unit); + let result = + convert_timestamp_to_struct(×tamp_array, *unit, SerializationFormat::Json); let string_array = result.as_any().downcast_ref::().unwrap(); assert_eq!(string_array.len(), 2); assert_eq!(string_array.value(0), *expected); @@ -235,7 +313,8 @@ mod tests { ])) as ArrayRef; let batch = RecordBatch::try_new(schema, vec![int_array, timestamp_array]).unwrap(); let records = vec![batch]; - let (converted_batches, column_infos) = convert_record_batches(records).unwrap(); + let (converted_batches, column_infos) = + convert_record_batches(records.clone(), SerializationFormat::Json).unwrap(); let converted_batch = &converted_batches[0]; assert_eq!(converted_batches.len(), 1); @@ -255,5 +334,17 @@ mod tests { assert_eq!(column_infos[0].r#type, "fixed"); assert_eq!(column_infos[1].name, "timestamp_col"); assert_eq!(column_infos[1].r#type, "timestamp_ntz"); + + let (converted_batches, column_infos) = + convert_record_batches(records, SerializationFormat::Arrow).unwrap(); + let converted_batch = &converted_batches[0]; + let converted_timestamp_array = converted_batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(converted_timestamp_array.value(0), 1627846261); + assert!(converted_timestamp_array.is_null(1)); + assert_eq!(converted_timestamp_array.value(2), 1627846262); } } diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index d31badfda..eead04ec9 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -17,6 +17,7 @@ use axum::Json; use base64; use base64::engine::general_purpose::STANDARD as engine_base64; use base64::prelude::*; +use control_plane::utils::SerializationFormat; use flate2::read::GzDecoder; use regex::Regex; use snafu::ResultExt; @@ -24,9 +25,6 @@ use std::io::Read; use tracing::debug; use uuid::Uuid; -// TODO: move out as a configurable parameter -const SERIALIZATION_FORMAT: &str = "json"; // or "arrow" - // https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding // Buffer Alignment and Padding: Implementations are recommended to allocate memory // on aligned addresses (multiple of 8- or 64-bytes) and pad (overallocate) to a @@ -146,18 +144,19 @@ pub async fn query( records_to_json_string(&records)?.as_str() ); + let serialization_format = state.control_svc.config().dbt_serialization_format; let json_resp = Json(JsonResponse { data: Option::from(ResponseData { row_type: columns.into_iter().map(Into::into).collect(), - query_result_format: Option::from(String::from(SERIALIZATION_FORMAT)), - row_set: if SERIALIZATION_FORMAT == "json" { + query_result_format: Some(serialization_format.to_string()), + row_set: if serialization_format == SerializationFormat::Json { Option::from(ResponseData::rows_to_vec( records_to_json_string(&records)?.as_str(), )?) } else { None }, - row_set_base_64: if SERIALIZATION_FORMAT == "arrow" { + row_set_base_64: if serialization_format == SerializationFormat::Arrow { Option::from(records_to_arrow_string(&records)?) } else { None From 17c80667088dad93a357b4b597fc20e9fec6e8af Mon Sep 17 00:00:00 2001 From: osipovartem Date: Wed, 19 Feb 2025 16:44:15 +0300 Subject: [PATCH 2/3] Fix IPC arrow format issue --- crates/control_plane/src/models/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/control_plane/src/models/mod.rs b/crates/control_plane/src/models/mod.rs index e70331935..c8ee5cd0e 100644 --- a/crates/control_plane/src/models/mod.rs +++ b/crates/control_plane/src/models/mod.rs @@ -734,7 +734,7 @@ mod tests { assert_eq!(column_info.byte_length.unwrap(), 8_388_608); assert_eq!(column_info.length.unwrap(), 8_388_608); - // Фтн щерук ензу + // Any other type let field = Field::new("test_field", DataType::Utf8View, false); let column_info = ColumnInfo::from_field(&field); assert_eq!(column_info.name, "test_field"); From 41fb5a16db56c00da63c481e60aa7d2a4313647a Mon Sep 17 00:00:00 2001 From: osipovartem Date: Wed, 19 Feb 2025 18:10:38 +0300 Subject: [PATCH 3/3] Fix comments --- crates/control_plane/src/utils.rs | 76 ++++++++++++------------------- 1 file changed, 28 insertions(+), 48 deletions(-) diff --git a/crates/control_plane/src/utils.rs b/crates/control_plane/src/utils.rs index 10b0a809d..ed73ebb27 100644 --- a/crates/control_plane/src/utils.rs +++ b/crates/control_plane/src/utils.rs @@ -42,7 +42,7 @@ impl Display for SerializationFormat { impl SerializationFormat { fn new() -> Self { let var = env::var("DBT_SERIALIZATION_FORMAT").unwrap_or_else(|_| "json".to_string()); - match var.as_str() { + match var.to_lowercase().as_str() { "arrow" => Self::Arrow, _ => Self::Json, } @@ -122,6 +122,16 @@ pub fn convert_record_batches( Ok((converted_batches.clone(), column_infos)) } +macro_rules! downcast_and_iter { + ($column:expr, $array_type:ty) => { + $column + .as_any() + .downcast_ref::<$array_type>() + .unwrap() + .into_iter() + }; +} + #[allow( clippy::unwrap_used, clippy::as_conversions, @@ -135,40 +145,22 @@ fn convert_timestamp_to_struct( match ser { SerializationFormat::Arrow => { let timestamps: Vec<_> = match unit { - TimeUnit::Second => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .collect(), - TimeUnit::Millisecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .collect(), - TimeUnit::Microsecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .collect(), - TimeUnit::Nanosecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .collect(), + TimeUnit::Second => downcast_and_iter!(column, TimestampSecondArray).collect(), + TimeUnit::Millisecond => { + downcast_and_iter!(column, TimestampMillisecondArray).collect() + } + TimeUnit::Microsecond => { + downcast_and_iter!(column, TimestampMicrosecondArray).collect() + } + TimeUnit::Nanosecond => { + downcast_and_iter!(column, TimestampNanosecondArray).collect() + } }; Arc::new(Int64Array::from(timestamps)) as ArrayRef } SerializationFormat::Json => { let timestamps: Vec<_> = match unit { - TimeUnit::Second => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() + TimeUnit::Second => downcast_and_iter!(column, TimestampSecondArray) .map(|x| { x.map(|ts| { let ts = DateTime::from_timestamp(ts, 0).unwrap(); @@ -176,11 +168,7 @@ fn convert_timestamp_to_struct( }) }) .collect(), - TimeUnit::Millisecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() + TimeUnit::Millisecond => downcast_and_iter!(column, TimestampMillisecondArray) .map(|x| { x.map(|ts| { let ts = DateTime::from_timestamp_millis(ts).unwrap(); @@ -188,11 +176,7 @@ fn convert_timestamp_to_struct( }) }) .collect(), - TimeUnit::Microsecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() + TimeUnit::Microsecond => downcast_and_iter!(column, TimestampMicrosecondArray) .map(|x| { x.map(|ts| { let ts = DateTime::from_timestamp_micros(ts).unwrap(); @@ -200,11 +184,7 @@ fn convert_timestamp_to_struct( }) }) .collect(), - TimeUnit::Nanosecond => column - .as_any() - .downcast_ref::() - .unwrap() - .iter() + TimeUnit::Nanosecond => downcast_and_iter!(column, TimestampNanosecondArray) .map(|x| { x.map(|ts| { let ts = DateTime::from_timestamp_nanos(ts); @@ -335,7 +315,7 @@ mod tests { assert_eq!(column_infos[1].name, "timestamp_col"); assert_eq!(column_infos[1].r#type, "timestamp_ntz"); - let (converted_batches, column_infos) = + let (converted_batches, _) = convert_record_batches(records, SerializationFormat::Arrow).unwrap(); let converted_batch = &converted_batches[0]; let converted_timestamp_array = converted_batch @@ -343,8 +323,8 @@ mod tests { .as_any() .downcast_ref::() .unwrap(); - assert_eq!(converted_timestamp_array.value(0), 1627846261); + assert_eq!(converted_timestamp_array.value(0), 1_627_846_261); assert!(converted_timestamp_array.is_null(1)); - assert_eq!(converted_timestamp_array.value(2), 1627846262); + assert_eq!(converted_timestamp_array.value(2), 1_627_846_262); } }