Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: inbound record distributor/store #24518

Merged
merged 8 commits into from
Dec 16, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Processes incoming record callbacks (i.e. Fetch requests).
*/
class RecordCallbackProcessor {
public:
virtual ~RecordCallbackProcessor() = default;

// Process an inbound record callback by passing cached records to it
// and (if needed) registering the callback.
virtual void processCallback(const RecordCbSharedPtr& callback) PURE;

// Remove the callback (usually invoked by the callback timing out downstream).
virtual void removeCallback(const RecordCbSharedPtr& callback) PURE;
};

/**
* Manages (raw) Kafka consumers pointing to upstream Kafka clusters.
* It is expected to have only one instance of this object per mesh-filter type.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "contrib/kafka/filters/network/source/mesh/shared_consumer_manager_impl.h"

#include <functional>

#include "source/common/common/fmt.h"

#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h"
Expand Down Expand Up @@ -50,6 +52,25 @@ SharedConsumerManagerImpl::SharedConsumerManagerImpl(
: distributor_{std::make_unique<RecordDistributor>()}, configuration_{configuration},
thread_factory_{thread_factory}, consumer_factory_{consumer_factory} {}

void SharedConsumerManagerImpl::processCallback(const RecordCbSharedPtr& callback) {

// For every fetch topic, figure out the upstream cluster,
// create consumer if needed ...
const TopicToPartitionsMap interest = callback->interest();
for (const auto& fetch : interest) {
const std::string& topic = fetch.first;
registerConsumerIfAbsent(topic);
}

// ... and start processing.
distributor_->processCallback(callback);
}

void SharedConsumerManagerImpl::removeCallback(const RecordCbSharedPtr& callback) {
// Real work - let's remove the callback.
distributor_->removeCallback(callback);
}

void SharedConsumerManagerImpl::registerConsumerIfAbsent(const std::string& topic) {
absl::MutexLock lock(&consumers_mutex_);
const auto it = topic_to_consumer_.find(topic);
Expand Down Expand Up @@ -86,13 +107,228 @@ size_t SharedConsumerManagerImpl::getConsumerCountForTest() const {

// RecordDistributor

bool RecordDistributor::waitUntilInterest(const std::string&, const int32_t) const {
// TODO (adam.kotwasinski) To implement in future commits.
RecordDistributor::RecordDistributor() : RecordDistributor({}, {}){};

RecordDistributor::RecordDistributor(const PartitionMap<RecordCbSharedPtr>& callbacks,
const PartitionMap<InboundRecordSharedPtr>& records)
: partition_to_callbacks_{callbacks}, stored_records_{records} {};

bool RecordDistributor::waitUntilInterest(const std::string& topic,
const int32_t timeout_ms) const {

auto distributor_has_interest = std::bind(&RecordDistributor::hasInterest, this, topic);
// Effectively this means "has an interest appeared within timeout".
// If not, we let the user know so they could do something else
// instead of being infinitely blocked.
bool can_poll = callbacks_mutex_.LockWhenWithTimeout(absl::Condition(&distributor_has_interest),
absl::Milliseconds(timeout_ms));
callbacks_mutex_.Unlock(); // Lock...WithTimeout always locks, so we need to unlock.
return can_poll;
}

bool RecordDistributor::hasInterest(const std::string& topic) const {
for (const auto& e : partition_to_callbacks_) {
if (topic == e.first.first && !e.second.empty()) {
return true;
}
}
return false;
}

// XXX (adam.kotwasinski) Inefficient: locks aquired per record.
void RecordDistributor::receive(InboundRecordSharedPtr record) {

const KafkaPartition kafka_partition = {record->topic_, record->partition_};

// Whether this record has been consumed by any of the callbacks.
// Because then we can safely throw it away instead of storing.
bool consumed_by_callback = false;

{
absl::MutexLock lock(&callbacks_mutex_);
auto& callbacks = partition_to_callbacks_[kafka_partition];

std::vector<RecordCbSharedPtr> satisfied_callbacks = {};

// Typical case: there is some interest in records for given partition.
// Notify the callback and remove it.
for (const auto& callback : callbacks) {
CallbackReply callback_status = callback->receive(record);
switch (callback_status) {
case CallbackReply::AcceptedAndFinished: {
consumed_by_callback = true;
// A callback is finally satisfied, it will never want more records.
satisfied_callbacks.push_back(callback);
break;
}
case CallbackReply::AcceptedAndWantMore: {
consumed_by_callback = true;
break;
}
case CallbackReply::Rejected: {
break;
}
} /* switch */

