Skip to content

Commit

Permalink
kafka: add some tests
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Kotwasinski <adam.kotwasinski@gmail.com>
  • Loading branch information
adamkotwasinski committed Dec 12, 2022
1 parent 207bd44 commit 9cd16ed
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 20 deletions.
1 change: 1 addition & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ envoy_cc_library(
],
tags = ["skip_on_windows"],
deps = [
":librdkafka_utils_lib",
":shared_consumer_manager_lib",
":upstream_config_lib",
":upstream_kafka_consumer_impl_lib",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <map>
#include <vector>
#include <memory>
#include <string>

#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer.h"

Expand All @@ -13,7 +13,7 @@ namespace Mesh {

/**
* Manages (raw) Kafka consumers pointing to upstream Kafka clusters.
* It is expected to have only one instance of this object in the runtime.
* It is expected to have only one instance of this object per mesh-filter type.
*/
class SharedConsumerManager {
public:
Expand All @@ -23,7 +23,7 @@ class SharedConsumerManager {
virtual void registerConsumerIfAbsent(const std::string& topic) PURE;
};

using SharedConsumerManagerSharedPtr = std::shared_ptr<SharedConsumerManager>;
using SharedConsumerManagerPtr = std::unique_ptr<SharedConsumerManager>;

} // namespace Mesh
} // namespace Kafka
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.h"

#include <functional>

#include "source/common/common/fmt.h"

#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h"
Expand All @@ -12,10 +10,45 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// KafkaConsumerFactoryImpl

class KafkaConsumerFactoryImpl : public KafkaConsumerFactory {
public:
// KafkaConsumerFactory
KafkaConsumerPtr createConsumer(InboundRecordProcessor& record_processor,
Thread::ThreadFactory& thread_factory, const std::string& topic,
const int32_t partition_count,
const RawKafkaConfig& configuration) const override;

// Default singleton accessor.
static const KafkaConsumerFactory& getDefaultInstance();
};

KafkaConsumerPtr
KafkaConsumerFactoryImpl::createConsumer(InboundRecordProcessor& record_processor,
Thread::ThreadFactory& thread_factory,
const std::string& topic, const int32_t partition_count,
const RawKafkaConfig& configuration) const {
return std::make_unique<RichKafkaConsumer>(record_processor, thread_factory, topic,
partition_count, configuration);
}

const KafkaConsumerFactory& KafkaConsumerFactoryImpl::getDefaultInstance() {
CONSTRUCT_ON_FIRST_USE(KafkaConsumerFactoryImpl);
}

// SharedConsumerManagerImpl

SharedConsumerManagerImpl::SharedConsumerManagerImpl(
const UpstreamKafkaConfiguration& configuration, Thread::ThreadFactory& thread_factory)
: record_processor_{std::make_unique<SharedProcessor>()}, configuration_{configuration},
thread_factory_{thread_factory} {}
: SharedConsumerManagerImpl{configuration, thread_factory,
KafkaConsumerFactoryImpl::getDefaultInstance()} {}

SharedConsumerManagerImpl::SharedConsumerManagerImpl(
const UpstreamKafkaConfiguration& configuration, Thread::ThreadFactory& thread_factory,
const KafkaConsumerFactory& consumer_factory)
: distributor_{std::make_unique<RecordDistributor>()}, configuration_{configuration},
thread_factory_{thread_factory}, consumer_factory_{consumer_factory} {}

void SharedConsumerManagerImpl::registerConsumerIfAbsent(const std::string& topic) {
absl::MutexLock lock(&consumers_mutex_);
Expand All @@ -38,22 +71,27 @@ void SharedConsumerManagerImpl::registerNewConsumer(const std::string& topic) {
}

// Create the consumer and register it.
KafkaConsumerPtr new_consumer = std::make_unique<RichKafkaConsumer>(
*record_processor_, thread_factory_, topic, cluster_config->partition_count_,
KafkaConsumerPtr new_consumer = consumer_factory_.createConsumer(
*distributor_, thread_factory_, topic, cluster_config->partition_count_,
cluster_config->upstream_consumer_properties_);
ENVOY_LOG(debug, "Registering new Kafka consumer for topic [{}], consuming from cluster [{}]",
topic, cluster_config->name_);
topic_to_consumer_.emplace(topic, std::move(new_consumer));
}

// InboundRecordProcessor
bool SharedProcessor::waitUntilInterest(const std::string&, const int32_t) const {
size_t SharedConsumerManagerImpl::getConsumerCountForTest() const {
absl::MutexLock lock(&consumers_mutex_);
return topic_to_consumer_.size();
}

// RecordDistributor

bool RecordDistributor::waitUntilInterest(const std::string&, const int32_t) const {
// TODO (adam.kotwasinski) To implement in future commits.
return false;
}

// InboundRecordProcessor
void SharedProcessor::receive(InboundRecordSharedPtr) {
void RecordDistributor::receive(InboundRecordSharedPtr) {
// TODO (adam.kotwasinski) To implement in future commits.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "source/common/common/logger.h"

#include "contrib/kafka/filters/network/source/kafka_types.h"
#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils.h"
#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager.h"
#include "contrib/kafka/filters/network/source/mesh/upstream_config.h"
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer.h"
Expand All @@ -22,8 +23,9 @@ namespace Mesh {
* In future:
* Processor implementation that stores received records (that had no interest), and callbacks
* waiting for records (that had no matching records delivered yet).
* Basically core of Fetch-handling business logic.
*/
class SharedProcessor : public InboundRecordProcessor {
class RecordDistributor : public InboundRecordProcessor {
public:
// InboundRecordProcessor
bool waitUntilInterest(const std::string& topic, const int32_t timeout_ms) const override;
Expand All @@ -32,29 +34,52 @@ class SharedProcessor : public InboundRecordProcessor {
void receive(InboundRecordSharedPtr message) override;
};

using SharedProcessorPtr = std::unique_ptr<SharedProcessor>;
using RecordDistributorPtr = std::unique_ptr<RecordDistributor>;

/**
* Implements SCM interface by maintaining a collection of Kafka consumers (one per topic).
* Injectable for tests.
*/
class KafkaConsumerFactory {
public:
virtual ~KafkaConsumerFactory() = default;

// Create a Kafka consumer.
virtual KafkaConsumerPtr createConsumer(InboundRecordProcessor& record_processor,
Thread::ThreadFactory& thread_factory,
const std::string& topic, const int32_t partition_count,
const RawKafkaConfig& configuration) const PURE;
};

/**
* Maintains a collection of Kafka consumers (one per topic).
*/
class SharedConsumerManagerImpl : public SharedConsumerManager,
private Logger::Loggable<Logger::Id::kafka> {
public:
// Main constructor.
SharedConsumerManagerImpl(const UpstreamKafkaConfiguration& configuration,
Thread::ThreadFactory& thread_factory);

// Visible for testing.
SharedConsumerManagerImpl(const UpstreamKafkaConfiguration& configuration,
Thread::ThreadFactory& thread_factory,
const KafkaConsumerFactory& consumer_factory);

// SharedConsumerManager
void registerConsumerIfAbsent(const std::string& topic) override;

size_t getConsumerCountForTest() const;

private:
// Mutates 'topic_to_consumer_'.
void registerNewConsumer(const std::string& topic)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(consumers_mutex_);

SharedProcessorPtr record_processor_;
RecordDistributorPtr distributor_;

const UpstreamKafkaConfiguration& configuration_;
Thread::ThreadFactory& thread_factory_;
const KafkaConsumerFactory& consumer_factory_;

mutable absl::Mutex consumers_mutex_;
std::map<std::string, KafkaConsumerPtr> topic_to_consumer_ ABSL_GUARDED_BY(consumers_mutex_);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#include "envoy/common/exception.h"
#include "envoy/thread/thread.h"

#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
Expand All @@ -7,17 +10,89 @@ namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {
namespace {

using testing::_;
using testing::Return;

class MockThreadFactory : public Thread::ThreadFactory {
public:
MOCK_METHOD(Thread::ThreadPtr, createThread, (std::function<void()>, Thread::OptionsOptConstRef));
MOCK_METHOD(Thread::ThreadId, currentThreadId, ());
};

class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration {
public:
MOCK_METHOD(absl::optional<ClusterConfig>, computeClusterConfigForTopic, (const std::string&),
(const));
MOCK_METHOD((std::pair<std::string, int32_t>), getAdvertisedAddress, (), (const));
};

class SCMTest : public testing::Test {
class MockKafkaConsumerFactory : public KafkaConsumerFactory {
public:
MOCK_METHOD(KafkaConsumerPtr, createConsumer,
(InboundRecordProcessor&, Thread::ThreadFactory&, const std::string&, const int32_t,
const RawKafkaConfig&),
(const));
};

class SharedConsumerManagerTest : public testing::Test {
protected:
MockThreadFactory thread_factory_;
MockUpstreamKafkaConfiguration configuration_;
MockKafkaConsumerFactory consumer_factory_;

std::unique_ptr<SharedConsumerManagerImpl> makeTestee() {
return std::make_unique<SharedConsumerManagerImpl>(configuration_, thread_factory_,
consumer_factory_);
}
};

TEST_F(SCMTest, ShouldTest) {
TEST_F(SharedConsumerManagerTest, ShouldRegisterTopicOnlyOnce) {
// given
const std::string topic1 = "topic1";
const std::string topic2 = "topic2";

const ClusterConfig cluster_config = {"cluster", 1, {}, {}};
EXPECT_CALL(configuration_, computeClusterConfigForTopic(topic1))
.WillOnce(Return(cluster_config));
EXPECT_CALL(configuration_, computeClusterConfigForTopic(topic2))
.WillOnce(Return(cluster_config));

EXPECT_CALL(consumer_factory_, createConsumer(_, _, _, _, _)).Times(2).WillRepeatedly([]() {
return nullptr;
});

auto testee = makeTestee();

// when
for (int i = 0; i < 3; ++i) {
testee->registerConsumerIfAbsent(topic1);
}
for (int i = 0; i < 3; ++i) {
testee->registerConsumerIfAbsent(topic2);
}

// then
ASSERT_EQ(testee->getConsumerCountForTest(), 2);
}

TEST_F(SharedConsumerManagerTest, ShouldHandleMissingConfig) {
// given
const std::string topic = "topic";

EXPECT_CALL(configuration_, computeClusterConfigForTopic(topic)).WillOnce(Return(absl::nullopt));

EXPECT_CALL(consumer_factory_, createConsumer(_, _, _, _, _)).Times(0);

auto testee = makeTestee();

// when, then - construction throws and nothing gets registered.
EXPECT_THROW(testee->registerConsumerIfAbsent(topic), EnvoyException);
ASSERT_EQ(testee->getConsumerCountForTest(), 0);
}

} // namespace
} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
Expand Down

0 comments on commit 9cd16ed

Please sign in to comment.