From b2853b806002163d15f214b97da96ff54902f308 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 11 Apr 2023 14:15:17 +0800 Subject: [PATCH 1/4] refactor(cluster): prepare exchange dictionary flight data --- .../src/api/rpc/exchange/exchange_manager.rs | 54 ++++++++----------- .../src/api/rpc/exchange/exchange_sink.rs | 13 ++--- .../api/rpc/exchange/statistics_receiver.rs | 4 +- .../src/api/rpc/exchange/statistics_sender.rs | 4 +- .../service/src/api/rpc/flight_client.rs | 34 ++++++------ src/query/service/src/lib.rs | 1 + 6 files changed, 49 insertions(+), 61 deletions(-) diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 1e0c30abdd6e3..657323c2e3366 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -312,7 +312,7 @@ impl DataExchangeManager { match queries_coordinator.get_mut(&query_id) { None => Err(ErrorCode::Internal("Query not exists.")), Some(query_coordinator) => { - query_coordinator.fragment_exchanges.clear(); + assert!(query_coordinator.fragment_exchanges.is_empty()); let injector = DefaultExchangeInjector::create(); let mut build_res = query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?; @@ -343,9 +343,9 @@ impl DataExchangeManager { pub fn get_flight_sender(&self, params: &ExchangeParams) -> Result> { let queries_coordinator_guard = self.queries_coordinator.lock(); - let queries_coordinator = unsafe { &*queries_coordinator_guard.deref().get() }; + let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; - match queries_coordinator.get(¶ms.get_query_id()) { + match queries_coordinator.get_mut(¶ms.get_query_id()) { None => Err(ErrorCode::Internal("Query not exists.")), Some(coordinator) => coordinator.get_flight_senders(params), } @@ -353,9 +353,9 @@ impl DataExchangeManager { pub fn get_flight_receiver(&self, params: &ExchangeParams) -> Result> { let queries_coordinator_guard = self.queries_coordinator.lock(); - let queries_coordinator = unsafe { &*queries_coordinator_guard.deref().get() }; + let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() }; - match queries_coordinator.get(¶ms.get_query_id()) { + match queries_coordinator.get_mut(¶ms.get_query_id()) { None => Err(ErrorCode::Internal("Query not exists.")), Some(coordinator) => coordinator.get_flight_receiver(params), } @@ -474,30 +474,25 @@ impl QueryCoordinator { Ok(()) } - pub fn get_flight_senders(&self, params: &ExchangeParams) -> Result> { + pub fn get_flight_senders(&mut self, params: &ExchangeParams) -> Result> { match params { - ExchangeParams::MergeExchange(params) => { - let mut exchanges = vec![]; - for ((_target, fragment, role), exchange) in &self.fragment_exchanges { - if *fragment == params.fragment_id && *role == FLIGHT_SENDER { - exchanges.push(exchange.as_sender()); - } - } - - Ok(exchanges) - } + ExchangeParams::MergeExchange(params) => Ok(self + .fragment_exchanges + .drain_filter(|(_, f, r), _| f == ¶ms.fragment_id && *r == FLIGHT_SENDER) + .map(|(_, v)| v.convert_to_sender()) + .collect::>()), ExchangeParams::ShuffleExchange(params) => { let mut exchanges = Vec::with_capacity(params.destination_ids.len()); for destination in ¶ms.destination_ids { exchanges.push(match destination == ¶ms.executor_id { true => Ok(FlightSender::create(async_channel::bounded(1).0)), - false => match self.fragment_exchanges.get(&( + false => match self.fragment_exchanges.remove(&( destination.clone(), params.fragment_id, FLIGHT_SENDER, )) { - Some(exchange_channel) => Ok(exchange_channel.as_sender()), + Some(exchange_channel) => Ok(exchange_channel.convert_to_sender()), None => Err(ErrorCode::UnknownFragmentExchange(format!( "Unknown fragment exchange channel, {}, {}", destination, params.fragment_id @@ -511,30 +506,25 @@ impl QueryCoordinator { } } - pub fn get_flight_receiver(&self, params: &ExchangeParams) -> Result> { + pub fn get_flight_receiver(&mut self, params: &ExchangeParams) -> Result> { match params { - ExchangeParams::MergeExchange(params) => { - let mut exchanges = vec![]; - for ((_target, fragment, role), exchange) in &self.fragment_exchanges { - if *fragment == params.fragment_id && *role == FLIGHT_RECEIVER { - exchanges.push(exchange.as_receiver()); - } - } - - Ok(exchanges) - } + ExchangeParams::MergeExchange(params) => Ok(self + .fragment_exchanges + .drain_filter(|(_, f, r), _| f == ¶ms.fragment_id && *r == FLIGHT_RECEIVER) + .map(|(_, v)| v.convert_to_receiver()) + .collect::>()), ExchangeParams::ShuffleExchange(params) => { let mut exchanges = Vec::with_capacity(params.destination_ids.len()); for destination in ¶ms.destination_ids { exchanges.push(match destination == ¶ms.executor_id { true => Ok(FlightReceiver::create(async_channel::bounded(1).1)), - false => match self.fragment_exchanges.get(&( + false => match self.fragment_exchanges.remove(&( destination.clone(), params.fragment_id, FLIGHT_RECEIVER, )) { - Some(v) => Ok(v.as_receiver()), + Some(v) => Ok(v.convert_to_receiver()), _ => Err(ErrorCode::UnknownFragmentExchange(format!( "Unknown fragment flight receiver, {}, {}", destination, params.fragment_id @@ -696,7 +686,7 @@ impl QueryCoordinator { let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; - self.fragment_exchanges.clear(); + assert!(self.fragment_exchanges.is_empty()); let info_mut = self.info.as_mut().expect("Query info is None"); info_mut.query_executor = Some(executor.clone()); diff --git a/src/query/service/src/api/rpc/exchange/exchange_sink.rs b/src/query/service/src/api/rpc/exchange/exchange_sink.rs index 2757b1930e14e..6b22db5fecc23 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_sink.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_sink.rs @@ -23,8 +23,8 @@ use common_pipeline_core::pipe::PipeItem; use common_pipeline_core::processors::processor::ProcessorPtr; use crate::api::rpc::exchange::exchange_params::ExchangeParams; +use crate::api::rpc::exchange::exchange_sink_writer::create_writer_item; use crate::api::rpc::exchange::exchange_sink_writer::create_writer_items; -use crate::api::rpc::exchange::exchange_sink_writer::ExchangeWriterSink; use crate::api::rpc::exchange::exchange_sorting::ExchangeSorting; use crate::api::rpc::exchange::exchange_sorting::TransformExchangeSorting; use crate::api::rpc::exchange::exchange_transform_shuffle::exchange_shuffle; @@ -73,14 +73,11 @@ impl ExchangeSink { )])); } + pipeline.resize(1)?; assert_eq!(flight_senders.len(), 1); - let flight_sender = flight_senders.remove(0); - pipeline.add_sink(|input| { - Ok(ProcessorPtr::create(ExchangeWriterSink::create( - input, - flight_sender.clone(), - ))) - }) + let item = create_writer_item(flight_senders.remove(0)); + pipeline.add_pipe(Pipe::create(1, 0, vec![item])); + Ok(()) } ExchangeParams::ShuffleExchange(params) => { exchange_shuffle(params, pipeline)?; diff --git a/src/query/service/src/api/rpc/exchange/statistics_receiver.rs b/src/query/service/src/api/rpc/exchange/statistics_receiver.rs index 0ca7b477ad97d..2c07867755568 100644 --- a/src/query/service/src/api/rpc/exchange/statistics_receiver.rs +++ b/src/query/service/src/api/rpc/exchange/statistics_receiver.rs @@ -56,10 +56,10 @@ impl StatisticsReceiver { let (tx, rx) = match (exchanges.remove(0), exchanges.remove(0)) { (tx @ FlightExchange::Sender { .. }, rx @ FlightExchange::Receiver { .. }) => { - (tx.as_sender(), rx.as_receiver()) + (tx.convert_to_sender(), rx.convert_to_receiver()) } (rx @ FlightExchange::Receiver { .. }, tx @ FlightExchange::Sender { .. }) => { - (tx.as_sender(), rx.as_receiver()) + (tx.convert_to_sender(), rx.convert_to_receiver()) } _ => unreachable!(), }; diff --git a/src/query/service/src/api/rpc/exchange/statistics_sender.rs b/src/query/service/src/api/rpc/exchange/statistics_sender.rs index a353144ada2b2..a55042e8c7016 100644 --- a/src/query/service/src/api/rpc/exchange/statistics_sender.rs +++ b/src/query/service/src/api/rpc/exchange/statistics_sender.rs @@ -48,10 +48,10 @@ impl StatisticsSender { let (tx, rx) = match (exchanges.remove(0), exchanges.remove(0)) { (tx @ FlightExchange::Sender { .. }, rx @ FlightExchange::Receiver { .. }) => { - (tx.as_sender(), rx.as_receiver()) + (tx.convert_to_sender(), rx.convert_to_receiver()) } (rx @ FlightExchange::Receiver { .. }, tx @ FlightExchange::Sender { .. }) => { - (tx.as_sender(), rx.as_receiver()) + (tx.convert_to_sender(), rx.convert_to_receiver()) } _ => unreachable!(), }; diff --git a/src/query/service/src/api/rpc/flight_client.rs b/src/query/service/src/api/rpc/flight_client.rs index 558c8f4b0079e..b482f3b680ad0 100644 --- a/src/query/service/src/api/rpc/flight_client.rs +++ b/src/query/service/src/api/rpc/flight_client.rs @@ -201,17 +201,17 @@ pub struct FlightSender { tx: Sender>, } -impl Clone for FlightSender { - fn clone(&self) -> Self { - self.state.strong_count.fetch_add(1, Ordering::SeqCst); - - FlightSender { - tx: self.tx.clone(), - state: self.state.clone(), - dropped: AtomicBool::new(false), - } - } -} +// impl Clone for FlightSender { +// fn clone(&self) -> Self { +// self.state.strong_count.fetch_add(1, Ordering::SeqCst); +// +// FlightSender { +// tx: self.tx.clone(), +// state: self.state.clone(), +// dropped: AtomicBool::new(false), +// } +// } +// } impl Drop for FlightSender { fn drop(&mut self) { @@ -288,14 +288,14 @@ impl FlightExchange { } } - pub fn as_sender(&self) -> FlightSender { + pub fn convert_to_sender(self) -> FlightSender { match self { FlightExchange::Sender { state, sender } => { state.strong_count.fetch_add(1, Ordering::SeqCst); FlightSender { - tx: sender.clone(), - state: state.clone(), + state, + tx: sender, dropped: AtomicBool::new(false), } } @@ -303,14 +303,14 @@ impl FlightExchange { } } - pub fn as_receiver(&self) -> FlightReceiver { + pub fn convert_to_receiver(self) -> FlightReceiver { match self { FlightExchange::Receiver { state, receiver } => { state.strong_count.fetch_add(1, Ordering::SeqCst); FlightReceiver { - rx: receiver.clone(), - state: state.clone(), + state, + rx: receiver, dropped: AtomicBool::new(false), } } diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index 336956916d029..341a0380a52ba 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -28,6 +28,7 @@ #![feature(cursor_remaining)] #![feature(vec_into_raw_parts)] #![feature(associated_type_bounds)] +#![feature(hash_drain_filter)] extern crate core; From 1ba7a24c7f9eb1f2a49105fe50923bbf9a97836e Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 11 Apr 2023 14:33:48 +0800 Subject: [PATCH 2/4] refactor(cluster): remove uesless code --- .../service/src/api/rpc/flight_client.rs | 110 ++---------------- 1 file changed, 10 insertions(+), 100 deletions(-) diff --git a/src/query/service/src/api/rpc/flight_client.rs b/src/query/service/src/api/rpc/flight_client.rs index b482f3b680ad0..3baf3c3cef84d 100644 --- a/src/query/service/src/api/rpc/flight_client.rs +++ b/src/query/service/src/api/rpc/flight_client.rs @@ -14,10 +14,6 @@ use std::convert::TryInto; use std::error::Error; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::sync::Arc; use async_channel::Receiver; use async_channel::Sender; @@ -156,24 +152,12 @@ impl FlightClient { } pub struct FlightReceiver { - state: Arc, - dropped: AtomicBool, rx: Receiver>, } -impl Drop for FlightReceiver { - fn drop(&mut self) { - self.close(); - } -} - impl FlightReceiver { pub fn create(rx: Receiver>) -> FlightReceiver { - FlightReceiver { - rx, - state: State::create(), - dropped: AtomicBool::new(false), - } + FlightReceiver { rx } } #[async_backtrace::framed] @@ -186,46 +170,17 @@ impl FlightReceiver { } pub fn close(&self) { - #[allow(clippy::collapsible_if)] - if !self.dropped.fetch_or(true, Ordering::SeqCst) { - if self.state.strong_count.fetch_sub(1, Ordering::SeqCst) == 1 { - self.rx.close(); - } - } + self.rx.close(); } } pub struct FlightSender { - state: Arc, - dropped: AtomicBool, tx: Sender>, } -// impl Clone for FlightSender { -// fn clone(&self) -> Self { -// self.state.strong_count.fetch_add(1, Ordering::SeqCst); -// -// FlightSender { -// tx: self.tx.clone(), -// state: self.state.clone(), -// dropped: AtomicBool::new(false), -// } -// } -// } - -impl Drop for FlightSender { - fn drop(&mut self) { - self.close(); - } -} - impl FlightSender { pub fn create(tx: Sender>) -> FlightSender { - FlightSender { - state: State::create(), - dropped: AtomicBool::new(false), - tx, - } + FlightSender { tx } } #[async_backtrace::framed] @@ -240,80 +195,35 @@ impl FlightSender { } pub fn close(&self) { - #[allow(clippy::collapsible_if)] - if !self.dropped.fetch_or(true, Ordering::SeqCst) { - if self.state.strong_count.fetch_sub(1, Ordering::SeqCst) == 1 { - self.tx.close(); - } - } - } -} - -pub struct State { - strong_count: AtomicUsize, -} - -impl State { - pub fn create() -> Arc { - Arc::new(State { - strong_count: AtomicUsize::new(0), - }) + self.tx.close(); } } pub enum FlightExchange { Dummy, - Receiver { - state: Arc, - receiver: Receiver>, - }, - Sender { - state: Arc, - sender: Sender>, - }, + Receiver(Receiver>), + Sender(Sender>), } impl FlightExchange { pub fn create_sender(sender: Sender>) -> FlightExchange { - FlightExchange::Sender { - sender, - state: State::create(), - } + FlightExchange::Sender(sender) } pub fn create_receiver(receiver: Receiver>) -> FlightExchange { - FlightExchange::Receiver { - receiver, - state: State::create(), - } + FlightExchange::Receiver(receiver) } pub fn convert_to_sender(self) -> FlightSender { match self { - FlightExchange::Sender { state, sender } => { - state.strong_count.fetch_add(1, Ordering::SeqCst); - - FlightSender { - state, - tx: sender, - dropped: AtomicBool::new(false), - } - } + FlightExchange::Sender(tx) => FlightSender { tx }, _ => unreachable!(), } } pub fn convert_to_receiver(self) -> FlightReceiver { match self { - FlightExchange::Receiver { state, receiver } => { - state.strong_count.fetch_add(1, Ordering::SeqCst); - - FlightReceiver { - state, - rx: receiver, - dropped: AtomicBool::new(false), - } - } + FlightExchange::Receiver(rx) => FlightReceiver { rx }, _ => unreachable!(), } } From 00388f833f742b4032e81ad549f24fd66b45274c Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 11 Apr 2023 21:04:54 +0800 Subject: [PATCH 3/4] refactor(cluster): add dictionary serialize --- .../api/rpc/exchange/exchange_sink_writer.rs | 19 +++++++----- .../exchange/serde/exchange_deserializer.rs | 1 + .../rpc/exchange/serde/exchange_serializer.rs | 31 +++++++++---------- .../api/rpc/exchange/statistics_receiver.rs | 1 + .../src/api/rpc/exchange/statistics_sender.rs | 1 + .../service/src/api/rpc/flight_client.rs | 2 +- .../src/api/rpc/packets/packet_data.rs | 31 ++++++++++--------- 7 files changed, 47 insertions(+), 39 deletions(-) diff --git a/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs b/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs index eb1b6ec561579..ca2f803c648f5 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs @@ -51,7 +51,7 @@ impl AsyncSink for ExchangeWriterSink { #[async_trait::unboxed_simple] #[async_backtrace::framed] async fn consume(&mut self, mut data_block: DataBlock) -> Result { - let mut serialize_meta = match data_block.take_meta() { + let serialize_meta = match data_block.take_meta() { None => Err(ErrorCode::Internal( "ExchangeWriterSink only recv ExchangeSerializeMeta.", )), @@ -63,14 +63,17 @@ impl AsyncSink for ExchangeWriterSink { }, }?; - match serialize_meta.packet.take() { - None => Ok(false), - Some(packet) => match self.flight_sender.send(packet).await { - Ok(_) => Ok(false), - Err(error) if error.code() == ErrorCode::ABORTED_QUERY => Ok(true), - Err(error) => Err(error), - }, + for packet in serialize_meta.packet { + if let Err(error) = self.flight_sender.send(packet).await { + if error.code() == ErrorCode::ABORTED_QUERY { + return Ok(true); + } + + return Err(error); + } } + + Ok(false) } } diff --git a/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs b/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs index 5a80d52b4ce19..8d908d11b021e 100644 --- a/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs +++ b/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs @@ -176,6 +176,7 @@ impl Processor for TransformExchangeDeserializer { if let Some(exchange_meta) = ExchangeDeserializeMeta::downcast_from(block_meta) { self.output_data = Some(match exchange_meta.packet.unwrap() { DataPacket::ErrorCode(v) => Err(v), + DataPacket::Dictionary(_) => unreachable!(), DataPacket::FetchProgressAndPrecommit => unreachable!(), DataPacket::ProgressAndPrecommit { .. } => unreachable!(), DataPacket::FragmentData(v) => self.recv_data(v), diff --git a/src/query/service/src/api/rpc/exchange/serde/exchange_serializer.rs b/src/query/service/src/api/rpc/exchange/serde/exchange_serializer.rs index 032b9d9097f2d..755a54493fa1e 100644 --- a/src/query/service/src/api/rpc/exchange/serde/exchange_serializer.rs +++ b/src/query/service/src/api/rpc/exchange/serde/exchange_serializer.rs @@ -46,11 +46,11 @@ use crate::api::FragmentData; pub struct ExchangeSerializeMeta { pub block_number: isize, - pub packet: Option, + pub packet: Vec, } impl ExchangeSerializeMeta { - pub fn create(block_number: isize, packet: Option) -> BlockMetaInfoPtr { + pub fn create(block_number: isize, packet: Vec) -> BlockMetaInfoPtr { Box::new(ExchangeSerializeMeta { packet, block_number, @@ -215,7 +215,8 @@ pub fn serialize_block( ) -> Result { if data_block.is_empty() && data_block.get_meta().is_none() { return Ok(DataBlock::empty_with_meta(ExchangeSerializeMeta::create( - block_num, None, + block_num, + vec![], ))); } @@ -224,24 +225,22 @@ pub fn serialize_block( bincode::serialize_into(&mut meta, &data_block.get_meta()) .map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?; - let values = match data_block.is_empty() { - true => serialize_batch(&Chunk::new(vec![]), &[], options)?.1, + let (dict, values) = match data_block.is_empty() { + true => serialize_batch(&Chunk::new(vec![]), &[], options)?, false => { let chunks = data_block.try_into()?; - let (dicts, values) = serialize_batch(&chunks, ipc_field, options)?; - - if !dicts.is_empty() { - return Err(ErrorCode::Unimplemented( - "DatabendQuery does not implement dicts.", - )); - } - - values + serialize_batch(&chunks, ipc_field, options)? } }; + let mut packet = Vec::with_capacity(dict.len() + 1); + + for dict_flight in dict { + packet.push(DataPacket::Dictionary(dict_flight)); + } + + packet.push(DataPacket::FragmentData(FragmentData::create(meta, values))); Ok(DataBlock::empty_with_meta(ExchangeSerializeMeta::create( - block_num, - Some(DataPacket::FragmentData(FragmentData::create(meta, values))), + block_num, packet, ))) } diff --git a/src/query/service/src/api/rpc/exchange/statistics_receiver.rs b/src/query/service/src/api/rpc/exchange/statistics_receiver.rs index 2c07867755568..4f8e9e502f77a 100644 --- a/src/query/service/src/api/rpc/exchange/statistics_receiver.rs +++ b/src/query/service/src/api/rpc/exchange/statistics_receiver.rs @@ -159,6 +159,7 @@ impl StatisticsReceiver { Ok(None) => Ok(true), Err(transport_error) => Err(transport_error), Ok(Some(DataPacket::ErrorCode(error))) => Err(error), + Ok(Some(DataPacket::Dictionary(_))) => unreachable!(), Ok(Some(DataPacket::FragmentData(_))) => unreachable!(), Ok(Some(DataPacket::FetchProgressAndPrecommit)) => unreachable!(), Ok(Some(DataPacket::ProgressAndPrecommit { diff --git a/src/query/service/src/api/rpc/exchange/statistics_sender.rs b/src/query/service/src/api/rpc/exchange/statistics_sender.rs index a55042e8c7016..6ea6380c6ea79 100644 --- a/src/query/service/src/api/rpc/exchange/statistics_sender.rs +++ b/src/query/service/src/api/rpc/exchange/statistics_sender.rs @@ -146,6 +146,7 @@ impl StatisticsSender { ) -> Result<()> { match command { DataPacket::ErrorCode(_) => unreachable!(), + DataPacket::Dictionary(_) => unreachable!(), DataPacket::FragmentData(_) => unreachable!(), DataPacket::ProgressAndPrecommit { .. } => unreachable!(), DataPacket::FetchProgressAndPrecommit => { diff --git a/src/query/service/src/api/rpc/flight_client.rs b/src/query/service/src/api/rpc/flight_client.rs index 3baf3c3cef84d..1f60a73f1d68d 100644 --- a/src/query/service/src/api/rpc/flight_client.rs +++ b/src/query/service/src/api/rpc/flight_client.rs @@ -185,7 +185,7 @@ impl FlightSender { #[async_backtrace::framed] pub async fn send(&self, data: DataPacket) -> Result<()> { - if let Err(_cause) = self.tx.send(Ok(FlightData::from(data))).await { + if let Err(_cause) = self.tx.send(Ok(FlightData::try_from(data)?)).await { return Err(ErrorCode::AbortedQuery( "Aborted query, because the remote flight channel is closed.", )); diff --git a/src/query/service/src/api/rpc/packets/packet_data.rs b/src/query/service/src/api/rpc/packets/packet_data.rs index 4bb80f5f9b328..36efaed60faf3 100644 --- a/src/query/service/src/api/rpc/packets/packet_data.rs +++ b/src/query/service/src/api/rpc/packets/packet_data.rs @@ -14,6 +14,7 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::vec; use byteorder::BigEndian; use byteorder::ReadBytesExt; @@ -49,6 +50,7 @@ impl Debug for FragmentData { pub enum DataPacket { ErrorCode(ErrorCode), + Dictionary(FlightData), FragmentData(FragmentData), FetchProgressAndPrecommit, ProgressAndPrecommit { @@ -57,9 +59,11 @@ pub enum DataPacket { }, } -impl From for FlightData { - fn from(packet: DataPacket) -> Self { - match packet { +impl TryFrom for FlightData { + type Error = ErrorCode; + + fn try_from(packet: DataPacket) -> Result { + Ok(match packet { DataPacket::ErrorCode(error) => { error!("Got error code data packet: {:?}", error); FlightData::from(error) @@ -76,23 +80,17 @@ impl From for FlightData { precommit, } => { let mut data_body = vec![]; - data_body - .write_u64::(progress.len() as u64) - .unwrap(); - data_body - .write_u64::(precommit.len() as u64) - .unwrap(); + data_body.write_u64::(progress.len() as u64)?; + data_body.write_u64::(precommit.len() as u64)?; // Progress. - // TODO(winter): remove unwrap. for progress_info in progress { - progress_info.write(&mut data_body).unwrap(); + progress_info.write(&mut data_body)?; } // Pre-commit. - // TODO(winter): remove unwrap. for precommit_block in precommit { - precommit_block.write(&mut data_body).unwrap(); + precommit_block.write(&mut data_body)?; } FlightData { @@ -102,7 +100,11 @@ impl From for FlightData { app_metadata: vec![0x04], } } - } + DataPacket::Dictionary(mut flight_data) => { + flight_data.app_metadata.push(0x05); + flight_data + } + }) } } @@ -154,6 +156,7 @@ impl TryFrom for DataPacket { progress: progress_info, }) } + 0x05 => Ok(DataPacket::Dictionary(flight_data)), _ => Err(ErrorCode::BadBytes("Unknown flight data packet type.")), } } From 41301f331cea9f56c475bf186eeb222f768ba578 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 11 Apr 2023 21:53:09 +0800 Subject: [PATCH 4/4] refactor(cluster): add dictionary deserialize --- .../rpc/exchange/exchange_source_reader.rs | 26 +++++++++----- .../exchange/serde/exchange_deserializer.rs | 34 +++++++++++++------ 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs b/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs index fdba7a5aa076b..a4c2072cb3246 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs @@ -33,7 +33,7 @@ use crate::api::DataPacket; pub struct ExchangeSourceReader { finished: bool, output: Arc, - output_data: Option, + output_data: Vec, flight_receiver: FlightReceiver, } @@ -43,7 +43,7 @@ impl ExchangeSourceReader { output, flight_receiver, finished: false, - output_data: None, + output_data: vec![], })) } } @@ -77,8 +77,9 @@ impl Processor for ExchangeSourceReader { return Ok(Event::NeedConsume); } - if let Some(data_packet) = self.output_data.take() { - let exchange_source_meta = ExchangeDeserializeMeta::create(data_packet); + if !self.output_data.is_empty() { + let packets = std::mem::take(&mut self.output_data); + let exchange_source_meta = ExchangeDeserializeMeta::create(packets); self.output .push_data(Ok(DataBlock::empty_with_meta(exchange_source_meta))); } @@ -88,11 +89,20 @@ impl Processor for ExchangeSourceReader { #[async_backtrace::framed] async fn async_process(&mut self) -> common_exception::Result<()> { - if self.output_data.is_none() { - if let Some(output_data) = self.flight_receiver.recv().await? { - self.output_data = Some(output_data); - return Ok(()); + if self.output_data.is_empty() { + let mut dictionaries = Vec::new(); + + while let Some(output_data) = self.flight_receiver.recv().await? { + if !matches!(&output_data, DataPacket::Dictionary(_)) { + dictionaries.push(output_data); + self.output_data = dictionaries; + return Ok(()); + } + + dictionaries.push(output_data); } + + assert!(dictionaries.is_empty()); } if !self.finished { diff --git a/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs b/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs index 8d908d11b021e..0743704bf352b 100644 --- a/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs +++ b/src/query/service/src/api/rpc/exchange/serde/exchange_deserializer.rs @@ -20,6 +20,8 @@ use std::sync::Arc; use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::flight::default_ipc_fields; use common_arrow::arrow::io::flight::deserialize_batch; +use common_arrow::arrow::io::flight::deserialize_dictionary; +use common_arrow::arrow::io::ipc::read::Dictionaries; use common_arrow::arrow::io::ipc::IpcSchema; use common_exception::ErrorCode; use common_exception::Result; @@ -76,7 +78,7 @@ impl TransformExchangeDeserializer { })) } - fn recv_data(&self, fragment_data: FragmentData) -> Result { + fn recv_data(&self, dict: Vec, fragment_data: FragmentData) -> Result { const ROW_HEADER_SIZE: usize = std::mem::size_of::(); let meta = match bincode::deserialize(&fragment_data.get_meta()[ROW_HEADER_SIZE..]) { @@ -93,11 +95,24 @@ impl TransformExchangeDeserializer { return Ok(DataBlock::new_with_meta(vec![], 0, meta)); } + let mut dictionaries = Dictionaries::new(); + + for dict_packet in dict { + if let DataPacket::Dictionary(ff) = dict_packet { + deserialize_dictionary( + &ff, + &self.arrow_schema.fields, + &self.ipc_schema, + &mut dictionaries, + )?; + } + } + let batch = deserialize_batch( &fragment_data.data, &self.arrow_schema.fields, &self.ipc_schema, - &Default::default(), + &dictionaries, )?; let data_block = DataBlock::from_arrow_chunk(&batch, &self.schema)?; @@ -173,13 +188,14 @@ impl Processor for TransformExchangeDeserializer { fn process(&mut self) -> Result<()> { if let Some(mut data) = self.input_data.take() { if let Some(block_meta) = data.take_meta() { - if let Some(exchange_meta) = ExchangeDeserializeMeta::downcast_from(block_meta) { - self.output_data = Some(match exchange_meta.packet.unwrap() { + if let Some(mut exchange_meta) = ExchangeDeserializeMeta::downcast_from(block_meta) + { + self.output_data = Some(match exchange_meta.packet.pop().unwrap() { DataPacket::ErrorCode(v) => Err(v), DataPacket::Dictionary(_) => unreachable!(), DataPacket::FetchProgressAndPrecommit => unreachable!(), DataPacket::ProgressAndPrecommit { .. } => unreachable!(), - DataPacket::FragmentData(v) => self.recv_data(v), + DataPacket::FragmentData(v) => self.recv_data(exchange_meta.packet, v), }?); return Ok(()); @@ -196,14 +212,12 @@ impl Processor for TransformExchangeDeserializer { } pub struct ExchangeDeserializeMeta { - pub packet: Option, + pub packet: Vec, } impl ExchangeDeserializeMeta { - pub fn create(packet: DataPacket) -> BlockMetaInfoPtr { - Box::new(ExchangeDeserializeMeta { - packet: Some(packet), - }) + pub fn create(packet: Vec) -> BlockMetaInfoPtr { + Box::new(ExchangeDeserializeMeta { packet }) } }