diff --git a/contrib/kafka/filters/network/source/mesh/BUILD b/contrib/kafka/filters/network/source/mesh/BUILD index bc6e78d46e57..71546a58bef0 100644 --- a/contrib/kafka/filters/network/source/mesh/BUILD +++ b/contrib/kafka/filters/network/source/mesh/BUILD @@ -110,6 +110,18 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "inbound_record_lib", + srcs = [ + ], + hdrs = [ + "inbound_record.h", + ], + tags = ["skip_on_windows"], + deps = [ + ], +) + envoy_cc_library( name = "outbound_record_lib", srcs = [ @@ -153,6 +165,36 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "upstream_kafka_consumer_lib", + srcs = [ + ], + hdrs = [ + "upstream_kafka_consumer.h", + ], + tags = ["skip_on_windows"], + deps = [ + ":inbound_record_lib", + ], +) + +envoy_cc_library( + name = "upstream_kafka_consumer_impl_lib", + srcs = [ + "upstream_kafka_consumer_impl.cc", + ], + hdrs = [ + "upstream_kafka_consumer_impl.h", + ], + tags = ["skip_on_windows"], + deps = [ + ":librdkafka_utils_impl_lib", + ":upstream_kafka_consumer_lib", + "//envoy/event:dispatcher_interface", + "//source/common/common:minimal_logger_lib", + ], +) + envoy_cc_library( name = "upstream_config_lib", srcs = [ diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h new file mode 100644 index 000000000000..83cd1d61fbcf --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include + +#include "absl/strings/str_cat.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +/** + * Simple structure representing the record received from upstream Kafka cluster. + */ +struct InboundRecord { + + const std::string topic_; + const int32_t partition_; + const int64_t offset_; + + // TODO (adam.kotwasinski) Get data in here in the next commits. + + InboundRecord(std::string topic, int32_t partition, int64_t offset) + : topic_{topic}, partition_{partition}, offset_{offset} {}; + + // Used in logging. + std::string toString() const { + return absl::StrCat("[", topic_, "-", partition_, "/", offset_, "]"); + } +}; + +using InboundRecordSharedPtr = std::shared_ptr; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/librdkafka_utils.h b/contrib/kafka/filters/network/source/mesh/librdkafka_utils.h index be43db2ea351..70a2af9fd578 100644 --- a/contrib/kafka/filters/network/source/mesh/librdkafka_utils.h +++ b/contrib/kafka/filters/network/source/mesh/librdkafka_utils.h @@ -16,6 +16,23 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { +// Used by librdkafka API. +using RdKafkaMessageRawPtr = RdKafka::Message*; + +using RdKafkaMessagePtr = std::unique_ptr; + +/** + * Helper class to wrap librdkafka consumer partition assignment. + * This object has to live longer than whatever consumer that uses its "raw" data. + * On its own it does not expose any public API, as it is not intended to be interacted with. + */ +class ConsumerAssignment { +public: + virtual ~ConsumerAssignment() = default; +}; + +using ConsumerAssignmentConstPtr = std::unique_ptr; + /** * Helper class responsible for creating librdkafka entities, so we can have mocks in tests. */ @@ -43,6 +60,12 @@ class LibRdKafkaUtils { // In case of produce failures, we need to dispose of headers manually. virtual void deleteHeaders(RdKafka::Headers* librdkafka_headers) const PURE; + + // Assigns partitions to a consumer. + // Impl: this method was extracted so that raw-pointer vector does not appear in real code. + virtual ConsumerAssignmentConstPtr assignConsumerPartitions(RdKafka::KafkaConsumer& consumer, + const std::string& topic, + const int32_t partitions) const PURE; }; using RawKafkaConfig = std::map; diff --git a/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.cc b/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.cc index 4ae2b7d96885..7747519ef84d 100644 --- a/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.cc +++ b/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.cc @@ -8,6 +8,30 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { +// ConsumerAssignmentImpl + +class ConsumerAssignmentImpl : public ConsumerAssignment { +public: + ConsumerAssignmentImpl(std::vector&& assignment) + : assignment_{std::move(assignment)} {}; + + // The assignment in a form that librdkafka likes. + RdKafkaPartitionVector raw() const; + +private: + const std::vector assignment_; +}; + +RdKafkaPartitionVector ConsumerAssignmentImpl::raw() const { + RdKafkaPartitionVector result; + for (const auto& tp : assignment_) { + result.push_back(tp.get()); // Raw pointer. + } + return result; +} + +// LibRdKafkaUtils + RdKafka::Conf::ConfResult LibRdKafkaUtilsImpl::setConfProperty(RdKafka::Conf& conf, const std::string& name, const std::string& value, @@ -51,6 +75,25 @@ void LibRdKafkaUtilsImpl::deleteHeaders(RdKafka::Headers* librdkafka_headers) co delete librdkafka_headers; } +ConsumerAssignmentConstPtr LibRdKafkaUtilsImpl::assignConsumerPartitions( + RdKafka::KafkaConsumer& consumer, const std::string& topic, const int32_t partitions) const { + + // Construct the topic-partition vector that we are going to store. + std::vector assignment; + for (int partition = 0; partition < partitions; ++partition) { + + // We consume records from the beginning of each partition. + const int64_t initial_offset = 0; + assignment.push_back(std::unique_ptr( + RdKafka::TopicPartition::create(topic, partition, initial_offset))); + } + auto result = std::make_unique(std::move(assignment)); + + // Do the assignment. + consumer.assign(result->raw()); + return result; +} + const LibRdKafkaUtils& LibRdKafkaUtilsImpl::getDefaultInstance() { CONSTRUCT_ON_FIRST_USE(LibRdKafkaUtilsImpl); } diff --git a/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h b/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h index 0e0a5f4ee2cd..ae36b2b591de 100644 --- a/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h +++ b/contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "contrib/kafka/filters/network/source/mesh/librdkafka_utils.h" namespace Envoy { @@ -8,6 +10,9 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { +using RdKafkaPartitionPtr = std::unique_ptr; +using RdKafkaPartitionVector = std::vector; + /** * Real implementation that just performs librdkafka operations. */ @@ -38,12 +43,15 @@ class LibRdKafkaUtilsImpl : public LibRdKafkaUtils { // LibRdKafkaUtils void deleteHeaders(RdKafka::Headers* librdkafka_headers) const override; + // LibRdKafkaUtils + ConsumerAssignmentConstPtr assignConsumerPartitions(RdKafka::KafkaConsumer& consumer, + const std::string& topic, + const int32_t partitions) const override; + // Default singleton accessor. static const LibRdKafkaUtils& getDefaultInstance(); }; -using RawKafkaConfig = std::map; - } // namespace Mesh } // namespace Kafka } // namespace NetworkFilters diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer.h b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer.h new file mode 100644 index 000000000000..3b7646de449c --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include + +#include "envoy/common/pure.h" + +#include "contrib/kafka/filters/network/source/mesh/inbound_record.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +/** + * An entity that is interested in inbound records delivered by Kafka consumer. + */ +class InboundRecordProcessor { +public: + virtual ~InboundRecordProcessor() = default; + + /** + * Passes the record to the processor. + */ + virtual void receive(InboundRecordSharedPtr message) PURE; + + /** + * Blocks until there is interest in records in a given topic, or timeout expires. + * Conceptually a thick condition variable. + * @return true if there was interest. + */ + virtual bool waitUntilInterest(const std::string& topic, const int32_t timeout_ms) const PURE; +}; + +using InboundRecordProcessorPtr = std::unique_ptr; + +/** + * Kafka consumer pointing to some upstream Kafka cluster. + */ +class KafkaConsumer { +public: + virtual ~KafkaConsumer() = default; +}; + +using KafkaConsumerPtr = std::unique_ptr; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc new file mode 100644 index 000000000000..4c3dfaeab005 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc @@ -0,0 +1,135 @@ +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h" + +#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +RichKafkaConsumer::RichKafkaConsumer(InboundRecordProcessor& record_processor, + Thread::ThreadFactory& thread_factory, + const std::string& topic, const int32_t partition_count, + const RawKafkaConfig& configuration) + : RichKafkaConsumer(record_processor, thread_factory, topic, partition_count, configuration, + LibRdKafkaUtilsImpl::getDefaultInstance()){}; + +RichKafkaConsumer::RichKafkaConsumer(InboundRecordProcessor& record_processor, + Thread::ThreadFactory& thread_factory, + const std::string& topic, const int32_t partition_count, + const RawKafkaConfig& configuration, + const LibRdKafkaUtils& utils) + : record_processor_{record_processor}, topic_{topic} { + + ENVOY_LOG(debug, "Creating consumer for topic [{}] with {} partitions", topic, partition_count); + + // Create consumer configuration object. + std::unique_ptr conf = + std::unique_ptr(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + + std::string errstr; + + // Setup consumer custom properties. + for (const auto& e : configuration) { + ENVOY_LOG(debug, "Setting consumer property {}={}", e.first, e.second); + if (utils.setConfProperty(*conf, e.first, e.second, errstr) != RdKafka::Conf::CONF_OK) { + throw EnvoyException(absl::StrCat("Could not set consumer property [", e.first, "] to [", + e.second, "]:", errstr)); + } + } + + // We create the consumer. + consumer_ = utils.createConsumer(conf.get(), errstr); + if (!consumer_) { + throw EnvoyException(absl::StrCat("Could not create consumer:", errstr)); + } + + // Consumer is going to read from all the topic partitions. + assignment_ = utils.assignConsumerPartitions(*consumer_, topic, partition_count); + + // Start the worker thread. + worker_thread_active_ = true; + const std::function thread_routine = [this]() -> void { runWorkerLoop(); }; + worker_thread_ = thread_factory.createThread(thread_routine); +} + +RichKafkaConsumer::~RichKafkaConsumer() { + ENVOY_LOG(debug, "Closing Kafka consumer [{}]", topic_); + + worker_thread_active_ = false; + // This should take at most INTEREST_TIMEOUT_MS + POLL_TIMEOUT_MS. + worker_thread_->join(); + + consumer_->unassign(); + consumer_->close(); + + ENVOY_LOG(debug, "Kafka consumer [{}] closed succesfully", topic_); +} + +// Read timeout constants. +// Large values are okay, but make the Envoy shutdown take longer +// (as there is no good way to interrupt a Kafka 'consume' call). +// XXX (adam.kotwasinski) This could be made configurable. + +// How long a thread should wait for interest before checking if it's cancelled. +constexpr int32_t INTEREST_TIMEOUT_MS = 1000; + +// How long a consumer should poll Kafka for messages. +constexpr int32_t POLL_TIMEOUT_MS = 1000; + +void RichKafkaConsumer::runWorkerLoop() { + while (worker_thread_active_) { + + // It makes no sense to poll and receive records if there is no interest right now, + // so we can just block instead. + bool can_poll = record_processor_.waitUntilInterest(topic_, INTEREST_TIMEOUT_MS); + if (!can_poll) { + // There is nothing to do, so we keep checking again. + // Also we happen to check if we were closed - this makes Envoy shutdown bit faster. + continue; + } + + // There is interest in messages present in this topic, so we can start polling. + std::vector records = receiveRecordBatch(); + for (auto& record : records) { + record_processor_.receive(record); + } + } + ENVOY_LOG(debug, "Worker thread for consumer [{}] finished", topic_); +} + +// Helper method, gets rid of librdkafka. +static InboundRecordSharedPtr transform(RdKafkaMessagePtr arg) { + const auto topic = arg->topic_name(); + const auto partition = arg->partition(); + const auto offset = arg->offset(); + return std::make_shared(topic, partition, offset); +} + +std::vector RichKafkaConsumer::receiveRecordBatch() { + // This message kicks off librdkafka consumer's Fetch requests and delivers a message. + auto message = std::unique_ptr(consumer_->consume(POLL_TIMEOUT_MS)); + if (RdKafka::ERR_NO_ERROR == message->err()) { + // We got a message. + auto inbound_record = transform(std::move(message)); + ENVOY_LOG(trace, "Received Kafka message (first one): {}", inbound_record->toString()); + + // XXX (adam.kotwasinski) There could be something more present in the consumer, + // and we could drain it (at least a little) in the next commits. + // See: https://github.com/edenhill/librdkafka/discussions/3897 + return {inbound_record}; + } else { + // Nothing extraordinary (timeout because there is nothing upstream), + // or upstream connectivity failure. + ENVOY_LOG(trace, "No message received in consumer [{}]: {}/{}", topic_, message->err(), + RdKafka::err2str(message->err())); + return {}; + } +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h new file mode 100644 index 000000000000..a9b69c1b7cd2 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h @@ -0,0 +1,74 @@ +#pragma once + +#include +#include +#include + +#include "envoy/event/dispatcher.h" +#include "envoy/thread/thread.h" + +#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils.h" +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +/** + * Combines the librdkafka consumer and its dedicated worker thread. + * The thread receives the records, and pushes them to the processor. + * The consumer is going to receive the records from all the partitions for the given topic. + */ +class RichKafkaConsumer : public KafkaConsumer, private Logger::Loggable { +public: + // Main constructor. + RichKafkaConsumer(InboundRecordProcessor& record_processor, Thread::ThreadFactory& thread_factory, + const std::string& topic, const int32_t partition_count, + const RawKafkaConfig& configuration); + + // Visible for testing (allows injection of LibRdKafkaUtils). + RichKafkaConsumer(InboundRecordProcessor& record_processor, Thread::ThreadFactory& thread_factory, + const std::string& topic, const int32_t partition_count, + const RawKafkaConfig& configuration, const LibRdKafkaUtils& utils); + + // More complex than usual - closes the real Kafka consumer and disposes of the assignment object. + ~RichKafkaConsumer() override; + +private: + // This method continuously fetches new records and passes them to processor. + // Does not finish until this object gets destroyed. + // Executed in the dedicated worker thread. + void runWorkerLoop(); + + // Uses internal consumer to receive records from upstream. + std::vector receiveRecordBatch(); + + // The record processor (provides info whether it wants records and consumes them). + InboundRecordProcessor& record_processor_; + + // The topic we are consuming from. + std::string topic_; + + // Real Kafka consumer (NOT thread-safe). + // All access to this thing happens in the worker thread. + std::unique_ptr consumer_; + + // Consumer's partition assignment. + ConsumerAssignmentConstPtr assignment_; + + // Flag controlling worker threads's execution. + std::atomic worker_thread_active_; + + // Real worker thread. + // Responsible for getting records from upstream Kafka with a consumer and passing these records + // to the processor. + Thread::ThreadPtr worker_thread_; +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/BUILD b/contrib/kafka/filters/network/test/mesh/BUILD index fe6ef7ba66af..47c556411c0c 100644 --- a/contrib/kafka/filters/network/test/mesh/BUILD +++ b/contrib/kafka/filters/network/test/mesh/BUILD @@ -74,6 +74,17 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "upstream_kafka_consumer_impl_unit_test", + srcs = ["upstream_kafka_consumer_impl_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + ":kafka_mocks_lib", + "//contrib/kafka/filters/network/source/mesh:upstream_kafka_consumer_impl_lib", + "//test/test_common:thread_factory_for_test_lib", + ], +) + envoy_cc_test_library( name = "kafka_mocks_lib", srcs = [], diff --git a/contrib/kafka/filters/network/test/mesh/kafka_mocks.h b/contrib/kafka/filters/network/test/mesh/kafka_mocks.h index f3f91be76590..2e2a7efb758f 100644 --- a/contrib/kafka/filters/network/test/mesh/kafka_mocks.h +++ b/contrib/kafka/filters/network/test/mesh/kafka_mocks.h @@ -12,6 +12,8 @@ namespace Mesh { // This file defines all librdkafka-related mocks. +class MockConsumerAssignment : public ConsumerAssignment {}; + class MockLibRdKafkaUtils : public LibRdKafkaUtils { public: MOCK_METHOD(RdKafka::Conf::ConfResult, setConfProperty, @@ -25,6 +27,8 @@ class MockLibRdKafkaUtils : public LibRdKafkaUtils { MOCK_METHOD(RdKafka::Headers*, convertHeaders, ((const std::vector>&)), (const)); MOCK_METHOD(void, deleteHeaders, (RdKafka::Headers * librdkafka_headers), (const)); + MOCK_METHOD(ConsumerAssignmentConstPtr, assignConsumerPartitions, + (RdKafka::KafkaConsumer&, const std::string&, int32_t), (const)); MockLibRdKafkaUtils() { ON_CALL(*this, convertHeaders(testing::_)) @@ -35,35 +39,9 @@ class MockLibRdKafkaUtils : public LibRdKafkaUtils { std::unique_ptr headers_holder_{RdKafka::Headers::create()}; }; -class MockKafkaProducer : public RdKafka::Producer { +// Base class for librdkafka objects. +class MockKafkaHandle : public virtual RdKafka::Handle { public: - // Producer API. - MOCK_METHOD(RdKafka::ErrorCode, produce, - (RdKafka::Topic*, int32_t, int, void*, size_t, const std::string*, void*), ()); - MOCK_METHOD(RdKafka::ErrorCode, produce, - (RdKafka::Topic*, int32_t, int, void*, size_t, const void*, size_t, void*), ()); - MOCK_METHOD(RdKafka::ErrorCode, produce, - (const std::string, int32_t, int, void*, size_t, const void*, size_t, int64_t, void*), - ()); - MOCK_METHOD(RdKafka::ErrorCode, produce, - (const std::string, int32_t, int, void*, size_t, const void*, size_t, int64_t, - RdKafka::Headers*, void*), - ()); - MOCK_METHOD(RdKafka::ErrorCode, produce, - (RdKafka::Topic*, int32_t, const std::vector*, const std::vector*, void*), - ()); - MOCK_METHOD(RdKafka::ErrorCode, flush, (int), ()); - MOCK_METHOD(RdKafka::ErrorCode, purge, (int), ()); - MOCK_METHOD(RdKafka::Error*, init_transactions, (int), ()); - MOCK_METHOD(RdKafka::Error*, begin_transaction, (), ()); - MOCK_METHOD(RdKafka::Error*, send_offsets_to_transaction, - (const std::vector&, const RdKafka::ConsumerGroupMetadata*, - int), - ()); - MOCK_METHOD(RdKafka::Error*, commit_transaction, (int), ()); - MOCK_METHOD(RdKafka::Error*, abort_transaction, (int), ()); - - // Handle API (unused by us). MOCK_METHOD(const std::string, name, (), (const)); MOCK_METHOD(const std::string, memberid, (), (const)); MOCK_METHOD(int, poll, (int), ()); @@ -94,6 +72,34 @@ class MockKafkaProducer : public RdKafka::Producer { MOCK_METHOD(void, mem_free, (void*), ()); }; +class MockKafkaProducer : public RdKafka::Producer, public MockKafkaHandle { +public: + MOCK_METHOD(RdKafka::ErrorCode, produce, + (RdKafka::Topic*, int32_t, int, void*, size_t, const std::string*, void*), ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (RdKafka::Topic*, int32_t, int, void*, size_t, const void*, size_t, void*), ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (const std::string, int32_t, int, void*, size_t, const void*, size_t, int64_t, void*), + ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (const std::string, int32_t, int, void*, size_t, const void*, size_t, int64_t, + RdKafka::Headers*, void*), + ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (RdKafka::Topic*, int32_t, const std::vector*, const std::vector*, void*), + ()); + MOCK_METHOD(RdKafka::ErrorCode, flush, (int), ()); + MOCK_METHOD(RdKafka::ErrorCode, purge, (int), ()); + MOCK_METHOD(RdKafka::Error*, init_transactions, (int), ()); + MOCK_METHOD(RdKafka::Error*, begin_transaction, (), ()); + MOCK_METHOD(RdKafka::Error*, send_offsets_to_transaction, + (const std::vector&, const RdKafka::ConsumerGroupMetadata*, + int), + ()); + MOCK_METHOD(RdKafka::Error*, commit_transaction, (int), ()); + MOCK_METHOD(RdKafka::Error*, abort_transaction, (int), ()); +}; + class MockKafkaMessage : public RdKafka::Message { public: MOCK_METHOD(std::string, errstr, (), (const)); @@ -117,6 +123,38 @@ class MockKafkaMessage : public RdKafka::Message { MOCK_METHOD(int32_t, broker_id, (), (const)); }; +class MockKafkaConsumer : public RdKafka::KafkaConsumer, public MockKafkaHandle { +public: + MOCK_METHOD(RdKafka::ErrorCode, assignment, (std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, subscription, (std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, subscribe, (const std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, unsubscribe, (), ()); + MOCK_METHOD(RdKafka::ErrorCode, assign, (const std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, unassign, (), ()); + MOCK_METHOD(RdKafka::Message*, consume, (int), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitSync, (), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitAsync, (), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitSync, (RdKafka::Message*), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitAsync, (RdKafka::Message*), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitSync, (std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitAsync, (const std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitSync, (RdKafka::OffsetCommitCb*), ()); + MOCK_METHOD(RdKafka::ErrorCode, commitSync, + (std::vector&, RdKafka::OffsetCommitCb*), ()); + MOCK_METHOD(RdKafka::ErrorCode, committed, (std::vector&, int), ()); + MOCK_METHOD(RdKafka::ErrorCode, position, (std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, close, (), ()); + MOCK_METHOD(RdKafka::ErrorCode, seek, (const RdKafka::TopicPartition&, int), ()); + MOCK_METHOD(RdKafka::ErrorCode, offsets_store, (std::vector&), ()); + MOCK_METHOD(RdKafka::ConsumerGroupMetadata*, groupMetadata, (), ()); + MOCK_METHOD(bool, assignment_lost, (), ()); + MOCK_METHOD(std::string, rebalance_protocol, (), ()); + MOCK_METHOD(RdKafka::Error*, incremental_assign, (const std::vector&), + ()); + MOCK_METHOD(RdKafka::Error*, incremental_unassign, (const std::vector&), + ()); +}; + } // namespace Mesh } // namespace Kafka } // namespace NetworkFilters diff --git a/contrib/kafka/filters/network/test/mesh/upstream_kafka_client_impl_unit_test.cc b/contrib/kafka/filters/network/test/mesh/upstream_kafka_client_impl_unit_test.cc index ef21325959f0..10478d4892eb 100644 --- a/contrib/kafka/filters/network/test/mesh/upstream_kafka_client_impl_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/upstream_kafka_client_impl_unit_test.cc @@ -33,8 +33,8 @@ class UpstreamKafkaClientTest : public testing::Test { NiceMock kafka_utils_{}; RawKafkaConfig config_ = {{"key1", "value1"}, {"key2", "value2"}}; - std::unique_ptr producer_ptr = std::make_unique(); - MockKafkaProducer& producer = *producer_ptr; + std::unique_ptr producer_ptr_ = std::make_unique(); + MockKafkaProducer& producer_ = *producer_ptr_; std::shared_ptr origin_ = std::make_shared(); @@ -47,9 +47,9 @@ class UpstreamKafkaClientTest : public testing::Test { EXPECT_CALL(kafka_utils_, setConfDeliveryCallback(_, _, _)) .WillOnce(Return(RdKafka::Conf::CONF_OK)); - EXPECT_CALL(producer, poll(_)).Times(AnyNumber()); + EXPECT_CALL(producer_, poll(_)).Times(AnyNumber()); EXPECT_CALL(kafka_utils_, createProducer(_, _)) - .WillOnce(Return(testing::ByMove(std::move(producer_ptr)))); + .WillOnce(Return(testing::ByMove(std::move(producer_ptr_)))); EXPECT_CALL(kafka_utils_, deleteHeaders(_)).Times(0); } @@ -71,7 +71,7 @@ TEST_F(UpstreamKafkaClientTest, ShouldSendRecordsAndReceiveConfirmations) { RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; // when, then - should send request without problems. - EXPECT_CALL(producer, produce("topic", 13, _, _, _, _, _, _, _, _)) + EXPECT_CALL(producer_, produce("topic", 13, _, _, _, _, _, _, _, _)) .Times(3) .WillRepeatedly(Return(RdKafka::ERR_NO_ERROR)); const std::vector payloads = {"value1", "value2", "value3"}; @@ -95,7 +95,7 @@ TEST_F(UpstreamKafkaClientTest, ShouldCheckCallbacksForDeliveries) { RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; // when, then - should send request without problems. - EXPECT_CALL(producer, produce("topic", 13, _, _, _, _, _, _, _, _)) + EXPECT_CALL(producer_, produce("topic", 13, _, _, _, _, _, _, _, _)) .Times(2) .WillRepeatedly(Return(RdKafka::ERR_NO_ERROR)); const std::vector payloads = {"value1", "value2"}; @@ -122,7 +122,7 @@ TEST_F(UpstreamKafkaClientTest, ShouldHandleProduceFailures) { RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; // when, then - if there are problems while sending, notify the source immediately. - EXPECT_CALL(producer, produce("topic", 13, _, _, _, _, _, _, _, _)) + EXPECT_CALL(producer_, produce("topic", 13, _, _, _, _, _, _, _, _)) .WillOnce(Return(RdKafka::ERR_LEADER_NOT_AVAILABLE)); EXPECT_CALL(kafka_utils_, deleteHeaders(_)); EXPECT_CALL(*origin_, accept(_)).WillOnce(Return(true)); @@ -149,7 +149,7 @@ TEST_F(UpstreamKafkaClientTest, ShouldHandleHeaderConversionFailures) { RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; // when, then - producer was not interacted with, response was sent immediately. - EXPECT_CALL(producer, produce(_, _, _, _, _, _, _, _, _, _)).Times(0); + EXPECT_CALL(producer_, produce(_, _, _, _, _, _, _, _, _, _)).Times(0); EXPECT_CALL(*origin_, accept(_)).WillOnce(Return(true)); testee.send(origin_, makeRecord("value")); EXPECT_EQ(testee.getUnfinishedRequestsForTest().size(), 0); @@ -199,7 +199,7 @@ TEST_F(UpstreamKafkaClientTest, ShouldPollProducerForEventsUntilShutdown) { setupConstructorExpectations(); absl::BlockingCounter counter{1}; - EXPECT_CALL(producer, poll(_)).Times(AtLeast(1)).WillOnce([&counter]() { + EXPECT_CALL(producer_, poll(_)).Times(AtLeast(1)).WillOnce([&counter]() { counter.DecrementCount(); return 0; }); diff --git a/contrib/kafka/filters/network/test/mesh/upstream_kafka_consumer_impl_unit_test.cc b/contrib/kafka/filters/network/test/mesh/upstream_kafka_consumer_impl_unit_test.cc new file mode 100644 index 000000000000..1ae8ca8aae08 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/upstream_kafka_consumer_impl_unit_test.cc @@ -0,0 +1,188 @@ +#include + +#include "test/test_common/thread_factory_for_test.h" + +#include "absl/synchronization/mutex.h" +#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils.h" +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h" +#include "contrib/kafka/filters/network/test/mesh/kafka_mocks.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::AnyNumber; +using testing::AtLeast; +using testing::ByMove; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnNull; + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +class MockInboundRecordProcessor : public InboundRecordProcessor { +public: + MOCK_METHOD(void, receive, (InboundRecordSharedPtr), ()); + MOCK_METHOD(bool, waitUntilInterest, (const std::string&, const int32_t), (const)); +}; + +class UpstreamKafkaConsumerTest : public testing::Test { +protected: + Thread::ThreadFactory& thread_factory_ = Thread::threadFactoryForTest(); + MockLibRdKafkaUtils kafka_utils_; + RawKafkaConfig config_ = {{"key1", "value1"}, {"key2", "value2"}}; + + std::unique_ptr consumer_ptr_ = std::make_unique(); + MockKafkaConsumer& consumer_ = *consumer_ptr_; + + MockInboundRecordProcessor record_processor_; + + // Helper method - allows creation of RichKafkaConsumer without problems. + void setupConstructorExpectations() { + EXPECT_CALL(kafka_utils_, setConfProperty(_, "key1", "value1", _)) + .WillOnce(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, setConfProperty(_, "key2", "value2", _)) + .WillOnce(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, assignConsumerPartitions(_, "topic", 42)); + + // These two methods get called in the destructor. + EXPECT_CALL(consumer_, unassign()); + EXPECT_CALL(consumer_, close()); + + EXPECT_CALL(kafka_utils_, createConsumer(_, _)) + .WillOnce(Return(ByMove(std::move(consumer_ptr_)))); + } + + // Helper method - creates the testee with all the mocks injected. + KafkaConsumerPtr makeTestee() { + return std::make_unique(record_processor_, thread_factory_, "topic", 42, + config_, kafka_utils_); + } +}; + +// This handles situations when users pass bad config to raw consumer_. +TEST_F(UpstreamKafkaConsumerTest, ShouldThrowIfSettingPropertiesFails) { + // given + EXPECT_CALL(kafka_utils_, setConfProperty(_, _, _, _)) + .WillOnce(Return(RdKafka::Conf::CONF_INVALID)); + + // when, then - exception gets thrown during construction. + EXPECT_THROW(makeTestee(), EnvoyException); +} + +TEST_F(UpstreamKafkaConsumerTest, ShouldThrowIfRawConsumerConstructionFails) { + // given + EXPECT_CALL(kafka_utils_, setConfProperty(_, _, _, _)) + .WillRepeatedly(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, createConsumer(_, _)).WillOnce(ReturnNull()); + + // when, then - exception gets thrown during construction. + EXPECT_THROW(makeTestee(), EnvoyException); +} + +// Utility class for counting invocations / capturing invocation data. +template class Tracker { +private: + mutable absl::Mutex mutex_; + int invocation_count_ ABSL_GUARDED_BY(mutex_) = 0; + T data_ ABSL_GUARDED_BY(mutex_); + +public: + // Stores the first value put inside. + void registerInvocation(const T& arg) { + absl::MutexLock lock{&mutex_}; + if (0 == invocation_count_) { + data_ = arg; + } + invocation_count_++; + } + + // Blocks until some value appears, and returns it. + T awaitFirstInvocation() const { + const auto cond = std::bind(&Tracker::hasInvocations, this, 1); + absl::MutexLock lock{&mutex_, absl::Condition(&cond)}; + return data_; + } + + // Blocks until N invocations have happened. + void awaitInvocations(const int n) const { + const auto cond = std::bind(&Tracker::hasInvocations, this, n); + absl::MutexLock lock{&mutex_, absl::Condition(&cond)}; + } + +private: + bool hasInvocations(const int n) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { + return invocation_count_ >= n; + } +}; + +// Utility method: creates a Kafka message with given error code. +// Results will be freed by the worker thread. +static RdKafkaMessageRawPtr makeMessage(const RdKafka::ErrorCode error_code) { + const auto result = new NiceMock(); + ON_CALL(*result, err()).WillByDefault(Return(error_code)); + return result; +} + +// Expected behaviour: if there is interest, then poll for records, and pass them to processor. +TEST_F(UpstreamKafkaConsumerTest, ShouldReceiveRecordsFromKafkaConsumer) { + // given + setupConstructorExpectations(); + + EXPECT_CALL(record_processor_, waitUntilInterest(_, _)) + .Times(AnyNumber()) + .WillRepeatedly(Return(true)); + + EXPECT_CALL(consumer_, consume(_)).Times(AnyNumber()).WillRepeatedly([]() { + return makeMessage(RdKafka::ERR_NO_ERROR); + }); + + Tracker tracker; + EXPECT_CALL(record_processor_, receive(_)) + .Times(AtLeast(1)) + .WillRepeatedly(Invoke(&tracker, &Tracker::registerInvocation)); + + // when + const auto testee = makeTestee(); + + // then - record processor got notified with a message. + const InboundRecordSharedPtr record = tracker.awaitFirstInvocation(); + ASSERT_NE(record, nullptr); +} + +// Expected behaviour: if there is no data, we send nothing to processor. +TEST_F(UpstreamKafkaConsumerTest, ShouldHandleNoDataGracefully) { + // given + setupConstructorExpectations(); + + Tracker tracker; + EXPECT_CALL(record_processor_, waitUntilInterest(_, _)) + .Times(AnyNumber()) + .WillRepeatedly([&tracker]() { + tracker.registerInvocation(nullptr); + return true; + }); + + EXPECT_CALL(consumer_, consume(_)).Times(AnyNumber()).WillRepeatedly([]() { + return makeMessage(RdKafka::ERR__TIMED_OUT); + }); + + // We do not expect to receive any meaningful records. + EXPECT_CALL(record_processor_, receive(_)).Times(0); + + // when + const auto testee = makeTestee(); + + // then - we have run a few loops, but the processor was never interacted with. + tracker.awaitInvocations(13); +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy