Skip to content

Commit

Permalink
Made stats sinks a statically registered component (#1506)
Browse files Browse the repository at this point in the history
This is part of a larger effort to make various implementations more flexible by allowing proprietary components to be statically registered with Envoy needing only to be linked to the binary at build time and configured at runtime #967 .

The user-visible configuration changes do involve a few deprecations: statsd_udp_ip_address and statsd_tcp_cluster_name will be deprecated, and their equivalents will be moved to a new stats_sinks array where the configurations for any statically registered sink will exist. As a part of this change, all integration tests were moved to the new configuration format, but a new configuration test was added to ensure that the deprecated format continues to work until it is removed in 1.5.0.
  • Loading branch information
mrice32 authored and htuch committed Sep 5, 2017
1 parent 79f4c6a commit 0702b29
Show file tree
Hide file tree
Showing 84 changed files with 433 additions and 127 deletions.
10 changes: 2 additions & 8 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,9 @@ class Main {
virtual RateLimit::ClientFactory& rateLimitClientFactory() PURE;

/**
* @return Optional<std::string> the optional local/remote TCP statsd cluster to write to.
* This cluster must be defined via the cluster manager configuration.
* @return std::list<Stats::SinkPtr>& the list of stats sinks initialized from the configuration.
*/
virtual Optional<std::string> statsdTcpClusterName() PURE;

/**
* @return Network::Address::InstanceConstSharedPtr the optional UDP statsd address to write to.
*/
virtual Network::Address::InstanceConstSharedPtr statsdUdpIpAddress() PURE;
virtual std::list<Stats::SinkPtr>& statsSinks() PURE;

/**
* @return std::chrono::milliseconds the time interval between flushing to configured stat sinks.
Expand Down
1 change: 1 addition & 0 deletions source/exe/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ envoy_cc_library(
"//source/server/config/network:ratelimit_lib",
"//source/server/config/network:redis_proxy_lib",
"//source/server/config/network:tcp_proxy_lib",
"//source/server/config/stats:statsd_lib",
"//source/server/http:health_check_lib",
],
)
Expand Down
1 change: 0 additions & 1 deletion source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ envoy_cc_library(
"//source/common/router:rds_lib",
"//source/common/runtime:runtime_lib",
"//source/common/singleton:manager_impl_lib",
"//source/common/stats:statsd_lib",
"//source/common/upstream:cluster_manager_lib",
"//source/server/http:admin_lib",
],
Expand Down
24 changes: 24 additions & 0 deletions source/server/config/stats/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
licenses(["notice"]) # Apache 2

load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

envoy_package()

envoy_cc_library(
name = "statsd_lib",
srcs = ["statsd.cc"],
hdrs = ["statsd.h"],
external_deps = [
"envoy_bootstrap",
],
deps = [
"//include/envoy/registry",
"//source/common/network:address_lib",
"//source/common/stats:statsd_lib",
"//source/server:configuration_lib",
],
)
53 changes: 53 additions & 0 deletions source/server/config/stats/statsd.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "server/config/stats/statsd.h"

#include <string>

#include "envoy/registry/registry.h"

#include "common/stats/statsd.h"

#include "api/bootstrap.pb.h"

namespace Envoy {
namespace Server {
namespace Configuration {

Stats::SinkPtr StatsdSinkFactory::createStatsSink(const Protobuf::Message& config,
Server::Instance& server) {

const auto& statsd_sink = dynamic_cast<const envoy::api::v2::StatsdSink&>(config);
switch (statsd_sink.statsd_specifier_case()) {
case envoy::api::v2::StatsdSink::kAddress: {
Network::Address::InstanceConstSharedPtr address =
Network::Utility::fromProtoAddress(statsd_sink.address());
ENVOY_LOG(info, "statsd UDP ip address: {}", address->asString());
return Stats::SinkPtr(
new Stats::Statsd::UdpStatsdSink(server.threadLocal(), std::move(address)));
break;
}
case envoy::api::v2::StatsdSink::kTcpClusterName:
ENVOY_LOG(info, "statsd TCP cluster: {}", statsd_sink.tcp_cluster_name());
return Stats::SinkPtr(new Stats::Statsd::TcpStatsdSink(
server.localInfo(), statsd_sink.tcp_cluster_name(), server.threadLocal(),
server.clusterManager(), server.stats()));
break;
default:
throw EnvoyException(
fmt::format("No tcp_cluster_name or address provided for {} Stats::Sink config", name()));
}
}

ProtobufTypes::MessagePtr StatsdSinkFactory::createEmptyConfigProto() {
return std::unique_ptr<envoy::api::v2::StatsdSink>(new envoy::api::v2::StatsdSink());
}

std::string StatsdSinkFactory::name() { return "envoy.statsd"; }

/**
* Static registration for the statsd sink factory. @see RegisterFactory.
*/
static Registry::RegisterFactory<StatsdSinkFactory, StatsSinkFactory> register_;

} // namespace Configuration
} // namespace Server
} // namespace Envoy
28 changes: 28 additions & 0 deletions source/server/config/stats/statsd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <string>

