Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: fetch record converter #25300

Merged
merged 10 commits into from
Feb 10, 2023
1 change: 1 addition & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ envoy_cc_library(
deps = [
":librdkafka_utils_impl_lib",
":upstream_kafka_consumer_lib",
"//contrib/kafka/filters/network/source:kafka_types_lib",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ envoy_cc_library(
tags = ["skip_on_windows"],
deps = [
"//contrib/kafka/filters/network/source:kafka_response_parser_lib",
"//contrib/kafka/filters/network/source:serialization_lib",
"//contrib/kafka/filters/network/source/mesh:inbound_record_lib",
],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,194 @@
#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h"

#include "contrib/kafka/filters/network/source/serialization.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() {
CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl);
}

std::vector<FetchableTopicResponse>
FetchRecordConverterImpl::convert(const InboundRecordsMap&) const {
FetchRecordConverterImpl::convert(const InboundRecordsMap& arg) const {

// Compute record batches.
std::map<KafkaPartition, Bytes> record_batches;
for (const auto& partition_and_records : arg) {
const KafkaPartition& kp = partition_and_records.first;
const std::vector<InboundRecordSharedPtr>& partition_records = partition_and_records.second;
const Bytes batch = renderRecordBatch(partition_records);
record_batches[kp] = batch;
}

// TODO (adam.kotwasinski) This needs to be actually implemented.
return {};
// Transform our maps into the Kafka structs.
std::map<std::string, std::vector<FetchResponseResponsePartitionData>> topic_to_frrpd;
for (const auto& record_batch : record_batches) {
const std::string& topic_name = record_batch.first.first;
const int32_t partition = record_batch.first.second;

std::vector<FetchResponseResponsePartitionData>& frrpds = topic_to_frrpd[topic_name];
const int16_t error_code = 0;
const int64_t high_watermark = 0;
const auto frrpd = FetchResponseResponsePartitionData{partition, error_code, high_watermark,
absl::make_optional(record_batch.second)};

frrpds.push_back(frrpd);
}

std::vector<FetchableTopicResponse> result;
for (const auto& partition_and_records : topic_to_frrpd) {
const std::string& topic_name = partition_and_records.first;
const auto ftr = FetchableTopicResponse{topic_name, partition_and_records.second};
result.push_back(ftr);
}
return result;
}

const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() {
CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl);
// Magic format introduced around Kafka 1.0.0 and still used with Kafka 3.3.
constexpr int8_t MAGIC = 2;

Bytes FetchRecordConverterImpl::renderRecordBatch(
const std::vector<InboundRecordSharedPtr>& records) const {

Bytes result = {};

// Base offset (bytes 0..7).
const int64_t base_offset = htobe64(0);
const unsigned char* base_offset_b = reinterpret_cast<const unsigned char*>(&base_offset);
result.insert(result.end(), base_offset_b, base_offset_b + sizeof(base_offset));

// Batch length placeholder (bytes 8..11).
result.insert(result.end(), {0, 0, 0, 0});

// All other attributes (spans partitionLeaderEpoch .. baseSequence) (bytes 12..56).
const std::vector zeros(45, 0);
result.insert(result.end(), zeros.begin(), zeros.end());

// Last offset delta.
// -1 means we always claim that we are at the beginning of partition.
const int32_t last_offset_delta = htobe32(-1);
const unsigned char* last_offset_delta_bytes =
reinterpret_cast<const unsigned char*>(&last_offset_delta);
const auto last_offset_delta_pos = result.begin() + 8 + 4 + 11;
std::copy(last_offset_delta_bytes, last_offset_delta_bytes + sizeof(last_offset_delta),
last_offset_delta_pos);

// Records (count) (bytes 57..60).
const int32_t record_count = htobe32(records.size());
const unsigned char* record_count_b = reinterpret_cast<const unsigned char*>(&record_count);
result.insert(result.end(), record_count_b, record_count_b + sizeof(record_count));

// Records (data) (bytes 61+).
for (const auto& record : records) {
appendRecord(*record, result);
}

// Set batch length.
const int32_t batch_len = htobe32(result.size() - (sizeof(base_offset) + sizeof(batch_len)));
const unsigned char* batch_len_bytes = reinterpret_cast<const unsigned char*>(&batch_len);
std::copy(batch_len_bytes, batch_len_bytes + sizeof(batch_len),
result.begin() + sizeof(base_offset));

// Set magic.
const uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t);
result[magic_offset] = MAGIC;

// Compute and set CRC.
const uint32_t crc_offset = magic_offset + 1;
const auto crc_data_start = result.data() + crc_offset + sizeof(int32_t);
const auto crc_data_len = result.size() - (crc_offset + sizeof(int32_t));
const Bytes crc = renderCrc32c(crc_data_start, crc_data_len);
std::copy(crc.begin(), crc.end(), result.begin() + crc_offset);

return result;
}

void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out) const {

Bytes tmp = {};
// This is not precise maths, as we could be over-reserving a little due to var-length fields.
tmp.reserve(sizeof(int8_t) + sizeof(int64_t) + sizeof(int32_t) + record.dataLengthEstimate());

// attributes: int8
const int8_t attributes = 0;
tmp.push_back(static_cast<unsigned char>(attributes));

// timestampDelta: varlong
const int64_t timestamp_delta = 0;
VarlenUtils::writeVarlong(timestamp_delta, tmp);

// offsetDelta: varint
const int32_t offset_delta = record.offset_;
VarlenUtils::writeVarint(offset_delta, tmp);

// Impl note: compared to requests/responses, records serialize byte arrays as varint length +
// bytes (and not length + 1, then bytes). So we cannot use EncodingContext from serialization.h.

// keyLength: varint
// key: byte[]
const NullableBytes& key = record.key_;
if (key.has_value()) {
VarlenUtils::writeVarint(key->size(), tmp);
tmp.insert(tmp.end(), key->begin(), key->end());
} else {
VarlenUtils::writeVarint(-1, tmp);
}

// valueLen: varint
// value: byte[]
const NullableBytes& value = record.value_;
if (value.has_value()) {
VarlenUtils::writeVarint(value->size(), tmp);
tmp.insert(tmp.end(), value->begin(), value->end());
} else {
VarlenUtils::writeVarint(-1, tmp);
}

// TODO (adam.kotwasinski) Headers are not supported yet.
const int32_t header_count = 0;
VarlenUtils::writeVarint(header_count, tmp);

// Put tmp's length into 'out'.
VarlenUtils::writeVarint(tmp.size(), out);

// Put tmp's contents into 'out'.
out.insert(out.end(), tmp.begin(), tmp.end());
}

