Skip to content

Commit

Permalink
api listener: add shutdown method and call during server termination (#…
Browse files Browse the repository at this point in the history
…9959)

Description: this PR adds a shutdown method to the ApiListener and calls it where appropriate during server termination. Previously there would be a crash due to use after free of objects in thread local storage by streams in the ApiListener. Funny enough the flakes reported in #9746 happened due to this.
Risk Level: low
Testing: new unit and integration test. Without appropriate termination the new integration test repros the stacktrace reported in #9746.

Signed-off-by: Jose Nino <jnino@lyft.com>
  • Loading branch information
junr03 committed Feb 8, 2020
1 parent bbdc33e commit 9105aea
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 4 deletions.
7 changes: 7 additions & 0 deletions include/envoy/server/api_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class ApiListener {
*/
virtual absl::string_view name() const PURE;

/**
* Shutdown the ApiListener. This is an interrupt, not a drain. In other words, calling this
* function results in termination of all active streams vs. draining where no new streams are
* allowed, but already existing streams are allowed to finish.
*/
virtual void shutdown() PURE;

/**
* @return the Type of the ApiListener.
*/
Expand Down
14 changes: 14 additions & 0 deletions source/server/api_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ ApiListenerImplBase::ApiListenerImplBase(const envoy::config::listener::v3::List
factory_context_(parent_.server_, config_, *this, *global_scope_, *listener_scope_),
read_callbacks_(SyntheticReadCallbacks(*this)) {}

void ApiListenerImplBase::SyntheticReadCallbacks::SyntheticConnection::raiseConnectionEvent(
Network::ConnectionEvent event) {
for (Network::ConnectionCallbacks* callback : callbacks_) {
callback->onEvent(event);
}
}

HttpApiListener::HttpApiListener(const envoy::config::listener::v3::Listener& config,
ListenerManagerImpl& parent, const std::string& name)
: ApiListenerImplBase(config, parent, name) {
Expand All @@ -44,5 +51,12 @@ Http::ApiListenerOptRef HttpApiListener::http() {
return Http::ApiListenerOptRef(std::ref(*http_connection_manager_));
}

void HttpApiListener::shutdown() {
// The Http::ConnectionManagerImpl is a callback target for the read_callback_.connection_. By
// raising connection closure, Http::ConnectionManagerImpl::onEvent is fired. In that case the
// Http::ConnectionManagerImpl will reset any ActiveStreams it has.
read_callbacks_.connection_.raiseConnectionEvent(Network::ConnectionEvent::RemoteClose);
}

} // namespace Server
} // namespace Envoy
10 changes: 9 additions & 1 deletion source/server/api_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class ApiListenerImplBase : public ApiListener,
: parent_(parent), stream_info_(parent_.parent_.factory_context_.timeSource()),
options_(std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>()) {}

void raiseConnectionEvent(Network::ConnectionEvent event);

// Network::FilterManager
void addWriteFilter(Network::WriteFilterSharedPtr) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand All @@ -84,7 +86,9 @@ class ApiListenerImplBase : public ApiListener,
bool initializeReadFilters() override { return true; }

// Network::Connection
void addConnectionCallbacks(Network::ConnectionCallbacks&) override {}
void addConnectionCallbacks(Network::ConnectionCallbacks& cb) override {
callbacks_.push_back(&cb);
}
void addBytesSentCallback(Network::Connection::BytesSentCb) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
Expand Down Expand Up @@ -129,6 +133,7 @@ class ApiListenerImplBase : public ApiListener,
SyntheticReadCallbacks& parent_;
StreamInfo::StreamInfoImpl stream_info_;
Network::ConnectionSocket::OptionsSharedPtr options_;
std::list<Network::ConnectionCallbacks*> callbacks_;
};

ApiListenerImplBase& parent_;
Expand Down Expand Up @@ -157,6 +162,9 @@ class HttpApiListener : public ApiListenerImplBase {
// ApiListener
ApiListener::Type type() const override { return ApiListener::Type::HttpApiListener; }
Http::ApiListenerOptRef http() override;
void shutdown() override;

Network::ReadFilterCallbacks& readCallbacksForTest() { return read_callbacks_; }

private:
// Need to store the factory due to the shared_ptrs that need to be kept alive: date provider,
Expand Down
7 changes: 7 additions & 0 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,13 @@ void InstanceImpl::terminate() {

// Shutdown all the workers now that the main dispatch loop is done.
if (listener_manager_ != nullptr) {
// Also shutdown the listener manager's ApiListener, if there is one, which runs on the main
// thread. This needs to happen ahead of calling thread_local_.shutdown() below to prevent any
// objects in the ApiListener destructor to reference any objects in thread local storage.
if (listener_manager_->apiListener().has_value()) {
listener_manager_->apiListener()->get().shutdown();
}

listener_manager_->stopWorkers();
}

Expand Down
32 changes: 30 additions & 2 deletions test/integration/api_listener_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class ApiListenerIntegrationTest : public BaseIntegrationTest,
bootstrap.mutable_static_resources()->add_listeners()->MergeFrom(
Server::parseListenerFromV2Yaml(api_listener_config()));
});
BaseIntegrationTest::initialize();
}

