Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl DataExchangeManager {
match queries_coordinator.get_mut(&query_id) {
None => Err(ErrorCode::Internal("Query not exists.")),
Some(query_coordinator) => {
query_coordinator.fragment_exchanges.clear();
assert!(query_coordinator.fragment_exchanges.is_empty());
let injector = DefaultExchangeInjector::create();
let mut build_res =
query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?;
Expand Down Expand Up @@ -343,19 +343,19 @@ impl DataExchangeManager {

pub fn get_flight_sender(&self, params: &ExchangeParams) -> Result<Vec<FlightSender>> {
let queries_coordinator_guard = self.queries_coordinator.lock();
let queries_coordinator = unsafe { &*queries_coordinator_guard.deref().get() };
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };

match queries_coordinator.get(&params.get_query_id()) {
match queries_coordinator.get_mut(&params.get_query_id()) {
None => Err(ErrorCode::Internal("Query not exists.")),
Some(coordinator) => coordinator.get_flight_senders(params),
}
}

pub fn get_flight_receiver(&self, params: &ExchangeParams) -> Result<Vec<FlightReceiver>> {
let queries_coordinator_guard = self.queries_coordinator.lock();
let queries_coordinator = unsafe { &*queries_coordinator_guard.deref().get() };
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };

match queries_coordinator.get(&params.get_query_id()) {
match queries_coordinator.get_mut(&params.get_query_id()) {
None => Err(ErrorCode::Internal("Query not exists.")),
Some(coordinator) => coordinator.get_flight_receiver(params),
}
Expand Down Expand Up @@ -474,30 +474,25 @@ impl QueryCoordinator {
Ok(())
}

pub fn get_flight_senders(&self, params: &ExchangeParams) -> Result<Vec<FlightSender>> {
pub fn get_flight_senders(&mut self, params: &ExchangeParams) -> Result<Vec<FlightSender>> {
match params {
ExchangeParams::MergeExchange(params) => {
let mut exchanges = vec![];
for ((_target, fragment, role), exchange) in &self.fragment_exchanges {
if *fragment == params.fragment_id && *role == FLIGHT_SENDER {
exchanges.push(exchange.as_sender());
}
}

Ok(exchanges)
}
ExchangeParams::MergeExchange(params) => Ok(self
.fragment_exchanges
.drain_filter(|(_, f, r), _| f == &params.fragment_id && *r == FLIGHT_SENDER)
.map(|(_, v)| v.convert_to_sender())
.collect::<Vec<_>>()),
ExchangeParams::ShuffleExchange(params) => {
let mut exchanges = Vec::with_capacity(params.destination_ids.len());

for destination in &params.destination_ids {
exchanges.push(match destination == &params.executor_id {
true => Ok(FlightSender::create(async_channel::bounded(1).0)),
false => match self.fragment_exchanges.get(&(
false => match self.fragment_exchanges.remove(&(
destination.clone(),
params.fragment_id,
FLIGHT_SENDER,
)) {
Some(exchange_channel) => Ok(exchange_channel.as_sender()),
Some(exchange_channel) => Ok(exchange_channel.convert_to_sender()),
None => Err(ErrorCode::UnknownFragmentExchange(format!(
"Unknown fragment exchange channel, {}, {}",
destination, params.fragment_id
Expand All @@ -511,30 +506,25 @@ impl QueryCoordinator {
}
}

pub fn get_flight_receiver(&self, params: &ExchangeParams) -> Result<Vec<FlightReceiver>> {
pub fn get_flight_receiver(&mut self, params: &ExchangeParams) -> Result<Vec<FlightReceiver>> {
match params {
ExchangeParams::MergeExchange(params) => {
let mut exchanges = vec![];
for ((_target, fragment, role), exchange) in &self.fragment_exchanges {
if *fragment == params.fragment_id && *role == FLIGHT_RECEIVER {
exchanges.push(exchange.as_receiver());
}
}

Ok(exchanges)
}
ExchangeParams::MergeExchange(params) => Ok(self
.fragment_exchanges
.drain_filter(|(_, f, r), _| f == &params.fragment_id && *r == FLIGHT_RECEIVER)
.map(|(_, v)| v.convert_to_receiver())
.collect::<Vec<_>>()),
ExchangeParams::ShuffleExchange(params) => {
let mut exchanges = Vec::with_capacity(params.destination_ids.len());

for destination in &params.destination_ids {
exchanges.push(match destination == &params.executor_id {
true => Ok(FlightReceiver::create(async_channel::bounded(1).1)),
false => match self.fragment_exchanges.get(&(
false => match self.fragment_exchanges.remove(&(
destination.clone(),
params.fragment_id,
FLIGHT_RECEIVER,
)) {
Some(v) => Ok(v.as_receiver()),
Some(v) => Ok(v.convert_to_receiver()),
_ => Err(ErrorCode::UnknownFragmentExchange(format!(
"Unknown fragment flight receiver, {}, {}",
destination, params.fragment_id
Expand Down Expand Up @@ -696,7 +686,7 @@ impl QueryCoordinator {

let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;

self.fragment_exchanges.clear();
assert!(self.fragment_exchanges.is_empty());
let info_mut = self.info.as_mut().expect("Query info is None");
info_mut.query_executor = Some(executor.clone());

Expand Down
13 changes: 5 additions & 8 deletions src/query/service/src/api/rpc/exchange/exchange_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use common_pipeline_core::pipe::PipeItem;
use common_pipeline_core::processors::processor::ProcessorPtr;

use crate::api::rpc::exchange::exchange_params::ExchangeParams;
use crate::api::rpc::exchange::exchange_sink_writer::create_writer_item;
use crate::api::rpc::exchange::exchange_sink_writer::create_writer_items;
use crate::api::rpc::exchange::exchange_sink_writer::ExchangeWriterSink;
use crate::api::rpc::exchange::exchange_sorting::ExchangeSorting;
use crate::api::rpc::exchange::exchange_sorting::TransformExchangeSorting;
use crate::api::rpc::exchange::exchange_transform_shuffle::exchange_shuffle;
Expand Down Expand Up @@ -73,14 +73,11 @@ impl ExchangeSink {
)]));
}

pipeline.resize(1)?;
assert_eq!(flight_senders.len(), 1);
let flight_sender = flight_senders.remove(0);
pipeline.add_sink(|input| {
Ok(ProcessorPtr::create(ExchangeWriterSink::create(
input,
flight_sender.clone(),
)))
})
let item = create_writer_item(flight_senders.remove(0));
pipeline.add_pipe(Pipe::create(1, 0, vec![item]));
Ok(())
}
ExchangeParams::ShuffleExchange(params) => {
exchange_shuffle(params, pipeline)?;
Expand Down
19 changes: 11 additions & 8 deletions src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl AsyncSink for ExchangeWriterSink {
#[async_trait::unboxed_simple]
#[async_backtrace::framed]
async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
let mut serialize_meta = match data_block.take_meta() {
let serialize_meta = match data_block.take_meta() {
None => Err(ErrorCode::Internal(
"ExchangeWriterSink only recv ExchangeSerializeMeta.",
)),
Expand All @@ -63,14 +63,17 @@ impl AsyncSink for ExchangeWriterSink {
},
}?;

match serialize_meta.packet.take() {
None => Ok(false),
Some(packet) => match self.flight_sender.send(packet).await {
Ok(_) => Ok(false),
Err(error) if error.code() == ErrorCode::ABORTED_QUERY => Ok(true),
Err(error) => Err(error),
},
for packet in serialize_meta.packet {
if let Err(error) = self.flight_sender.send(packet).await {
if error.code() == ErrorCode::ABORTED_QUERY {
return Ok(true);
}

return Err(error);
}
}

Ok(false)
}
}

Expand Down
26 changes: 18 additions & 8 deletions src/query/service/src/api/rpc/exchange/exchange_source_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::api::DataPacket;
pub struct ExchangeSourceReader {
finished: bool,
output: Arc<OutputPort>,
output_data: Option<DataPacket>,
output_data: Vec<DataPacket>,
flight_receiver: FlightReceiver,
}

Expand All @@ -43,7 +43,7 @@ impl ExchangeSourceReader {
output,
flight_receiver,
finished: false,
output_data: None,
output_data: vec![],
}))
}
}
Expand Down Expand Up @@ -77,8 +77,9 @@ impl Processor for ExchangeSourceReader {
return Ok(Event::NeedConsume);
}

if let Some(data_packet) = self.output_data.take() {
let exchange_source_meta = ExchangeDeserializeMeta::create(data_packet);
if !self.output_data.is_empty() {
let packets = std::mem::take(&mut self.output_data);
let exchange_source_meta = ExchangeDeserializeMeta::create(packets);
self.output
.push_data(Ok(DataBlock::empty_with_meta(exchange_source_meta)));
}
Expand All @@ -88,11 +89,20 @@ impl Processor for ExchangeSourceReader {

#[async_backtrace::framed]
async fn async_process(&mut self) -> common_exception::Result<()> {
if self.output_data.is_none() {
if let Some(output_data) = self.flight_receiver.recv().await? {
self.output_data = Some(output_data);
return Ok(());
if self.output_data.is_empty() {
let mut dictionaries = Vec::new();

while let Some(output_data) = self.flight_receiver.recv().await? {
if !matches!(&output_data, DataPacket::Dictionary(_)) {
dictionaries.push(output_data);
self.output_data = dictionaries;
return Ok(());
}

dictionaries.push(output_data);
}

assert!(dictionaries.is_empty());
}

if !self.finished {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::sync::Arc;
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_arrow::arrow::io::flight::default_ipc_fields;
use common_arrow::arrow::io::flight::deserialize_batch;
use common_arrow::arrow::io::flight::deserialize_dictionary;
use common_arrow::arrow::io::ipc::read::Dictionaries;
use common_arrow::arrow::io::ipc::IpcSchema;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -76,7 +78,7 @@ impl TransformExchangeDeserializer {
}))
}

fn recv_data(&self, fragment_data: FragmentData) -> Result<DataBlock> {
fn recv_data(&self, dict: Vec<DataPacket>, fragment_data: FragmentData) -> Result<DataBlock> {
const ROW_HEADER_SIZE: usize = std::mem::size_of::<u32>();

let meta = match bincode::deserialize(&fragment_data.get_meta()[ROW_HEADER_SIZE..]) {
Expand All @@ -93,11 +95,24 @@ impl TransformExchangeDeserializer {
return Ok(DataBlock::new_with_meta(vec![], 0, meta));
}

let mut dictionaries = Dictionaries::new();

for dict_packet in dict {
if let DataPacket::Dictionary(ff) = dict_packet {
deserialize_dictionary(
&ff,
&self.arrow_schema.fields,
&self.ipc_schema,
&mut dictionaries,
)?;
}
}

let batch = deserialize_batch(
&fragment_data.data,
&self.arrow_schema.fields,
&self.ipc_schema,
&Default::default(),
&dictionaries,
)?;

let data_block = DataBlock::from_arrow_chunk(&batch, &self.schema)?;
Expand Down Expand Up @@ -173,12 +188,14 @@ impl Processor for TransformExchangeDeserializer {
fn process(&mut self) -> Result<()> {
if let Some(mut data) = self.input_data.take() {
if let Some(block_meta) = data.take_meta() {
if let Some(exchange_meta) = ExchangeDeserializeMeta::downcast_from(block_meta) {
self.output_data = Some(match exchange_meta.packet.unwrap() {
if let Some(mut exchange_meta) = ExchangeDeserializeMeta::downcast_from(block_meta)
{
self.output_data = Some(match exchange_meta.packet.pop().unwrap() {
DataPacket::ErrorCode(v) => Err(v),
DataPacket::Dictionary(_) => unreachable!(),
DataPacket::FetchProgressAndPrecommit => unreachable!(),
DataPacket::ProgressAndPrecommit { .. } => unreachable!(),
DataPacket::FragmentData(v) => self.recv_data(v),
DataPacket::FragmentData(v) => self.recv_data(exchange_meta.packet, v),
}?);

return Ok(());
Expand All @@ -195,14 +212,12 @@ impl Processor for TransformExchangeDeserializer {
}

pub struct ExchangeDeserializeMeta {
pub packet: Option<DataPacket>,
pub packet: Vec<DataPacket>,
}

impl ExchangeDeserializeMeta {
pub fn create(packet: DataPacket) -> BlockMetaInfoPtr {
Box::new(ExchangeDeserializeMeta {
packet: Some(packet),
})
pub fn create(packet: Vec<DataPacket>) -> BlockMetaInfoPtr {
Box::new(ExchangeDeserializeMeta { packet })
}
}

Expand Down
31 changes: 15 additions & 16 deletions src/query/service/src/api/rpc/exchange/serde/exchange_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ use crate::api::FragmentData;

pub struct ExchangeSerializeMeta {
pub block_number: isize,
pub packet: Option<DataPacket>,
pub packet: Vec<DataPacket>,
}

impl ExchangeSerializeMeta {
pub fn create(block_number: isize, packet: Option<DataPacket>) -> BlockMetaInfoPtr {
pub fn create(block_number: isize, packet: Vec<DataPacket>) -> BlockMetaInfoPtr {
Box::new(ExchangeSerializeMeta {
packet,
block_number,
Expand Down Expand Up @@ -215,7 +215,8 @@ pub fn serialize_block(
) -> Result<DataBlock> {
if data_block.is_empty() && data_block.get_meta().is_none() {
return Ok(DataBlock::empty_with_meta(ExchangeSerializeMeta::create(
block_num, None,
block_num,
vec![],
)));
}

Expand All @@ -224,24 +225,22 @@ pub fn serialize_block(
bincode::serialize_into(&mut meta, &data_block.get_meta())
.map_err(|_| ErrorCode::BadBytes("block meta serialize error when exchange"))?;

let values = match data_block.is_empty() {
true => serialize_batch(&Chunk::new(vec![]), &[], options)?.1,
let (dict, values) = match data_block.is_empty() {
true => serialize_batch(&Chunk::new(vec![]), &[], options)?,
false => {
let chunks = data_block.try_into()?;
let (dicts, values) = serialize_batch(&chunks, ipc_field, options)?;

if !dicts.is_empty() {
return Err(ErrorCode::Unimplemented(
"DatabendQuery does not implement dicts.",
));
}

values
serialize_batch(&chunks, ipc_field, options)?
}
};

let mut packet = Vec::with_capacity(dict.len() + 1);

for dict_flight in dict {
packet.push(DataPacket::Dictionary(dict_flight));
}

packet.push(DataPacket::FragmentData(FragmentData::create(meta, values)));
Ok(DataBlock::empty_with_meta(ExchangeSerializeMeta::create(
block_num,
Some(DataPacket::FragmentData(FragmentData::create(meta, values))),
block_num, packet,
)))
}
5 changes: 3 additions & 2 deletions src/query/service/src/api/rpc/exchange/statistics_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ impl StatisticsReceiver {

let (tx, rx) = match (exchanges.remove(0), exchanges.remove(0)) {
(tx @ FlightExchange::Sender { .. }, rx @ FlightExchange::Receiver { .. }) => {
(tx.as_sender(), rx.as_receiver())
(tx.convert_to_sender(), rx.convert_to_receiver())
}
(rx @ FlightExchange::Receiver { .. }, tx @ FlightExchange::Sender { .. }) => {
(tx.as_sender(), rx.as_receiver())
(tx.convert_to_sender(), rx.convert_to_receiver())
}
_ => unreachable!(),
};
Expand Down Expand Up @@ -159,6 +159,7 @@ impl StatisticsReceiver {
Ok(None) => Ok(true),
Err(transport_error) => Err(transport_error),
Ok(Some(DataPacket::ErrorCode(error))) => Err(error),
Ok(Some(DataPacket::Dictionary(_))) => unreachable!(),
Ok(Some(DataPacket::FragmentData(_))) => unreachable!(),
Ok(Some(DataPacket::FetchProgressAndPrecommit)) => unreachable!(),
Ok(Some(DataPacket::ProgressAndPrecommit {
Expand Down
Loading