diff --git a/src/query/pipeline/sinks/src/async_sink.rs b/src/query/pipeline/sinks/src/async_sink.rs index 63c152c5683dd..902510ac3d9c3 100644 --- a/src/query/pipeline/sinks/src/async_sink.rs +++ b/src/query/pipeline/sinks/src/async_sink.rs @@ -65,11 +65,20 @@ impl AsyncSinker { impl Drop for AsyncSinker { fn drop(&mut self) { - if !self.called_on_finish { - self.called_on_finish = true; + if !self.called_on_start || !self.called_on_finish { if let Some(mut inner) = self.inner.take() { - GlobalIORuntime::instance().spawn(async move { - let _ = inner.on_finish().await; + GlobalIORuntime::instance().spawn({ + let called_on_start = self.called_on_start; + let called_on_finish = self.called_on_finish; + async move { + if !called_on_start { + let _ = inner.on_start().await; + } + + if !called_on_finish { + let _ = inner.on_finish().await; + } + } }); } } diff --git a/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs b/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs index a43b4840b17ce..8919adfa826cd 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs @@ -55,6 +55,11 @@ impl AsyncSink for ExchangeWriterSink { const NAME: &'static str = "ExchangeWriterSink"; async fn on_start(&mut self) -> Result<()> { + info!( + "Start query:{:?}, fragment:{:?} exchange write.", + self.query_id, self.fragment + ); + let res = self.exchange.close_input().await; info!( "Started query:{:?}, fragment:{:?} exchange write. {}", @@ -64,6 +69,11 @@ impl AsyncSink for ExchangeWriterSink { } async fn on_finish(&mut self) -> Result<()> { + info!( + "Finish query:{:?}, fragment:{:?} exchange write.", + self.query_id, self.fragment + ); + let res = self.exchange.close_output().await; info!( "Finished query:{:?}, fragment:{:?} exchange write. {}", diff --git a/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs b/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs index 2f08252eafdbb..7d2d861ae847d 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_source_reader.rs @@ -33,7 +33,6 @@ use crate::api::DataPacket; pub struct ExchangeSourceReader { finished: bool, - initialized: bool, query_id: String, fragment: usize, output: Arc, @@ -48,12 +47,13 @@ impl ExchangeSourceReader { query_id: String, fragment: usize, ) -> ProcessorPtr { + flight_exchange.dec_output_ref(); + ProcessorPtr::create(Box::new(ExchangeSourceReader { output, flight_exchange, finished: false, output_data: None, - initialized: false, fragment, query_id, })) @@ -98,20 +98,6 @@ impl Processor for ExchangeSourceReader { } async fn async_process(&mut self) -> common_exception::Result<()> { - if !self.initialized { - self.initialized = true; - info!( - "Start query:{:?}, fragment:{:?} exchange read.", - self.query_id, self.fragment - ); - let res = self.flight_exchange.close_output().await; - - info!( - "Started query:{:?}, fragment:{:?} exchange read. {}", - self.query_id, self.fragment, res - ); - } - if self.output_data.is_none() { if let Some(output_data) = self.flight_exchange.recv().await? { self.output_data = Some(output_data); diff --git a/src/query/service/src/api/rpc/flight_client.rs b/src/query/service/src/api/rpc/flight_client.rs index 3e79a0ab405ea..0530c0f1ccc98 100644 --- a/src/query/service/src/api/rpc/flight_client.rs +++ b/src/query/service/src/api/rpc/flight_client.rs @@ -231,7 +231,6 @@ impl FlightExchange { Either::Left((_, right)) => { debug_assert!(state.closed_both()); - // break 'loop_worker; tx.close(); drop(network_tx); response_tx.close(); @@ -467,6 +466,12 @@ impl FlightExchange { 'publisher_worker: loop { if channel_state.closed_both() { + while let Ok(response) = response_rx.try_recv() { + if network_tx.send(response).await.is_err() { + break 'publisher_worker; + } + } + break 'publisher_worker; } @@ -632,6 +637,12 @@ impl FlightExchangeRef { false } + pub fn dec_output_ref(&self) { + if !self.is_closed_response.fetch_or(true, Ordering::SeqCst) { + self.state.response_count.fetch_sub(1, Ordering::SeqCst); + } + } + pub async fn close_output(&self) -> bool { if self.is_closed_response.fetch_or(true, Ordering::SeqCst) { return false;