Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP/bp/1.25] Kafka + deps #29172

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@ option (xds.annotations.v3.file_status).work_in_progress = true;
// Kafka Mesh :ref:`configuration overview <config_network_filters_kafka_mesh>`.
// [#extension: envoy.filters.network.kafka_mesh]

// [#next-free-field: 6]
message KafkaMesh {
enum ConsumerProxyMode {
// Records received are going to be distributed amongst downstream consumer connections.
// In this mode Envoy uses librdkafka consumers pointing at upstream Kafka clusters, what means that these
// consumers' position is meaningful and affects what records are received from upstream.
// Users might want to take a look into these consumers' custom configuration to manage their auto-committing
// capabilities, as it will impact Envoy's behaviour in case of restarts.
StatefulConsumerProxy = 0;
}

// Envoy's host that's advertised to clients.
// Has the same meaning as corresponding Kafka broker properties.
// Usually equal to filter chain's listener config, but needs to be reachable by clients
Expand All @@ -33,8 +43,12 @@ message KafkaMesh {

// Rules that will decide which cluster gets which request.
repeated ForwardingRule forwarding_rules = 4;

// How the consumer proxying should behave - this relates mostly to Fetch request handling.
ConsumerProxyMode consumer_proxy_mode = 5;
}

// [#next-free-field: 6]
message KafkaClusterDefinition {
// Cluster name.
string cluster_name = 1 [(validate.rules).string = {min_len: 1}];
Expand All @@ -44,10 +58,14 @@ message KafkaClusterDefinition {

// Default number of partitions present in this cluster.
// This is especially important for clients that do not specify partition in their payloads and depend on this value for hashing.
// The same number of partitions is going to be used by upstream-pointing Kafka consumers for consumer proxying scenarios.
int32 partition_count = 3 [(validate.rules).int32 = {gt: 0}];

// Custom configuration passed to Kafka producer.
map<string, string> producer_config = 4;

// Custom configuration passed to Kafka consumer.
map<string, string> consumer_config = 5;
}

message ForwardingRule {
Expand Down
12 changes: 6 additions & 6 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1209,13 +1209,13 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (source)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "3.4.0",
sha256 = "9eeaf83ffddb85d253a2441a29ba6be0a563cd3d6eb9ddf0eeb8d6e2f49c0ef7",
version = "3.5.1",
sha256 = "9715589a02148fb21bc80d79f29763dbd371457bedcbbeab3db4f5c7fdd2d29c",
strip_prefix = "kafka-{version}/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/{version}.zip"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"],
release_date = "2023-01-31",
release_date = "2023-07-14",
cpe = "cpe:2.3:a:apache:kafka:*",
license = "Apache-2.0",
license_url = "https://github.com/apache/kafka/blob/{version}/LICENSE",
Expand All @@ -1239,11 +1239,11 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (server binary)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "3.4.0",
sha256 = "67025feb03eb963a8852d4adc5b2810744f493a672c5992728955e38bed43da8",
version = "3.5.1",
sha256 = "f7b74d544023f2c0ec52a179de59975cb64e34ea03650d829328b407b560e4da",
strip_prefix = "kafka_2.13-{version}",
urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.13-{version}.tgz"],
release_date = "2023-01-31",
release_date = "2023-07-21",
use_category = ["test_only"],
),
kafka_python_client = dict(
Expand Down
25 changes: 12 additions & 13 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ envoy_cc_contrib_extension(
"//bazel:windows": [],
"//conditions:default": [
":filter_lib",
":shared_consumer_manager_impl_lib",
":upstream_config_lib",
":upstream_kafka_facade_lib",
],
Expand All @@ -41,6 +42,7 @@ envoy_cc_library(
deps = [
":abstract_command_lib",
":request_processor_lib",
":shared_consumer_manager_lib",
":upstream_config_lib",
":upstream_kafka_facade_lib",
"//contrib/kafka/filters/network/source:kafka_request_codec_lib",
Expand Down Expand Up @@ -68,7 +70,9 @@ envoy_cc_library(
":upstream_kafka_facade_lib",
"//contrib/kafka/filters/network/source:kafka_request_codec_lib",
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
"//contrib/kafka/filters/network/source/mesh:shared_consumer_manager_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:api_versions_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:fetch_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:list_offsets_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:metadata_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:produce_lib",
Expand All @@ -88,6 +92,7 @@ envoy_cc_library(
deps = [
"//contrib/kafka/filters/network/source:kafka_response_lib",
"//contrib/kafka/filters/network/source:tagged_fields_lib",
"//envoy/event:dispatcher_interface",
],
)

Expand All @@ -112,32 +117,25 @@ envoy_cc_library(

envoy_cc_library(
name = "shared_consumer_manager_lib",
srcs = [
],
hdrs = [
"shared_consumer_manager.h",
],
srcs = [],
hdrs = ["shared_consumer_manager.h"],
tags = ["skip_on_windows"],
deps = [
":upstream_config_lib",
":upstream_kafka_consumer_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "shared_consumer_manager_impl_lib",
srcs = [
"shared_consumer_manager_impl.cc",
],
hdrs = [
"shared_consumer_manager_impl.h",
],
srcs = ["shared_consumer_manager_impl.cc"],
hdrs = ["shared_consumer_manager_impl.h"],
tags = ["skip_on_windows"],
deps = [
":librdkafka_utils_lib",
":shared_consumer_manager_lib",
":upstream_config_lib",
":upstream_kafka_consumer_impl_lib",
":upstream_kafka_consumer_lib",
"//contrib/kafka/filters/network/source:kafka_types_lib",
"//source/common/common:minimal_logger_lib",
],
Expand Down Expand Up @@ -223,6 +221,7 @@ envoy_cc_library(
deps = [
":librdkafka_utils_impl_lib",
":upstream_kafka_consumer_lib",
"//contrib/kafka/filters/network/source:kafka_types_lib",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "envoy/event/dispatcher.h"

#include "source/common/common/logger.h"

#include "contrib/kafka/filters/network/source/kafka_response.h"
Expand Down Expand Up @@ -57,18 +59,23 @@ class AbstractRequestListener {
// Notifies the listener that a new request has been received.
virtual void onRequest(InFlightRequestSharedPtr request) PURE;

// Notified the listener, that the request finally has an answer ready.
// Notifies the listener, that the request finally has an answer ready.
// Usually this means that the request has been sent to upstream Kafka clusters and we got answers
// (unless it's something that could be responded to locally).
// IMPL: we do not need to pass request here, as filters need to answer in-order.
// What means that we always need to check if first answer is ready, even if the latter are
// already finished.
virtual void onRequestReadyForAnswer() PURE;

// Accesses listener's dispatcher.
// Used by non-Envoy threads that need to communicate with listeners.
virtual Event::Dispatcher& dispatcher() PURE;
};

/**
* Helper base class for all in flight requests.
* Binds request to its origin filter.
* All the fields can be accessed only by the owning dispatcher thread.
*/
class BaseInFlightRequest : public InFlightRequest, protected Logger::Loggable<Logger::Id::kafka> {
public:
Expand Down
90 changes: 90 additions & 0 deletions contrib/kafka/filters/network/source/mesh/command_handlers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Command handlers

These simple diagrams show what are the main classes involved in providing Kafka-mesh filter functionality.

Disclaimer: these are not UML diagrams in any shape or form.

## Basics

Raw data is processed by `RequestDecoder`, which notifies `RequestProcessor` on successful parse.
`RequestProcessor` then creates `InFlightRequest` instances that can be processed.

When an `InFlightRequest` is finally processed, it can generate an answer (`AbstractResponse`)
that is later serialized by `ResponseEncoder`.

```mermaid
graph TD;
InFlightRequest["<< abstract >> \n InFlightRequest"]
AbstractResponse["<< abstract >> \n AbstractResponse"]

KafkaMeshFilter <-.-> |"in-flight-reference\n(finish/abandon)"| InFlightRequest
KafkaMeshFilter --> |feeds| RequestDecoder
RequestDecoder --> |"notifies"| RequestProcessor
RequestProcessor --> |"creates"| InFlightRequest
InFlightRequest --> |"produces"| AbstractResponse

RequestHolder["...RequestHolder"]
RequestHolder --> |"subclass"| InFlightRequest

KafkaMeshFilter --> ResponseEncoder
ResponseEncoder -.-> |encodes| AbstractResponse
```

## Produce

Produce request (`ProduceRequestHolder`) uses `UpstreamKafkaFacade` to get `RichKafkaProducer` instances that
correspond to its topics.
When the deliveries have finished (successfully or not - the upstream could have rejected the records because
of its own reasons), `RichKafkaProducer` notifies the `ProduceRequestHolder` that it has finished.
The request can then notify its parent (`KafkaMeshFilter`) that the response can be sent downstream.

```mermaid
graph TD;
KafkaMeshFilter <-.-> |"in-flight-reference\n(finish/abandon)"| ProduceRequestHolder
KafkaMeshFilter --> RP["RequestDecoder+RequestProcessor"]
RP --> |"creates"| ProduceRequestHolder
UpstreamKafkaFacade --> |"accesses (Envoy thread-local)"| ThreadLocalKafkaFacade
ThreadLocalKafkaFacade --> |"stores multiple"| RichKafkaProducer
RdKafkaProducer["<< librdkafka >>\nRdKafkaProducer"]
RichKafkaProducer --> |"wraps"| RdKafkaProducer
RichKafkaProducer -.-> |"in-flight-reference\n(delivery callback)"| ProduceRequestHolder
ProduceRequestHolder --> |uses| UpstreamKafkaFacade
ProduceRequestHolder -.-> |sends data to| RichKafkaProducer
```

## Fetch

Fetch request (`FechRequestHolder`) registers itself with `SharedConsumerManager` to be notified when records matching
its interests appear.
`SharedConsumerManager` maintains multiple `RichKafkaConsumer` instances (what means keeps the Kafka consumer state)
that are responsible for polling records from upstream Kafka clusters.
Each `RichKafkaConsumer` is effectively a librdkafka `KafkaConsumer` and its poller thread.
When `FechRequestHolder` is finished with its processing (whether through record delivery or timeout), it uses an Envoy
`Dispatcher` to notify the parent filter.

```mermaid
graph TD;
FRH["FechRequestHolder"]
KafkaMeshFilter <-.-> |"in-flight-reference\n(finish/abandon)"| FRH
KafkaMeshFilter --> RP["RequestDecoder+RequestProcessor"]
RP --> |"creates"| FRH

RCP["<< interface >> \n RecordCallbackProcessor"]
SCM["SharedConsumerManager"]
SCM --> |subclass| RCP

KC["RichKafkaConsumer"]
FRH -.-> |registers itself with| SCM
SCM -.-> |provides records| FRH
SCM --> |stores mutliple| KC

LibrdKafkaConsumer["<< librdkafka >> \n KafkaConsumer"]
ConsumerPoller["<< thread >> \n consumer poller"]
KC --> |wraps| LibrdKafkaConsumer
KC --> |holds| ConsumerPoller
ConsumerPoller --> |polls from| LibrdKafkaConsumer

DSP["<< Envoy >> \n Dispatcher"]
KafkaMeshFilter --- DSP
FRH -.-> |notifies on finish| DSP
```
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "fetch_lib",
srcs = [
"fetch.cc",
],
hdrs = [
"fetch.h",
],
tags = ["skip_on_windows"],
deps = [
":fetch_record_converter_lib",
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
"//contrib/kafka/filters/network/source:kafka_response_parser_lib",
"//contrib/kafka/filters/network/source/mesh:abstract_command_lib",
"//contrib/kafka/filters/network/source/mesh:shared_consumer_manager_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "fetch_record_converter_lib",
srcs = [
"fetch_record_converter.cc",
],
hdrs = [
"fetch_record_converter.h",
],
tags = ["skip_on_windows"],
deps = [
"//contrib/kafka/filters/network/source:kafka_response_parser_lib",
"//contrib/kafka/filters/network/source:serialization_lib",
"//contrib/kafka/filters/network/source/mesh:inbound_record_lib",
],
)

envoy_cc_library(
name = "list_offsets_lib",
srcs = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const {
const int16_t error_code = 0;
const ApiVersion produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED,
MAX_PRODUCE_SUPPORTED};
const ApiVersion fetch_entry = {FETCH_REQUEST_API_KEY, 0, FETCH_REQUEST_MAX_VERSION};
const ApiVersion list_offsets_entry = {LIST_OFFSETS_REQUEST_API_KEY, MIN_LIST_OFFSETS_SUPPORTED,
MAX_LIST_OFFSETS_SUPPORTED};
const ApiVersion metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED,
MAX_METADATA_SUPPORTED};
const ApiVersionsResponse real_response = {error_code,
{produce_entry, list_offsets_entry, metadata_entry}};
const ApiVersionsResponse real_response = {
error_code, {produce_entry, fetch_entry, list_offsets_entry, metadata_entry}};

return std::make_shared<Response<ApiVersionsResponse>>(metadata, real_response);
}
Expand Down