/* Some callback has taken the record - this is good, no more iterating. */
if (consumed_by_callback) {
break;
}
}

for (const auto& callback : satisfied_callbacks) {
doRemoveCallback(callback);
}
}

// Noone is interested in our record, so we are going to store it in a local cache.
if (!consumed_by_callback) {
absl::MutexLock lock(&stored_records_mutex_);
auto& stored_records = stored_records_[kafka_partition];
// XXX (adam.kotwasinski) Implement some kind of limit.
stored_records.push_back(record);
ENVOY_LOG(trace, "Stored record [{}]", record->toString());
}
}

void RecordDistributor::processCallback(const RecordCbSharedPtr& callback) {
ENVOY_LOG(trace, "Processing callback {}", callback->toString());

// Attempt to fulfill callback's requirements using the stored records.
bool fulfilled_at_startup = passRecordsToCallback(callback);

if (fulfilled_at_startup) {
// Early exit: callback was fulfilled with only stored records.
// What means it will not require anything anymore, and does not need to be registered.
ENVOY_LOG(trace, "No registration for callback {} due to successful early processing",
callback->toString());
return;
}

// Usual path: the request was not fulfilled at receive time (there were no stored messages).
// So we just register the callback.
TopicToPartitionsMap requested = callback->interest();
absl::MutexLock lock(&callbacks_mutex_);
for (const auto& topic_and_partitions : requested) {
const std::string topic = topic_and_partitions.first;
for (const int32_t partition : topic_and_partitions.second) {
const KafkaPartition kp = {topic, partition};
auto& partition_callbacks = partition_to_callbacks_[kp];
partition_callbacks.push_back(callback);
}
}
}

bool RecordDistributor::passRecordsToCallback(const RecordCbSharedPtr& callback) {
TopicToPartitionsMap requested = callback->interest();
absl::MutexLock lock(&stored_records_mutex_);

for (const auto& topic_and_partitions : requested) {
for (const int32_t partition : topic_and_partitions.second) {
const KafkaPartition kp = {topic_and_partitions.first, partition};
// Processing of given partition's records was enough for given callback.
const bool processing_finished = passPartitionRecordsToCallback(callback, kp);
if (processing_finished) {
return true;
}
}
}

// All the eligible records have been passed to callback, but it still wants more.
// So we are going to need to register it.
return false;
}

void RecordDistributor::receive(InboundRecordSharedPtr) {
// TODO (adam.kotwasinski) To implement in future commits.
bool RecordDistributor::passPartitionRecordsToCallback(const RecordCbSharedPtr& callback,
const KafkaPartition& kafka_partition) {
const auto it = stored_records_.find(kafka_partition);
if (stored_records_.end() == it) {
// This partition does not have any records buffered.
return false;
}

auto& partition_records = it->second;
ENVOY_LOG(trace, "Early notification for callback {}, as there are {} messages available",
callback->toString(), partition_records.size());

bool processing_finished = false;
for (auto record_it = partition_records.begin(); record_it != partition_records.end();) {
const CallbackReply callback_status = callback->receive(*record_it);
switch (callback_status) {
case CallbackReply::AcceptedAndWantMore: {
// Callback consumed the record, and wants more. We keep iterating.
record_it = partition_records.erase(record_it);
break;
}
case CallbackReply::AcceptedAndFinished: {
// We had a callback that wanted records, and got all it wanted in the initial
// processing (== everything it needed was buffered), so we won't need to register it.
record_it = partition_records.erase(record_it);
processing_finished = true;
break;
}
case CallbackReply::Rejected: {
// Our callback entered a terminal state in the meantime. We won't work with it anymore.
processing_finished = true;
break;
}
} /* switch */

if (processing_finished) {
// No more processing needed.
break;
}
} /* for-stored-records */

if (partition_records.empty()) {
// The partition's buffer got drained - there is no reason to keep empty vectors.
stored_records_.erase(it);
}

return processing_finished;
}

void RecordDistributor::removeCallback(const RecordCbSharedPtr& callback) {
absl::MutexLock lock(&callbacks_mutex_);
doRemoveCallback(callback);
}

void RecordDistributor::doRemoveCallback(const RecordCbSharedPtr& callback) {
ENVOY_LOG(trace, "Removing callback {}", callback->toString());
for (auto it = partition_to_callbacks_.begin(); it != partition_to_callbacks_.end();) {
auto& partition_callbacks = it->second;
partition_callbacks.erase(
std::remove(partition_callbacks.begin(), partition_callbacks.end(), callback),
partition_callbacks.end());
if (partition_callbacks.empty()) {
it = partition_to_callbacks_.erase(it);
} else {
++it;
}
}
}

// Just a helper function for tests.
template <typename T>
int32_t countForTest(const std::string& topic, const int32_t partition, PartitionMap<T> map) {
const auto it = map.find({topic, partition});
if (map.end() != it) {
return it->second.size();
} else {
return -1; // Tests are simpler to type if we do this instead of absl::optional.
}
}

int32_t RecordDistributor::getCallbackCountForTest(const std::string& topic,
const int32_t partition) const {
absl::MutexLock lock(&callbacks_mutex_);
return countForTest(topic, partition, partition_to_callbacks_);
}

int32_t RecordDistributor::getRecordCountForTest(const std::string& topic,
const int32_t partition) const {
absl::MutexLock lock(&stored_records_mutex_);
return countForTest(topic, partition, stored_records_);
}

} // namespace Mesh
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <map>
#include <vector>