#include "envoy/server/instance.h"

#include "server/configuration_impl.h"

namespace Envoy {
namespace Server {
namespace Configuration {

/**
* Config registration for the tcp statsd sink. @see StatsSinkFactory.
*/
class StatsdSinkFactory : Logger::Loggable<Logger::Id::config>, public StatsSinkFactory {
public:
// StatsSinkFactory
Stats::SinkPtr createStatsSink(const Protobuf::Message& config, Instance& server) override;

ProtobufTypes::MessagePtr createEmptyConfigProto() override;

std::string name() override;
};

} // namespace Configuration
} // namespace Server
} // namespace Envoy
44 changes: 25 additions & 19 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ void MainImpl::initialize(const envoy::api::v2::Bootstrap& bootstrap, Instance&
server.localInfo(), server.stats(), server.listenerManager()));
}

for (const auto& stats_sink : bootstrap.stats_sinks()) {
// TODO(mrice32): Add support for pluggable stats sinks.
ASSERT(stats_sink.name() == "envoy.statsd");
envoy::api::v2::StatsdSink statsd_sink;
MessageUtil::jsonConvert(stats_sink.config(), statsd_sink);

switch (statsd_sink.statsd_specifier_case()) {
case envoy::api::v2::StatsdSink::kAddress: {
statsd_udp_ip_address_ = Network::Utility::fromProtoAddress(statsd_sink.address());
break;
}
case envoy::api::v2::StatsdSink::kTcpClusterName:
statsd_tcp_cluster_name_.value(statsd_sink.tcp_cluster_name());
break;
default:
NOT_REACHED;
}
}

stats_flush_interval_ =
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(bootstrap, stats_flush_interval, 5000));

Expand All @@ -95,6 +76,8 @@ void MainImpl::initialize(const envoy::api::v2::Bootstrap& bootstrap, Instance&
} else {
ratelimit_client_factory_.reset(new RateLimit::NullFactoryImpl());
}

initializeStatsSinks(bootstrap, server);
}

void MainImpl::initializeTracers(const envoy::api::v2::Tracing& configuration, Instance& server) {
Expand Down Expand Up @@ -127,6 +110,29 @@ void MainImpl::initializeTracers(const envoy::api::v2::Tracing& configuration, I
}
}

