Skip to content

Commit

Permalink
kafka: add upstream-kafka-consumer (#24431)
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Kotwasinski <adam.kotwasinski@gmail.com>
Signed-off-by: Ryan Northey <ryan@synca.io>
  • Loading branch information
adamkotwasinski authored and phlax committed Jun 2, 2023
1 parent 76fa8bd commit fad1110
Show file tree
Hide file tree
Showing 12 changed files with 694 additions and 39 deletions.
42 changes: 42 additions & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "inbound_record_lib",
srcs = [
],
hdrs = [
"inbound_record.h",
],
tags = ["skip_on_windows"],
deps = [
],
)

envoy_cc_library(
name = "outbound_record_lib",
srcs = [
Expand Down Expand Up @@ -152,6 +164,36 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "upstream_kafka_consumer_lib",
srcs = [
],
hdrs = [
"upstream_kafka_consumer.h",
],
tags = ["skip_on_windows"],
deps = [
":inbound_record_lib",
],
)

envoy_cc_library(
name = "upstream_kafka_consumer_impl_lib",
srcs = [
"upstream_kafka_consumer_impl.cc",
],
hdrs = [
"upstream_kafka_consumer_impl.h",
],
tags = ["skip_on_windows"],
deps = [
":librdkafka_utils_impl_lib",
":upstream_kafka_consumer_lib",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "upstream_config_lib",
srcs = [
Expand Down
40 changes: 40 additions & 0 deletions contrib/kafka/filters/network/source/mesh/inbound_record.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include <memory>
#include <string>

#include "absl/strings/str_cat.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Simple structure representing the record received from upstream Kafka cluster.
*/
struct InboundRecord {

const std::string topic_;
const int32_t partition_;
const int64_t offset_;

// TODO (adam.kotwasinski) Get data in here in the next commits.

InboundRecord(std::string topic, int32_t partition, int64_t offset)
: topic_{topic}, partition_{partition}, offset_{offset} {};

// Used in logging.
std::string toString() const {
return absl::StrCat("[", topic_, "-", partition_, "/", offset_, "]");
}
};

using InboundRecordSharedPtr = std::shared_ptr<InboundRecord>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
23 changes: 23 additions & 0 deletions contrib/kafka/filters/network/source/mesh/librdkafka_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// Used by librdkafka API.
using RdKafkaMessageRawPtr = RdKafka::Message*;

using RdKafkaMessagePtr = std::unique_ptr<RdKafka::Message>;

/**
* Helper class to wrap librdkafka consumer partition assignment.
* This object has to live longer than whatever consumer that uses its "raw" data.
* On its own it does not expose any public API, as it is not intended to be interacted with.
*/
class ConsumerAssignment {
public:
virtual ~ConsumerAssignment() = default;
};

using ConsumerAssignmentConstPtr = std::unique_ptr<const ConsumerAssignment>;

/**
* Helper class responsible for creating librdkafka entities, so we can have mocks in tests.
*/
Expand Down Expand Up @@ -43,6 +60,12 @@ class LibRdKafkaUtils {

// In case of produce failures, we need to dispose of headers manually.
virtual void deleteHeaders(RdKafka::Headers* librdkafka_headers) const PURE;

// Assigns partitions to a consumer.
// Impl: this method was extracted so that raw-pointer vector does not appear in real code.
virtual ConsumerAssignmentConstPtr assignConsumerPartitions(RdKafka::KafkaConsumer& consumer,
const std::string& topic,
const int32_t partitions) const PURE;
};

using RawKafkaConfig = std::map<std::string, std::string>;
Expand Down
43 changes: 43 additions & 0 deletions contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// ConsumerAssignmentImpl

class ConsumerAssignmentImpl : public ConsumerAssignment {
public:
ConsumerAssignmentImpl(std::vector<RdKafkaPartitionPtr>&& assignment)
: assignment_{std::move(assignment)} {};

// The assignment in a form that librdkafka likes.
RdKafkaPartitionVector raw() const;

private:
const std::vector<RdKafkaPartitionPtr> assignment_;
};

RdKafkaPartitionVector ConsumerAssignmentImpl::raw() const {
RdKafkaPartitionVector result;
for (const auto& tp : assignment_) {
result.push_back(tp.get()); // Raw pointer.
}
return result;
}

// LibRdKafkaUtils

RdKafka::Conf::ConfResult LibRdKafkaUtilsImpl::setConfProperty(RdKafka::Conf& conf,
const std::string& name,
const std::string& value,
Expand Down Expand Up @@ -51,6 +75,25 @@ void LibRdKafkaUtilsImpl::deleteHeaders(RdKafka::Headers* librdkafka_headers) co
delete librdkafka_headers;
}

ConsumerAssignmentConstPtr LibRdKafkaUtilsImpl::assignConsumerPartitions(
RdKafka::KafkaConsumer& consumer, const std::string& topic, const int32_t partitions) const {

// Construct the topic-partition vector that we are going to store.
std::vector<RdKafkaPartitionPtr> assignment;
for (int partition = 0; partition < partitions; ++partition) {

// We consume records from the beginning of each partition.
const int64_t initial_offset = 0;
assignment.push_back(std::unique_ptr<RdKafka::TopicPartition>(
RdKafka::TopicPartition::create(topic, partition, initial_offset)));
}
auto result = std::make_unique<ConsumerAssignmentImpl>(std::move(assignment));

// Do the assignment.
consumer.assign(result->raw());
return result;
}

const LibRdKafkaUtils& LibRdKafkaUtilsImpl::getDefaultInstance() {
CONSTRUCT_ON_FIRST_USE(LibRdKafkaUtilsImpl);
}
Expand Down
12 changes: 10 additions & 2 deletions contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <vector>

#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils.h"

namespace Envoy {
Expand All @@ -8,6 +10,9 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

using RdKafkaPartitionPtr = std::unique_ptr<RdKafka::TopicPartition>;
using RdKafkaPartitionVector = std::vector<RdKafka::TopicPartition*>;

/**
* Real implementation that just performs librdkafka operations.
*/
Expand Down Expand Up @@ -38,12 +43,15 @@ class LibRdKafkaUtilsImpl : public LibRdKafkaUtils {
// LibRdKafkaUtils
void deleteHeaders(RdKafka::Headers* librdkafka_headers) const override;

// LibRdKafkaUtils
ConsumerAssignmentConstPtr assignConsumerPartitions(RdKafka::KafkaConsumer& consumer,
const std::string& topic,
const int32_t partitions) const override;

// Default singleton accessor.
static const LibRdKafkaUtils& getDefaultInstance();
};

using RawKafkaConfig = std::map<std::string, std::string>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#pragma once

#include <memory>
#include <utility>
#include <vector>

#include "envoy/common/pure.h"

#include "contrib/kafka/filters/network/source/mesh/inbound_record.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* An entity that is interested in inbound records delivered by Kafka consumer.
*/
class InboundRecordProcessor {
public:
virtual ~InboundRecordProcessor() = default;

/**
* Passes the record to the processor.
*/
virtual void receive(InboundRecordSharedPtr message) PURE;

/**
* Blocks until there is interest in records in a given topic, or timeout expires.
* Conceptually a thick condition variable.
* @return true if there was interest.
*/
virtual bool waitUntilInterest(const std::string& topic, const int32_t timeout_ms) const PURE;
};

using InboundRecordProcessorPtr = std::unique_ptr<InboundRecordProcessor>;

/**
* Kafka consumer pointing to some upstream Kafka cluster.
*/
class KafkaConsumer {
public:
virtual ~KafkaConsumer() = default;
};

using KafkaConsumerPtr = std::unique_ptr<KafkaConsumer>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading

0 comments on commit fad1110

Please sign in to comment.