Skip to content

Commit

Permalink
kafka: add broker-level metrics-collecting filter
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Kotwasinski <adam.kotwasinski@gmail.com>
  • Loading branch information
adamkotwasinski committed Sep 13, 2019
1 parent 5551315 commit 8056d1e
Show file tree
Hide file tree
Showing 24 changed files with 897 additions and 26 deletions.
1 change: 1 addition & 0 deletions api/docs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ proto_library(
"//envoy/config/filter/network/dubbo_proxy/v2alpha1:dubbo_proxy",
"//envoy/config/filter/network/ext_authz/v2:ext_authz",
"//envoy/config/filter/network/http_connection_manager/v2:http_connection_manager",
"//envoy/config/filter/network/kafka_broker/v2:kafka_broker",
"//envoy/config/filter/network/mongo_proxy/v2:mongo_proxy",
"//envoy/config/filter/network/rate_limit/v2:rate_limit",
"//envoy/config/filter/network/rbac/v2:rbac",
Expand Down
10 changes: 10 additions & 0 deletions api/envoy/config/filter/network/kafka_broker/v2/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package()

api_proto_library_internal(
name = "kafka_broker",
srcs = ["kafka_broker.proto"],
)
17 changes: 17 additions & 0 deletions api/envoy/config/filter/network/kafka_broker/v2/kafka_broker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
syntax = "proto3";

package envoy.config.filter.network.kafka_broker.v2;

option java_outer_classname = "KafkaBrokerProto";
option java_multiple_files = true;
option java_package = "io.envoyproxy.envoy.config.filter.network.kafka_broker.v2";

import "validate/validate.proto";

// [#protodoc-title: Kafka Broker]
// Kafka Broker :ref:`configuration overview <config_network_filters_kafka_broker>`.

message KafkaBroker {
// The prefix to use when emitting :ref:`statistics <config_network_filters_kafka_broker_stats>`.
string stat_prefix = 1 [(validate.rules).string.min_bytes = 1];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
.. _config_network_filters_kafka_broker:

Kafka Broker filter
===================

The Apache Kafka broker filter decodes the client protocol for
`Apache Kafka <https://kafka.apache.org/>`_. It decodes the requests and responses in the payload.
The message versions in `Kafka 2.0 <http://kafka.apache.org/20/protocol.html#protocol_api_keys>`_
are supported.

.. attention::

The kafka_broker filter is experimental and is currently under active development.
Capabilities will be expanded over time and the configuration structures are likely to change.

.. _config_network_filters_kafka_broker_config:

Configuration
-------------

The Kafka Broker filter should be chained with the TCP proxy filter as shown
in the configuration snippet below:

.. code-block:: yaml
filter_chains:
- filters:
- name: envoy.filters.network.kafka_broker
config:
stat_prefix: exampleprefix
- name: envoy.tcp_proxy
config:
stat_prefix: tcp
cluster: ...
.. _config_network_filters_kafka_broker_stats:

Statistics
----------

Every configured Kafka Broker filter has statistics rooted at *kafka.<stat_prefix>.*, with multiple
statistics per message type.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

request.TYPE, Counter, Number of times a request of particular type was received from Kafka client
request.failed_parse, Counter, Number of times a request could not be parsed
response.TYPE, Counter, Number of times a response of particular type was received from Kafka broker
response.TYPE_duration, Histogram, Response generation time in milliseconds
response.failed_parse, Counter, Number of times a response could not be parsed
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ filters.
client_ssl_auth_filter
echo_filter
ext_authz_filter
kafka_broker_filter
mongo_proxy_filter
mysql_proxy_filter
rate_limit_filter
Expand Down
4 changes: 1 addition & 3 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ EXTENSIONS = {
"envoy.filters.network.echo": "//source/extensions/filters/network/echo:config",
"envoy.filters.network.ext_authz": "//source/extensions/filters/network/ext_authz:config",
"envoy.filters.network.http_connection_manager": "//source/extensions/filters/network/http_connection_manager:config",
# NOTE: Kafka filter does not have a proper filter implemented right now. We are referencing to
# codec implementation that is going to be used by the filter.
"envoy.filters.network.kafka": "//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"envoy.filters.network.kafka_broker": "//source/extensions/filters/network/kafka:kafka_broker_config_lib",
"envoy.filters.network.mongo_proxy": "//source/extensions/filters/network/mongo_proxy:config",
"envoy.filters.network.mysql_proxy": "//source/extensions/filters/network/mysql_proxy:config",
"envoy.filters.network.ratelimit": "//source/extensions/filters/network/ratelimit:config",
Expand Down
40 changes: 37 additions & 3 deletions source/extensions/filters/network/kafka/BUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
licenses(["notice"]) # Apache 2

# Kafka network filter.
# Public docs: docs/root/configuration/network_filters/kafka_filter.rst
# Broker filter public docs: docs/root/configuration/network_filters/kafka_broker_filter.rst

load(
"//bazel:envoy_build_system.bzl",
Expand All @@ -11,6 +11,38 @@ load(

envoy_package()

envoy_cc_library(
name = "kafka_broker_config_lib",
srcs = ["broker/config.cc"],
hdrs = ["broker/config.h"],
deps = [
":kafka_broker_filter_lib",
"//source/extensions/filters/network:well_known_names",
"//source/extensions/filters/network/common:factory_base_lib",
"@envoy_api//envoy/config/filter/network/kafka_broker/v2:kafka_broker_cc",
],
)

envoy_cc_library(
name = "kafka_broker_filter_lib",
srcs = ["broker/filter.cc"],
hdrs = [
"broker/filter.h",
"broker/filter_impl.h",
"external/request_metrics.h",
"external/response_metrics.h",
],
deps = [
":kafka_request_codec_lib",
":kafka_response_codec_lib",
"//include/envoy/buffer:buffer_interface",
"//include/envoy/network:connection_interface",
"//include/envoy/network:filter_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "abstract_codec_lib",
srcs = [],
Expand Down Expand Up @@ -72,11 +104,12 @@ genrule(
outs = [
"external/requests.h",
"external/kafka_request_resolver.cc",
"external/request_metrics.h",
],
cmd = """
./$(location :kafka_protocol_code_generator_bin) request \
$(location external/requests.h) $(location external/kafka_request_resolver.cc) \
$(SRCS)
$(location external/request_metrics.h) $(SRCS)
""",
tools = [
":kafka_protocol_code_generator_bin",
Expand Down Expand Up @@ -133,11 +166,12 @@ genrule(
outs = [
"external/responses.h",
"external/kafka_response_resolver.cc",
"external/response_metrics.h",
],
cmd = """
./$(location :kafka_protocol_code_generator_bin) response \
$(location external/responses.h) $(location external/kafka_response_resolver.cc) \
$(SRCS)
$(location external/response_metrics.h) $(SRCS)
""",
tools = [
":kafka_protocol_code_generator_bin",
Expand Down
38 changes: 38 additions & 0 deletions source/extensions/filters/network/kafka/broker/config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "extensions/filters/network/kafka/broker/config.h"

#include "envoy/registry/registry.h"
#include "envoy/server/filter_config.h"
#include "envoy/stats/scope.h"

#include "extensions/filters/network/kafka/broker/filter.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Broker {

Network::FilterFactoryCb KafkaConfigFactory::createFilterFactoryFromProtoTyped(
const KafkaBrokerProtoConfig& proto_config, Server::Configuration::FactoryContext& context) {

ASSERT(!proto_config.stat_prefix().empty());

const std::string& stat_prefix = proto_config.stat_prefix();

return [&context, stat_prefix](Network::FilterManager& filter_manager) -> void {
Network::FilterSharedPtr filter =
std::make_shared<KafkaBrokerFilter>(context.scope(), context.timeSource(), stat_prefix);
filter_manager.addFilter(filter);
};
}

/**
* Static registration for the Kafka filter. @see RegisterFactory.
*/
REGISTER_FACTORY(KafkaConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory);

} // namespace Broker
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
34 changes: 34 additions & 0 deletions source/extensions/filters/network/kafka/broker/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "envoy/config/filter/network/kafka_broker/v2/kafka_broker.pb.h"
#include "envoy/config/filter/network/kafka_broker/v2/kafka_broker.pb.validate.h"

#include "extensions/filters/network/common/factory_base.h"
#include "extensions/filters/network/well_known_names.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Broker {

using KafkaBrokerProtoConfig = envoy::config::filter::network::kafka_broker::v2::KafkaBroker;

/**
* Config registration for the Kafka filter.
*/
class KafkaConfigFactory : public Common::FactoryBase<KafkaBrokerProtoConfig> {
public:
KafkaConfigFactory() : FactoryBase(NetworkFilterNames::get().KafkaBroker) {}

private:
Network::FilterFactoryCb
createFilterFactoryFromProtoTyped(const KafkaBrokerProtoConfig& proto_config,
Server::Configuration::FactoryContext& context) override;
};

} // namespace Broker
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
59 changes: 59 additions & 0 deletions source/extensions/filters/network/kafka/broker/filter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include "extensions/filters/network/kafka/broker/filter.h"

#include <sstream>

#include "envoy/buffer/buffer.h"
#include "envoy/network/connection.h"

#include "common/common/assert.h"
#include "common/common/utility.h"

#include "extensions/filters/network/kafka/broker/filter_impl.h"
#include "extensions/filters/network/kafka/external/request_metrics.h"
#include "extensions/filters/network/kafka/external/response_metrics.h"
#include "extensions/filters/network/kafka/kafka_request.h"
#include "extensions/filters/network/kafka/kafka_response.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Broker {

KafkaBrokerFilter::KafkaBrokerFilter(Stats::Scope& scope, TimeSource& time_source,
const std::string& stat_prefix)
: KafkaBrokerFilter{
std::make_shared<MetricTrackingCallback>(scope, time_source, stat_prefix)} {};

KafkaBrokerFilter::KafkaBrokerFilter(const KafkaCallbackSharedPtr& metrics_callback)
: response_decoder_{new ResponseDecoder({metrics_callback})},
request_decoder_{new RequestDecoder(
{std::make_shared<Forwarder>(*response_decoder_), metrics_callback})} {};

KafkaBrokerFilter::KafkaBrokerFilter(ResponseDecoderSharedPtr response_decoder,
RequestDecoderSharedPtr request_decoder)
: response_decoder_{response_decoder}, request_decoder_{request_decoder} {};

Network::FilterStatus KafkaBrokerFilter::onNewConnection() {
return Network::FilterStatus::Continue;
}

void KafkaBrokerFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks&) {}

Network::FilterStatus KafkaBrokerFilter::onData(Buffer::Instance& data, bool) {
ENVOY_LOG(trace, "data from Kafka client [{} request bytes]", data.length());
request_decoder_->onData(data);
return Network::FilterStatus::Continue;
}

Network::FilterStatus KafkaBrokerFilter::onWrite(Buffer::Instance& data, bool) {
ENVOY_LOG(trace, "data from Kafka broker [{} response bytes]", data.length());
response_decoder_->onData(data);
return Network::FilterStatus::Continue;
}

} // namespace Broker
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
61 changes: 61 additions & 0 deletions source/extensions/filters/network/kafka/broker/filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include <sstream>

#include "envoy/common/time.h"
#include "envoy/network/filter.h"
#include "envoy/stats/scope.h"

#include "common/common/logger.h"

#include "extensions/filters/network/kafka/parser.h"
#include "extensions/filters/network/kafka/request_codec.h"
#include "extensions/filters/network/kafka/response_codec.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Broker {

/**
* Utility class that merges both request & response callbacks.
*/
class KafkaCallback : public RequestCallback, public ResponseCallback {};

using KafkaCallbackSharedPtr = std::shared_ptr<KafkaCallback>;

/**
* Implementation of Kafka broker-level filter.
*/
class KafkaBrokerFilter : public Network::Filter, private Logger::Loggable<Logger::Id::kafka> {

public:
// Main constructor.
KafkaBrokerFilter(Stats::Scope& scope, TimeSource& time_source, const std::string& stat_prefix);

// Visible for testing only.
// Allows for injecting request and response decoders.
KafkaBrokerFilter(ResponseDecoderSharedPtr response_decoder,
RequestDecoderSharedPtr request_decoder);

// Network::ReadFilter
Network::FilterStatus onNewConnection() override;
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;
Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;

// Network::WriteFilter
Network::FilterStatus onWrite(Buffer::Instance& data, bool end_stream) override;

private:
KafkaBrokerFilter(const KafkaCallbackSharedPtr& callback);

ResponseDecoderSharedPtr response_decoder_;
RequestDecoderSharedPtr request_decoder_;
};

} // namespace Broker
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy

0 comments on commit 8056d1e

Please sign in to comment.