void MainImpl::initializeStatsSinks(const envoy::api::v2::Bootstrap& bootstrap, Instance& server) {
ENVOY_LOG(info, "loading stats sink configuration");

for (const envoy::api::v2::StatsSink& sink_object : bootstrap.stats_sinks()) {
if (sink_object.name().empty()) {
throw EnvoyException(
"sink object does not have 'name' attribute to look up the implementation");
}

ProtobufTypes::String name = sink_object.name();
StatsSinkFactory* factory = Registry::FactoryRegistry<StatsSinkFactory>::getFactory(name);
if (factory != nullptr) {
ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto();
if (sink_object.has_config()) {
MessageUtil::jsonConvert(sink_object.config(), *message);
}
stats_sinks_.emplace_back(factory->createStatsSink(*message, server));
} else {
throw EnvoyException(fmt::format("No Stats::Sink found for name: {}", name));
}
}
}

InitialImpl::InitialImpl(const envoy::api::v2::Bootstrap& bootstrap) {
const auto& admin = bootstrap.admin();
admin_.access_log_path_ = admin.access_log_path();
Expand Down
41 changes: 35 additions & 6 deletions source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,37 @@ class HttpTracerFactory {
virtual std::string name() PURE;
};

/**
* Implemented for each Stats::Sink and registered via Registry::registerFactory() or
* the convenience class RegisterFactory.
*/
class StatsSinkFactory {
public:
virtual ~StatsSinkFactory() {}

/**
* Create a particular Stats::Sink implementation. If the implementation is unable to produce a
* Stats::Sink with the provided parameters, it should throw an EnvoyException. The returned
* pointer should always be valid.
* @param config supplies the custom proto configuration for the Stats::Sink
* @param server supplies the server instance
*/
virtual Stats::SinkPtr createStatsSink(const Protobuf::Message& config, Instance& server) PURE;

/**
* @return ProtobufTypes::MessagePtr create empty config proto message for v2. The filter
* config, which arrives in an opaque google.protobuf.Struct message, will be converted to
* JSON and then parsed into this empty proto.
*/
virtual ProtobufTypes::MessagePtr createEmptyConfigProto() PURE;

/**
* Returns the identifying name for a particular implementation of Stats::Sink produced by the
* factory.
*/
virtual std::string name() PURE;
};

/**
* Utilities for creating a filter chain for a network connection.
*/
Expand Down Expand Up @@ -88,10 +119,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
Upstream::ClusterManager& clusterManager() override { return *cluster_manager_; }
Tracing::HttpTracer& httpTracer() override { return *http_tracer_; }
RateLimit::ClientFactory& rateLimitClientFactory() override { return *ratelimit_client_factory_; }
Optional<std::string> statsdTcpClusterName() override { return statsd_tcp_cluster_name_; }
Network::Address::InstanceConstSharedPtr statsdUdpIpAddress() override {
return statsd_udp_ip_address_;
}
std::list<Stats::SinkPtr>& statsSinks() override { return stats_sinks_; }
std::chrono::milliseconds statsFlushInterval() override { return stats_flush_interval_; }
std::chrono::milliseconds wdMissTimeout() const override { return watchdog_miss_timeout_; }
std::chrono::milliseconds wdMegaMissTimeout() const override {
Expand All @@ -108,11 +136,12 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
*/
void initializeTracers(const envoy::api::v2::Tracing& configuration, Instance& server);

void initializeStatsSinks(const envoy::api::v2::Bootstrap& bootstrap, Instance& server);

std::unique_ptr<Upstream::ClusterManager> cluster_manager_;
std::unique_ptr<LdsApi> lds_api_;
Tracing::HttpTracerPtr http_tracer_;
Optional<std::string> statsd_tcp_cluster_name_;
Network::Address::InstanceConstSharedPtr statsd_udp_ip_address_;
std::list<Stats::SinkPtr> stats_sinks_;
RateLimit::ClientFactoryPtr ratelimit_client_factory_;
std::chrono::milliseconds stats_flush_interval_;
std::chrono::milliseconds watchdog_miss_timeout_;
Expand Down
24 changes: 4 additions & 20 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "common/router/rds_impl.h"
#include "common/runtime/runtime_impl.h"
#include "common/singleton/manager_impl.h"
#include "common/stats/statsd.h"
#include "common/upstream/cluster_manager_impl.h"

#include "server/configuration_impl.h"
Expand Down Expand Up @@ -132,7 +131,7 @@ void InstanceImpl::flushStats() {
server_stats_.days_until_first_cert_expiring_.set(
sslContextManager().daysUntilFirstCertExpires());

InstanceUtil::flushCountersAndGaugesToSinks(stat_sinks_, stats_store_);
InstanceUtil::flushCountersAndGaugesToSinks(config_->statsSinks(), stats_store_);
stat_flush_timer_->enableTimer(config_->statsFlushInterval());
}

Expand Down Expand Up @@ -228,7 +227,9 @@ void InstanceImpl::initialize(Options& options,
ENVOY_LOG(warn, "caught and eating SIGHUP. See documentation for how to hot restart.");
});

initializeStatSinks();
for (Stats::SinkPtr& sink : main_config->statsSinks()) {
stats_store_.addSink(*sink);
}

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
Expand Down Expand Up @@ -268,23 +269,6 @@ Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
}
}

