Skip to content

Commit

Permalink
sentry: add exception handling for main tasks (#2032)
Browse files Browse the repository at this point in the history
This PR introduces some additional exception handling in the following modules of sentry:

* run loop of all the Sentry main tasks (i.e. StatusManager, Discovery...): these are long-running tasks which usually should never terminate (unless the user requests a graceful exit), so here we just catch any system exception to log a critical error and relaunch it. As a result, we will see what is the root cause of the error forcing the program to exit, currently it's just a somewhat obscure co_await: operation cancelled error
* rlpx::Peer: during some tests I've found that sometimes a std::nested_exception gets raised, which we can handle properly. Moreover, I've seen Peer::post_message rarely failing to spawn Peer::send_message, thus causing the program to terminate, so I've added a warning to at least detect this situation (it seems to me like an unavoidable race between peer close and message spawing in different sub-tasks) and avoid program termination
* rlpx:: Server: again, sometimes async_accept raises a boost::system::system_error with error code boost::system::errc::invalid_argument, which we need to understand but seems to self-heal in some way
  • Loading branch information
canepat committed May 23, 2024
1 parent 7623880 commit 8415d10
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 20 deletions.
11 changes: 10 additions & 1 deletion silkworm/sentry/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ Discovery::~Discovery() {
}

Task<void> Discovery::run() {
return p_impl_->run();
try {
return p_impl_->run();
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "Discovery::run unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "Discovery::run unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<std::vector<Discovery::PeerCandidate>> Discovery::request_peer_candidates(
Expand Down
27 changes: 26 additions & 1 deletion silkworm/sentry/rlpx/peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ Task<void> Peer::handle() {
}
log::Error("sentry") << "Peer::handle system_error: " << ex.what();
throw;
} catch (const std::nested_exception& ne) {
try {
ne.rethrow_nested();
} catch (const DisconnectedError&) {
log::Debug("sentry") << "Peer::handle nested disconnection error";
auto reason = disconnect_reason_.get().value_or(DisconnectReason::DisconnectRequested);
disconnect_reason_.set({reason});
co_return;
} catch (const boost::system::system_error& ex) {
if (is_fatal_network_error(ex)) {
log::Debug("sentry") << "Peer::handle nested network error: " << ex.what();
auto reason = disconnect_reason_.get().value_or(DisconnectReason::NetworkError);
disconnect_reason_.set({reason});
co_return;
} else if (ex.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "Peer::handle nested cancellation";
co_return;
}
log::Error("sentry") << "Peer::handle nested system_error: " << ex.what();
throw;
}
} catch (const std::exception& ex) {
log::Error("sentry") << "Peer::handle exception: " << ex.what();
throw;
Expand Down Expand Up @@ -313,7 +334,11 @@ void Peer::close() {
}

void Peer::post_message(const std::shared_ptr<Peer>& peer, const Message& message) {
peer->send_message_tasks_.spawn(peer->strand_, Peer::send_message(peer, message));
try {
peer->send_message_tasks_.spawn(peer->strand_, Peer::send_message(peer, message));
} catch (const concurrency::TaskGroup::SpawnAfterCloseError&) {
log::Warning("sentry") << "Peer::post_message cannot spawn send_message after close";
}
}

Task<void> Peer::send_message(std::shared_ptr<Peer> peer, Message message) {
Expand Down
12 changes: 11 additions & 1 deletion silkworm/sentry/rlpx/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,17 @@ Task<void> Server::run(
while (acceptor.is_open()) {
auto client_executor = executor_pool.any_executor();
SocketStream stream{client_executor};
co_await acceptor.async_accept(stream.socket(), use_awaitable);
try {
co_await acceptor.async_accept(stream.socket(), use_awaitable);
} catch (const boost::system::system_error& ex) {
if (ex.code() == boost::system::errc::invalid_argument) {
log::Error("sentry") << "Sentry RLPx server got invalid_argument on accept port=" << port_;
continue;
} else {
log::Critical("sentry") << "Sentry RLPx server unexpected end [" + std::string{ex.what()} + "]";
throw;
}
}

auto remote_endpoint = stream.socket().remote_endpoint();
log::Debug("sentry") << "rlpx::Server client connected from " << remote_endpoint;
Expand Down
82 changes: 68 additions & 14 deletions silkworm/sentry/sentry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,24 @@ Task<void> SentryImpl::run_tasks() {
co_await status_manager_.wait_for_status();
log::Info("sentry") << "Sentry received initial status message";

co_await (
run_status_manager() &&
run_server() &&
run_discovery() &&
run_peer_manager() &&
run_message_sender() &&
run_message_receiver() &&
run_peer_manager_api() &&
run_peer_discovery_feedback());
try {
co_await (
run_status_manager() &&
run_server() &&
run_discovery() &&
run_peer_manager() &&
run_message_sender() &&
run_message_receiver() &&
run_peer_manager_api() &&
run_peer_discovery_feedback());
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "Sentry run_tasks unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "Sentry run_tasks unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

std::unique_ptr<rlpx::Protocol> SentryImpl::make_protocol() {
Expand Down Expand Up @@ -234,23 +243,68 @@ Task<void> SentryImpl::run_discovery() {
}

Task<void> SentryImpl::run_peer_manager() {
return peer_manager_.run(rlpx_server_, discovery_, make_protocol(), client_factory());
try {
return peer_manager_.run(rlpx_server_, discovery_, make_protocol(), client_factory());
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_peer_manager unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_peer_manager unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_message_sender() {
return message_sender_.run(peer_manager_);
try {
return message_sender_.run(peer_manager_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_message_sender unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_message_sender unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_message_receiver() {
return MessageReceiver::run(message_receiver_, peer_manager_);
try {
return MessageReceiver::run(message_receiver_, peer_manager_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_message_receiver unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_message_receiver unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_peer_manager_api() {
return PeerManagerApi::run(peer_manager_api_);
try {
return PeerManagerApi::run(peer_manager_api_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_peer_manager_api unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_peer_manager_api unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_peer_discovery_feedback() {
return PeerDiscoveryFeedback::run(peer_discovery_feedback_, peer_manager_, discovery_);
try {
return PeerDiscoveryFeedback::run(peer_discovery_feedback_, peer_manager_, discovery_);
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "run_peer_discovery_feedback unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "run_peer_discovery_feedback unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Task<void> SentryImpl::run_grpc_server() {
Expand Down
15 changes: 12 additions & 3 deletions silkworm/sentry/status_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@ Task<void> StatusManager::wait_for_status() {
}

Task<void> StatusManager::run() {
// loop until wait_for_status() throws a cancelled exception
while (true) {
co_await wait_for_status();
try {
// loop until wait_for_status() throws a cancelled exception
while (true) {
co_await wait_for_status();
}
} catch (const boost::system::system_error& se) {
if (se.code() == boost::system::errc::operation_canceled) {
log::Debug("sentry") << "StatusManager::run unexpected end [operation_canceled]";
} else {
log::Critical("sentry") << "StatusManager::run unexpected end [" + std::string{se.what()} + "]";
}
throw se;
}
}

Expand Down

0 comments on commit 8415d10

Please sign in to comment.