void TearDown() override {
Expand Down Expand Up @@ -84,6 +83,7 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, ApiListenerIntegrationTest,
TestUtility::ipTestParamsToString);

TEST_P(ApiListenerIntegrationTest, Basic) {
BaseIntegrationTest::initialize();
absl::Notification done;
test_server_->server().dispatcher().post([this, &done]() -> void {
ASSERT_TRUE(test_server_->server().listenerManager().apiListener().has_value());
Expand Down Expand Up @@ -111,5 +111,33 @@ TEST_P(ApiListenerIntegrationTest, Basic) {
ASSERT_TRUE(done.WaitForNotificationWithTimeout(absl::Seconds(1)));
}

TEST_P(ApiListenerIntegrationTest, DestroyWithActiveStreams) {
autonomous_allow_incomplete_streams_ = true;
BaseIntegrationTest::initialize();
absl::Notification done;

test_server_->server().dispatcher().post([this, &done]() -> void {
ASSERT_TRUE(test_server_->server().listenerManager().apiListener().has_value());
ASSERT_EQ("api_listener", test_server_->server().listenerManager().apiListener()->get().name());
ASSERT_TRUE(test_server_->server().listenerManager().apiListener()->get().http().has_value());
auto& http_api_listener =
test_server_->server().listenerManager().apiListener()->get().http()->get();

ON_CALL(stream_encoder_, getStream()).WillByDefault(ReturnRef(stream_encoder_.stream_));
auto& stream_decoder = http_api_listener.newStream(stream_encoder_);

// Send a headers-only request
stream_decoder.decodeHeaders(
Http::HeaderMapPtr(new Http::TestHeaderMapImpl{
{":method", "GET"}, {":path", "/api"}, {":scheme", "http"}, {":authority", "host"}}),
false);

done.Notify();
});
ASSERT_TRUE(done.WaitForNotificationWithTimeout(absl::Seconds(1)));
// The server should shutdown the ApiListener at the right time during server termination such
// that no crashes occur if termination happens when the ApiListener still has ongoing streams.
}

} // namespace
} // namespace Envoy
} // namespace Envoy
1 change: 1 addition & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_test(
":utility_lib",
"//source/server:api_listener_lib",
"//source/server:listener_lib",
"//test/mocks/network:network_mocks",
"//test/mocks/server:server_mocks",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/listener/v3:pkg_cc_proto",
Expand Down
46 changes: 45 additions & 1 deletion test/server/api_listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "server/api_listener_impl.h"
#include "server/listener_manager_impl.h"

#include "test/mocks/network/mocks.h"
#include "test/mocks/server/mocks.h"
#include "test/server/utility.h"
#include "test/test_common/utility.h"
Expand Down Expand Up @@ -87,5 +88,48 @@ name: test_api_listener
"eds_cluster_config {\n eds_config {\n path: \"eds path\"\n }\n }\n}\n");
}

TEST_F(ApiListenerTest, HttpApiListenerShutdown) {
const std::string yaml = R"EOF(
name: test_api_listener
address:
socket_address:
address: 127.0.0.1
port_value: 1234
api_listener:
api_listener:
"@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager
stat_prefix: hcm
route_config:
name: api_router
virtual_hosts:
- name: api
domains:
- "*"
routes:
- match:
prefix: "/"
route:
cluster: dynamic_forward_proxy_cluster
)EOF";

const envoy::config::listener::v3::Listener config = parseListenerFromV2Yaml(yaml);

auto http_api_listener = HttpApiListener(config, *listener_manager_, config.name());

ASSERT_EQ("test_api_listener", http_api_listener.name());
ASSERT_EQ(ApiListener::Type::HttpApiListener, http_api_listener.type());
ASSERT_TRUE(http_api_listener.http().has_value());

Network::MockConnectionCallbacks network_connection_callbacks;
// TODO(junr03): potentially figure out a way of unit testing this behavior without exposing a
// ForTest function.
http_api_listener.readCallbacksForTest().connection().addConnectionCallbacks(
network_connection_callbacks);

EXPECT_CALL(network_connection_callbacks, onEvent(Network::ConnectionEvent::RemoteClose));
// Shutting down the ApiListener should raise an event on all connection callback targets.
http_api_listener.shutdown();
}

} // namespace Server
} // namespace Envoy
} // namespace Envoy

0 comments on commit 9105aea

Please sign in to comment.