Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions src/query/pipeline/sinks/src/async_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,20 @@ impl<T: AsyncSink + 'static> AsyncSinker<T> {

impl<T: AsyncSink + 'static> Drop for AsyncSinker<T> {
fn drop(&mut self) {
if !self.called_on_finish {
self.called_on_finish = true;
if !self.called_on_start || !self.called_on_finish {
if let Some(mut inner) = self.inner.take() {
GlobalIORuntime::instance().spawn(async move {
let _ = inner.on_finish().await;
GlobalIORuntime::instance().spawn({
let called_on_start = self.called_on_start;
let called_on_finish = self.called_on_finish;
async move {
if !called_on_start {
let _ = inner.on_start().await;
}

if !called_on_finish {
let _ = inner.on_finish().await;
}
}
});
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ impl AsyncSink for ExchangeWriterSink {
const NAME: &'static str = "ExchangeWriterSink";

async fn on_start(&mut self) -> Result<()> {
info!(
"Start query:{:?}, fragment:{:?} exchange write.",
self.query_id, self.fragment
);

let res = self.exchange.close_input().await;
info!(
"Started query:{:?}, fragment:{:?} exchange write. {}",
Expand All @@ -64,6 +69,11 @@ impl AsyncSink for ExchangeWriterSink {
}

async fn on_finish(&mut self) -> Result<()> {
info!(
"Finish query:{:?}, fragment:{:?} exchange write.",
self.query_id, self.fragment
);

let res = self.exchange.close_output().await;
info!(
"Finished query:{:?}, fragment:{:?} exchange write. {}",
Expand Down
18 changes: 2 additions & 16 deletions src/query/service/src/api/rpc/exchange/exchange_source_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::api::DataPacket;

pub struct ExchangeSourceReader {
finished: bool,
initialized: bool,
query_id: String,
fragment: usize,
output: Arc<OutputPort>,
Expand All @@ -48,12 +47,13 @@ impl ExchangeSourceReader {
query_id: String,
fragment: usize,
) -> ProcessorPtr {
flight_exchange.dec_output_ref();

ProcessorPtr::create(Box::new(ExchangeSourceReader {
output,
flight_exchange,
finished: false,
output_data: None,
initialized: false,
fragment,
query_id,
}))
Expand Down Expand Up @@ -98,20 +98,6 @@ impl Processor for ExchangeSourceReader {
}

async fn async_process(&mut self) -> common_exception::Result<()> {
if !self.initialized {
self.initialized = true;
info!(
"Start query:{:?}, fragment:{:?} exchange read.",
self.query_id, self.fragment
);
let res = self.flight_exchange.close_output().await;

info!(
"Started query:{:?}, fragment:{:?} exchange read. {}",
self.query_id, self.fragment, res
);
}

if self.output_data.is_none() {
if let Some(output_data) = self.flight_exchange.recv().await? {
self.output_data = Some(output_data);
Expand Down
13 changes: 12 additions & 1 deletion src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ impl FlightExchange {
Either::Left((_, right)) => {
debug_assert!(state.closed_both());

// break 'loop_worker;
tx.close();
drop(network_tx);
response_tx.close();
Expand Down Expand Up @@ -467,6 +466,12 @@ impl FlightExchange {

'publisher_worker: loop {
if channel_state.closed_both() {
while let Ok(response) = response_rx.try_recv() {
if network_tx.send(response).await.is_err() {
break 'publisher_worker;
}
}

break 'publisher_worker;
}

Expand Down Expand Up @@ -632,6 +637,12 @@ impl FlightExchangeRef {
false
}

pub fn dec_output_ref(&self) {
if !self.is_closed_response.fetch_or(true, Ordering::SeqCst) {
self.state.response_count.fetch_sub(1, Ordering::SeqCst);
}
}

pub async fn close_output(&self) -> bool {
if self.is_closed_response.fetch_or(true, Ordering::SeqCst) {
return false;
Expand Down