From 1076ab6d36535f35f8fc418d023ae33cdeefae84 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Sat, 10 Dec 2022 22:25:47 -0800 Subject: [PATCH 01/10] kafka: fetch record converter Signed-off-by: Adam Kotwasinski --- .../source/mesh/command_handlers/BUILD | 2 + .../fetch_record_converter.cc | 167 +++++++++++++++++- .../command_handlers/fetch_record_converter.h | 16 +- .../filters/network/source/serialization.h | 50 ++++++ 4 files changed, 229 insertions(+), 6 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD index 7b0ee1d20800..1476fae6ab5e 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD @@ -75,7 +75,9 @@ envoy_cc_library( tags = ["skip_on_windows"], deps = [ "//contrib/kafka/filters/network/source:kafka_response_parser_lib", + "//contrib/kafka/filters/network/source:serialization_lib", "//contrib/kafka/filters/network/source/mesh:inbound_record_lib", + "//source/common/buffer:buffer_lib", ], ) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index a2c2d3a68669..58c1cad0ca04 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -1,20 +1,177 @@ #include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" +#include "source/common/buffer/buffer_impl.h" + +#include "contrib/kafka/filters/network/source/serialization.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace Kafka { namespace Mesh { +const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() { + CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl); +} + std::vector -FetchRecordConverterImpl::convert(const InboundRecordsMap&) const { +FetchRecordConverterImpl::convert(const InboundRecordsMap& arg) const { + + // Compute record batches. + std::map record_batches; + for (const auto& partition_and_records : arg) { + const KafkaPartition& kp = partition_and_records.first; + const std::vector& partition_records = partition_and_records.second; + const Bytes batch = renderRecordBatch(partition_records); + record_batches[kp] = batch; + } + + // Transform our maps into the Kafka structs. + std::map> topic_to_frrpd; + for (const auto& record_batch : record_batches) { + const std::string& topic_name = record_batch.first.first; + const int32_t partition = record_batch.first.second; + + std::vector& frrpds = topic_to_frrpd[topic_name]; + const int16_t error_code = 0; + const int64_t high_watermark = 0; + const auto frrpd = FetchResponseResponsePartitionData{partition, error_code, high_watermark, + absl::make_optional(record_batch.second)}; + + frrpds.push_back(frrpd); + } - // TODO (adam.kotwasinski) This needs to be actually implemented. - return {}; + std::vector result; + for (const auto& partition_and_records : topic_to_frrpd) { + const std::string& topic_name = partition_and_records.first; + const auto ftr = FetchableTopicResponse{topic_name, partition_and_records.second}; + result.push_back(ftr); + } + return result; } -const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() { - CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl); +Bytes FetchRecordConverterImpl::renderRecordBatch( + const std::vector& records) const { + + Bytes result = {}; + + // Base offset. + const int64_t base_offset = htobe64(0); + const unsigned char* base_offset_b = reinterpret_cast(&base_offset); + result.insert(result.end(), base_offset_b, base_offset_b + sizeof(base_offset)); + + // Batch length placeholder. + result.insert(result.end(), {0, 0, 0, 0}); + + // All other attributes (spans partitionLeaderEpoch .. baseSequence). + const std::vector zeros(45, 0); + result.insert(result.end(), zeros.begin(), zeros.end()); + + // Last offset delta. + // -1 means we always claim that we are at the beginning of partition. + const int32_t last_offset_delta = htobe32(-1); + const unsigned char* last_offset_delta_bytes = + reinterpret_cast(&last_offset_delta); + const auto last_offset_delta_pos = result.begin() + 8 + 4 + 11; + std::copy(last_offset_delta_bytes, last_offset_delta_bytes + sizeof(last_offset_delta), + last_offset_delta_pos); + + // Records (count). + const int32_t record_count = htobe32(records.size()); + const unsigned char* record_count_b = reinterpret_cast(&record_count); + result.insert(result.end(), record_count_b, record_count_b + sizeof(record_count)); + + // Records (data). + for (const auto& record : records) { + appendRecord(*record, result); + } + + // Set batch length. + const int32_t batch_len = htobe32(result.size() - (sizeof(base_offset) + sizeof(batch_len))); + const unsigned char* batch_len_bytes = reinterpret_cast(&batch_len); + std::copy(batch_len_bytes, batch_len_bytes + sizeof(batch_len), + result.begin() + sizeof(base_offset)); + + // Set magic. + constexpr uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t); + result[magic_offset] = 2; + + // Compute and set CRC. + constexpr uint32_t crc_offset = magic_offset + 1; + const auto crc_data_start = result.data() + crc_offset + sizeof(int32_t); + const auto crc_data_len = result.size() - (crc_offset + sizeof(int32_t)); + const Bytes crc = renderCrc(crc_data_start, crc_data_len); + std::copy(crc.begin(), crc.end(), result.begin() + crc_offset); + + return result; +} + +void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out) const { + + Buffer::OwnedImpl buffer; + + // attributes: int8 + constexpr int8_t attributes = 0; + buffer.add(&attributes, sizeof(int8_t)); + + // timestampDelta: varlong + constexpr int64_t timestamp_delta = 0; + Statics::writeVarlong(timestamp_delta, buffer); + + // offsetDelta: varint + const int32_t offset_delta = record.offset_; + Statics::writeVarint(offset_delta, buffer); + + // keyLength: varint + const int32_t key_length = 0; + Statics::writeVarint(key_length, buffer); + + // key: byte[] + // ??? + + // valueLen: varint + const int32_t value_length = 0; + Statics::writeVarint(value_length, buffer); + + // value: byte[] + // ??? + + // TODO (adam.kotwasinski) Headers are not supported yet. + const int32_t header_count = 0; + Statics::writeVarint(header_count, buffer); + + // XXX (adam.kotwasinski) This might be less than efficient. Improve it later. + Buffer::OwnedImpl length_buffer; + Statics::writeVarint(buffer.length(), length_buffer); + buffer.prepend(length_buffer); + + // Finish: put buffer's contents into the 'out' variable. + const auto buf_len = buffer.length(); + void* linearized = buffer.linearize(buf_len); + unsigned char* raw = static_cast(linearized); + out.insert(out.end(), raw, raw + buf_len); +} + +Bytes FetchRecordConverterImpl::renderCrc(const unsigned char* data, const size_t len) const { + uint32_t crc = 0xFFFFFFFF; + for (size_t i = 0; i < len; i++) { + char ch = data[i]; + for (size_t j = 0; j < 8; j++) { + uint32_t b = (ch ^ crc) & 1; + crc >>= 1; + if (b) { + crc = crc ^ 0x82F63B78; + } + ch >>= 1; + } + } + crc = ~crc; + crc = htobe32(crc); + + Bytes result; + unsigned char* raw = reinterpret_cast(&crc); + result.insert(result.end(), raw, raw + sizeof(crc)); + return result; } } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h index fe5103a8b814..46273274c354 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h @@ -31,13 +31,27 @@ class FetchRecordConverter { /** * Proper implementation. */ -class FetchRecordConverterImpl : public FetchRecordConverter { +class FetchRecordConverterImpl : public FetchRecordConverter, + private Logger::Loggable { public: // FetchRecordConverter std::vector convert(const InboundRecordsMap& arg) const override; // Default singleton accessor. static const FetchRecordConverter& getDefaultInstance(); + +private: + // Helper function: transform records from a partition into a record batch. + // See: https://kafka.apache.org/33/documentation.html#recordbatch + Bytes renderRecordBatch(const std::vector& records) const; + + // Helper function: append record to output array. + // See: https://kafka.apache.org/33/documentation.html#record + // https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L164 + void appendRecord(const InboundRecord& record, Bytes& out) const; + + // Helper function: render CRC32C bytes from given input. + Bytes renderCrc(const unsigned char* data, const size_t len) const; }; } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index 29a9a01d3924..a95ea28f1092 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -1502,6 +1502,56 @@ uint32_t EncodingContext::encodeCompact(const NullableArray& arg, Buffer::Ins } } +class Statics { +public: + // org.apache.kafka.common.utils.ByteUtils.writeUnsignedVarint(int, ByteBuffer) + static uint32_t writeUnsignedVarint(const uint32_t& arg, Buffer::Instance& dst) { + uint32_t value = arg; + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.add(&el, sizeof(uint8_t)); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.add(&el, sizeof(uint8_t)); + + return elements_with_1 + 1; + } + + // org.apache.kafka.common.utils.ByteUtils.writeVarint(int, ByteBuffer) + static uint32_t writeVarint(const int32_t arg, Buffer::Instance& dst) { + return writeUnsignedVarint((arg << 1) ^ (arg >> 31), dst); + } + + // org.apache.kafka.common.utils.ByteUtils.writeVarlong(long, ByteBuffer) + static uint32_t writeVarlong(const int64_t& arg, Buffer::Instance& dst) { + int64_t value = (arg << 1) ^ (arg >> 63); + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.add(&el, sizeof(uint8_t)); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.add(&el, sizeof(uint8_t)); + + return elements_with_1 + 1; + } +}; + } // namespace Kafka } // namespace NetworkFilters } // namespace Extensions From a418fccfd86adba59ee9978ea3f899c7d60b75f4 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Fri, 3 Feb 2023 15:20:52 -0800 Subject: [PATCH 02/10] kafka: add key/value fields to InboundRecord (== finally send the data downstream) Signed-off-by: Adam Kotwasinski --- .../kafka/filters/network/source/mesh/BUILD | 1 + .../fetch_record_converter.cc | 22 ++++++++++------ .../network/source/mesh/inbound_record.h | 26 ++++++++++++++++--- .../mesh/upstream_kafka_consumer_impl.cc | 18 ++++++++++++- .../shared_consumer_manager_impl_unit_test.cc | 2 +- 5 files changed, 56 insertions(+), 13 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/BUILD b/contrib/kafka/filters/network/source/mesh/BUILD index 668995d99f14..35706ea86c75 100644 --- a/contrib/kafka/filters/network/source/mesh/BUILD +++ b/contrib/kafka/filters/network/source/mesh/BUILD @@ -224,6 +224,7 @@ envoy_cc_library( deps = [ ":librdkafka_utils_impl_lib", ":upstream_kafka_consumer_lib", + "//contrib/kafka/filters/network/source:kafka_types_lib", "//envoy/event:dispatcher_interface", "//source/common/common:minimal_logger_lib", ], diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index 58c1cad0ca04..80efeaf281ff 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -123,18 +123,24 @@ void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& Statics::writeVarint(offset_delta, buffer); // keyLength: varint - const int32_t key_length = 0; - Statics::writeVarint(key_length, buffer); - // key: byte[] - // ??? + const absl::string_view key = record.key(); + if (!key.empty()) { + Statics::writeVarint(key.size(), buffer); + buffer.add(key); + } else { + Statics::writeVarint(-1, buffer); + } // valueLen: varint - const int32_t value_length = 0; - Statics::writeVarint(value_length, buffer); - // value: byte[] - // ??? + const absl::string_view value = record.value(); + if (!value.empty()) { + Statics::writeVarint(value.size(), buffer); + buffer.add(record.value()); + } else { + Statics::writeVarint(-1, buffer); + } // TODO (adam.kotwasinski) Headers are not supported yet. const int32_t header_count = 0; diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h index 83cd1d61fbcf..f37228dbcddd 100644 --- a/contrib/kafka/filters/network/source/mesh/inbound_record.h +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -4,6 +4,8 @@ #include #include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "contrib/kafka/filters/network/source/kafka_types.h" namespace Envoy { namespace Extensions { @@ -20,10 +22,28 @@ struct InboundRecord { const int32_t partition_; const int64_t offset_; - // TODO (adam.kotwasinski) Get data in here in the next commits. + const NullableBytes key_; + const NullableBytes value_; - InboundRecord(std::string topic, int32_t partition, int64_t offset) - : topic_{topic}, partition_{partition}, offset_{offset} {}; + InboundRecord(std::string topic, int32_t partition, int64_t offset, NullableBytes key, + NullableBytes value) + : topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {}; + + absl::string_view key() const { + if (key_) { + return {reinterpret_cast(key_->data()), key_->size()}; + } else { + return {}; + } + } + + absl::string_view value() const { + if (value_) { + return {reinterpret_cast(value_->data()), value_->size()}; + } else { + return {}; + } + } // Used in logging. std::string toString() const { diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc index 4c3dfaeab005..a6ba7a3e1d71 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc @@ -1,5 +1,6 @@ #include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h" +#include "contrib/kafka/filters/network/source/kafka_types.h" #include "contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h" namespace Envoy { @@ -99,12 +100,27 @@ void RichKafkaConsumer::runWorkerLoop() { ENVOY_LOG(debug, "Worker thread for consumer [{}] finished", topic_); } +// Helper method, converts byte array. +static NullableBytes toBytes(const void* data, const size_t size) { + const unsigned char* as_char = static_cast(data); + if (data) { + Bytes bytes(as_char, as_char + size); + return {bytes}; + } else { + return absl::nullopt; + } +} + // Helper method, gets rid of librdkafka. static InboundRecordSharedPtr transform(RdKafkaMessagePtr arg) { const auto topic = arg->topic_name(); const auto partition = arg->partition(); const auto offset = arg->offset(); - return std::make_shared(topic, partition, offset); + + const NullableBytes key = toBytes(arg->key_pointer(), arg->key_len()); + const NullableBytes value = toBytes(arg->payload(), arg->len()); + + return std::make_shared(topic, partition, offset, key, value); } std::vector RichKafkaConsumer::receiveRecordBatch() { diff --git a/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc b/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc index a2f4ddba6dcb..74b80f1cd32d 100644 --- a/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc @@ -148,7 +148,7 @@ class RecordDistributorTest : public testing::Test { } InboundRecordSharedPtr makeRecord(const std::string& topic, const int32_t partition) { - return std::make_shared(topic, partition, 0); + return std::make_shared(topic, partition, 0, absl::nullopt, absl::nullopt); } }; From e4e697a2389f0bf49c7d8a4fb9e42a5a30fca407 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Fri, 3 Feb 2023 16:19:07 -0800 Subject: [PATCH 03/10] kafka: impl comment Signed-off-by: Adam Kotwasinski --- .../source/mesh/command_handlers/fetch_record_converter.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index 80efeaf281ff..de08488dae82 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -122,6 +122,9 @@ void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& const int32_t offset_delta = record.offset_; Statics::writeVarint(offset_delta, buffer); + // Impl note: compared to requests/responses, records serialize byte arrays as varint length + + // bytes (and not length + 1, then bytes). So we cannot use EncodingContext from serialization.h. + // keyLength: varint // key: byte[] const absl::string_view key = record.key(); From 838629337a7b7283c8cacd4fd928956c157338ea Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Fri, 3 Feb 2023 17:04:44 -0800 Subject: [PATCH 04/10] kafka: initial FRC unit tests Signed-off-by: Adam Kotwasinski --- .../network/test/mesh/command_handlers/BUILD | 9 ++++ .../fetch_record_converter_unit_test.cc | 48 +++++++++++++++++++ .../mesh/command_handlers/fetch_unit_test.cc | 4 +- 3 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD index 154b7bfb696d..c9ea469752f4 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD @@ -39,6 +39,15 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "fetch_record_converter_unit_test", + srcs = ["fetch_record_converter_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh/command_handlers:fetch_record_converter_lib", + ], +) + envoy_cc_test( name = "list_offsets_unit_test", srcs = ["list_offsets_unit_test.cc"], diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc new file mode 100644 index 000000000000..96e7ccbb47f3 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc @@ -0,0 +1,48 @@ +#include + +#include "test/test_common/utility.h" + +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +TEST(FetchRecordConverterImpl, shouldProcessEmptyInput) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + const InboundRecordsMap input = {}; + + // when + const auto result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 0); +} + +TEST(FetchRecordConverterImpl, shouldProcessInputWithNoRecords) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + InboundRecordsMap input = {}; + input[{"aaa", 0}] = {}; + input[{"aaa", 1}] = {}; + input[{"aaa", 2}] = {}; + input[{"bbb", 0}] = {}; + + // when + const auto result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 2); // Number of unique topic names (not partitions). +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc index 8a34d4a06ea1..2b65bce9c752 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc @@ -97,7 +97,9 @@ TEST_F(FetchUnitTest, ShouldCleanupAfterTimer) { } // Helper method to generate records. -InboundRecordSharedPtr makeRecord() { return std::make_shared("aaa", 0, 0); } +InboundRecordSharedPtr makeRecord() { + return std::make_shared("aaa", 0, 0, absl::nullopt, absl::nullopt); +} TEST_F(FetchUnitTest, ShouldReceiveRecords) { // given From f9cb25d5568f8a015ec414e138dcec420aa25099 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 6 Feb 2023 14:33:38 -0800 Subject: [PATCH 05/10] kafka: add missing tests Signed-off-by: Adam Kotwasinski --- .../fetch_record_converter.cc | 28 +++++++--- .../command_handlers/fetch_record_converter.h | 7 ++- .../network/source/mesh/inbound_record.h | 1 + .../fetch_record_converter_unit_test.cc | 56 ++++++++++++++++++- 4 files changed, 81 insertions(+), 11 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index de08488dae82..59c8f3a3bd4f 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -55,15 +55,15 @@ Bytes FetchRecordConverterImpl::renderRecordBatch( Bytes result = {}; - // Base offset. + // Base offset (bytes 0..7). const int64_t base_offset = htobe64(0); const unsigned char* base_offset_b = reinterpret_cast(&base_offset); result.insert(result.end(), base_offset_b, base_offset_b + sizeof(base_offset)); - // Batch length placeholder. + // Batch length placeholder (bytes 8..11). result.insert(result.end(), {0, 0, 0, 0}); - // All other attributes (spans partitionLeaderEpoch .. baseSequence). + // All other attributes (spans partitionLeaderEpoch .. baseSequence) (bytes 12..56). const std::vector zeros(45, 0); result.insert(result.end(), zeros.begin(), zeros.end()); @@ -76,12 +76,12 @@ Bytes FetchRecordConverterImpl::renderRecordBatch( std::copy(last_offset_delta_bytes, last_offset_delta_bytes + sizeof(last_offset_delta), last_offset_delta_pos); - // Records (count). + // Records (count) (bytes 57..60). const int32_t record_count = htobe32(records.size()); const unsigned char* record_count_b = reinterpret_cast(&record_count); result.insert(result.end(), record_count_b, record_count_b + sizeof(record_count)); - // Records (data). + // Records (data) (bytes 61+). for (const auto& record : records) { appendRecord(*record, result); } @@ -100,7 +100,7 @@ Bytes FetchRecordConverterImpl::renderRecordBatch( constexpr uint32_t crc_offset = magic_offset + 1; const auto crc_data_start = result.data() + crc_offset + sizeof(int32_t); const auto crc_data_len = result.size() - (crc_offset + sizeof(int32_t)); - const Bytes crc = renderCrc(crc_data_start, crc_data_len); + const Bytes crc = renderCrc32c(crc_data_start, crc_data_len); std::copy(crc.begin(), crc.end(), result.begin() + crc_offset); return result; @@ -161,7 +161,10 @@ void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out.insert(out.end(), raw, raw + buf_len); } -Bytes FetchRecordConverterImpl::renderCrc(const unsigned char* data, const size_t len) const { +// XXX (adam.kotwasinski) Instead of computing it naively, either link against librdkafka's +// implementation or generate it. +// https://github.com/confluentinc/librdkafka/blob/v1.8.0/src/crc32c.c#L1 +uint32_t FetchRecordConverterImpl::computeCrc32c(const unsigned char* data, const size_t len) { uint32_t crc = 0xFFFFFFFF; for (size_t i = 0; i < len; i++) { char ch = data[i]; @@ -174,9 +177,16 @@ Bytes FetchRecordConverterImpl::renderCrc(const unsigned char* data, const size_ ch >>= 1; } } - crc = ~crc; - crc = htobe32(crc); + return ~crc; +} + +uint32_t FetchRecordConverterImpl::computeCrc32cForTest(const unsigned char* data, + const size_t len) { + return computeCrc32c(data, len); +} +Bytes FetchRecordConverterImpl::renderCrc32c(const unsigned char* data, const size_t len) const { + uint32_t crc = htobe32(computeCrc32c(data, len)); Bytes result; unsigned char* raw = reinterpret_cast(&crc); result.insert(result.end(), raw, raw + sizeof(crc)); diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h index 46273274c354..d1320a5e990c 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h @@ -40,6 +40,8 @@ class FetchRecordConverterImpl : public FetchRecordConverter, // Default singleton accessor. static const FetchRecordConverter& getDefaultInstance(); + static uint32_t computeCrc32cForTest(const unsigned char* data, const size_t len); + private: // Helper function: transform records from a partition into a record batch. // See: https://kafka.apache.org/33/documentation.html#recordbatch @@ -51,7 +53,10 @@ class FetchRecordConverterImpl : public FetchRecordConverter, void appendRecord(const InboundRecord& record, Bytes& out) const; // Helper function: render CRC32C bytes from given input. - Bytes renderCrc(const unsigned char* data, const size_t len) const; + Bytes renderCrc32c(const unsigned char* data, const size_t len) const; + + // Helper function: compute CRC32C. + static uint32_t computeCrc32c(const unsigned char* data, const size_t len); }; } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h index f37228dbcddd..ba8741c483bc 100644 --- a/contrib/kafka/filters/network/source/mesh/inbound_record.h +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -25,6 +25,7 @@ struct InboundRecord { const NullableBytes key_; const NullableBytes value_; + // XXX (adam.kotwasinski) const& InboundRecord(std::string topic, int32_t partition, int64_t offset, NullableBytes key, NullableBytes value) : topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {}; diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc index 96e7ccbb47f3..48e98bc9bfb6 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc @@ -1,3 +1,4 @@ +#include #include #include "test/test_common/utility.h" @@ -34,10 +35,63 @@ TEST(FetchRecordConverterImpl, shouldProcessInputWithNoRecords) { input[{"bbb", 0}] = {}; // when - const auto result = testee.convert(input); + const std::vector result = testee.convert(input); // then ASSERT_EQ(result.size(), 2); // Number of unique topic names (not partitions). + const auto topic1 = + std::find_if(result.begin(), result.end(), [](auto x) { return "aaa" == x.topic_; }); + ASSERT_EQ(topic1->partitions_.size(), 3); + const auto topic2 = + std::find_if(result.begin(), result.end(), [](auto x) { return "bbb" == x.topic_; }); + ASSERT_EQ(topic2->partitions_.size(), 1); +} + +// Helper method to generate records. +InboundRecordSharedPtr makeRecord() { + const NullableBytes key = {Bytes(128)}; + const NullableBytes value = {Bytes(1024)}; + return std::make_shared("aaa", 0, 0, key, value); +} + +TEST(FetchRecordConverterImpl, shouldProcessRecords) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + InboundRecordsMap input = {}; + input[{"aaa", 0}] = {makeRecord(), makeRecord(), makeRecord()}; + + // when + const std::vector result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 1); + const auto& partitions = result[0].partitions_; + ASSERT_EQ(partitions.size(), 1); + const NullableBytes& data = partitions[0].records_; + ASSERT_EQ(data.has_value(), true); + ASSERT_GT(data->size(), + 3 * (128 + 1024)); // Records carry some metadata so it should always pass. + + // then - check whether metadata really says we carry 3 records. + constexpr auto record_count_offset = 57; + const auto ptr = reinterpret_cast(data->data() + record_count_offset); + const uint32_t record_count = be32toh(*ptr); + ASSERT_EQ(record_count, 3); +} + +// Here we check whether our manual implementation really works. +// https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java +TEST(FetchRecordConverterImpl, shouldComputeCrc32c) { + constexpr auto testee = &FetchRecordConverterImpl::computeCrc32cForTest; + + std::vector arg1 = {}; + ASSERT_EQ(testee(arg1.data(), arg1.size()), 0x00000000); + + std::vector arg2 = {0, 0, 0, 0}; + ASSERT_EQ(testee(arg2.data(), arg2.size()), 0x48674BC7); + + std::vector arg3 = {13, 42, 13, 42, 13, 42, 13, 42}; + ASSERT_EQ(testee(arg3.data(), arg3.size()), 0xDB56B80F); } } // namespace From b94818dcb7686761ac66210ccf8506fa75ecf8a8 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 6 Feb 2023 16:05:38 -0800 Subject: [PATCH 06/10] kafka: cleanup Signed-off-by: Adam Kotwasinski --- .../fetch_record_converter.cc | 24 ++++++++++++------- .../network/source/mesh/inbound_record.h | 5 ++-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index 59c8f3a3bd4f..f074e8960da0 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -50,6 +50,9 @@ FetchRecordConverterImpl::convert(const InboundRecordsMap& arg) const { return result; } +// Magic format introduced around Kafka 1.0.0 and still used with Kafka 3.3. +constexpr int8_t MAGIC = 2; + Bytes FetchRecordConverterImpl::renderRecordBatch( const std::vector& records) const { @@ -94,7 +97,7 @@ Bytes FetchRecordConverterImpl::renderRecordBatch( // Set magic. constexpr uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t); - result[magic_offset] = 2; + result[magic_offset] = MAGIC; // Compute and set CRC. constexpr uint32_t crc_offset = magic_offset + 1; @@ -106,6 +109,14 @@ Bytes FetchRecordConverterImpl::renderRecordBatch( return result; } +// Helper method. +static void putBufferIntoBytes(Buffer::Instance& buffer, Bytes& out) { + const auto buf_len = buffer.length(); + void* linearized = buffer.linearize(buf_len); + unsigned char* raw = static_cast(linearized); + out.insert(out.end(), raw, raw + buf_len); +} + void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out) const { Buffer::OwnedImpl buffer; @@ -149,16 +160,11 @@ void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& const int32_t header_count = 0; Statics::writeVarint(header_count, buffer); - // XXX (adam.kotwasinski) This might be less than efficient. Improve it later. + // Put length and contents of 'buffer' of into 'out'. Buffer::OwnedImpl length_buffer; Statics::writeVarint(buffer.length(), length_buffer); - buffer.prepend(length_buffer); - - // Finish: put buffer's contents into the 'out' variable. - const auto buf_len = buffer.length(); - void* linearized = buffer.linearize(buf_len); - unsigned char* raw = static_cast(linearized); - out.insert(out.end(), raw, raw + buf_len); + putBufferIntoBytes(length_buffer, out); + putBufferIntoBytes(buffer, out); } // XXX (adam.kotwasinski) Instead of computing it naively, either link against librdkafka's diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h index ba8741c483bc..162202e569f7 100644 --- a/contrib/kafka/filters/network/source/mesh/inbound_record.h +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -25,9 +25,8 @@ struct InboundRecord { const NullableBytes key_; const NullableBytes value_; - // XXX (adam.kotwasinski) const& - InboundRecord(std::string topic, int32_t partition, int64_t offset, NullableBytes key, - NullableBytes value) + InboundRecord(const std::string& topic, const int32_t partition, const int64_t offset, + const NullableBytes& key, const NullableBytes& value) : topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {}; absl::string_view key() const { From fa1c6384e8e50dce3d6617d5b3a5ae9983531f84 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 6 Feb 2023 16:32:30 -0800 Subject: [PATCH 07/10] kafka: get rid of buffer in FRC Signed-off-by: Adam Kotwasinski --- .../source/mesh/command_handlers/BUILD | 1 - .../fetch_record_converter.cc | 52 ++++++++----------- .../network/source/mesh/inbound_record.h | 21 +++----- .../filters/network/source/serialization.h | 14 ++--- 4 files changed, 35 insertions(+), 53 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD index 1476fae6ab5e..508881604858 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD @@ -77,7 +77,6 @@ envoy_cc_library( "//contrib/kafka/filters/network/source:kafka_response_parser_lib", "//contrib/kafka/filters/network/source:serialization_lib", "//contrib/kafka/filters/network/source/mesh:inbound_record_lib", - "//source/common/buffer:buffer_lib", ], ) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index f074e8960da0..0e22f801d039 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -1,7 +1,5 @@ #include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" -#include "source/common/buffer/buffer_impl.h" - #include "contrib/kafka/filters/network/source/serialization.h" namespace Envoy { @@ -109,62 +107,56 @@ Bytes FetchRecordConverterImpl::renderRecordBatch( return result; } -// Helper method. -static void putBufferIntoBytes(Buffer::Instance& buffer, Bytes& out) { - const auto buf_len = buffer.length(); - void* linearized = buffer.linearize(buf_len); - unsigned char* raw = static_cast(linearized); - out.insert(out.end(), raw, raw + buf_len); -} - void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out) const { - Buffer::OwnedImpl buffer; + Bytes tmp = {}; + // This is not precise maths, as we could be over-reserving a little due to var-length fields. + tmp.reserve(sizeof(int8_t) + sizeof(int64_t) + sizeof(int32_t) + record.dataLengthEstimate()); // attributes: int8 constexpr int8_t attributes = 0; - buffer.add(&attributes, sizeof(int8_t)); + tmp.push_back(static_cast(attributes)); // timestampDelta: varlong constexpr int64_t timestamp_delta = 0; - Statics::writeVarlong(timestamp_delta, buffer); + Statics::writeVarlong(timestamp_delta, tmp); // offsetDelta: varint const int32_t offset_delta = record.offset_; - Statics::writeVarint(offset_delta, buffer); + Statics::writeVarint(offset_delta, tmp); // Impl note: compared to requests/responses, records serialize byte arrays as varint length + // bytes (and not length + 1, then bytes). So we cannot use EncodingContext from serialization.h. // keyLength: varint // key: byte[] - const absl::string_view key = record.key(); - if (!key.empty()) { - Statics::writeVarint(key.size(), buffer); - buffer.add(key); + const NullableBytes& key = record.key_; + if (key.has_value()) { + Statics::writeVarint(key->size(), tmp); + tmp.insert(tmp.end(), key->begin(), key->end()); } else { - Statics::writeVarint(-1, buffer); + Statics::writeVarint(-1, tmp); } // valueLen: varint // value: byte[] - const absl::string_view value = record.value(); - if (!value.empty()) { - Statics::writeVarint(value.size(), buffer); - buffer.add(record.value()); + const NullableBytes& value = record.value_; + if (value.has_value()) { + Statics::writeVarint(value->size(), tmp); + tmp.insert(tmp.end(), value->begin(), value->end()); } else { - Statics::writeVarint(-1, buffer); + Statics::writeVarint(-1, tmp); } // TODO (adam.kotwasinski) Headers are not supported yet. const int32_t header_count = 0; - Statics::writeVarint(header_count, buffer); + Statics::writeVarint(header_count, tmp); + + // Put tmp's length into 'out'. + Statics::writeVarint(tmp.size(), out); - // Put length and contents of 'buffer' of into 'out'. - Buffer::OwnedImpl length_buffer; - Statics::writeVarint(buffer.length(), length_buffer); - putBufferIntoBytes(length_buffer, out); - putBufferIntoBytes(buffer, out); + // Put tmp's contents into 'out'. + out.insert(out.end(), tmp.begin(), tmp.end()); } // XXX (adam.kotwasinski) Instead of computing it naively, either link against librdkafka's diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h index 162202e569f7..94c6e61a2eec 100644 --- a/contrib/kafka/filters/network/source/mesh/inbound_record.h +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -4,7 +4,6 @@ #include #include "absl/strings/str_cat.h" -#include "absl/strings/string_view.h" #include "contrib/kafka/filters/network/source/kafka_types.h" namespace Envoy { @@ -29,20 +28,12 @@ struct InboundRecord { const NullableBytes& key, const NullableBytes& value) : topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {}; - absl::string_view key() const { - if (key_) { - return {reinterpret_cast(key_->data()), key_->size()}; - } else { - return {}; - } - } - - absl::string_view value() const { - if (value_) { - return {reinterpret_cast(value_->data()), value_->size()}; - } else { - return {}; - } + // Estimates how many bytes this record would take. + uint32_t dataLengthEstimate() const { + uint32_t result = 12; // Max key length, value lenght, header count (right now 0). + result += key_ ? key_->size() : 0; + result += value_ ? value_->size() : 0; + return result; } // Used in logging. diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index a95ea28f1092..d5c114685a24 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -1505,7 +1505,7 @@ uint32_t EncodingContext::encodeCompact(const NullableArray& arg, Buffer::Ins class Statics { public: // org.apache.kafka.common.utils.ByteUtils.writeUnsignedVarint(int, ByteBuffer) - static uint32_t writeUnsignedVarint(const uint32_t& arg, Buffer::Instance& dst) { + static uint32_t writeUnsignedVarint(const uint32_t& arg, Bytes& dst) { uint32_t value = arg; uint32_t elements_with_1 = 0; @@ -1513,25 +1513,25 @@ class Statics { while ((value & ~(0x7f)) != 0) { // Save next 7-bit batch with highest bit set. const uint8_t el = (value & 0x7f) | 0x80; - dst.add(&el, sizeof(uint8_t)); + dst.push_back(el); value >>= 7; elements_with_1++; } // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. const uint8_t el = value; - dst.add(&el, sizeof(uint8_t)); + dst.push_back(el); return elements_with_1 + 1; } // org.apache.kafka.common.utils.ByteUtils.writeVarint(int, ByteBuffer) - static uint32_t writeVarint(const int32_t arg, Buffer::Instance& dst) { + static uint32_t writeVarint(const int32_t arg, Bytes& dst) { return writeUnsignedVarint((arg << 1) ^ (arg >> 31), dst); } // org.apache.kafka.common.utils.ByteUtils.writeVarlong(long, ByteBuffer) - static uint32_t writeVarlong(const int64_t& arg, Buffer::Instance& dst) { + static uint32_t writeVarlong(const int64_t& arg, Bytes& dst) { int64_t value = (arg << 1) ^ (arg >> 63); uint32_t elements_with_1 = 0; @@ -1539,14 +1539,14 @@ class Statics { while ((value & ~(0x7f)) != 0) { // Save next 7-bit batch with highest bit set. const uint8_t el = (value & 0x7f) | 0x80; - dst.add(&el, sizeof(uint8_t)); + dst.push_back(el); value >>= 7; elements_with_1++; } // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. const uint8_t el = value; - dst.add(&el, sizeof(uint8_t)); + dst.push_back(el); return elements_with_1 + 1; } From 84f2e9d24261af3c93ff8996e1b4171f5834eab0 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 6 Feb 2023 17:06:27 -0800 Subject: [PATCH 08/10] kafka: reuse varlen handling Signed-off-by: Adam Kotwasinski --- .../fetch_record_converter.cc | 24 ++--- .../filters/network/source/serialization.cc | 49 ++++++++++ .../filters/network/source/serialization.h | 93 ++++++------------- .../network/test/serialization_test.cc | 44 +++++++++ 4 files changed, 131 insertions(+), 79 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index 0e22f801d039..8b760b21dd94 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -94,11 +94,11 @@ Bytes FetchRecordConverterImpl::renderRecordBatch( result.begin() + sizeof(base_offset)); // Set magic. - constexpr uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t); + const uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t); result[magic_offset] = MAGIC; // Compute and set CRC. - constexpr uint32_t crc_offset = magic_offset + 1; + const uint32_t crc_offset = magic_offset + 1; const auto crc_data_start = result.data() + crc_offset + sizeof(int32_t); const auto crc_data_len = result.size() - (crc_offset + sizeof(int32_t)); const Bytes crc = renderCrc32c(crc_data_start, crc_data_len); @@ -114,16 +114,16 @@ void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& tmp.reserve(sizeof(int8_t) + sizeof(int64_t) + sizeof(int32_t) + record.dataLengthEstimate()); // attributes: int8 - constexpr int8_t attributes = 0; + const int8_t attributes = 0; tmp.push_back(static_cast(attributes)); // timestampDelta: varlong - constexpr int64_t timestamp_delta = 0; - Statics::writeVarlong(timestamp_delta, tmp); + const int64_t timestamp_delta = 0; + VarlenUtils::writeVarlong(timestamp_delta, tmp); // offsetDelta: varint const int32_t offset_delta = record.offset_; - Statics::writeVarint(offset_delta, tmp); + VarlenUtils::writeVarint(offset_delta, tmp); // Impl note: compared to requests/responses, records serialize byte arrays as varint length + // bytes (and not length + 1, then bytes). So we cannot use EncodingContext from serialization.h. @@ -132,28 +132,28 @@ void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& // key: byte[] const NullableBytes& key = record.key_; if (key.has_value()) { - Statics::writeVarint(key->size(), tmp); + VarlenUtils::writeVarint(key->size(), tmp); tmp.insert(tmp.end(), key->begin(), key->end()); } else { - Statics::writeVarint(-1, tmp); + VarlenUtils::writeVarint(-1, tmp); } // valueLen: varint // value: byte[] const NullableBytes& value = record.value_; if (value.has_value()) { - Statics::writeVarint(value->size(), tmp); + VarlenUtils::writeVarint(value->size(), tmp); tmp.insert(tmp.end(), value->begin(), value->end()); } else { - Statics::writeVarint(-1, tmp); + VarlenUtils::writeVarint(-1, tmp); } // TODO (adam.kotwasinski) Headers are not supported yet. const int32_t header_count = 0; - Statics::writeVarint(header_count, tmp); + VarlenUtils::writeVarint(header_count, tmp); // Put tmp's length into 'out'. - Statics::writeVarint(tmp.size(), out); + VarlenUtils::writeVarint(tmp.size(), out); // Put tmp's contents into 'out'. out.insert(out.end(), tmp.begin(), tmp.end()); diff --git a/contrib/kafka/filters/network/source/serialization.cc b/contrib/kafka/filters/network/source/serialization.cc index b78085fdbf18..45f2229d8892 100644 --- a/contrib/kafka/filters/network/source/serialization.cc +++ b/contrib/kafka/filters/network/source/serialization.cc @@ -220,6 +220,55 @@ NullableBytes NullableCompactBytesDeserializer::get() const { } } +namespace VarlenUtils { + +uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst) { + uint32_t value = arg; + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.push_back(el); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.push_back(el); + + return elements_with_1 + 1; +} + +uint32_t writeVarint(const int32_t arg, Bytes& dst) { + uint32_t zz = (static_cast(arg) << 1) ^ (arg >> 31); // Zig-zag. + return writeUnsignedVarint(zz, dst); +} + +uint32_t writeVarlong(const int64_t arg, Bytes& dst) { + uint64_t value = (static_cast(arg) << 1) ^ (arg >> 63); // Zig-zag. + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.push_back(el); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.push_back(el); + + return elements_with_1 + 1; +} + +} // namespace VarlenUtils + } // namespace Kafka } // namespace NetworkFilters } // namespace Extensions diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index d5c114685a24..711b45b3778c 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -943,6 +943,28 @@ class UuidDeserializer : public Deserializer { Int64Deserializer low_bytes_deserializer_; }; +// Variable length encoding utilities. +namespace VarlenUtils { + +/** + * Writes given unsigned int in variable-length encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeUnsignedVarint(int, ByteBuffer) + */ +uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst); + +/** + * Writes given signed int in variable-length zig-zag encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeVarint(int, ByteBuffer) + */ +uint32_t writeVarint(const int32_t arg, Bytes& dst); + +/** + * Writes given long in variable-length zig-zag encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeVarlong(long, ByteBuffer) + */ +uint32_t writeVarlong(const int64_t arg, Bytes& dst); +} // namespace VarlenUtils + /** * Encodes provided argument in Kafka format. * In case of primitive types, this is done explicitly as per specification. @@ -1401,23 +1423,10 @@ inline uint32_t EncodingContext::encodeCompact(const int64_t& arg, Buffer::Insta */ template <> inline uint32_t EncodingContext::encodeCompact(const uint32_t& arg, Buffer::Instance& dst) { - uint32_t value = arg; - - uint32_t elements_with_1 = 0; - // As long as there are bits set on indexes 8 or higher (counting from 1). - while ((value & ~(0x7f)) != 0) { - // Save next 7-bit batch with highest bit set. - const uint8_t el = (value & 0x7f) | 0x80; - dst.add(&el, sizeof(uint8_t)); - value >>= 7; - elements_with_1++; - } - - // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. - const uint8_t el = value; - dst.add(&el, sizeof(uint8_t)); - - return elements_with_1 + 1; + std::vector tmp; + const uint32_t written = VarlenUtils::writeUnsignedVarint(arg, tmp); + dst.add(tmp.data(), written); + return written; } /** @@ -1502,56 +1511,6 @@ uint32_t EncodingContext::encodeCompact(const NullableArray& arg, Buffer::Ins } } -class Statics { -public: - // org.apache.kafka.common.utils.ByteUtils.writeUnsignedVarint(int, ByteBuffer) - static uint32_t writeUnsignedVarint(const uint32_t& arg, Bytes& dst) { - uint32_t value = arg; - - uint32_t elements_with_1 = 0; - // As long as there are bits set on indexes 8 or higher (counting from 1). - while ((value & ~(0x7f)) != 0) { - // Save next 7-bit batch with highest bit set. - const uint8_t el = (value & 0x7f) | 0x80; - dst.push_back(el); - value >>= 7; - elements_with_1++; - } - - // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. - const uint8_t el = value; - dst.push_back(el); - - return elements_with_1 + 1; - } - - // org.apache.kafka.common.utils.ByteUtils.writeVarint(int, ByteBuffer) - static uint32_t writeVarint(const int32_t arg, Bytes& dst) { - return writeUnsignedVarint((arg << 1) ^ (arg >> 31), dst); - } - - // org.apache.kafka.common.utils.ByteUtils.writeVarlong(long, ByteBuffer) - static uint32_t writeVarlong(const int64_t& arg, Bytes& dst) { - int64_t value = (arg << 1) ^ (arg >> 63); - - uint32_t elements_with_1 = 0; - // As long as there are bits set on indexes 8 or higher (counting from 1). - while ((value & ~(0x7f)) != 0) { - // Save next 7-bit batch with highest bit set. - const uint8_t el = (value & 0x7f) | 0x80; - dst.push_back(el); - value >>= 7; - elements_with_1++; - } - - // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. - const uint8_t el = value; - dst.push_back(el); - - return elements_with_1 + 1; - } -}; - } // namespace Kafka } // namespace NetworkFilters } // namespace Extensions diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index b9264cf237e4..59aa0e567b96 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -568,6 +568,50 @@ TEST(TaggedFieldsDeserializer, ShouldConsumeCorrectAmountOfData) { serializeCompactThenDeserializeAndCheckEquality(value); } +// Just a helper to write shorter tests. +template Bytes toBytes(uint32_t fn(const T arg, Bytes& out), const T arg) { + Bytes res; + fn(arg, res); + return res; +} + +TEST(VarlenUtils, ShouldEncodeUnsignedVarInt) { + const auto testee = VarlenUtils::writeUnsignedVarint; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, 127), Bytes({0x7f})); + ASSERT_EQ(toBytes(testee, 128), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, 2147483647), Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x07})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x0F})); +} + +TEST(VarlenUtils, ShouldEncodeSignedVarInt) { + const auto testee = VarlenUtils::writeVarint; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x02})); + ASSERT_EQ(toBytes(testee, 63), Bytes({0x7e})); + ASSERT_EQ(toBytes(testee, 64), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, -1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::min()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x0F})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFE, 0xFF, 0xFF, 0xFF, 0x0F})); +} + +TEST(VarlenUtils, ShouldEncodeVarLong) { + const auto testee = VarlenUtils::writeVarlong; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x02})); + ASSERT_EQ(toBytes(testee, 63), Bytes({0x7e})); + ASSERT_EQ(toBytes(testee, 64), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, -1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::min()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFE, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01})); +} + } // namespace SerializationTest } // namespace Kafka } // namespace NetworkFilters From 526168d7af7f829c52190bddefe71e9ab1a4eeb4 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Wed, 8 Feb 2023 17:35:41 -0800 Subject: [PATCH 09/10] kafka: unnecessary logger in FRC Signed-off-by: Adam Kotwasinski --- .../source/mesh/command_handlers/fetch_record_converter.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h index d1320a5e990c..e621adc2e7ff 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h @@ -31,8 +31,7 @@ class FetchRecordConverter { /** * Proper implementation. */ -class FetchRecordConverterImpl : public FetchRecordConverter, - private Logger::Loggable { +class FetchRecordConverterImpl : public FetchRecordConverter { public: // FetchRecordConverter std::vector convert(const InboundRecordsMap& arg) const override; From be182d4348b9d966e98e9d418d273040f4d1fde6 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Wed, 8 Feb 2023 17:38:40 -0800 Subject: [PATCH 10/10] kafka: proper IR size estimate (varints can have len = 5) Signed-off-by: Adam Kotwasinski --- contrib/kafka/filters/network/source/mesh/inbound_record.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h index 94c6e61a2eec..03ff1e19224a 100644 --- a/contrib/kafka/filters/network/source/mesh/inbound_record.h +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -30,7 +30,7 @@ struct InboundRecord { // Estimates how many bytes this record would take. uint32_t dataLengthEstimate() const { - uint32_t result = 12; // Max key length, value lenght, header count (right now 0). + uint32_t result = 15; // Max key length, value lenght, header count. result += key_ ? key_->size() : 0; result += value_ ? value_->size() : 0; return result;