Skip to content

Commit

Permalink
Kafka: Add broker-level metrics-collecting filter (#8188)
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Kotwasinski <adam.kotwasinski@gmail.com>
  • Loading branch information
adamkotwasinski authored and Matt Klein committed Jan 6, 2020
1 parent 215983a commit a60f685
Show file tree
Hide file tree
Showing 62 changed files with 2,754 additions and 256 deletions.
2 changes: 2 additions & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ proto_library(
"//envoy/config/filter/network/dubbo_proxy/v2alpha1:pkg",
"//envoy/config/filter/network/ext_authz/v2:pkg",
"//envoy/config/filter/network/http_connection_manager/v2:pkg",
"//envoy/config/filter/network/kafka_broker/v2alpha1:pkg",
"//envoy/config/filter/network/local_rate_limit/v2alpha:pkg",
"//envoy/config/filter/network/local_rate_limit/v3alpha:pkg",
"//envoy/config/filter/network/mongo_proxy/v2:pkg",
Expand Down Expand Up @@ -137,6 +138,7 @@ proto_library(
"//envoy/extensions/filters/network/dubbo_proxy/v3alpha:pkg",
"//envoy/extensions/filters/network/ext_authz/v3alpha:pkg",
"//envoy/extensions/filters/network/http_connection_manager/v3alpha:pkg",
"//envoy/extensions/filters/network/kafka_broker/v3alpha:pkg",
"//envoy/extensions/filters/network/mongo_proxy/v3alpha:pkg",
"//envoy/extensions/filters/network/mysql_proxy/v3alpha:pkg",
"//envoy/extensions/filters/network/ratelimit/v3alpha:pkg",
Expand Down
25 changes: 0 additions & 25 deletions api/bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ def api_dependencies():
name = "rules_proto",
locations = REPOSITORY_LOCATIONS,
)
envoy_http_archive(
name = "kafka_source",
locations = REPOSITORY_LOCATIONS,
build_file_content = KAFKASOURCE_BUILD_CONTENT,
)
envoy_http_archive(
name = "com_github_openzipkin_zipkinapi",
locations = REPOSITORY_LOCATIONS,
Expand Down Expand Up @@ -84,26 +79,6 @@ go_proto_library(
)
"""

KAFKASOURCE_BUILD_CONTENT = """
filegroup(
name = "request_protocol_files",
srcs = glob([
"*Request.json",
]),
visibility = ["//visibility:public"],
)
filegroup(
name = "response_protocol_files",
srcs = glob([
"*Response.json",
]),
visibility = ["//visibility:public"],
)
"""

ZIPKINAPI_BUILD_CONTENT = """
load("@envoy_api//bazel:api_build_system.bzl", "api_cc_py_proto_library")
Expand Down
7 changes: 0 additions & 7 deletions api/bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ GOOGLEAPIS_SHA = "a45019af4d3290f02eaeb1ce10990166978c807cb33a9692141a076ba46d14
PROMETHEUS_GIT_SHA = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" # Nov 17, 2017
PROMETHEUS_SHA = "783bdaf8ee0464b35ec0c8704871e1e72afa0005c3f3587f65d9d6694bf3911b"

KAFKA_SOURCE_SHA = "ae7a1696c0a0302b43c5b21e515c37e6ecd365941f68a510a7e442eebddf39a1" # 2.2.0-rc2

UDPA_GIT_SHA = "edbea6a78f6d1ba34edc69c53a396b1d88d59651" # Dec 30, 2019
UDPA_SHA256 = "8cabd617b68354fa8b4adab8a031f80c10e2ea43f57d5f6210bc7b3ebb79b684"

Expand Down Expand Up @@ -60,11 +58,6 @@ REPOSITORY_LOCATIONS = dict(
strip_prefix = "rules_proto-" + RULES_PROTO_GIT_SHA + "",
urls = ["https://github.com/bazelbuild/rules_proto/archive/" + RULES_PROTO_GIT_SHA + ".tar.gz"],
),
kafka_source = dict(
sha256 = KAFKA_SOURCE_SHA,
strip_prefix = "kafka-2.2.0-rc2/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.2.0-rc2.zip"],
),
com_github_openzipkin_zipkinapi = dict(
sha256 = ZIPKINAPI_SHA256,
strip_prefix = "zipkin-api-" + ZIPKINAPI_RELEASE,
Expand Down
1 change: 1 addition & 0 deletions api/docs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ proto_library(
"//envoy/config/filter/network/dubbo_proxy/v2alpha1:pkg",
"//envoy/config/filter/network/ext_authz/v2:pkg",
"//envoy/config/filter/network/http_connection_manager/v2:pkg",
"//envoy/config/filter/network/kafka_broker/v2alpha1:pkg",
"//envoy/config/filter/network/local_rate_limit/v2alpha:pkg",
"//envoy/config/filter/network/mongo_proxy/v2:pkg",
"//envoy/config/filter/network/mysql_proxy/v1alpha1:pkg",
Expand Down
9 changes: 9 additions & 0 deletions api/envoy/config/filter/network/kafka_broker/v2alpha1/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# DO NOT EDIT. This file is generated by tools/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";

package envoy.config.filter.network.kafka_broker.v2alpha1;

import "udpa/annotations/migrate.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.config.filter.network.kafka_broker.v2alpha1";
option java_outer_classname = "KafkaBrokerProto";
option java_multiple_files = true;
option (udpa.annotations.file_migrate).move_to_package =
"envoy.extensions.filters.network.kafka_broker.v3alpha";

// [#protodoc-title: Kafka Broker]
// Kafka Broker :ref:`configuration overview <config_network_filters_kafka_broker>`.
// [#extension: envoy.filters.network.kafka_broker]

message KafkaBroker {
// The prefix to use when emitting :ref:`statistics <config_network_filters_kafka_broker_stats>`.
string stat_prefix = 1 [(validate.rules).string = {min_bytes: 1}];
}
12 changes: 12 additions & 0 deletions api/envoy/extensions/filters/network/kafka_broker/v3alpha/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# DO NOT EDIT. This file is generated by tools/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"//envoy/config/filter/network/kafka_broker/v2alpha1:pkg",
"@com_github_cncf_udpa//udpa/annotations:pkg",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package envoy.extensions.filters.network.kafka_broker.v3alpha;

import "udpa/annotations/versioning.proto";

import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.extensions.filters.network.kafka_broker.v3alpha";
option java_outer_classname = "KafkaBrokerProto";
option java_multiple_files = true;

// [#protodoc-title: Kafka Broker]
// Kafka Broker :ref:`configuration overview <config_network_filters_kafka_broker>`.
// [#extension: envoy.filters.network.kafka_broker]

message KafkaBroker {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.filter.network.kafka_broker.v2alpha1.KafkaBroker";

// The prefix to use when emitting :ref:`statistics <config_network_filters_kafka_broker_stats>`.
string stat_prefix = 1 [(validate.rules).string = {min_bytes: 1}];
}
5 changes: 5 additions & 0 deletions bazel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,8 @@ alias(
},
),
)

alias(
name = "remote_jdk11",
actual = "@bazel_tools//tools/jdk:remote_jdk11",
)
38 changes: 38 additions & 0 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def envoy_dependencies(skip_targets = []):
_python_deps()
_cc_deps()
_go_deps(skip_targets)
_kafka_deps()

switched_rules_by_language(
name = "com_google_googleapis_imports",
Expand Down Expand Up @@ -740,6 +741,43 @@ def _com_github_gperftools_gperftools():
actual = "@envoy//bazel/foreign_cc:gperftools",
)

def _kafka_deps():
# This archive contains Kafka client source code.
# We are using request/response message format files to generate parser code.
KAFKASOURCE_BUILD_CONTENT = """
filegroup(
name = "request_protocol_files",
srcs = glob(["*Request.json"]),
visibility = ["//visibility:public"],
)
filegroup(
name = "response_protocol_files",
srcs = glob(["*Response.json"]),
visibility = ["//visibility:public"],
)
"""
http_archive(
name = "kafka_source",
build_file_content = KAFKASOURCE_BUILD_CONTENT,
**REPOSITORY_LOCATIONS["kafka_source"]
)

# This archive provides Kafka (and Zookeeper) binaries, that are used during Kafka integration
# tests.
http_archive(
name = "kafka_server_binary",
build_file_content = BUILD_ALL_CONTENT,
**REPOSITORY_LOCATIONS["kafka_server_binary"]
)

# This archive provides Kafka client in Python, so we can use it to interact with Kafka server
# during interation tests.
http_archive(
name = "kafka_python_client",
build_file_content = BUILD_ALL_CONTENT,
**REPOSITORY_LOCATIONS["kafka_python_client"]
)

def _foreign_cc_dependencies():
_repository_impl("rules_foreign_cc")

Expand Down
15 changes: 15 additions & 0 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,19 @@ REPOSITORY_LOCATIONS = dict(
# 2019-11-19
urls = ["https://github.com/protocolbuffers/upb/archive/8a3ae1ef3e3e3f26b45dec735c5776737fc7247f.tar.gz"],
),
kafka_source = dict(
sha256 = "ae7a1696c0a0302b43c5b21e515c37e6ecd365941f68a510a7e442eebddf39a1", # 2.2.0-rc2
strip_prefix = "kafka-2.2.0-rc2/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.2.0-rc2.zip"],
),
kafka_server_binary = dict(
sha256 = "a009624fae678fa35968f945e18e45fbea9a30fa8080d5dcce7fdea726120027",
strip_prefix = "kafka_2.12-2.2.0",
urls = ["http://us.mirrors.quenda.co/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"],
),
kafka_python_client = dict(
sha256 = "81f24a5d297531495e0ccb931fbd6c4d1ec96583cf5a730579a3726e63f59c47",
strip_prefix = "kafka-python-1.4.7",
urls = ["https://github.com/dpkp/kafka-python/archive/1.4.7.tar.gz"],
),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
.. _config_network_filters_kafka_broker:

Kafka Broker filter
===================

The Apache Kafka broker filter decodes the client protocol for
`Apache Kafka <https://kafka.apache.org/>`_, both the requests and responses in the payload.
The message versions in `Kafka 2.0 <http://kafka.apache.org/20/protocol.html#protocol_api_keys>`_
are supported.
The filter attempts not to influence the communication between client and brokers, so the messages
that could not be decoded (due to Kafka client or broker running a newer version than supported by
this filter) are forwarded as-is.

* :ref:`v2 API reference <envoy_api_msg_config.filter.network.kafka_broker.v2alpha1.KafkaBroker>`
* This filter should be configured with the name *envoy.filters.network.kafka_broker*.

.. attention::

The kafka_broker filter is experimental and is currently under active development.
Capabilities will be expanded over time and the configuration structures are likely to change.

.. _config_network_filters_kafka_broker_config:

Configuration
-------------

The Kafka Broker filter should be chained with the TCP proxy filter as shown
in the configuration snippet below:

.. code-block:: yaml
listeners:
- address:
socket_address:
address: 127.0.0.1 # Host that Kafka clients should connect to.
port_value: 19092 # Port that Kafka clients should connect to.
filter_chains:
- filters:
- name: envoy.filters.network.kafka_broker
config:
stat_prefix: exampleprefix
- name: envoy.tcp_proxy
config:
stat_prefix: tcp
cluster: localkafka
clusters:
- name: localkafka
connect_timeout: 0.25s
type: strict_dns
lb_policy: round_robin
hosts:
- socket_address:
address: 127.0.0.1 # Kafka broker's host.
port_value: 9092 # Kafka broker's port.
The Kafka broker needs to advertise the Envoy listener port instead of its own.

.. code-block:: text
# Listener value needs to be equal to cluster value in Envoy config
# (will receive payloads from Envoy).
listeners=PLAINTEXT://127.0.0.1:9092
# Advertised listener value needs to be equal to Envoy's listener
# (will make clients discovering this broker talk to it through Envoy).
advertised.listeners=PLAINTEXT://127.0.0.1:19092
.. _config_network_filters_kafka_broker_stats:

Statistics
----------

Every configured Kafka Broker filter has statistics rooted at *kafka.<stat_prefix>.*, with multiple
statistics per message type.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

request.TYPE, Counter, Number of times a request of particular type was received from Kafka client
request.unknown, Counter, Number of times a request with format not recognized by this filter was received
request.failure, Counter, Number of times a request with invalid format was received or other processing exception occurred
response.TYPE, Counter, Number of times a response of particular type was received from Kafka broker
response.TYPE_duration, Histogram, Response generation time in milliseconds
response.unknown, Counter, Number of times a response with format not recognized by this filter was received
response.failure, Counter, Number of times a response with invalid format was received or other processing exception occurred
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ filters.
client_ssl_auth_filter
echo_filter
ext_authz_filter
kafka_broker_filter
local_rate_limit_filter
mongo_proxy_filter
mysql_proxy_filter
Expand Down
2 changes: 2 additions & 0 deletions generated_api_shadow/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ proto_library(
"//envoy/config/filter/network/dubbo_proxy/v2alpha1:pkg",
"//envoy/config/filter/network/ext_authz/v2:pkg",
"//envoy/config/filter/network/http_connection_manager/v2:pkg",
"//envoy/config/filter/network/kafka_broker/v2alpha1:pkg",
"//envoy/config/filter/network/local_rate_limit/v2alpha:pkg",
"//envoy/config/filter/network/local_rate_limit/v3alpha:pkg",
"//envoy/config/filter/network/mongo_proxy/v2:pkg",
Expand Down Expand Up @@ -137,6 +138,7 @@ proto_library(
"//envoy/extensions/filters/network/dubbo_proxy/v3alpha:pkg",
"//envoy/extensions/filters/network/ext_authz/v3alpha:pkg",
"//envoy/extensions/filters/network/http_connection_manager/v3alpha:pkg",
"//envoy/extensions/filters/network/kafka_broker/v3alpha:pkg",
"//envoy/extensions/filters/network/mongo_proxy/v3alpha:pkg",
"//envoy/extensions/filters/network/mysql_proxy/v3alpha:pkg",
"//envoy/extensions/filters/network/ratelimit/v3alpha:pkg",
Expand Down
25 changes: 0 additions & 25 deletions generated_api_shadow/bazel/repositories.bzl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a60f685

Please sign in to comment.