Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): try fix broken pipe or connect reset #9104

Merged
merged 2 commits into from Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions scripts/ci/ci-run-stateless-tests-cluster.sh
Expand Up @@ -15,5 +15,4 @@ cd "$SCRIPT_PATH/../../tests" || exit

echo "Starting databend-test"
# 13_0004_q4: https://github.com/datafuselabs/databend/issues/8107
# 13_0005_q5: https://github.com/datafuselabs/databend/issues/7986
./databend-test --mode 'cluster' --run-dir 0_stateless --skip '13_0004_q4' --skip '13_0005_q5'
./databend-test --mode 'cluster' --run-dir 0_stateless --skip '13_0004_q4'
Expand Up @@ -264,6 +264,7 @@ impl Processor for ExchangeTransform {
DataPacket::ProgressAndPrecommit { .. } => unreachable!(),
DataPacket::FetchProgressAndPrecommit => unreachable!(),
DataPacket::FragmentData(v) => self.on_recv_data(v),
DataPacket::ClosingClient => Ok(()),
};
}

Expand Down
Expand Up @@ -130,6 +130,7 @@ impl Processor for ExchangeSourceTransform {
DataPacket::FragmentData(v) => self.on_recv_data(v),
DataPacket::FetchProgressAndPrecommit => unreachable!(),
DataPacket::ProgressAndPrecommit { .. } => unreachable!(),
DataPacket::ClosingClient => Ok(()),
};
}

Expand Down
Expand Up @@ -139,6 +139,7 @@ impl StatisticsReceiver {
fn recv_data(ctx: &Arc<QueryContext>, recv_data: Result<Option<DataPacket>>) -> Result<bool> {
match recv_data {
Ok(None) => Ok(true),
Ok(Some(DataPacket::ClosingClient)) => Ok(true),
Err(transport_error) => Err(transport_error),
Ok(Some(DataPacket::ErrorCode(error))) => Err(error),
Ok(Some(DataPacket::FragmentData(_))) => unreachable!(),
Expand Down
10 changes: 10 additions & 0 deletions src/query/service/src/api/rpc/exchange/statistics_sender.rs
Expand Up @@ -90,6 +90,11 @@ impl StatisticsSender {
notified = right;
recv = Box::pin(flight_exchange.recv());

if matches!(command, DataPacket::ClosingClient) {
ctx.get_exchange_manager().shutdown_query(&query_id);
return;
}

if let Err(_cause) = Self::on_command(&ctx, command, &flight_exchange).await
{
ctx.get_exchange_manager().shutdown_query(&query_id);
Expand All @@ -100,6 +105,10 @@ impl StatisticsSender {
}

if let Ok(Some(command)) = flight_exchange.recv().await {
if matches!(command, DataPacket::ClosingClient) {
return;
}

if let Err(error) = Self::on_command(&ctx, command, &flight_exchange).await {
tracing::warn!("Statistics send has error, cause: {:?}.", error);
}
Expand Down Expand Up @@ -140,6 +149,7 @@ impl StatisticsSender {
})
.await
}
DataPacket::ClosingClient => unreachable!(),
}
}

Expand Down
44 changes: 27 additions & 17 deletions src/query/service/src/api/rpc/flight_client.rs
Expand Up @@ -135,10 +135,18 @@ impl FlightExchange {
) -> FlightExchange {
let mut streaming = streaming.into_inner();
let (tx, rx) = async_channel::bounded(1);

common_base::base::tokio::spawn(async move {
while let Some(message) = streaming.next().await {
if let Err(_cause) = tx.send(message).await {
break;
match message {
Ok(message) if DataPacket::is_closing_client(&message) => {
break;
}
other => {
if let Err(_c) = tx.send(other).await {
break;
}
}
}
}
});
Expand All @@ -159,8 +167,15 @@ impl FlightExchange {
let (tx, request_rx) = async_channel::bounded(1);
common_base::base::tokio::spawn(async move {
while let Some(message) = streaming.next().await {
if let Err(_cause) = tx.send(message).await {
break;
match message {
Ok(flight_data) if DataPacket::is_closing_client(&flight_data) => {
break;
}
other => {
if let Err(_cause) = tx.send(other).await {
break;
}
}
}
}
});
Expand Down Expand Up @@ -282,7 +297,9 @@ impl ClientFlightExchange {
if !self.is_closed_response.fetch_or(true, Ordering::SeqCst)
&& self.state.response_count.fetch_sub(1, Ordering::AcqRel) == 1
{
self.response_tx.close();
let _ = self
.response_tx
.send_blocking(FlightData::from(DataPacket::ClosingClient));
}
}
}
Expand All @@ -304,17 +321,8 @@ impl Clone for ClientFlightExchange {

impl Drop for ClientFlightExchange {
fn drop(&mut self) {
if !self.is_closed_request.fetch_or(true, Ordering::SeqCst)
&& self.state.request_count.fetch_sub(1, Ordering::AcqRel) == 1
{
self.request_rx.close();
}

if !self.is_closed_response.fetch_or(true, Ordering::SeqCst)
&& self.state.response_count.fetch_sub(1, Ordering::AcqRel) == 1
{
self.response_tx.close();
}
self.close_input();
self.close_output();
}
}

Expand Down Expand Up @@ -398,7 +406,9 @@ impl ServerFlightExchange {
if !self.is_closed_response.fetch_or(true, Ordering::SeqCst)
&& self.state.response_count.fetch_sub(1, Ordering::AcqRel) == 1
{
self.response_tx.close();
let _ = self
.response_tx
.send_blocking(Ok(FlightData::from(DataPacket::ClosingClient)));
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/query/service/src/api/rpc/packets/packet_data.rs
Expand Up @@ -56,6 +56,18 @@ pub enum DataPacket {
progress: Vec<ProgressInfo>,
precommit: Vec<PrecommitBlock>,
},
// NOTE: Unknown reason. This may be tonic's bug.
// when we use two-way streaming grpc for data exchange,
// if the client side is closed and the server side reads data immediately.
// we will get a broken pipe or connect reset error.
// we use the ClosingClient to notify the server side to close the connection for avoid errors.
ClosingClient,
}

impl DataPacket {
pub fn is_closing_client(data: &FlightData) -> bool {
data.app_metadata.last() == Some(&0x05)
}
}

impl From<DataPacket> for FlightData {
Expand Down Expand Up @@ -103,6 +115,12 @@ impl From<DataPacket> for FlightData {
app_metadata: vec![0x04],
}
}
DataPacket::ClosingClient => FlightData {
data_body: vec![],
data_header: vec![],
flight_descriptor: None,
app_metadata: vec![0x05],
},
}
}
}
Expand Down Expand Up @@ -155,6 +173,7 @@ impl TryFrom<FlightData> for DataPacket {
progress: progress_info,
})
}
0x05 => Ok(DataPacket::ClosingClient),
_ => Err(ErrorCode::BadBytes("Unknown flight data packet type.")),
}
}
Expand Down