// XXX (adam.kotwasinski) Instead of computing it naively, either link against librdkafka's
// implementation or generate it.
// https://github.com/confluentinc/librdkafka/blob/v1.8.0/src/crc32c.c#L1
uint32_t FetchRecordConverterImpl::computeCrc32c(const unsigned char* data, const size_t len) {
uint32_t crc = 0xFFFFFFFF;
for (size_t i = 0; i < len; i++) {
char ch = data[i];
for (size_t j = 0; j < 8; j++) {
uint32_t b = (ch ^ crc) & 1;
crc >>= 1;
if (b) {
crc = crc ^ 0x82F63B78;
}
ch >>= 1;
}
}
return ~crc;
}

uint32_t FetchRecordConverterImpl::computeCrc32cForTest(const unsigned char* data,
const size_t len) {
return computeCrc32c(data, len);
}

Bytes FetchRecordConverterImpl::renderCrc32c(const unsigned char* data, const size_t len) const {
uint32_t crc = htobe32(computeCrc32c(data, len));
Bytes result;
unsigned char* raw = reinterpret_cast<unsigned char*>(&crc);
result.insert(result.end(), raw, raw + sizeof(crc));
return result;
}

} // namespace Mesh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ class FetchRecordConverterImpl : public FetchRecordConverter {

// Default singleton accessor.
static const FetchRecordConverter& getDefaultInstance();

static uint32_t computeCrc32cForTest(const unsigned char* data, const size_t len);

private:
// Helper function: transform records from a partition into a record batch.
// See: https://kafka.apache.org/33/documentation.html#recordbatch
Bytes renderRecordBatch(const std::vector<InboundRecordSharedPtr>& records) const;

// Helper function: append record to output array.
// See: https://kafka.apache.org/33/documentation.html#record
// https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L164
void appendRecord(const InboundRecord& record, Bytes& out) const;