void InstanceImpl::initializeStatSinks() {
if (config_->statsdUdpIpAddress()) {
ENVOY_LOG(info, "statsd UDP ip address: {}", config_->statsdUdpIpAddress()->asString());
stat_sinks_.emplace_back(
new Stats::Statsd::UdpStatsdSink(thread_local_, config_->statsdUdpIpAddress()));
stats_store_.addSink(*stat_sinks_.back());
}

if (config_->statsdTcpClusterName().valid()) {
ENVOY_LOG(info, "statsd TCP cluster: {}", config_->statsdTcpClusterName().value());
stat_sinks_.emplace_back(
new Stats::Statsd::TcpStatsdSink(*local_info_, config_->statsdTcpClusterName().value(),
thread_local_, config_->clusterManager(), stats_store_));
stats_store_.addSink(*stat_sinks_.back());
}
}

void InstanceImpl::loadServerFlags(const Optional<std::string>& flags_path) {
if (!flags_path.valid()) {
return;
Expand Down
2 changes: 0 additions & 2 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
void flushStats();
void initialize(Options& options, Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory);
void initializeStatSinks();
void loadServerFlags(const Optional<std::string>& flags_path);
uint64_t numConnections();
void startWorkers();
Expand All @@ -148,7 +147,6 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
const time_t start_time_;
time_t original_start_time_;
Stats::StoreRoot& stats_store_;
std::list<Stats::SinkPtr> stat_sinks_;
ServerStats server_stats_;
ThreadLocal::Instance& thread_local_;
Api::ApiPtr api_;
Expand Down
2 changes: 1 addition & 1 deletion test/common/access_log/access_log_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
using testing::Return;
using testing::_;

namespace Envoy {
namespace AccessLog {

TEST(AccessLogManagerImpl, reopenAllFiles) {
Expand Down
3 changes: 2 additions & 1 deletion test/common/common/utility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
using testing::ContainerEq;

namespace Envoy {

TEST(StringUtil, atoul) {
uint64_t out;
EXPECT_FALSE(StringUtil::atoul("123b", out));
Expand Down
2 changes: 1 addition & 1 deletion test/common/dynamo/dynamo_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;
using testing::_;

namespace Envoy {
namespace Dynamo {

class DynamoFilterTest : public testing::Test {
Expand Down
2 changes: 1 addition & 1 deletion test/common/dynamo/dynamo_utility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
using testing::_;

namespace Envoy {
namespace Dynamo {

TEST(DynamoUtility, PartitionIdStatString) {
Expand Down
2 changes: 1 addition & 1 deletion test/common/event/dispatcher_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
using testing::InSequence;

namespace Envoy {
namespace Event {

class TestDeferredDeletable : public DeferredDeletable {
Expand Down
Loading

0 comments on commit 0702b29

Please sign in to comment.