Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread local cache of overload action states #4090

Merged
merged 3 commits into from
Aug 12, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions include/envoy/server/overload_manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include <unordered_map>

#include "envoy/common/pure.h"
#include "envoy/thread_local/thread_local.h"

namespace Envoy {
namespace Server {
Expand All @@ -21,6 +24,22 @@ enum class OverloadActionState {
*/
typedef std::function<void(OverloadActionState)> OverloadActionCb;

/**
* Thread-local cache of the current state of each configured overload action.
*/
class OverloadActionStateCache : public ThreadLocal::ThreadLocalObject {
public:
bool isActive(const std::string& action) const {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is eventually going to be called on the data path on HCM, I guess I'm wonder out loud whether we want to do a string hash/compare on new stream? It's probably lost in the noise of the overall overhead of new stream creation. An alternative would be to establish a map from the action strings to opaque integer tags that HCM can obtain at instantiation. Seems like micro-optimization, just curious about your thoughts here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea - changed to allow doing the map lookup at at initialization time.

auto it = actions_.find(action);
return it != actions_.end() && it->second == OverloadActionState::Active;
}

void setState(const std::string& action, OverloadActionState state) { actions_[action] = state; }

private:
std::unordered_map<std::string, OverloadActionState> actions_;
};

/**
* The OverloadManager protects the Envoy instance from being overwhelmed by client
* requests. It monitors a set of resources and notifies registered listeners if
Expand All @@ -46,6 +65,12 @@ class OverloadManager {
*/
virtual void registerForAction(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback) PURE;

/**
* Get the thread-local overload action state cache. Lookups in this cache can be used as
* an alternative to registering a callback for overload action state changes.
*/
virtual const OverloadActionStateCache& getOverloadActionStateCache() PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I might name this getOverLoadThreadLocalState(); "cache" is an overloaded concept and pointing out this is the thread-local variant makes the code a bit easier to read </bikeshed>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to ThreadLocalOverloadState

};

} // namespace Server
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ envoy_cc_library(
deps = [
"//include/envoy/server:overload_manager_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/thread_local:thread_local_interface",
"//source/common/common:logger_lib",
"//source/common/config:utility_lib",
"//source/server:resource_monitor_config_lib",
Expand Down
14 changes: 13 additions & 1 deletion source/server/overload_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ bool OverloadAction::isActive() const { return !fired_triggers_.empty(); }

OverloadManagerImpl::OverloadManagerImpl(
Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
ThreadLocal::SlotAllocator& slot_allocator,
const envoy::config::overload::v2alpha::OverloadManager& config)
: started_(false), dispatcher_(dispatcher),
: started_(false), dispatcher_(dispatcher), tls_(slot_allocator.allocateSlot()),
refresh_interval_(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))) {
Configuration::ResourceMonitorFactoryContextImpl context(dispatcher);
Expand Down Expand Up @@ -125,6 +126,10 @@ OverloadManagerImpl::OverloadManagerImpl(
resource_to_actions_.insert(std::make_pair(resource, name));
}
}

tls_->set([](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<OverloadActionStateCache>();
});
}

void OverloadManagerImpl::start() {
Expand Down Expand Up @@ -157,6 +162,10 @@ void OverloadManagerImpl::registerForAction(const std::string& action,
std::forward_as_tuple(dispatcher, callback));
}

const OverloadActionStateCache& OverloadManagerImpl::getOverloadActionStateCache() {
return tls_->getTyped<OverloadActionStateCache>();
}