#include "envoy/thread/thread.h"

Expand All @@ -18,20 +19,66 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

template <typename T> using PartitionMap = std::map<KafkaPartition, std::vector<T>>;

/**
* Placeholder interface for now.
* In future:
* Processor implementation that stores received records (that had no interest), and callbacks
* waiting for records (that had no matching records delivered yet).
* Processor implementation that stores received records (that had no interest),
* and callbacks waiting for records (that had no matching records delivered yet).
* Basically core of Fetch-handling business logic.
*/
class RecordDistributor : public InboundRecordProcessor {
class RecordDistributor : public RecordCallbackProcessor,
public InboundRecordProcessor,
private Logger::Loggable<Logger::Id::kafka> {
public:
// Main constructor.
RecordDistributor();

// Visible for testing.
RecordDistributor(const PartitionMap<RecordCbSharedPtr>& callbacks,
const PartitionMap<InboundRecordSharedPtr>& records);

// InboundRecordProcessor
bool waitUntilInterest(const std::string& topic, const int32_t timeout_ms) const override;

// InboundRecordProcessor
void receive(InboundRecordSharedPtr message) override;

// RecordCallbackProcessor
void processCallback(const RecordCbSharedPtr& callback) override;

// RecordCallbackProcessor
void removeCallback(const RecordCbSharedPtr& callback) override;

int32_t getCallbackCountForTest(const std::string& topic, const int32_t partition) const;

int32_t getRecordCountForTest(const std::string& topic, const int32_t partition) const;

private:
// Checks whether any of the callbacks stored right now are interested in the topic.
bool hasInterest(const std::string& topic) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(callbacks_mutex_);

// Helper function (passes all stored records to callback).
bool passRecordsToCallback(const RecordCbSharedPtr& callback);

// Helper function (passes partition records to callback).
bool passPartitionRecordsToCallback(const RecordCbSharedPtr& callback,
const KafkaPartition& kafka_partition)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(stored_records_mutex_);

// Helper function (real callback removal).
void doRemoveCallback(const RecordCbSharedPtr& callback)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(callbacks_mutex_);

/**
* Invariant - for every KafkaPartition, there may be callbacks for this partition,
* or there may be records for this partition, but never both at the same time.
*/

mutable absl::Mutex callbacks_mutex_;
PartitionMap<RecordCbSharedPtr> partition_to_callbacks_ ABSL_GUARDED_BY(callbacks_mutex_);

mutable absl::Mutex stored_records_mutex_;
PartitionMap<InboundRecordSharedPtr> stored_records_ ABSL_GUARDED_BY(stored_records_mutex_);
};

using RecordDistributorPtr = std::unique_ptr<RecordDistributor>;
Expand All @@ -51,9 +98,10 @@ class KafkaConsumerFactory {
};

/**
* Maintains a collection of Kafka consumers (one per topic).
* Maintains a collection of Kafka consumers (one per topic) and the real distributor instance.
*/
class SharedConsumerManagerImpl : public SharedConsumerManager,
class SharedConsumerManagerImpl : public RecordCallbackProcessor,
public SharedConsumerManager,
private Logger::Loggable<Logger::Id::kafka> {
public:
// Main constructor.
Expand All @@ -65,6 +113,12 @@ class SharedConsumerManagerImpl : public SharedConsumerManager,
Thread::ThreadFactory& thread_factory,
const KafkaConsumerFactory& consumer_factory);

// RecordCallbackProcessor
void processCallback(const RecordCbSharedPtr& callback) override;

// RecordCallbackProcessor
void removeCallback(const RecordCbSharedPtr& callback) override;

// SharedConsumerManager
void registerConsumerIfAbsent(const std::string& topic) override;

Expand Down
Loading