Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: add upstream-kafka-consumer #24431

Merged
merged 7 commits into from
Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "inbound_record_lib",
srcs = [
],
hdrs = [
"inbound_record.h",
],
tags = ["skip_on_windows"],
deps = [
],
)

envoy_cc_library(
name = "outbound_record_lib",
srcs = [
Expand Down Expand Up @@ -153,6 +165,36 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "upstream_kafka_consumer_lib",
srcs = [
],
hdrs = [
"upstream_kafka_consumer.h",
],
tags = ["skip_on_windows"],
deps = [
":inbound_record_lib",
],
)

envoy_cc_library(
name = "upstream_kafka_consumer_impl_lib",
srcs = [
"upstream_kafka_consumer_impl.cc",
],
hdrs = [
"upstream_kafka_consumer_impl.h",
],
tags = ["skip_on_windows"],
deps = [
":librdkafka_utils_impl_lib",
":upstream_kafka_consumer_lib",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "upstream_config_lib",
srcs = [
Expand Down
40 changes: 40 additions & 0 deletions contrib/kafka/filters/network/source/mesh/inbound_record.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include <memory>
#include <string>

#include "absl/strings/str_cat.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Simple structure representing the record received from upstream Kafka cluster.
*/
struct InboundRecord {

const std::string topic_;
const int32_t partition_;
const int64_t offset_;

// TODO (adam.kotwasinski) Get data in here in the next commits.

InboundRecord(std::string topic, int32_t partition, int64_t offset)
: topic_{topic}, partition_{partition}, offset_{offset} {};

// Used in logging.
std::string toString() const {
return absl::StrCat("[", topic_, "-", partition_, "/", offset_, "]");
}
};

using InboundRecordSharedPtr = std::shared_ptr<InboundRecord>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
23 changes: 23 additions & 0 deletions contrib/kafka/filters/network/source/mesh/librdkafka_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// Used by librdkafka API.
using RdKafkaMessageRawPtr = RdKafka::Message*;

using RdKafkaMessagePtr = std::unique_ptr<RdKafka::Message>;

/**
* Helper class to wrap librdkafka consumer partition assignment.
* This object has to live longer than whatever consumer that uses its "raw" data.
* On its own it does not expose any public API, as it is not intended to be interacted with.
*/
class ConsumerAssignment {
public:
virtual ~ConsumerAssignment() = default;
};

using ConsumerAssignmentConstPtr = std::unique_ptr<const ConsumerAssignment>;

/**
* Helper class responsible for creating librdkafka entities, so we can have mocks in tests.
*/
Expand Down Expand Up @@ -43,6 +60,12 @@ class LibRdKafkaUtils {

// In case of produce failures, we need to dispose of headers manually.
virtual void deleteHeaders(RdKafka::Headers* librdkafka_headers) const PURE;

// Assigns partitions to a consumer.
// Impl: this method was extracted so that raw-pointer vector does not appear in real code.
virtual ConsumerAssignmentConstPtr assignConsumerPartitions(RdKafka::KafkaConsumer& consumer,
const std::string& topic,
const int32_t partitions) const PURE;
};

using RawKafkaConfig = std::map<std::string, std::string>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// ConsumerAssignmentImpl

class ConsumerAssignmentImpl : public ConsumerAssignment {
public:
ConsumerAssignmentImpl(std::vector<RdKafkaPartitionPtr>&& assignment)
: assignment_{std::move(assignment)} {};

// The assignment in a form that librdkafka likes.
RdKafkaPartitionVector raw() const;

private:
const std::vector<RdKafkaPartitionPtr> assignment_;
};

RdKafkaPartitionVector ConsumerAssignmentImpl::raw() const {
RdKafkaPartitionVector result;
for (const auto& tp : assignment_) {
result.push_back(tp.get()); // Raw pointer.
}
return result;
}

// LibRdKafkaUtils

RdKafka::Conf::ConfResult LibRdKafkaUtilsImpl::setConfProperty(RdKafka::Conf& conf,
const std::string& name,
const std::string& value,
Expand Down Expand Up @@ -51,6 +75,25 @@ void LibRdKafkaUtilsImpl::deleteHeaders(RdKafka::Headers* librdkafka_headers) co
delete librdkafka_headers;
}

ConsumerAssignmentConstPtr LibRdKafkaUtilsImpl::assignConsumerPartitions(
RdKafka::KafkaConsumer& consumer, const std::string& topic, const int32_t partitions) const {

// Construct the topic-partition vector that we are going to store.
std::vector<RdKafkaPartitionPtr> assignment;
for (int partition = 0; partition < partitions; ++partition) {

// We consume records from the beginning of each partition.
const int64_t initial_offset = 0;
assignment.push_back(std::unique_ptr<RdKafka::TopicPartition>(
RdKafka::TopicPartition::create(topic, partition, initial_offset)));
}
auto result = std::make_unique<ConsumerAssignmentImpl>(std::move(assignment));

// Do the assignment.
consumer.assign(result->raw());
return result;
}

const LibRdKafkaUtils& LibRdKafkaUtilsImpl::getDefaultInstance() {
CONSTRUCT_ON_FIRST_USE(LibRdKafkaUtilsImpl);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <vector>

#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils.h"

namespace Envoy {
Expand All @@ -8,6 +10,9 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

using RdKafkaPartitionPtr = std::unique_ptr<RdKafka::TopicPartition>;
using RdKafkaPartitionVector = std::vector<RdKafka::TopicPartition*>;

/**
* Real implementation that just performs librdkafka operations.
*/
Expand Down Expand Up @@ -38,12 +43,15 @@ class LibRdKafkaUtilsImpl : public LibRdKafkaUtils {
// LibRdKafkaUtils
void deleteHeaders(RdKafka::Headers* librdkafka_headers) const override;

// LibRdKafkaUtils
ConsumerAssignmentConstPtr assignConsumerPartitions(RdKafka::KafkaConsumer& consumer,
const std::string& topic,
const int32_t partitions) const override;

// Default singleton accessor.
static const LibRdKafkaUtils& getDefaultInstance();
};

using RawKafkaConfig = std::map<std::string, std::string>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#pragma once

#include <memory>
#include <utility>
#include <vector>

#include "envoy/common/pure.h"

#include "contrib/kafka/filters/network/source/mesh/inbound_record.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* An entity that is interested in inbound records delivered by Kafka consumer.
*/
class InboundRecordProcessor {
public:
virtual ~InboundRecordProcessor() = default;

/**
* Passes the record to the processor.
*/
virtual void receive(InboundRecordSharedPtr message) PURE;

/**
* Blocks until there is interest in records in a given topic, or timeout expires.
* Conceptually a thick condition variable.
* @return true if there was interest.
*/
virtual bool waitUntilInterest(const std::string& topic, const int32_t timeout_ms) const PURE;
};

using InboundRecordProcessorPtr = std::unique_ptr<InboundRecordProcessor>;

/**
* Kafka consumer pointing to some upstream Kafka cluster.
*/
class KafkaConsumer {
public:
virtual ~KafkaConsumer() = default;
};

using KafkaConsumerPtr = std::unique_ptr<KafkaConsumer>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Loading