void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure) {
auto action_range = resource_to_actions_.equal_range(resource);
std::for_each(action_range.first, action_range.second,
Expand All @@ -170,6 +179,9 @@ void OverloadManagerImpl::updateResourcePressure(const std::string& resource, do
is_active ? OverloadActionState::Active : OverloadActionState::Inactive;
ENVOY_LOG(info, "Overload action {} has become {}", action,
is_active ? "active" : "inactive");
tls_->runOnAllThreads([this, action, state] {
tls_->getTyped<OverloadActionStateCache>().setState(action, state);
});
auto callback_range = action_to_callbacks_.equal_range(action);
std::for_each(callback_range.first, callback_range.second,
[&](ActionToCallbackMap::value_type& cb_entry) {
Expand Down
4 changes: 4 additions & 0 deletions source/server/overload_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "envoy/server/resource_monitor.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats.h"
#include "envoy/thread_local/thread_local.h"

#include "common/common/logger.h"

Expand Down Expand Up @@ -50,12 +51,14 @@ class OverloadAction {
class OverloadManagerImpl : Logger::Loggable<Logger::Id::main>, public OverloadManager {
public:
OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
ThreadLocal::SlotAllocator& slot_allocator,
const envoy::config::overload::v2alpha::OverloadManager& config);

// Server::OverloadManager
void start() override;
void registerForAction(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback) override;
const OverloadActionStateCache& getOverloadActionStateCache() override;

private:
class Resource : public ResourceMonitor::Callbacks {
Expand Down Expand Up @@ -90,6 +93,7 @@ class OverloadManagerImpl : Logger::Loggable<Logger::Id::main>, public OverloadM

bool started_;
Event::Dispatcher& dispatcher_;
ThreadLocal::SlotPtr tls_;
const std::chrono::milliseconds refresh_interval_;
Event::TimerPtr timer_;
std::unordered_map<std::string, Resource> resources_;
Expand Down
8 changes: 4 additions & 4 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ void InstanceImpl::initialize(Options& options,

loadServerFlags(initial_config.flagsPath());

// Initialize the overload manager early so other modules can register for actions.
overload_manager_.reset(
new OverloadManagerImpl(dispatcher(), stats(), bootstrap_.overload_manager()));

// Workers get created first so they register for thread local updates.
listener_manager_.reset(new ListenerManagerImpl(
*this, listener_component_factory_, worker_factory_, ProdSystemTimeSource::instance_));
Expand All @@ -259,6 +255,10 @@ void InstanceImpl::initialize(Options& options,
// whether it runs on the main thread or on workers can still use TLS.
thread_local_.registerThread(*dispatcher_, true);

// Initialize the overload manager early so other modules can register for actions.
overload_manager_.reset(
new OverloadManagerImpl(dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager()));

// We can now initialize stats for threading.
stats_store_.initializeThreading(*dispatcher_, thread_local_);

Expand Down
1 change: 1 addition & 0 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class MockOverloadManager : public OverloadManager {
MOCK_METHOD0(start, void());
MOCK_METHOD3(registerForAction, void(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback));
MOCK_METHOD0(getOverloadActionStateCache, const OverloadActionStateCache&());
};

class MockInstance : public Instance {
Expand Down
1 change: 1 addition & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ envoy_cc_test(
"//source/extensions/resource_monitors/common:factory_base_lib",
"//source/server:overload_manager_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:registry_lib",
"//test/test_common:utility_lib",
],
Expand Down
54 changes: 33 additions & 21 deletions test/server/overload_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "extensions/resource_monitors/common/factory_base.h"

#include "test/mocks/event/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/registry.h"
#include "test/test_common/utility.h"

Expand Down Expand Up @@ -118,46 +119,55 @@ class OverloadManagerImplTest : public testing::Test {
)EOF";
}

std::unique_ptr<OverloadManagerImpl> createOverloadManager(const std::string& config) {
return std::make_unique<OverloadManagerImpl>(dispatcher_, stats_, thread_local_,
parseConfig(config));
}

FakeResourceMonitorFactory factory1_;
FakeResourceMonitorFactory factory2_;
Registry::InjectFactory<Configuration::ResourceMonitorFactory> register_factory1_;
Registry::InjectFactory<Configuration::ResourceMonitorFactory> register_factory2_;
NiceMock<Event::MockDispatcher> dispatcher_;
Stats::IsolatedStoreImpl stats_;
NiceMock<ThreadLocal::MockInstance> thread_local_;
Event::TimerCb timer_cb_;
};

TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
setDispatcherExpectation();

OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
auto manager(createOverloadManager(getConfig()));
bool is_active = false;
int cb_count = 0;
manager.registerForAction("envoy.overload_actions.dummy_action", dispatcher_,
[&](OverloadActionState state) {
is_active = state == OverloadActionState::Active;
cb_count++;
});
manager.registerForAction("envoy.overload_actions.unknown_action", dispatcher_,
[&](OverloadActionState) { EXPECT_TRUE(false); });
manager.start();
manager->registerForAction("envoy.overload_actions.dummy_action", dispatcher_,
[&](OverloadActionState state) {
is_active = state == OverloadActionState::Active;
cb_count++;
});
manager->registerForAction("envoy.overload_actions.unknown_action", dispatcher_,
[&](OverloadActionState) { EXPECT_TRUE(false); });
manager->start();

Stats::Gauge& active_gauge = stats_.gauge("overload.envoy.overload_actions.dummy_action.active");
Stats::Gauge& pressure_gauge1 =
stats_.gauge("overload.envoy.resource_monitors.fake_resource1.pressure");
Stats::Gauge& pressure_gauge2 =
stats_.gauge("overload.envoy.resource_monitors.fake_resource2.pressure");
const OverloadActionStateCache& action_state = manager->getOverloadActionStateCache();

factory1_.monitor_->setPressure(0.5);
timer_cb_();
EXPECT_FALSE(is_active);
EXPECT_FALSE(action_state.isActive("envoy.overload_actions.dummy_action"));
EXPECT_EQ(0, cb_count);
EXPECT_EQ(0, active_gauge.value());
EXPECT_EQ(50, pressure_gauge1.value());

factory1_.monitor_->setPressure(0.95);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_TRUE(action_state.isActive("envoy.overload_actions.dummy_action"));
EXPECT_EQ(1, cb_count);
EXPECT_EQ(1, active_gauge.value());
EXPECT_EQ(95, pressure_gauge1.value());
Expand All @@ -166,6 +176,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
factory1_.monitor_->setPressure(0.94);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_TRUE(action_state.isActive("envoy.overload_actions.dummy_action"));
EXPECT_EQ(1, cb_count);
EXPECT_EQ(94, pressure_gauge1.value());

Expand All @@ -174,22 +185,24 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
factory2_.monitor_->setPressure(0.9);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_TRUE(action_state.isActive("envoy.overload_actions.dummy_action"));
EXPECT_EQ(1, cb_count);
EXPECT_EQ(50, pressure_gauge1.value());
EXPECT_EQ(90, pressure_gauge2.value());

factory2_.monitor_->setPressure(0.4);
timer_cb_();
EXPECT_FALSE(is_active);
EXPECT_FALSE(action_state.isActive("envoy.overload_actions.dummy_action"));
EXPECT_EQ(2, cb_count);
EXPECT_EQ(0, active_gauge.value());
EXPECT_EQ(40, pressure_gauge2.value());
}

TEST_F(OverloadManagerImplTest, FailedUpdates) {
setDispatcherExpectation();
OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
manager.start();
auto manager(createOverloadManager(getConfig()));
manager->start();
Stats::Counter& failed_updates =
stats_.counter("overload.envoy.resource_monitors.fake_resource1.failed_updates");

Expand All @@ -207,8 +220,8 @@ TEST_F(OverloadManagerImplTest, SkippedUpdates) {
Event::PostCb post_cb;
ON_CALL(dispatcher_, post(_)).WillByDefault(Invoke([&](Event::PostCb cb) { post_cb = cb; }));

OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
manager.start();
auto manager(createOverloadManager(getConfig()));
manager->start();
Stats::Counter& skipped_updates =
stats_.counter("overload.envoy.resource_monitors.fake_resource1.skipped_updates");

Expand All @@ -233,8 +246,8 @@ TEST_F(OverloadManagerImplTest, DuplicateResourceMonitor) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate resource monitor .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Duplicate resource monitor .*");
}

TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) {
Expand All @@ -247,8 +260,8 @@ TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate overload action .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Duplicate overload action .*");
}

TEST_F(OverloadManagerImplTest, UnknownTrigger) {
Expand All @@ -264,8 +277,8 @@ TEST_F(OverloadManagerImplTest, UnknownTrigger) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Unknown trigger resource .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Unknown trigger resource .*");
}

TEST_F(OverloadManagerImplTest, DuplicateTrigger) {
Expand All @@ -290,8 +303,7 @@ TEST_F(OverloadManagerImplTest, DuplicateTrigger) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate trigger .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, "Duplicate trigger .*");
}
} // namespace
} // namespace Server
Expand Down