Skip to content

Commit

Permalink
fix(cluster): support finished for writer sink
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Feb 13, 2023
1 parent bab0072 commit 74d0c5f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
15 changes: 13 additions & 2 deletions src/query/pipeline/sinks/src/async_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ pub trait AsyncSink: Send {
}

#[unboxed_simple]
async fn consume(&mut self, data_block: DataBlock) -> Result<()>;
async fn consume(&mut self, data_block: DataBlock) -> Result<bool>;
}

pub struct AsyncSinker<T: AsyncSink + 'static> {
inner: T,
finished: bool,
input: Arc<InputPort>,
input_data: Option<DataBlock>,
called_on_start: bool,
Expand All @@ -53,6 +54,7 @@ impl<T: AsyncSink + 'static> AsyncSinker<T> {
ProcessorPtr::create(Box::new(AsyncSinker {
inner,
input,
finished: false,
input_data: None,
called_on_start: false,
called_on_finish: false,
Expand All @@ -79,6 +81,15 @@ impl<T: AsyncSink + 'static> Processor for AsyncSinker<T> {
return Ok(Event::Async);
}

if self.finished {
if !self.called_on_finish {
return Ok(Event::Async);
}

self.input.finish();
return Ok(Event::Finished);
}

if self.input.is_finished() {
return match !self.called_on_finish {
true => Ok(Event::Async),
Expand All @@ -103,7 +114,7 @@ impl<T: AsyncSink + 'static> Processor for AsyncSinker<T> {
self.called_on_start = true;
self.inner.on_start().await?;
} else if let Some(data_block) = self.input_data.take() {
self.inner.consume(data_block).await?;
self.finished = self.inner.consume(data_block).await?;
} else if !self.called_on_finish {
self.called_on_finish = true;
self.inner.on_finish().await?;
Expand Down
5 changes: 3 additions & 2 deletions src/query/pipeline/sinks/src/union_receive_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ impl AsyncSink for UnionReceiveSink {
}

#[unboxed_simple]
async fn consume(&mut self, data_block: DataBlock) -> Result<()> {
async fn consume(&mut self, data_block: DataBlock) -> Result<bool> {
if let Some(sender) = self.sender.as_ref() {
if sender.send(data_block).await.is_err() {
return Err(ErrorCode::Internal("UnionReceiveSink sender failed"));
};
}
Ok(())

Ok(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl AsyncSink for ExchangeWriterSink {
}

#[async_trait::unboxed_simple]
async fn consume(&mut self, mut data_block: DataBlock) -> Result<()> {
async fn consume(&mut self, mut data_block: DataBlock) -> Result<bool> {
let packet = match data_block.take_meta() {
None => Err(ErrorCode::Internal(
"ExchangeWriterSink only recv ExchangeSerializeMeta.",
Expand All @@ -64,8 +64,11 @@ impl AsyncSink for ExchangeWriterSink {
},
}?;

self.exchange.send(packet).await?;
Ok(())
match self.exchange.send(packet).await {
Ok(_) => Ok(false),
Err(error) if error.code() == ErrorCode::ABORTED_QUERY => Ok(true),
Err(error) => Err(error),
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl FlightExchange {
streaming: Request<Streaming<FlightData>>,
response_tx: Sender<Result<FlightData, Status>>,
) -> FlightExchange {
let mut streaming = streaming.into_inner();
let streaming = streaming.into_inner();
let state = Arc::new(ChannelState::create());
let rx = Self::listen_request(state.clone(), response_tx.clone(), streaming);

Expand All @@ -150,7 +150,7 @@ impl FlightExchange {

pub fn from_client(
response_tx: Sender<FlightData>,
mut streaming: Streaming<FlightData>,
streaming: Streaming<FlightData>,
) -> FlightExchange {
let state = Arc::new(ChannelState::create());
let rx = Self::listen_request(state.clone(), response_tx.clone(), streaming);
Expand All @@ -164,7 +164,7 @@ impl FlightExchange {
})
}

fn listen_request<ResponseT>(
fn listen_request<ResponseT: Send + 'static>(
state: Arc<ChannelState>,
response: Sender<ResponseT>,
mut streaming: Streaming<FlightData>,
Expand All @@ -182,10 +182,10 @@ impl FlightExchange {
return;
}
other => {
if let Some(status) = other {
if let Some(error) = match_for_io_error(&status) {
if let Err(status) = &other {
if let Some(error) = match_for_io_error(status) {
{
let may_recv_error = state.may_recv_error.lock();
let mut may_recv_error = state.may_recv_error.lock();
*may_recv_error = Some(std::io::Error::new(error.kind(), ""));
}

Expand Down

0 comments on commit 74d0c5f

Please sign in to comment.