// Helper function: render CRC32C bytes from given input.
Bytes renderCrc32c(const unsigned char* data, const size_t len) const;

// Helper function: compute CRC32C.
static uint32_t computeCrc32c(const unsigned char* data, const size_t len);
};

} // namespace Mesh
Expand Down
17 changes: 14 additions & 3 deletions contrib/kafka/filters/network/source/mesh/inbound_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string>

#include "absl/strings/str_cat.h"
#include "contrib/kafka/filters/network/source/kafka_types.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -20,10 +21,20 @@ struct InboundRecord {
const int32_t partition_;
const int64_t offset_;

// TODO (adam.kotwasinski) Get data in here in the next commits.
const NullableBytes key_;
const NullableBytes value_;

InboundRecord(std::string topic, int32_t partition, int64_t offset)
: topic_{topic}, partition_{partition}, offset_{offset} {};
InboundRecord(const std::string& topic, const int32_t partition, const int64_t offset,
const NullableBytes& key, const NullableBytes& value)
: topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {};

// Estimates how many bytes this record would take.
uint32_t dataLengthEstimate() const {
uint32_t result = 15; // Max key length, value lenght, header count.
result += key_ ? key_->size() : 0;
result += value_ ? value_->size() : 0;
return result;
}

// Used in logging.
std::string toString() const {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h"

#include "contrib/kafka/filters/network/source/kafka_types.h"
#include "contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h"

namespace Envoy {
Expand Down Expand Up @@ -99,12 +100,27 @@ void RichKafkaConsumer::runWorkerLoop() {
ENVOY_LOG(debug, "Worker thread for consumer [{}] finished", topic_);
}

// Helper method, converts byte array.
static NullableBytes toBytes(const void* data, const size_t size) {
const unsigned char* as_char = static_cast<const unsigned char*>(data);
if (data) {
Bytes bytes(as_char, as_char + size);
return {bytes};
} else {
return absl::nullopt;
}
}

// Helper method, gets rid of librdkafka.
static InboundRecordSharedPtr transform(RdKafkaMessagePtr arg) {
const auto topic = arg->topic_name();
const auto partition = arg->partition();
const auto offset = arg->offset();
return std::make_shared<InboundRecord>(topic, partition, offset);

const NullableBytes key = toBytes(arg->key_pointer(), arg->key_len());
const NullableBytes value = toBytes(arg->payload(), arg->len());

return std::make_shared<InboundRecord>(topic, partition, offset, key, value);
}

std::vector<InboundRecordSharedPtr> RichKafkaConsumer::receiveRecordBatch() {
Expand Down
49 changes: 49 additions & 0 deletions contrib/kafka/filters/network/source/serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,55 @@ NullableBytes NullableCompactBytesDeserializer::get() const {
}
}

namespace VarlenUtils {

uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst) {
uint32_t value = arg;

uint32_t elements_with_1 = 0;
// As long as there are bits set on indexes 8 or higher (counting from 1).
while ((value & ~(0x7f)) != 0) {
// Save next 7-bit batch with highest bit set.
const uint8_t el = (value & 0x7f) | 0x80;
dst.push_back(el);
value >>= 7;
elements_with_1++;
}

// After the loop has finished, we are certain that bit 8 = 0, so we can just add final element.
const uint8_t el = value;
dst.push_back(el);

return elements_with_1 + 1;
}

uint32_t writeVarint(const int32_t arg, Bytes& dst) {
uint32_t zz = (static_cast<uint32_t>(arg) << 1) ^ (arg >> 31); // Zig-zag.
return writeUnsignedVarint(zz, dst);
}

uint32_t writeVarlong(const int64_t arg, Bytes& dst) {
uint64_t value = (static_cast<uint64_t>(arg) << 1) ^ (arg >> 63); // Zig-zag.

uint32_t elements_with_1 = 0;
// As long as there are bits set on indexes 8 or higher (counting from 1).
while ((value & ~(0x7f)) != 0) {
// Save next 7-bit batch with highest bit set.
const uint8_t el = (value & 0x7f) | 0x80;
dst.push_back(el);
value >>= 7;
elements_with_1++;
}

// After the loop has finished, we are certain that bit 8 = 0, so we can just add final element.
const uint8_t el = value;
dst.push_back(el);

return elements_with_1 + 1;
}

} // namespace VarlenUtils

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
Loading