diff --git a/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto b/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto index 4a906cdeebc6..68c71f296ee3 100644 --- a/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto +++ b/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto @@ -18,7 +18,17 @@ option (xds.annotations.v3.file_status).work_in_progress = true; // Kafka Mesh :ref:`configuration overview `. // [#extension: envoy.filters.network.kafka_mesh] +// [#next-free-field: 6] message KafkaMesh { + enum ConsumerProxyMode { + // Records received are going to be distributed amongst downstream consumer connections. + // In this mode Envoy uses librdkafka consumers pointing at upstream Kafka clusters, what means that these + // consumers' position is meaningful and affects what records are received from upstream. + // Users might want to take a look into these consumers' custom configuration to manage their auto-committing + // capabilities, as it will impact Envoy's behaviour in case of restarts. + StatefulConsumerProxy = 0; + } + // Envoy's host that's advertised to clients. // Has the same meaning as corresponding Kafka broker properties. // Usually equal to filter chain's listener config, but needs to be reachable by clients @@ -33,8 +43,12 @@ message KafkaMesh { // Rules that will decide which cluster gets which request. repeated ForwardingRule forwarding_rules = 4; + + // How the consumer proxying should behave - this relates mostly to Fetch request handling. + ConsumerProxyMode consumer_proxy_mode = 5; } +// [#next-free-field: 6] message KafkaClusterDefinition { // Cluster name. string cluster_name = 1 [(validate.rules).string = {min_len: 1}]; @@ -44,10 +58,14 @@ message KafkaClusterDefinition { // Default number of partitions present in this cluster. // This is especially important for clients that do not specify partition in their payloads and depend on this value for hashing. + // The same number of partitions is going to be used by upstream-pointing Kafka consumers for consumer proxying scenarios. int32 partition_count = 3 [(validate.rules).int32 = {gt: 0}]; // Custom configuration passed to Kafka producer. map producer_config = 4; + + // Custom configuration passed to Kafka consumer. + map consumer_config = 5; } message ForwardingRule { diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index b6ce7584fba0..5a98e3816af6 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -1209,13 +1209,13 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (source)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "3.4.0", - sha256 = "9eeaf83ffddb85d253a2441a29ba6be0a563cd3d6eb9ddf0eeb8d6e2f49c0ef7", + version = "3.5.1", + sha256 = "9715589a02148fb21bc80d79f29763dbd371457bedcbbeab3db4f5c7fdd2d29c", strip_prefix = "kafka-{version}/clients/src/main/resources/common/message", urls = ["https://github.com/apache/kafka/archive/{version}.zip"], use_category = ["dataplane_ext"], extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"], - release_date = "2023-01-31", + release_date = "2023-07-14", cpe = "cpe:2.3:a:apache:kafka:*", license = "Apache-2.0", license_url = "https://github.com/apache/kafka/blob/{version}/LICENSE", @@ -1239,11 +1239,11 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (server binary)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "3.4.0", - sha256 = "67025feb03eb963a8852d4adc5b2810744f493a672c5992728955e38bed43da8", + version = "3.5.1", + sha256 = "f7b74d544023f2c0ec52a179de59975cb64e34ea03650d829328b407b560e4da", strip_prefix = "kafka_2.13-{version}", urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.13-{version}.tgz"], - release_date = "2023-01-31", + release_date = "2023-07-21", use_category = ["test_only"], ), kafka_python_client = dict( diff --git a/contrib/kafka/filters/network/source/mesh/BUILD b/contrib/kafka/filters/network/source/mesh/BUILD index cfc0b024924f..7ce691225bdf 100644 --- a/contrib/kafka/filters/network/source/mesh/BUILD +++ b/contrib/kafka/filters/network/source/mesh/BUILD @@ -25,6 +25,7 @@ envoy_cc_contrib_extension( "//bazel:windows": [], "//conditions:default": [ ":filter_lib", + ":shared_consumer_manager_impl_lib", ":upstream_config_lib", ":upstream_kafka_facade_lib", ], @@ -41,6 +42,7 @@ envoy_cc_library( deps = [ ":abstract_command_lib", ":request_processor_lib", + ":shared_consumer_manager_lib", ":upstream_config_lib", ":upstream_kafka_facade_lib", "//contrib/kafka/filters/network/source:kafka_request_codec_lib", @@ -68,7 +70,9 @@ envoy_cc_library( ":upstream_kafka_facade_lib", "//contrib/kafka/filters/network/source:kafka_request_codec_lib", "//contrib/kafka/filters/network/source:kafka_request_parser_lib", + "//contrib/kafka/filters/network/source/mesh:shared_consumer_manager_lib", "//contrib/kafka/filters/network/source/mesh/command_handlers:api_versions_lib", + "//contrib/kafka/filters/network/source/mesh/command_handlers:fetch_lib", "//contrib/kafka/filters/network/source/mesh/command_handlers:list_offsets_lib", "//contrib/kafka/filters/network/source/mesh/command_handlers:metadata_lib", "//contrib/kafka/filters/network/source/mesh/command_handlers:produce_lib", @@ -88,6 +92,7 @@ envoy_cc_library( deps = [ "//contrib/kafka/filters/network/source:kafka_response_lib", "//contrib/kafka/filters/network/source:tagged_fields_lib", + "//envoy/event:dispatcher_interface", ], ) @@ -112,32 +117,25 @@ envoy_cc_library( envoy_cc_library( name = "shared_consumer_manager_lib", - srcs = [ - ], - hdrs = [ - "shared_consumer_manager.h", - ], + srcs = [], + hdrs = ["shared_consumer_manager.h"], tags = ["skip_on_windows"], deps = [ + ":upstream_config_lib", ":upstream_kafka_consumer_lib", - "//source/common/common:minimal_logger_lib", ], ) envoy_cc_library( name = "shared_consumer_manager_impl_lib", - srcs = [ - "shared_consumer_manager_impl.cc", - ], - hdrs = [ - "shared_consumer_manager_impl.h", - ], + srcs = ["shared_consumer_manager_impl.cc"], + hdrs = ["shared_consumer_manager_impl.h"], tags = ["skip_on_windows"], deps = [ - ":librdkafka_utils_lib", ":shared_consumer_manager_lib", ":upstream_config_lib", ":upstream_kafka_consumer_impl_lib", + ":upstream_kafka_consumer_lib", "//contrib/kafka/filters/network/source:kafka_types_lib", "//source/common/common:minimal_logger_lib", ], @@ -223,6 +221,7 @@ envoy_cc_library( deps = [ ":librdkafka_utils_impl_lib", ":upstream_kafka_consumer_lib", + "//contrib/kafka/filters/network/source:kafka_types_lib", "//envoy/event:dispatcher_interface", "//source/common/common:minimal_logger_lib", ], diff --git a/contrib/kafka/filters/network/source/mesh/abstract_command.h b/contrib/kafka/filters/network/source/mesh/abstract_command.h index e7ee458d1e7d..6de8c6ac4b45 100644 --- a/contrib/kafka/filters/network/source/mesh/abstract_command.h +++ b/contrib/kafka/filters/network/source/mesh/abstract_command.h @@ -1,5 +1,7 @@ #pragma once +#include "envoy/event/dispatcher.h" + #include "source/common/common/logger.h" #include "contrib/kafka/filters/network/source/kafka_response.h" @@ -57,18 +59,23 @@ class AbstractRequestListener { // Notifies the listener that a new request has been received. virtual void onRequest(InFlightRequestSharedPtr request) PURE; - // Notified the listener, that the request finally has an answer ready. + // Notifies the listener, that the request finally has an answer ready. // Usually this means that the request has been sent to upstream Kafka clusters and we got answers // (unless it's something that could be responded to locally). // IMPL: we do not need to pass request here, as filters need to answer in-order. // What means that we always need to check if first answer is ready, even if the latter are // already finished. virtual void onRequestReadyForAnswer() PURE; + + // Accesses listener's dispatcher. + // Used by non-Envoy threads that need to communicate with listeners. + virtual Event::Dispatcher& dispatcher() PURE; }; /** * Helper base class for all in flight requests. * Binds request to its origin filter. + * All the fields can be accessed only by the owning dispatcher thread. */ class BaseInFlightRequest : public InFlightRequest, protected Logger::Loggable { public: diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers.md b/contrib/kafka/filters/network/source/mesh/command_handlers.md new file mode 100644 index 000000000000..3918193b941a --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/command_handlers.md @@ -0,0 +1,90 @@ +# Command handlers + +These simple diagrams show what are the main classes involved in providing Kafka-mesh filter functionality. + +Disclaimer: these are not UML diagrams in any shape or form. + +## Basics + +Raw data is processed by `RequestDecoder`, which notifies `RequestProcessor` on successful parse. +`RequestProcessor` then creates `InFlightRequest` instances that can be processed. + +When an `InFlightRequest` is finally processed, it can generate an answer (`AbstractResponse`) +that is later serialized by `ResponseEncoder`. + +```mermaid +graph TD; + InFlightRequest["<< abstract >> \n InFlightRequest"] + AbstractResponse["<< abstract >> \n AbstractResponse"] + + KafkaMeshFilter <-.-> |"in-flight-reference\n(finish/abandon)"| InFlightRequest + KafkaMeshFilter --> |feeds| RequestDecoder + RequestDecoder --> |"notifies"| RequestProcessor + RequestProcessor --> |"creates"| InFlightRequest + InFlightRequest --> |"produces"| AbstractResponse + + RequestHolder["...RequestHolder"] + RequestHolder --> |"subclass"| InFlightRequest + + KafkaMeshFilter --> ResponseEncoder + ResponseEncoder -.-> |encodes| AbstractResponse +``` + +## Produce + +Produce request (`ProduceRequestHolder`) uses `UpstreamKafkaFacade` to get `RichKafkaProducer` instances that +correspond to its topics. +When the deliveries have finished (successfully or not - the upstream could have rejected the records because +of its own reasons), `RichKafkaProducer` notifies the `ProduceRequestHolder` that it has finished. +The request can then notify its parent (`KafkaMeshFilter`) that the response can be sent downstream. + +```mermaid +graph TD; + KafkaMeshFilter <-.-> |"in-flight-reference\n(finish/abandon)"| ProduceRequestHolder + KafkaMeshFilter --> RP["RequestDecoder+RequestProcessor"] + RP --> |"creates"| ProduceRequestHolder + UpstreamKafkaFacade --> |"accesses (Envoy thread-local)"| ThreadLocalKafkaFacade + ThreadLocalKafkaFacade --> |"stores multiple"| RichKafkaProducer + RdKafkaProducer["<< librdkafka >>\nRdKafkaProducer"] + RichKafkaProducer --> |"wraps"| RdKafkaProducer + RichKafkaProducer -.-> |"in-flight-reference\n(delivery callback)"| ProduceRequestHolder + ProduceRequestHolder --> |uses| UpstreamKafkaFacade + ProduceRequestHolder -.-> |sends data to| RichKafkaProducer +``` + +## Fetch + +Fetch request (`FechRequestHolder`) registers itself with `SharedConsumerManager` to be notified when records matching +its interests appear. +`SharedConsumerManager` maintains multiple `RichKafkaConsumer` instances (what means keeps the Kafka consumer state) +that are responsible for polling records from upstream Kafka clusters. +Each `RichKafkaConsumer` is effectively a librdkafka `KafkaConsumer` and its poller thread. +When `FechRequestHolder` is finished with its processing (whether through record delivery or timeout), it uses an Envoy +`Dispatcher` to notify the parent filter. + +```mermaid +graph TD; + FRH["FechRequestHolder"] + KafkaMeshFilter <-.-> |"in-flight-reference\n(finish/abandon)"| FRH + KafkaMeshFilter --> RP["RequestDecoder+RequestProcessor"] + RP --> |"creates"| FRH + + RCP["<< interface >> \n RecordCallbackProcessor"] + SCM["SharedConsumerManager"] + SCM --> |subclass| RCP + + KC["RichKafkaConsumer"] + FRH -.-> |registers itself with| SCM + SCM -.-> |provides records| FRH + SCM --> |stores mutliple| KC + + LibrdKafkaConsumer["<< librdkafka >> \n KafkaConsumer"] + ConsumerPoller["<< thread >> \n consumer poller"] + KC --> |wraps| LibrdKafkaConsumer + KC --> |holds| ConsumerPoller + ConsumerPoller --> |polls from| LibrdKafkaConsumer + + DSP["<< Envoy >> \n Dispatcher"] + KafkaMeshFilter --- DSP + FRH -.-> |notifies on finish| DSP +``` diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD index be021a214a3a..508881604858 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD @@ -45,6 +45,41 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "fetch_lib", + srcs = [ + "fetch.cc", + ], + hdrs = [ + "fetch.h", + ], + tags = ["skip_on_windows"], + deps = [ + ":fetch_record_converter_lib", + "//contrib/kafka/filters/network/source:kafka_request_parser_lib", + "//contrib/kafka/filters/network/source:kafka_response_parser_lib", + "//contrib/kafka/filters/network/source/mesh:abstract_command_lib", + "//contrib/kafka/filters/network/source/mesh:shared_consumer_manager_lib", + "//source/common/common:minimal_logger_lib", + ], +) + +envoy_cc_library( + name = "fetch_record_converter_lib", + srcs = [ + "fetch_record_converter.cc", + ], + hdrs = [ + "fetch_record_converter.h", + ], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source:kafka_response_parser_lib", + "//contrib/kafka/filters/network/source:serialization_lib", + "//contrib/kafka/filters/network/source/mesh:inbound_record_lib", + ], +) + envoy_cc_library( name = "list_offsets_lib", srcs = [ diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc index 3f50ed73de38..4a2c09b3b32d 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc @@ -44,12 +44,13 @@ AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const { const int16_t error_code = 0; const ApiVersion produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED, MAX_PRODUCE_SUPPORTED}; + const ApiVersion fetch_entry = {FETCH_REQUEST_API_KEY, 0, FETCH_REQUEST_MAX_VERSION}; const ApiVersion list_offsets_entry = {LIST_OFFSETS_REQUEST_API_KEY, MIN_LIST_OFFSETS_SUPPORTED, MAX_LIST_OFFSETS_SUPPORTED}; const ApiVersion metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED, MAX_METADATA_SUPPORTED}; - const ApiVersionsResponse real_response = {error_code, - {produce_entry, list_offsets_entry, metadata_entry}}; + const ApiVersionsResponse real_response = { + error_code, {produce_entry, fetch_entry, list_offsets_entry, metadata_entry}}; return std::make_shared>(metadata, real_response); } diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch.cc new file mode 100644 index 000000000000..241000933c32 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch.cc @@ -0,0 +1,188 @@ +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch.h" + +#include + +#include "source/common/common/fmt.h" + +#include "absl/synchronization/mutex.h" +#include "contrib/kafka/filters/network/source/external/responses.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +FetchRequestHolder::FetchRequestHolder(AbstractRequestListener& filter, + RecordCallbackProcessor& consumer_manager, + const std::shared_ptr> request) + : FetchRequestHolder{filter, consumer_manager, request, + FetchRecordConverterImpl::getDefaultInstance()} {} + +FetchRequestHolder::FetchRequestHolder(AbstractRequestListener& filter, + RecordCallbackProcessor& consumer_manager, + const std::shared_ptr> request, + const FetchRecordConverter& converter) + : BaseInFlightRequest{filter}, consumer_manager_{consumer_manager}, request_{request}, + dispatcher_{filter.dispatcher()}, converter_{converter} {} + +// XXX (adam.kotwasinski) This should be made configurable in future. +constexpr uint32_t FETCH_TIMEOUT_MS = 5000; + +static Event::TimerPtr registerTimeoutCallback(Event::Dispatcher& dispatcher, + const Event::TimerCb callback, + const int32_t timeout) { + auto event = dispatcher.createTimer(callback); + event->enableTimer(std::chrono::milliseconds(timeout)); + return event; +} + +void FetchRequestHolder::startProcessing() { + const TopicToPartitionsMap requested_topics = interest(); + + { + absl::MutexLock lock(&state_mutex_); + for (const auto& topic_and_partitions : requested_topics) { + const std::string& topic_name = topic_and_partitions.first; + for (const int32_t partition : topic_and_partitions.second) { + // This makes sure that all requested KafkaPartitions are tracked, + // so then output generation is simpler. + messages_[{topic_name, partition}] = {}; + } + } + } + + const auto self_reference = shared_from_this(); + consumer_manager_.processCallback(self_reference); + + Event::TimerCb callback = [this]() -> void { + // Fun fact: if the request is degenerate (no partitions requested), + // this will ensure it gets processed. + markFinishedByTimer(); + }; + timer_ = registerTimeoutCallback(dispatcher_, callback, FETCH_TIMEOUT_MS); +} + +TopicToPartitionsMap FetchRequestHolder::interest() const { + TopicToPartitionsMap result; + const std::vector& topics = request_->data_.topics_; + for (const FetchTopic& topic : topics) { + const std::string topic_name = topic.topic_; + const std::vector partitions = topic.partitions_; + for (const FetchPartition& partition : partitions) { + result[topic_name].push_back(partition.partition_); + } + } + return result; +} + +// This method is called by a Envoy-worker thread. +void FetchRequestHolder::markFinishedByTimer() { + ENVOY_LOG(trace, "Request {} timed out", toString()); + bool doCleanup = false; + { + absl::MutexLock lock(&state_mutex_); + timer_ = nullptr; + if (!finished_) { + finished_ = true; + doCleanup = true; + } + } + if (doCleanup) { + cleanup(true); + } +} + +// XXX (adam.kotwasinski) This should be made configurable in future. +// Right now the Fetch request is going to send up to 3 records. +// In future this should transform into some kind of method that's invoked inside 'receive' calls, +// as Kafka can have limits on records per partition. +constexpr int32_t MINIMAL_MSG_CNT = 3; + +// This method is called by: +// - Kafka-consumer thread - when have the records delivered, +// - dispatcher thread - when we start processing and check whether anything was cached. +CallbackReply FetchRequestHolder::receive(InboundRecordSharedPtr message) { + absl::MutexLock lock(&state_mutex_); + if (!finished_) { + // Store a new record. + const KafkaPartition kp = {message->topic_, message->partition_}; + messages_[kp].push_back(message); + + // Count all the records currently stored within this request. + uint32_t current_messages = 0; + for (const auto& e : messages_) { + current_messages += e.second.size(); + } + + if (current_messages < MINIMAL_MSG_CNT) { + // We can consume more in future. + return CallbackReply::AcceptedAndWantMore; + } else { + // We have all we needed, we can finish processing. + finished_ = true; + cleanup(false); + return CallbackReply::AcceptedAndFinished; + } + } else { + // This fetch request has finished processing, so it will not accept a record. + return CallbackReply::Rejected; + } +} + +std::string FetchRequestHolder::toString() const { + return fmt::format("[Fetch id={}]", request_->request_header_.correlation_id_); +} + +void FetchRequestHolder::cleanup(bool unregister) { + ENVOY_LOG(trace, "Cleanup starting for {}", toString()); + if (unregister) { + const auto self_reference = shared_from_this(); + consumer_manager_.removeCallback(self_reference); + } + + // Our request is ready and can be sent downstream. + // However, the caller here could be a Kafka-consumer worker thread (not an Envoy worker one), + // so we need to use dispatcher to notify the filter that we are finished. + auto notifyCallback = [this]() -> void { + timer_ = nullptr; + filter_.onRequestReadyForAnswer(); + }; + // Impl note: usually this will be invoked by non-Envoy thread, + // so let's not optimize that this might be invoked by dispatcher callback. + dispatcher_.post(notifyCallback); + ENVOY_LOG(trace, "Cleanup finished for {}", toString()); +} + +bool FetchRequestHolder::finished() const { + absl::MutexLock lock(&state_mutex_); + return finished_; +} + +void FetchRequestHolder::abandon() { + ENVOY_LOG(trace, "Abandoning {}", toString()); + // We remove the timeout-callback and unregister this request so no deliveries happen to it. + timer_ = nullptr; + const auto self_reference = shared_from_this(); + consumer_manager_.removeCallback(self_reference); + BaseInFlightRequest::abandon(); +} + +AbstractResponseSharedPtr FetchRequestHolder::computeAnswer() const { + const auto& header = request_->request_header_; + const ResponseMetadata metadata = {header.api_key_, header.api_version_, header.correlation_id_}; + + std::vector responses; + { + absl::MutexLock lock(&state_mutex_); + responses = converter_.convert(messages_); + } + const FetchResponse data = {responses}; + return std::make_shared>(metadata, data); +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch.h b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch.h new file mode 100644 index 000000000000..ac11a45f6ad1 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch.h @@ -0,0 +1,89 @@ +#pragma once + +#include "envoy/event/dispatcher.h" +#include "envoy/event/timer.h" + +#include "absl/synchronization/mutex.h" +#include "contrib/kafka/filters/network/source/external/requests.h" +#include "contrib/kafka/filters/network/source/mesh/abstract_command.h" +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" +#include "contrib/kafka/filters/network/source/mesh/inbound_record.h" +#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager.h" +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +class FetchRequestHolder : public BaseInFlightRequest, + public RecordCb, + public std::enable_shared_from_this { +public: + FetchRequestHolder(AbstractRequestListener& filter, RecordCallbackProcessor& consumer_manager, + const std::shared_ptr> request); + + // Visible for testing. + FetchRequestHolder(AbstractRequestListener& filter, RecordCallbackProcessor& consumer_manager, + const std::shared_ptr>, + const FetchRecordConverter& converter); + + // AbstractInFlightRequest + void startProcessing() override; + + // AbstractInFlightRequest + bool finished() const override; + + // AbstractInFlightRequest + void abandon() override; + + // AbstractInFlightRequest + AbstractResponseSharedPtr computeAnswer() const override; + + // Invoked by timer as this requests's time runs out. + // It is possible that this request has already been finished (there was data to send), + // then this method does nothing. + void markFinishedByTimer(); + + // RecordCb + CallbackReply receive(InboundRecordSharedPtr message) override; + + // RecordCb + TopicToPartitionsMap interest() const override; + + // RecordCb + std::string toString() const override; + +private: + // Invoked internally when we want to mark this Fetch request as done. + // This means: we are no longer interested in future messages and might need to unregister + // ourselves. + void cleanup(bool unregister); + + // Provides access to upstream-pointing consumers. + RecordCallbackProcessor& consumer_manager_; + // Original request. + const std::shared_ptr> request_; + + mutable absl::Mutex state_mutex_; + // Whether this request has finished processing and is ready for sending upstream. + bool finished_ ABSL_GUARDED_BY(state_mutex_) = false; + // The messages to send downstream. + std::map> + messages_ ABSL_GUARDED_BY(state_mutex_); + + // Filter's dispatcher. + Event::Dispatcher& dispatcher_; + // Timeout timer (invalidated when request is finished). + Event::TimerPtr timer_; + + // Translates librdkafka objects into bytes to be sent downstream. + const FetchRecordConverter& converter_; +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc new file mode 100644 index 000000000000..8b760b21dd94 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -0,0 +1,198 @@ +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" + +#include "contrib/kafka/filters/network/source/serialization.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() { + CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl); +} + +std::vector +FetchRecordConverterImpl::convert(const InboundRecordsMap& arg) const { + + // Compute record batches. + std::map record_batches; + for (const auto& partition_and_records : arg) { + const KafkaPartition& kp = partition_and_records.first; + const std::vector& partition_records = partition_and_records.second; + const Bytes batch = renderRecordBatch(partition_records); + record_batches[kp] = batch; + } + + // Transform our maps into the Kafka structs. + std::map> topic_to_frrpd; + for (const auto& record_batch : record_batches) { + const std::string& topic_name = record_batch.first.first; + const int32_t partition = record_batch.first.second; + + std::vector& frrpds = topic_to_frrpd[topic_name]; + const int16_t error_code = 0; + const int64_t high_watermark = 0; + const auto frrpd = FetchResponseResponsePartitionData{partition, error_code, high_watermark, + absl::make_optional(record_batch.second)}; + + frrpds.push_back(frrpd); + } + + std::vector result; + for (const auto& partition_and_records : topic_to_frrpd) { + const std::string& topic_name = partition_and_records.first; + const auto ftr = FetchableTopicResponse{topic_name, partition_and_records.second}; + result.push_back(ftr); + } + return result; +} + +// Magic format introduced around Kafka 1.0.0 and still used with Kafka 3.3. +constexpr int8_t MAGIC = 2; + +Bytes FetchRecordConverterImpl::renderRecordBatch( + const std::vector& records) const { + + Bytes result = {}; + + // Base offset (bytes 0..7). + const int64_t base_offset = htobe64(0); + const unsigned char* base_offset_b = reinterpret_cast(&base_offset); + result.insert(result.end(), base_offset_b, base_offset_b + sizeof(base_offset)); + + // Batch length placeholder (bytes 8..11). + result.insert(result.end(), {0, 0, 0, 0}); + + // All other attributes (spans partitionLeaderEpoch .. baseSequence) (bytes 12..56). + const std::vector zeros(45, 0); + result.insert(result.end(), zeros.begin(), zeros.end()); + + // Last offset delta. + // -1 means we always claim that we are at the beginning of partition. + const int32_t last_offset_delta = htobe32(-1); + const unsigned char* last_offset_delta_bytes = + reinterpret_cast(&last_offset_delta); + const auto last_offset_delta_pos = result.begin() + 8 + 4 + 11; + std::copy(last_offset_delta_bytes, last_offset_delta_bytes + sizeof(last_offset_delta), + last_offset_delta_pos); + + // Records (count) (bytes 57..60). + const int32_t record_count = htobe32(records.size()); + const unsigned char* record_count_b = reinterpret_cast(&record_count); + result.insert(result.end(), record_count_b, record_count_b + sizeof(record_count)); + + // Records (data) (bytes 61+). + for (const auto& record : records) { + appendRecord(*record, result); + } + + // Set batch length. + const int32_t batch_len = htobe32(result.size() - (sizeof(base_offset) + sizeof(batch_len))); + const unsigned char* batch_len_bytes = reinterpret_cast(&batch_len); + std::copy(batch_len_bytes, batch_len_bytes + sizeof(batch_len), + result.begin() + sizeof(base_offset)); + + // Set magic. + const uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t); + result[magic_offset] = MAGIC; + + // Compute and set CRC. + const uint32_t crc_offset = magic_offset + 1; + const auto crc_data_start = result.data() + crc_offset + sizeof(int32_t); + const auto crc_data_len = result.size() - (crc_offset + sizeof(int32_t)); + const Bytes crc = renderCrc32c(crc_data_start, crc_data_len); + std::copy(crc.begin(), crc.end(), result.begin() + crc_offset); + + return result; +} + +void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out) const { + + Bytes tmp = {}; + // This is not precise maths, as we could be over-reserving a little due to var-length fields. + tmp.reserve(sizeof(int8_t) + sizeof(int64_t) + sizeof(int32_t) + record.dataLengthEstimate()); + + // attributes: int8 + const int8_t attributes = 0; + tmp.push_back(static_cast(attributes)); + + // timestampDelta: varlong + const int64_t timestamp_delta = 0; + VarlenUtils::writeVarlong(timestamp_delta, tmp); + + // offsetDelta: varint + const int32_t offset_delta = record.offset_; + VarlenUtils::writeVarint(offset_delta, tmp); + + // Impl note: compared to requests/responses, records serialize byte arrays as varint length + + // bytes (and not length + 1, then bytes). So we cannot use EncodingContext from serialization.h. + + // keyLength: varint + // key: byte[] + const NullableBytes& key = record.key_; + if (key.has_value()) { + VarlenUtils::writeVarint(key->size(), tmp); + tmp.insert(tmp.end(), key->begin(), key->end()); + } else { + VarlenUtils::writeVarint(-1, tmp); + } + + // valueLen: varint + // value: byte[] + const NullableBytes& value = record.value_; + if (value.has_value()) { + VarlenUtils::writeVarint(value->size(), tmp); + tmp.insert(tmp.end(), value->begin(), value->end()); + } else { + VarlenUtils::writeVarint(-1, tmp); + } + + // TODO (adam.kotwasinski) Headers are not supported yet. + const int32_t header_count = 0; + VarlenUtils::writeVarint(header_count, tmp); + + // Put tmp's length into 'out'. + VarlenUtils::writeVarint(tmp.size(), out); + + // Put tmp's contents into 'out'. + out.insert(out.end(), tmp.begin(), tmp.end()); +} + +// XXX (adam.kotwasinski) Instead of computing it naively, either link against librdkafka's +// implementation or generate it. +// https://github.com/confluentinc/librdkafka/blob/v1.8.0/src/crc32c.c#L1 +uint32_t FetchRecordConverterImpl::computeCrc32c(const unsigned char* data, const size_t len) { + uint32_t crc = 0xFFFFFFFF; + for (size_t i = 0; i < len; i++) { + char ch = data[i]; + for (size_t j = 0; j < 8; j++) { + uint32_t b = (ch ^ crc) & 1; + crc >>= 1; + if (b) { + crc = crc ^ 0x82F63B78; + } + ch >>= 1; + } + } + return ~crc; +} + +uint32_t FetchRecordConverterImpl::computeCrc32cForTest(const unsigned char* data, + const size_t len) { + return computeCrc32c(data, len); +} + +Bytes FetchRecordConverterImpl::renderCrc32c(const unsigned char* data, const size_t len) const { + uint32_t crc = htobe32(computeCrc32c(data, len)); + Bytes result; + unsigned char* raw = reinterpret_cast(&crc); + result.insert(result.end(), raw, raw + sizeof(crc)); + return result; +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h new file mode 100644 index 000000000000..e621adc2e7ff --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h @@ -0,0 +1,65 @@ +#pragma once + +#include +#include +#include + +#include "contrib/kafka/filters/network/source/external/responses.h" +#include "contrib/kafka/filters/network/source/kafka_types.h" +#include "contrib/kafka/filters/network/source/mesh/inbound_record.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +using InboundRecordsMap = std::map>; + +/** + * Dependency injection class responsible for converting received records into serializable form + * that we can put into Fetch responses. + */ +class FetchRecordConverter { +public: + virtual ~FetchRecordConverter() = default; + + // Converts received records into the serialized form. + virtual std::vector convert(const InboundRecordsMap& arg) const PURE; +}; + +/** + * Proper implementation. + */ +class FetchRecordConverterImpl : public FetchRecordConverter { +public: + // FetchRecordConverter + std::vector convert(const InboundRecordsMap& arg) const override; + + // Default singleton accessor. + static const FetchRecordConverter& getDefaultInstance(); + + static uint32_t computeCrc32cForTest(const unsigned char* data, const size_t len); + +private: + // Helper function: transform records from a partition into a record batch. + // See: https://kafka.apache.org/33/documentation.html#recordbatch + Bytes renderRecordBatch(const std::vector& records) const; + + // Helper function: append record to output array. + // See: https://kafka.apache.org/33/documentation.html#record + // https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L164 + void appendRecord(const InboundRecord& record, Bytes& out) const; + + // Helper function: render CRC32C bytes from given input. + Bytes renderCrc32c(const unsigned char* data, const size_t len) const; + + // Helper function: compute CRC32C. + static uint32_t computeCrc32c(const unsigned char* data, const size_t len); +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/config.cc b/contrib/kafka/filters/network/source/mesh/config.cc index 7c2a1f4e2474..d2c2501afb37 100644 --- a/contrib/kafka/filters/network/source/mesh/config.cc +++ b/contrib/kafka/filters/network/source/mesh/config.cc @@ -5,6 +5,8 @@ #include "envoy/stats/scope.h" #ifndef WIN32 +#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager.h" +#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.h" #include "contrib/kafka/filters/network/source/mesh/upstream_config.h" #include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h" #include "contrib/kafka/filters/network/source/mesh/filter.h" @@ -35,9 +37,15 @@ Network::FilterFactoryCb KafkaMeshConfigFactory::createFilterFactoryFromProtoTyp std::make_shared(*configuration, context.threadLocal(), context.api().threadFactory()); - return [configuration, upstream_kafka_facade](Network::FilterManager& filter_manager) -> void { - Network::ReadFilterSharedPtr filter = - std::make_shared(*configuration, *upstream_kafka_facade); + // Manager for consumers shared across downstream connections + // (connects us to upstream Kafka clusters). + const RecordCallbackProcessorSharedPtr shared_consumer_manager = + std::make_shared(*configuration, context.api().threadFactory()); + + return [configuration, upstream_kafka_facade, + shared_consumer_manager](Network::FilterManager& filter_manager) -> void { + Network::ReadFilterSharedPtr filter = std::make_shared( + *configuration, *upstream_kafka_facade, *shared_consumer_manager); filter_manager.addReadFilter(filter); }; #endif diff --git a/contrib/kafka/filters/network/source/mesh/filter.cc b/contrib/kafka/filters/network/source/mesh/filter.cc index 10c0ef796162..4f600a5e8989 100644 --- a/contrib/kafka/filters/network/source/mesh/filter.cc +++ b/contrib/kafka/filters/network/source/mesh/filter.cc @@ -15,9 +15,11 @@ namespace Kafka { namespace Mesh { KafkaMeshFilter::KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration, - UpstreamKafkaFacade& upstream_kafka_facade) - : KafkaMeshFilter{std::make_shared(std::vector( - {std::make_shared(*this, configuration, upstream_kafka_facade)}))} {} + UpstreamKafkaFacade& upstream_kafka_facade, + RecordCallbackProcessor& record_callback_processor) + : KafkaMeshFilter{std::make_shared( + std::vector({std::make_shared( + *this, configuration, upstream_kafka_facade, record_callback_processor)}))} {} KafkaMeshFilter::KafkaMeshFilter(RequestDecoderSharedPtr request_decoder) : request_decoder_{request_decoder} {} @@ -90,6 +92,10 @@ void KafkaMeshFilter::onRequestReadyForAnswer() { } } +Event::Dispatcher& KafkaMeshFilter::dispatcher() { + return read_filter_callbacks_->connection().dispatcher(); +} + void KafkaMeshFilter::abandonAllInFlightRequests() { for (const auto& request : requests_in_flight_) { request->abandon(); diff --git a/contrib/kafka/filters/network/source/mesh/filter.h b/contrib/kafka/filters/network/source/mesh/filter.h index a6b4ec80cdd4..56239b6dc57c 100644 --- a/contrib/kafka/filters/network/source/mesh/filter.h +++ b/contrib/kafka/filters/network/source/mesh/filter.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/common/time.h" +#include "envoy/event/dispatcher.h" #include "envoy/network/filter.h" #include "envoy/stats/scope.h" @@ -9,6 +10,7 @@ #include "contrib/kafka/filters/network/source/external/requests.h" #include "contrib/kafka/filters/network/source/mesh/abstract_command.h" #include "contrib/kafka/filters/network/source/mesh/request_processor.h" +#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager.h" #include "contrib/kafka/filters/network/source/mesh/upstream_config.h" #include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h" #include "contrib/kafka/filters/network/source/request_codec.h" @@ -26,26 +28,7 @@ namespace Mesh { * Filter is going to maintain a list of in-flight-request so it can send responses when they * finish. * - * - * +----------------+ +-----------------------+ - * |RequestProcessor+----------------->AbstractInFlightRequest| - * +-------^--------+ +----^-----^------------+ - * | | | - * | | +-+------------------+ - * +-------+-------+ | |ProduceRequestHolder| - * |KafkaMeshFilter+-----------------------+ +-+------------------+ - * +-------+-------+ | - * | | - * | | - * +-------v-----------+ | - * |UpstreamKafkaFacade| |(for callback when finished) - * +-------+-----------+ | - * | | - * | | - * +-------v--------------+ +--------------v--+ +-----------------+ - * |<> +------->RichKafkaProducer+--->><> | - * |ThreadLocalKafkaFacade| +-----------------+ |RdKafka::Producer| - * +----------------------+ +-----------------+ + * See command_handlers.md for particular request interactions. **/ class KafkaMeshFilter : public Network::ReadFilter, public Network::ConnectionCallbacks, @@ -54,7 +37,8 @@ class KafkaMeshFilter : public Network::ReadFilter, public: // Main constructor. KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration, - UpstreamKafkaFacade& upstream_kafka_facade); + UpstreamKafkaFacade& upstream_kafka_facade, + RecordCallbackProcessor& record_callback_processor); // Visible for testing. KafkaMeshFilter(RequestDecoderSharedPtr request_decoder); @@ -75,6 +59,7 @@ class KafkaMeshFilter : public Network::ReadFilter, // AbstractRequestListener void onRequest(InFlightRequestSharedPtr request) override; void onRequestReadyForAnswer() override; + Event::Dispatcher& dispatcher() override; std::list& getRequestsInFlightForTest(); diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h index 83cd1d61fbcf..ecde30aaea0c 100644 --- a/contrib/kafka/filters/network/source/mesh/inbound_record.h +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -4,6 +4,7 @@ #include #include "absl/strings/str_cat.h" +#include "contrib/kafka/filters/network/source/kafka_types.h" namespace Envoy { namespace Extensions { @@ -20,10 +21,20 @@ struct InboundRecord { const int32_t partition_; const int64_t offset_; - // TODO (adam.kotwasinski) Get data in here in the next commits. + const NullableBytes key_; + const NullableBytes value_; - InboundRecord(std::string topic, int32_t partition, int64_t offset) - : topic_{topic}, partition_{partition}, offset_{offset} {}; + InboundRecord(const std::string& topic, const int32_t partition, const int64_t offset, + const NullableBytes& key, const NullableBytes& value) + : topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {}; + + // Estimates how many bytes this record would take. + uint32_t dataLengthEstimate() const { + uint32_t result = 15; // Max key length, value length, header count. + result += key_ ? key_->size() : 0; + result += value_ ? value_->size() : 0; + return result; + } // Used in logging. std::string toString() const { diff --git a/contrib/kafka/filters/network/source/mesh/request_processor.cc b/contrib/kafka/filters/network/source/mesh/request_processor.cc index 6e47e3ede536..d63c2e672d6d 100644 --- a/contrib/kafka/filters/network/source/mesh/request_processor.cc +++ b/contrib/kafka/filters/network/source/mesh/request_processor.cc @@ -3,6 +3,7 @@ #include "envoy/common/exception.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.h" +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/list_offsets.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/metadata.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/produce.h" @@ -15,9 +16,10 @@ namespace Mesh { RequestProcessor::RequestProcessor(AbstractRequestListener& origin, const UpstreamKafkaConfiguration& configuration, - UpstreamKafkaFacade& upstream_kafka_facade) - : origin_{origin}, configuration_{configuration}, upstream_kafka_facade_{ - upstream_kafka_facade} {} + UpstreamKafkaFacade& upstream_kafka_facade, + RecordCallbackProcessor& record_callback_processor) + : origin_{origin}, configuration_{configuration}, upstream_kafka_facade_{upstream_kafka_facade}, + record_callback_processor_{record_callback_processor} {} // Helper function. Throws a nice message. Filter will react by closing the connection. static void throwOnUnsupportedRequest(const std::string& reason, const RequestHeader& header) { @@ -30,6 +32,9 @@ void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) { case PRODUCE_REQUEST_API_KEY: process(std::dynamic_pointer_cast>(arg)); break; + case FETCH_REQUEST_API_KEY: + process(std::dynamic_pointer_cast>(arg)); + break; case LIST_OFFSETS_REQUEST_API_KEY: process(std::dynamic_pointer_cast>(arg)); break; @@ -51,6 +56,11 @@ void RequestProcessor::process(const std::shared_ptr> re origin_.onRequest(res); } +void RequestProcessor::process(const std::shared_ptr> request) const { + auto res = std::make_shared(origin_, record_callback_processor_, request); + origin_.onRequest(res); +} + void RequestProcessor::process(const std::shared_ptr> request) const { auto res = std::make_shared(origin_, request); origin_.onRequest(res); diff --git a/contrib/kafka/filters/network/source/mesh/request_processor.h b/contrib/kafka/filters/network/source/mesh/request_processor.h index 515a55ee6412..c2f79234cdfe 100644 --- a/contrib/kafka/filters/network/source/mesh/request_processor.h +++ b/contrib/kafka/filters/network/source/mesh/request_processor.h @@ -4,6 +4,7 @@ #include "contrib/kafka/filters/network/source/external/requests.h" #include "contrib/kafka/filters/network/source/mesh/abstract_command.h" +#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager.h" #include "contrib/kafka/filters/network/source/mesh/upstream_config.h" #include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h" #include "contrib/kafka/filters/network/source/request_codec.h" @@ -20,7 +21,8 @@ namespace Mesh { class RequestProcessor : public RequestCallback, private Logger::Loggable { public: RequestProcessor(AbstractRequestListener& origin, const UpstreamKafkaConfiguration& configuration, - UpstreamKafkaFacade& upstream_kafka_facade); + UpstreamKafkaFacade& upstream_kafka_facade, + RecordCallbackProcessor& record_callback_processor); // RequestCallback void onMessage(AbstractRequestSharedPtr arg) override; @@ -28,6 +30,7 @@ class RequestProcessor : public RequestCallback, private Logger::Loggable> request) const; + void process(const std::shared_ptr> request) const; void process(const std::shared_ptr> request) const; void process(const std::shared_ptr> request) const; void process(const std::shared_ptr> request) const; @@ -35,6 +38,7 @@ class RequestProcessor : public RequestCallback, private Logger::Loggable; + /** * Manages (raw) Kafka consumers pointing to upstream Kafka clusters. * It is expected to have only one instance of this object per mesh-filter type. diff --git a/contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.cc b/contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.cc index e44006f3ea0a..edfee9120078 100644 --- a/contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.cc +++ b/contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.cc @@ -135,7 +135,7 @@ bool RecordDistributor::hasInterest(const std::string& topic) const { return false; } -// XXX (adam.kotwasinski) Inefficient: locks aquired per record. +// XXX (adam.kotwasinski) Inefficient: locks acquired per record. void RecordDistributor::receive(InboundRecordSharedPtr record) { const KafkaPartition kafka_partition = {record->topic_, record->partition_}; @@ -181,7 +181,7 @@ void RecordDistributor::receive(InboundRecordSharedPtr record) { } } - // Noone is interested in our record, so we are going to store it in a local cache. + // No-one is interested in our record, so we are going to store it in a local cache. if (!consumed_by_callback) { absl::MutexLock lock(&stored_records_mutex_); auto& stored_records = stored_records_[kafka_partition]; diff --git a/contrib/kafka/filters/network/source/mesh/upstream_config.cc b/contrib/kafka/filters/network/source/mesh/upstream_config.cc index 48b1fefe31b9..5b2b1d451453 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_config.cc +++ b/contrib/kafka/filters/network/source/mesh/upstream_config.cc @@ -15,6 +15,8 @@ namespace Mesh { using KafkaClusterDefinition = envoy::extensions::filters::network::kafka_mesh::v3alpha::KafkaClusterDefinition; using ForwardingRule = envoy::extensions::filters::network::kafka_mesh::v3alpha::ForwardingRule; +using KafkaMesh = envoy::extensions::filters::network::kafka_mesh::v3alpha::KafkaMesh; +using ConsumerProxyMode = KafkaMesh::ConsumerProxyMode; const std::string DEFAULT_CONSUMER_GROUP_ID = "envoy"; @@ -39,18 +41,21 @@ UpstreamKafkaConfigurationImpl::UpstreamKafkaConfigurationImpl(const KafkaMeshPr cluster_name)); } - // Upstream client configuration - use all the optional custom configs provided, and then use - // the target IPs. + // Upstream producer configuration. std::map producer_configs = { upstream_cluster_definition.producer_config().begin(), upstream_cluster_definition.producer_config().end()}; producer_configs["bootstrap.servers"] = upstream_cluster_definition.bootstrap_servers(); - // TODO (adam.kotwasinski) This needs to read configs just like producer does. - std::map consumer_configs = {}; + // Upstream consumer configuration. + std::map consumer_configs = { + upstream_cluster_definition.consumer_config().begin(), + upstream_cluster_definition.consumer_config().end()}; + if (consumer_configs.end() == consumer_configs.find("group.id")) { + // librdkafka consumer needs a group id, let's use a default one if nothing was provided. + consumer_configs["group.id"] = DEFAULT_CONSUMER_GROUP_ID; + } consumer_configs["bootstrap.servers"] = upstream_cluster_definition.bootstrap_servers(); - // TODO (adam.kotwasinski) When configs are read, use this only if absent. - consumer_configs["group.id"] = DEFAULT_CONSUMER_GROUP_ID; ClusterConfig cluster_config = {cluster_name, upstream_cluster_definition.partition_count(), producer_configs, consumer_configs}; @@ -77,6 +82,9 @@ UpstreamKafkaConfigurationImpl::UpstreamKafkaConfigurationImpl(const KafkaMeshPr topic_prefix_to_cluster_config_[rule.topic_prefix()] = cluster_name_to_cluster_config[target_cluster]; } + + // The only mode we support right now - embedded librdkafka consumers. + ASSERT(config.consumer_proxy_mode() == KafkaMesh::StatefulConsumerProxy); } absl::optional diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc index 4c3dfaeab005..a6ba7a3e1d71 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc @@ -1,5 +1,6 @@ #include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h" +#include "contrib/kafka/filters/network/source/kafka_types.h" #include "contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h" namespace Envoy { @@ -99,12 +100,27 @@ void RichKafkaConsumer::runWorkerLoop() { ENVOY_LOG(debug, "Worker thread for consumer [{}] finished", topic_); } +// Helper method, converts byte array. +static NullableBytes toBytes(const void* data, const size_t size) { + const unsigned char* as_char = static_cast(data); + if (data) { + Bytes bytes(as_char, as_char + size); + return {bytes}; + } else { + return absl::nullopt; + } +} + // Helper method, gets rid of librdkafka. static InboundRecordSharedPtr transform(RdKafkaMessagePtr arg) { const auto topic = arg->topic_name(); const auto partition = arg->partition(); const auto offset = arg->offset(); - return std::make_shared(topic, partition, offset); + + const NullableBytes key = toBytes(arg->key_pointer(), arg->key_len()); + const NullableBytes value = toBytes(arg->payload(), arg->len()); + + return std::make_shared(topic, partition, offset, key, value); } std::vector RichKafkaConsumer::receiveRecordBatch() { diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index 8aede752f2a9..d1417ecfc6b7 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -128,8 +128,10 @@ def parse_messages(self, input_files): amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines) message_spec = json.loads(amended) api_key = message_spec['apiKey'] - message = self.parse_top_level_element(message_spec) - messages.append(message) + # (adam.kotwasinski) ConsumerGroupHeartbeat needs some more changes to parse. + if api_key not in [68]: + message = self.parse_top_level_element(message_spec) + messages.append(message) except Exception as e: print('could not process %s' % input_file) raise @@ -165,7 +167,7 @@ def parse_top_level_element(self, spec): # So let's parse them and store them in state. common_structs = spec.get('commonStructs') if common_structs is not None: - for common_struct in common_structs: + for common_struct in reversed(common_structs): common_struct_name = common_struct['name'] common_struct_versions = Statics.parse_version_string( common_struct['versions'], versions[-1]) diff --git a/contrib/kafka/filters/network/source/serialization.cc b/contrib/kafka/filters/network/source/serialization.cc index b78085fdbf18..45f2229d8892 100644 --- a/contrib/kafka/filters/network/source/serialization.cc +++ b/contrib/kafka/filters/network/source/serialization.cc @@ -220,6 +220,55 @@ NullableBytes NullableCompactBytesDeserializer::get() const { } } +namespace VarlenUtils { + +uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst) { + uint32_t value = arg; + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.push_back(el); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.push_back(el); + + return elements_with_1 + 1; +} + +uint32_t writeVarint(const int32_t arg, Bytes& dst) { + uint32_t zz = (static_cast(arg) << 1) ^ (arg >> 31); // Zig-zag. + return writeUnsignedVarint(zz, dst); +} + +uint32_t writeVarlong(const int64_t arg, Bytes& dst) { + uint64_t value = (static_cast(arg) << 1) ^ (arg >> 63); // Zig-zag. + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.push_back(el); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.push_back(el); + + return elements_with_1 + 1; +} + +} // namespace VarlenUtils + } // namespace Kafka } // namespace NetworkFilters } // namespace Extensions diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index 29a9a01d3924..711b45b3778c 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -943,6 +943,28 @@ class UuidDeserializer : public Deserializer { Int64Deserializer low_bytes_deserializer_; }; +// Variable length encoding utilities. +namespace VarlenUtils { + +/** + * Writes given unsigned int in variable-length encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeUnsignedVarint(int, ByteBuffer) + */ +uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst); + +/** + * Writes given signed int in variable-length zig-zag encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeVarint(int, ByteBuffer) + */ +uint32_t writeVarint(const int32_t arg, Bytes& dst); + +/** + * Writes given long in variable-length zig-zag encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeVarlong(long, ByteBuffer) + */ +uint32_t writeVarlong(const int64_t arg, Bytes& dst); +} // namespace VarlenUtils + /** * Encodes provided argument in Kafka format. * In case of primitive types, this is done explicitly as per specification. @@ -1401,23 +1423,10 @@ inline uint32_t EncodingContext::encodeCompact(const int64_t& arg, Buffer::Insta */ template <> inline uint32_t EncodingContext::encodeCompact(const uint32_t& arg, Buffer::Instance& dst) { - uint32_t value = arg; - - uint32_t elements_with_1 = 0; - // As long as there are bits set on indexes 8 or higher (counting from 1). - while ((value & ~(0x7f)) != 0) { - // Save next 7-bit batch with highest bit set. - const uint8_t el = (value & 0x7f) | 0x80; - dst.add(&el, sizeof(uint8_t)); - value >>= 7; - elements_with_1++; - } - - // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. - const uint8_t el = value; - dst.add(&el, sizeof(uint8_t)); - - return elements_with_1 + 1; + std::vector tmp; + const uint32_t written = VarlenUtils::writeUnsignedVarint(arg, tmp); + dst.add(tmp.data(), written); + return written; } /** diff --git a/contrib/kafka/filters/network/test/mesh/BUILD b/contrib/kafka/filters/network/test/mesh/BUILD index ebb23eba960b..6dac555069e0 100644 --- a/contrib/kafka/filters/network/test/mesh/BUILD +++ b/contrib/kafka/filters/network/test/mesh/BUILD @@ -39,6 +39,7 @@ envoy_cc_test( tags = ["skip_on_windows"], deps = [ "//contrib/kafka/filters/network/source/mesh:request_processor_lib", + "//test/mocks/network:network_mocks", ], ) diff --git a/contrib/kafka/filters/network/test/mesh/abstract_command_unit_test.cc b/contrib/kafka/filters/network/test/mesh/abstract_command_unit_test.cc index 48661edf9751..34d725edae2f 100644 --- a/contrib/kafka/filters/network/test/mesh/abstract_command_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/abstract_command_unit_test.cc @@ -13,6 +13,7 @@ class MockAbstractRequestListener : public AbstractRequestListener { public: MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); MOCK_METHOD(void, onRequestReadyForAnswer, ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); }; class Testee : public BaseInFlightRequest { diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD index b65189eeff7b..c9ea469752f4 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD @@ -28,6 +28,26 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "fetch_unit_test", + srcs = ["fetch_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh/command_handlers:fetch_lib", + "//test/mocks/network:network_mocks", + "//test/mocks/stats:stats_mocks", + ], +) + +envoy_cc_test( + name = "fetch_record_converter_unit_test", + srcs = ["fetch_record_converter_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh/command_handlers:fetch_record_converter_lib", + ], +) + envoy_cc_test( name = "list_offsets_unit_test", srcs = ["list_offsets_unit_test.cc"], diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/api_versions_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/api_versions_unit_test.cc index 2a572bec507b..4d9cb58a6995 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/api_versions_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/api_versions_unit_test.cc @@ -13,6 +13,7 @@ class MockAbstractRequestListener : public AbstractRequestListener { public: MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); MOCK_METHOD(void, onRequestReadyForAnswer, ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); }; TEST(ApiVersionsTest, shouldBeAlwaysReadyForAnswer) { diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc new file mode 100644 index 000000000000..48e98bc9bfb6 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc @@ -0,0 +1,102 @@ +#include +#include + +#include "test/test_common/utility.h" + +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +TEST(FetchRecordConverterImpl, shouldProcessEmptyInput) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + const InboundRecordsMap input = {}; + + // when + const auto result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 0); +} + +TEST(FetchRecordConverterImpl, shouldProcessInputWithNoRecords) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + InboundRecordsMap input = {}; + input[{"aaa", 0}] = {}; + input[{"aaa", 1}] = {}; + input[{"aaa", 2}] = {}; + input[{"bbb", 0}] = {}; + + // when + const std::vector result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 2); // Number of unique topic names (not partitions). + const auto topic1 = + std::find_if(result.begin(), result.end(), [](auto x) { return "aaa" == x.topic_; }); + ASSERT_EQ(topic1->partitions_.size(), 3); + const auto topic2 = + std::find_if(result.begin(), result.end(), [](auto x) { return "bbb" == x.topic_; }); + ASSERT_EQ(topic2->partitions_.size(), 1); +} + +// Helper method to generate records. +InboundRecordSharedPtr makeRecord() { + const NullableBytes key = {Bytes(128)}; + const NullableBytes value = {Bytes(1024)}; + return std::make_shared("aaa", 0, 0, key, value); +} + +TEST(FetchRecordConverterImpl, shouldProcessRecords) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + InboundRecordsMap input = {}; + input[{"aaa", 0}] = {makeRecord(), makeRecord(), makeRecord()}; + + // when + const std::vector result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 1); + const auto& partitions = result[0].partitions_; + ASSERT_EQ(partitions.size(), 1); + const NullableBytes& data = partitions[0].records_; + ASSERT_EQ(data.has_value(), true); + ASSERT_GT(data->size(), + 3 * (128 + 1024)); // Records carry some metadata so it should always pass. + + // then - check whether metadata really says we carry 3 records. + constexpr auto record_count_offset = 57; + const auto ptr = reinterpret_cast(data->data() + record_count_offset); + const uint32_t record_count = be32toh(*ptr); + ASSERT_EQ(record_count, 3); +} + +// Here we check whether our manual implementation really works. +// https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java +TEST(FetchRecordConverterImpl, shouldComputeCrc32c) { + constexpr auto testee = &FetchRecordConverterImpl::computeCrc32cForTest; + + std::vector arg1 = {}; + ASSERT_EQ(testee(arg1.data(), arg1.size()), 0x00000000); + + std::vector arg2 = {0, 0, 0, 0}; + ASSERT_EQ(testee(arg2.data(), arg2.size()), 0x48674BC7); + + std::vector arg3 = {13, 42, 13, 42, 13, 42, 13, 42}; + ASSERT_EQ(testee(arg3.data(), arg3.size()), 0xDB56B80F); +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc new file mode 100644 index 000000000000..2b65bce9c752 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc @@ -0,0 +1,188 @@ +#include "test/mocks/event/mocks.h" + +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch.h" +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +class MockAbstractRequestListener : public AbstractRequestListener { +public: + MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); + MOCK_METHOD(void, onRequestReadyForAnswer, ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); +}; + +class MockRecordCallbackProcessor : public RecordCallbackProcessor { +public: + MOCK_METHOD(void, processCallback, (const RecordCbSharedPtr&)); + MOCK_METHOD(void, removeCallback, (const RecordCbSharedPtr&)); +}; + +class MockFetchRecordConverter : public FetchRecordConverter { +public: + MOCK_METHOD(std::vector, convert, (const InboundRecordsMap&), (const)); +}; + +class FetchUnitTest : public testing::Test { +protected: + constexpr static int64_t TEST_CORRELATION_ID = 123456; + + NiceMock filter_; + NiceMock dispatcher_; + NiceMock callback_processor_; + MockFetchRecordConverter converter_; + + FetchUnitTest() { ON_CALL(filter_, dispatcher).WillByDefault(ReturnRef(dispatcher_)); } + + std::shared_ptr makeTestee() { + const RequestHeader header = {FETCH_REQUEST_API_KEY, 0, TEST_CORRELATION_ID, absl::nullopt}; + // Our request refers to aaa-0, aaa-1, bbb-10, bbb-20. + const FetchTopic t1 = {"aaa", {{0, 0, 0}, {1, 0, 0}}}; + const FetchTopic t2 = {"bbb", {{10, 0, 0}, {20, 0, 0}}}; + const FetchRequest data = {0, 0, 0, {t1, t2}}; + const auto message = std::make_shared>(header, data); + return std::make_shared(filter_, callback_processor_, message, converter_); + } +}; + +TEST_F(FetchUnitTest, ShouldRegisterCallbackAndTimer) { + // given + const auto testee = makeTestee(); + EXPECT_CALL(callback_processor_, processCallback(_)); + EXPECT_CALL(dispatcher_, createTimer_(_)); + + // when + testee->startProcessing(); + + // then + ASSERT_FALSE(testee->finished()); +} + +TEST_F(FetchUnitTest, ShouldReturnProperInterest) { + // given + const auto testee = makeTestee(); + + // when + const TopicToPartitionsMap result = testee->interest(); + + // then + const TopicToPartitionsMap expected = {{"aaa", {0, 1}}, {"bbb", {10, 20}}}; + ASSERT_EQ(result, expected); +} + +TEST_F(FetchUnitTest, ShouldCleanupAfterTimer) { + // given + const auto testee = makeTestee(); + testee->startProcessing(); + + EXPECT_CALL(callback_processor_, removeCallback(_)); + EXPECT_CALL(dispatcher_, post(_)); + + // when + testee->markFinishedByTimer(); + + // then + ASSERT_TRUE(testee->finished()); +} + +// Helper method to generate records. +InboundRecordSharedPtr makeRecord() { + return std::make_shared("aaa", 0, 0, absl::nullopt, absl::nullopt); +} + +TEST_F(FetchUnitTest, ShouldReceiveRecords) { + // given + const auto testee = makeTestee(); + testee->startProcessing(); + + // Will be invoked by the third record (delivery was finished). + EXPECT_CALL(dispatcher_, post(_)); + // It is invoker that removes the callback - not us. + EXPECT_CALL(callback_processor_, removeCallback(_)).Times(0); + + // when - 1 + const auto res1 = testee->receive(makeRecord()); + // then - first record got stored. + ASSERT_EQ(res1, CallbackReply::AcceptedAndWantMore); + ASSERT_FALSE(testee->finished()); + + // when - 2 + const auto res2 = testee->receive(makeRecord()); + // then - second record got stored. + ASSERT_EQ(res2, CallbackReply::AcceptedAndWantMore); + ASSERT_FALSE(testee->finished()); + + // when - 3 + const auto res3 = testee->receive(makeRecord()); + // then - third record got stored and no more will be accepted. + ASSERT_EQ(res3, CallbackReply::AcceptedAndFinished); + ASSERT_TRUE(testee->finished()); + + // when - 4 + const auto res4 = testee->receive(makeRecord()); + // then - fourth record was rejected. + ASSERT_EQ(res4, CallbackReply::Rejected); +} + +TEST_F(FetchUnitTest, ShouldRejectRecordsAfterTimer) { + // given + const auto testee = makeTestee(); + testee->startProcessing(); + testee->markFinishedByTimer(); + + // when + const auto res = testee->receive(makeRecord()); + + // then + ASSERT_EQ(res, CallbackReply::Rejected); +} + +TEST_F(FetchUnitTest, ShouldUnregisterItselfWhenAbandoned) { + // given + const auto testee = makeTestee(); + testee->startProcessing(); + + EXPECT_CALL(callback_processor_, removeCallback(_)); + + // when + testee->abandon(); + + // then - expectations are met. +} + +TEST_F(FetchUnitTest, ShouldComputeAnswer) { + // given + const auto testee = makeTestee(); + testee->startProcessing(); + + std::vector ftr = {{"aaa", {}}, {"bbb", {}}}; + EXPECT_CALL(converter_, convert(_)).WillOnce(Return(ftr)); + + // when + const AbstractResponseSharedPtr answer = testee->computeAnswer(); + + // then + ASSERT_EQ(answer->metadata_.correlation_id_, TEST_CORRELATION_ID); + const auto response = std::dynamic_pointer_cast>(answer); + ASSERT_TRUE(response); + const std::vector responses = response->data_.responses_; + ASSERT_EQ(responses, ftr); +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/list_offsets_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/list_offsets_unit_test.cc index dc5bbd25ae19..4e3820876337 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/list_offsets_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/list_offsets_unit_test.cc @@ -13,6 +13,7 @@ class MockAbstractRequestListener : public AbstractRequestListener { public: MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); MOCK_METHOD(void, onRequestReadyForAnswer, ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); }; TEST(ListOffsetsTest, shouldBeAlwaysReadyForAnswer) { diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc index 132946cd8401..e40aa2923e22 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc @@ -16,6 +16,7 @@ class MockAbstractRequestListener : public AbstractRequestListener { public: MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); MOCK_METHOD(void, onRequestReadyForAnswer, ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); }; class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration { diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc index f2e1abf4afa1..f98fec3fb5e1 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc @@ -23,6 +23,7 @@ class MockAbstractRequestListener : public AbstractRequestListener { public: MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); MOCK_METHOD(void, onRequestReadyForAnswer, ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); }; class MockRecordExtractor : public RecordExtractor { @@ -53,7 +54,6 @@ class ProduceUnitTest : public testing::Test { // (as ProduceRequests with no topics/records make no sense). TEST_F(ProduceUnitTest, ShouldHandleProduceRequestWithNoRecords) { // given - MockRecordExtractor extractor; const std::vector records = {}; EXPECT_CALL(extractor_, extractRecords(_)).WillOnce(Return(records)); diff --git a/contrib/kafka/filters/network/test/mesh/filter_unit_test.cc b/contrib/kafka/filters/network/test/mesh/filter_unit_test.cc index 340859067759..fe73ab1cc9f3 100644 --- a/contrib/kafka/filters/network/test/mesh/filter_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/filter_unit_test.cc @@ -189,13 +189,21 @@ class MockUpstreamKafkaFacade : public UpstreamKafkaFacade { MOCK_METHOD(KafkaProducer&, getProducerForTopic, (const std::string&)); }; +class MockRecordCallbackProcessor : public RecordCallbackProcessor { +public: + MOCK_METHOD(void, processCallback, (const RecordCbSharedPtr&)); + MOCK_METHOD(void, removeCallback, (const RecordCbSharedPtr&)); +}; + TEST(Filter, ShouldBeConstructable) { // given MockUpstreamKafkaConfiguration configuration; MockUpstreamKafkaFacade upstream_kafka_facade; + MockRecordCallbackProcessor record_callback_processor; // when - KafkaMeshFilter filter = KafkaMeshFilter(configuration, upstream_kafka_facade); + KafkaMeshFilter filter = + KafkaMeshFilter(configuration, upstream_kafka_facade, record_callback_processor); // then - no exceptions. } diff --git a/contrib/kafka/filters/network/test/mesh/integration_test/kafka_mesh_integration_test.py b/contrib/kafka/filters/network/test/mesh/integration_test/kafka_mesh_integration_test.py index 53623b655f86..2712b3d6698f 100644 --- a/contrib/kafka/filters/network/test/mesh/integration_test/kafka_mesh_integration_test.py +++ b/contrib/kafka/filters/network/test/mesh/integration_test/kafka_mesh_integration_test.py @@ -205,6 +205,72 @@ def __verify_target_kafka_cluster( self.assertTrue(other_partition.topic not in consumer.topics()) consumer.close(False) + def test_consumer_stateful_proxy(self): + """ + This test verifies that consumer can receive messages through the mesh filter. + We are going to have messages in two topics: 'aaaconsumer' and 'bbbconsumer'. + The mesh filter is configured to process fetch requests for topics starting with 'a' (like 'aaaconsumer') + by consuming from the first cluster, and the ones starting with 'b' (so 'bbbconsumer') from the second one. + So in the end our consumers that point at Envoy should receive records from matching upstream Kafka clusters. + """ + + # Put the messages into upstream Kafka clusters. + partition1 = TopicPartition('aaaconsumer', 0) + count1 = 20 + partition2 = TopicPartition('bbbconsumer', 0) + count2 = 30 + self.__put_messages_into_upstream_kafka( + IntegrationTest.kafka_cluster1_address(), partition1, count1) + self.__put_messages_into_upstream_kafka( + IntegrationTest.kafka_cluster2_address(), partition2, count2) + + # Create Kafka consumers that point at Envoy. + consumer1 = KafkaConsumer(bootstrap_servers=IntegrationTest.kafka_envoy_address()) + consumer1.assign([partition1]) + consumer2 = KafkaConsumer(bootstrap_servers=IntegrationTest.kafka_envoy_address()) + consumer2.assign([partition2]) + + # Have the consumers receive the messages from Kafka clusters through Envoy. + received1 = [] + received2 = [] + while (len(received1) < count1): + poll_result = consumer1.poll(timeout_ms=5000) + for records in poll_result.values(): + received1 += records + while (len(received2) < count2): + poll_result = consumer2.poll(timeout_ms=5000) + for records in poll_result.values(): + received2 += records + + # Verify that the messages sent have been received. + self.assertTrue(len(received1) == count1) + self.assertTrue(len(received2) == count2) + + # Cleanup + consumer1.close(False) + consumer2.close(False) + + def __put_messages_into_upstream_kafka(self, bootstrap_servers, partition, count): + """ + Helper method for putting messages into Kafka directly. + """ + producer = KafkaProducer(bootstrap_servers=bootstrap_servers) + + futures = [] + for _ in range(count): + message = Message() + future = producer.send( + key=message.key, + value=message.value, + headers=message.headers, + topic=partition.topic, + partition=partition.partition) + futures.append(future) + for future in futures: + offset = future.get().offset + print('Saved message at offset %s' % (offset)) + producer.close(True) + class MetricsHolder: """ diff --git a/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc b/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc index e01534a1fbe4..7f624eda4ad1 100644 --- a/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc @@ -1,7 +1,9 @@ +#include "test/mocks/event/mocks.h" #include "test/test_common/utility.h" #include "contrib/kafka/filters/network/source/mesh/abstract_command.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.h" +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/list_offsets.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/metadata.h" #include "contrib/kafka/filters/network/source/mesh/command_handlers/produce.h" @@ -11,6 +13,8 @@ #include "gtest/gtest.h" using testing::_; +using testing::NiceMock; +using testing::ReturnRef; namespace Envoy { namespace Extensions { @@ -23,6 +27,14 @@ class MockAbstractRequestListener : public AbstractRequestListener { public: MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); MOCK_METHOD(void, onRequestReadyForAnswer, ()); + MOCK_METHOD(Event::Dispatcher&, dispatcher, ()); + + MockAbstractRequestListener() { + ON_CALL(*this, dispatcher).WillByDefault(ReturnRef(mock_dispatcher_)); + } + +private: + Event::MockDispatcher mock_dispatcher_; }; class MockUpstreamKafkaFacade : public UpstreamKafkaFacade { @@ -30,6 +42,12 @@ class MockUpstreamKafkaFacade : public UpstreamKafkaFacade { MOCK_METHOD(KafkaProducer&, getProducerForTopic, (const std::string&)); }; +class MockRecordCallbackProcessor : public RecordCallbackProcessor { +public: + MOCK_METHOD(void, processCallback, (const RecordCbSharedPtr&)); + MOCK_METHOD(void, removeCallback, (const RecordCbSharedPtr&)); +}; + class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration { public: MOCK_METHOD(absl::optional, computeClusterConfigForTopic, (const std::string&), @@ -39,10 +57,12 @@ class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration { class RequestProcessorTest : public testing::Test { protected: - MockAbstractRequestListener listener_; + NiceMock listener_; MockUpstreamKafkaConfiguration configuration_; MockUpstreamKafkaFacade upstream_kafka_facade_; - RequestProcessor testee_ = {listener_, configuration_, upstream_kafka_facade_}; + MockRecordCallbackProcessor record_callback_processor_; + RequestProcessor testee_ = {listener_, configuration_, upstream_kafka_facade_, + record_callback_processor_}; }; TEST_F(RequestProcessorTest, ShouldProcessProduceRequest) { @@ -61,6 +81,22 @@ TEST_F(RequestProcessorTest, ShouldProcessProduceRequest) { ASSERT_NE(std::dynamic_pointer_cast(capture), nullptr); } +TEST_F(RequestProcessorTest, ShouldProcessFetchRequest) { + // given + const RequestHeader header = {FETCH_REQUEST_API_KEY, 0, 0, absl::nullopt}; + const FetchRequest data = {0, 0, 0, {}}; + const auto message = std::make_shared>(header, data); + + InFlightRequestSharedPtr capture = nullptr; + EXPECT_CALL(listener_, onRequest(_)).WillOnce(testing::SaveArg<0>(&capture)); + + // when + testee_.onMessage(message); + + // then + ASSERT_NE(std::dynamic_pointer_cast(capture), nullptr); +} + TEST_F(RequestProcessorTest, ShouldProcessListOffsetsRequest) { // given const RequestHeader header = {LIST_OFFSETS_REQUEST_API_KEY, 0, 0, absl::nullopt}; diff --git a/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc b/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc index a2f4ddba6dcb..74b80f1cd32d 100644 --- a/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc @@ -148,7 +148,7 @@ class RecordDistributorTest : public testing::Test { } InboundRecordSharedPtr makeRecord(const std::string& topic, const int32_t partition) { - return std::make_shared(topic, partition, 0); + return std::make_shared(topic, partition, 0, absl::nullopt, absl::nullopt); } }; diff --git a/contrib/kafka/filters/network/test/mesh/upstream_config_unit_test.cc b/contrib/kafka/filters/network/test/mesh/upstream_config_unit_test.cc index 3ecb0b5fd679..7bcc0cc667f8 100644 --- a/contrib/kafka/filters/network/test/mesh/upstream_config_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/upstream_config_unit_test.cc @@ -142,6 +142,68 @@ advertised_port: 42 EXPECT_FALSE(res4.has_value()); } +TEST(UpstreamKafkaConfigurationTest, shouldBehaveProperlyWithCustomConfigs) { + // given + const std::string yaml = R"EOF( +advertised_host: mock_host +advertised_port: 42 +upstream_clusters: +- cluster_name: cluster1 + bootstrap_servers: s1 + partition_count : 1 + producer_config: + p1: "111" +- cluster_name: cluster2 + bootstrap_servers: s2 + partition_count : 2 + consumer_config: + p2: "222" + group.id: "custom-value" +forwarding_rules: +- target_cluster: cluster1 + topic_prefix: prefix1 +- target_cluster: cluster2 + topic_prefix: prefix2 + )EOF"; + KafkaMeshProtoConfig proto_config; + TestUtility::loadFromYamlAndValidate(yaml, proto_config); + const UpstreamKafkaConfiguration& testee = UpstreamKafkaConfigurationImpl{proto_config}; + + const ClusterConfig cluster1 = {"cluster1", + 1, + {{"bootstrap.servers", "s1"}, {"p1", "111"}}, + {{"bootstrap.servers", "s1"}, {"group.id", "envoy"}}}; + const ClusterConfig cluster2 = { + "cluster2", + 2, + {{"bootstrap.servers", "s2"}}, + {{"bootstrap.servers", "s2"}, {"group.id", "custom-value"}, {"p2", "222"}}}; + + // when, then (advertised address is returned properly) + const auto address = testee.getAdvertisedAddress(); + EXPECT_EQ(address.first, "mock_host"); + EXPECT_EQ(address.second, 42); + + // when, then (matching prefix with something more) + const auto res1 = testee.computeClusterConfigForTopic("prefix1somethingmore"); + ASSERT_TRUE(res1.has_value()); + EXPECT_EQ(*res1, cluster1); + + // when, then (matching prefix alone) + const auto res2 = testee.computeClusterConfigForTopic("prefix1"); + ASSERT_TRUE(res2.has_value()); + EXPECT_EQ(*res2, cluster1); + + // when, then (failing to match first rule, but then matching the second one) + const auto res3 = testee.computeClusterConfigForTopic("prefix2somethingmore"); + ASSERT_TRUE(res3.has_value()); + EXPECT_EQ(*res3, cluster2); + + // when, then (no rules match) + const auto res4 = testee.computeClusterConfigForTopic("someotherthing"); + EXPECT_FALSE(res4.has_value()); +} + } // namespace Mesh } // namespace Kafka } // namespace NetworkFilters diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index b9264cf237e4..59aa0e567b96 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -568,6 +568,50 @@ TEST(TaggedFieldsDeserializer, ShouldConsumeCorrectAmountOfData) { serializeCompactThenDeserializeAndCheckEquality(value); } +// Just a helper to write shorter tests. +template Bytes toBytes(uint32_t fn(const T arg, Bytes& out), const T arg) { + Bytes res; + fn(arg, res); + return res; +} + +TEST(VarlenUtils, ShouldEncodeUnsignedVarInt) { + const auto testee = VarlenUtils::writeUnsignedVarint; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, 127), Bytes({0x7f})); + ASSERT_EQ(toBytes(testee, 128), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, 2147483647), Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x07})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x0F})); +} + +TEST(VarlenUtils, ShouldEncodeSignedVarInt) { + const auto testee = VarlenUtils::writeVarint; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x02})); + ASSERT_EQ(toBytes(testee, 63), Bytes({0x7e})); + ASSERT_EQ(toBytes(testee, 64), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, -1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::min()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x0F})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFE, 0xFF, 0xFF, 0xFF, 0x0F})); +} + +TEST(VarlenUtils, ShouldEncodeVarLong) { + const auto testee = VarlenUtils::writeVarlong; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x02})); + ASSERT_EQ(toBytes(testee, 63), Bytes({0x7e})); + ASSERT_EQ(toBytes(testee, 64), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, -1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::min()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFE, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01})); +} + } // namespace SerializationTest } // namespace Kafka } // namespace NetworkFilters diff --git a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst index 78dcb5bceb3b..8d9f96258b8f 100644 --- a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst +++ b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst @@ -5,8 +5,8 @@ Kafka Broker filter The Apache Kafka broker filter decodes the client protocol for `Apache Kafka `_, both the requests and responses in the payload. -The message versions in `Kafka 3.4.0 `_ -are supported. +The message versions in `Kafka 3.5.1 `_ +are supported (apart from ConsumerGroupHeartbeat). The filter attempts not to influence the communication between client and brokers, so the messages that could not be decoded (due to Kafka client or broker running a newer version than supported by this filter) are forwarded as-is. diff --git a/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst index 8e91548d8c1e..472947348971 100644 --- a/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst +++ b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst @@ -4,9 +4,15 @@ Kafka Mesh filter =================== The Apache Kafka mesh filter provides a facade for `Apache Kafka `_ -producers. Produce requests sent to this filter insance can be forwarded to one of multiple -clusters, depending on configured forwarding rules. Corresponding message versions from -Kafka 3.4.0 are supported. +clusters. + +It allows for processing of Produce (producer) and Fetch (consumer) requests sent by downstream +clients. + +The requests received by this filter instance can be forwarded to one of multiple clusters, +depending on the configured forwarding rules. + +Corresponding message versions from Kafka 3.5.1 are supported. * This filter should be configured with the type URL ``type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh``. * :ref:`v3 API reference ` @@ -60,6 +66,8 @@ to cluster depending on topic names. producer_config: acks: "1" linger.ms: "500" + consumer_config: + client.id: "my-envoy-consumer" forwarding_rules: - target_cluster: kafka_c1 topic_prefix: apples @@ -75,28 +83,58 @@ chain to capture the request processing metrics. Notes ----- -Given that this filter does its own processing of received requests, there are some changes -in behaviour compared to explicit connection to a Kafka cluster: -#. Only ProduceRequests with version 2 are supported (what means very old producers like 0.8 are - not going to be supported). -#. Python producers need to set API version of at least 1.0.0, so that the produce requests they - send are going to have records with magic equal to 2. +#. The records are being sent/received using embedded + `librdkafka `_ producers/consumers. +#. librdkafka was compiled without ssl, lz4, gssapi, so related custom config options are + not supported. +#. Invalid custom configs are not found at startup (only when appropriate producers or consumers + are being initialised). Requests that would have referenced these clusters are going to close + connection and fail. +#. Requests that reference to topics that do not match any of the rules are going to close + connection and fail. This usually should not happen (clients request metadata first, and they + should then fail with 'no broker available' first), but is possible if someone tailors binary + payloads over the connection. + +Producer proxy +-------------- + +#. The embedded librdkafka producers that are pointing at upstream Kafka clusters are created + per Envoy worker thread (so the throughput can be increased with `--concurrency` option, + allowing for requests to be processed by a larger number of producers). +#. Only ProduceRequests with version 2 are supported (what means very old producers like 0.8 + are not going to be supported). +#. Python producers need to set API version of at least 1.0.0, so that the produce requests + they send are going to have records with magic equal to 2. #. Downstream handling of Kafka producer 'acks' property is delegated to upstream client. E.g. if upstream client is configured to use acks=0 then the response is going to be sent to downstream client as soon as possible (even if they had non-zero acks!). #. As the filter splits single producer requests into separate records, it's possible that delivery of only some of these records fails. In that case, the response returned to upstream client is - a failure, however it is possible some of the records have been appended in target cluster. + a failure, however it is possible some of the records have been appended in the target cluster. #. Because of the splitting mentioned above, records are not necessarily appended one after another (as they do not get sent as single request to upstream). Users that want to avoid this scenario might want to take a look into downstream producer configs: 'linger.ms' and 'batch.size'. -#. Produce requests that reference to topics that do not match any of the rules are going to close - connection and fail. This usually should not happen (clients request metadata first, and they - should then fail with 'no broker available' first), but is possible if someone tailors binary - payloads over the connection. -#. librdkafka was compiled without ssl, lz4, gssapi, so related custom producer config options are - not supported. -#. Invalid custom producer configs are not found at startup (only when appropriate clusters are - being sent to). Requests that would have referenced these clusters are going to close connection - and fail. + +Consumer proxy +-------------- + +#. Currently the consumer proxy supports only stateful proxying - Envoy uses upstream-pointing + librdkafka consumers to receive the records, and does that only when more data is requested. +#. Users might want to take a look consumers' config property *group.id* to manage the consumers' + offset committing behaviour (what is meaningful across Envoy restarts). +#. When requesting consumer position, the response always contains offset = 0 + (see *list_offsets.cc*). +#. Record offset information is provided, but record batch offset delta is not - + it has been observed that the Apache Kafka Java client is not going to update its position + despite receiving records (see *fetch_record_converter.cc*). +#. The Fetch response is sent downstream if it has collected at least 3 records (see *fetch.cc*). + The data about requested bytes etc. in the request is ignored by the current implementation. +#. The Fetch response is sent after it is considered to be fulfilled (see above) or the hardcoded + timeout of 5 seconds passes (see *fetch.cc*). Timeout specified by request is ignored. +#. The consumers are going to poll records from topics as long as there are incoming requests for + these topics, without considering partitions. + Users are encouraged to make sure all partitions are being consumed from to avoid a situation + when e.g. we are only fetching records from partition 0, but the proxy receives records + for partition 0 (which are sent downstream) and partition 1 (which are kept in memory until + someone shows interest in them).