diff --git a/contrib/kafka/filters/network/source/mesh/BUILD b/contrib/kafka/filters/network/source/mesh/BUILD index 668995d99f14..35706ea86c75 100644 --- a/contrib/kafka/filters/network/source/mesh/BUILD +++ b/contrib/kafka/filters/network/source/mesh/BUILD @@ -224,6 +224,7 @@ envoy_cc_library( deps = [ ":librdkafka_utils_impl_lib", ":upstream_kafka_consumer_lib", + "//contrib/kafka/filters/network/source:kafka_types_lib", "//envoy/event:dispatcher_interface", "//source/common/common:minimal_logger_lib", ], diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD index 7b0ee1d20800..508881604858 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/BUILD @@ -75,6 +75,7 @@ envoy_cc_library( tags = ["skip_on_windows"], deps = [ "//contrib/kafka/filters/network/source:kafka_response_parser_lib", + "//contrib/kafka/filters/network/source:serialization_lib", "//contrib/kafka/filters/network/source/mesh:inbound_record_lib", ], ) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc index a2c2d3a68669..8b760b21dd94 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.cc @@ -1,20 +1,194 @@ #include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" +#include "contrib/kafka/filters/network/source/serialization.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace Kafka { namespace Mesh { +const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() { + CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl); +} + std::vector -FetchRecordConverterImpl::convert(const InboundRecordsMap&) const { +FetchRecordConverterImpl::convert(const InboundRecordsMap& arg) const { + + // Compute record batches. + std::map record_batches; + for (const auto& partition_and_records : arg) { + const KafkaPartition& kp = partition_and_records.first; + const std::vector& partition_records = partition_and_records.second; + const Bytes batch = renderRecordBatch(partition_records); + record_batches[kp] = batch; + } - // TODO (adam.kotwasinski) This needs to be actually implemented. - return {}; + // Transform our maps into the Kafka structs. + std::map> topic_to_frrpd; + for (const auto& record_batch : record_batches) { + const std::string& topic_name = record_batch.first.first; + const int32_t partition = record_batch.first.second; + + std::vector& frrpds = topic_to_frrpd[topic_name]; + const int16_t error_code = 0; + const int64_t high_watermark = 0; + const auto frrpd = FetchResponseResponsePartitionData{partition, error_code, high_watermark, + absl::make_optional(record_batch.second)}; + + frrpds.push_back(frrpd); + } + + std::vector result; + for (const auto& partition_and_records : topic_to_frrpd) { + const std::string& topic_name = partition_and_records.first; + const auto ftr = FetchableTopicResponse{topic_name, partition_and_records.second}; + result.push_back(ftr); + } + return result; } -const FetchRecordConverter& FetchRecordConverterImpl::getDefaultInstance() { - CONSTRUCT_ON_FIRST_USE(FetchRecordConverterImpl); +// Magic format introduced around Kafka 1.0.0 and still used with Kafka 3.3. +constexpr int8_t MAGIC = 2; + +Bytes FetchRecordConverterImpl::renderRecordBatch( + const std::vector& records) const { + + Bytes result = {}; + + // Base offset (bytes 0..7). + const int64_t base_offset = htobe64(0); + const unsigned char* base_offset_b = reinterpret_cast(&base_offset); + result.insert(result.end(), base_offset_b, base_offset_b + sizeof(base_offset)); + + // Batch length placeholder (bytes 8..11). + result.insert(result.end(), {0, 0, 0, 0}); + + // All other attributes (spans partitionLeaderEpoch .. baseSequence) (bytes 12..56). + const std::vector zeros(45, 0); + result.insert(result.end(), zeros.begin(), zeros.end()); + + // Last offset delta. + // -1 means we always claim that we are at the beginning of partition. + const int32_t last_offset_delta = htobe32(-1); + const unsigned char* last_offset_delta_bytes = + reinterpret_cast(&last_offset_delta); + const auto last_offset_delta_pos = result.begin() + 8 + 4 + 11; + std::copy(last_offset_delta_bytes, last_offset_delta_bytes + sizeof(last_offset_delta), + last_offset_delta_pos); + + // Records (count) (bytes 57..60). + const int32_t record_count = htobe32(records.size()); + const unsigned char* record_count_b = reinterpret_cast(&record_count); + result.insert(result.end(), record_count_b, record_count_b + sizeof(record_count)); + + // Records (data) (bytes 61+). + for (const auto& record : records) { + appendRecord(*record, result); + } + + // Set batch length. + const int32_t batch_len = htobe32(result.size() - (sizeof(base_offset) + sizeof(batch_len))); + const unsigned char* batch_len_bytes = reinterpret_cast(&batch_len); + std::copy(batch_len_bytes, batch_len_bytes + sizeof(batch_len), + result.begin() + sizeof(base_offset)); + + // Set magic. + const uint32_t magic_offset = sizeof(base_offset) + sizeof(batch_len) + sizeof(int32_t); + result[magic_offset] = MAGIC; + + // Compute and set CRC. + const uint32_t crc_offset = magic_offset + 1; + const auto crc_data_start = result.data() + crc_offset + sizeof(int32_t); + const auto crc_data_len = result.size() - (crc_offset + sizeof(int32_t)); + const Bytes crc = renderCrc32c(crc_data_start, crc_data_len); + std::copy(crc.begin(), crc.end(), result.begin() + crc_offset); + + return result; +} + +void FetchRecordConverterImpl::appendRecord(const InboundRecord& record, Bytes& out) const { + + Bytes tmp = {}; + // This is not precise maths, as we could be over-reserving a little due to var-length fields. + tmp.reserve(sizeof(int8_t) + sizeof(int64_t) + sizeof(int32_t) + record.dataLengthEstimate()); + + // attributes: int8 + const int8_t attributes = 0; + tmp.push_back(static_cast(attributes)); + + // timestampDelta: varlong + const int64_t timestamp_delta = 0; + VarlenUtils::writeVarlong(timestamp_delta, tmp); + + // offsetDelta: varint + const int32_t offset_delta = record.offset_; + VarlenUtils::writeVarint(offset_delta, tmp); + + // Impl note: compared to requests/responses, records serialize byte arrays as varint length + + // bytes (and not length + 1, then bytes). So we cannot use EncodingContext from serialization.h. + + // keyLength: varint + // key: byte[] + const NullableBytes& key = record.key_; + if (key.has_value()) { + VarlenUtils::writeVarint(key->size(), tmp); + tmp.insert(tmp.end(), key->begin(), key->end()); + } else { + VarlenUtils::writeVarint(-1, tmp); + } + + // valueLen: varint + // value: byte[] + const NullableBytes& value = record.value_; + if (value.has_value()) { + VarlenUtils::writeVarint(value->size(), tmp); + tmp.insert(tmp.end(), value->begin(), value->end()); + } else { + VarlenUtils::writeVarint(-1, tmp); + } + + // TODO (adam.kotwasinski) Headers are not supported yet. + const int32_t header_count = 0; + VarlenUtils::writeVarint(header_count, tmp); + + // Put tmp's length into 'out'. + VarlenUtils::writeVarint(tmp.size(), out); + + // Put tmp's contents into 'out'. + out.insert(out.end(), tmp.begin(), tmp.end()); +} + +// XXX (adam.kotwasinski) Instead of computing it naively, either link against librdkafka's +// implementation or generate it. +// https://github.com/confluentinc/librdkafka/blob/v1.8.0/src/crc32c.c#L1 +uint32_t FetchRecordConverterImpl::computeCrc32c(const unsigned char* data, const size_t len) { + uint32_t crc = 0xFFFFFFFF; + for (size_t i = 0; i < len; i++) { + char ch = data[i]; + for (size_t j = 0; j < 8; j++) { + uint32_t b = (ch ^ crc) & 1; + crc >>= 1; + if (b) { + crc = crc ^ 0x82F63B78; + } + ch >>= 1; + } + } + return ~crc; +} + +uint32_t FetchRecordConverterImpl::computeCrc32cForTest(const unsigned char* data, + const size_t len) { + return computeCrc32c(data, len); +} + +Bytes FetchRecordConverterImpl::renderCrc32c(const unsigned char* data, const size_t len) const { + uint32_t crc = htobe32(computeCrc32c(data, len)); + Bytes result; + unsigned char* raw = reinterpret_cast(&crc); + result.insert(result.end(), raw, raw + sizeof(crc)); + return result; } } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h index fe5103a8b814..e621adc2e7ff 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h @@ -38,6 +38,24 @@ class FetchRecordConverterImpl : public FetchRecordConverter { // Default singleton accessor. static const FetchRecordConverter& getDefaultInstance(); + + static uint32_t computeCrc32cForTest(const unsigned char* data, const size_t len); + +private: + // Helper function: transform records from a partition into a record batch. + // See: https://kafka.apache.org/33/documentation.html#recordbatch + Bytes renderRecordBatch(const std::vector& records) const; + + // Helper function: append record to output array. + // See: https://kafka.apache.org/33/documentation.html#record + // https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L164 + void appendRecord(const InboundRecord& record, Bytes& out) const; + + // Helper function: render CRC32C bytes from given input. + Bytes renderCrc32c(const unsigned char* data, const size_t len) const; + + // Helper function: compute CRC32C. + static uint32_t computeCrc32c(const unsigned char* data, const size_t len); }; } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/mesh/inbound_record.h b/contrib/kafka/filters/network/source/mesh/inbound_record.h index 83cd1d61fbcf..03ff1e19224a 100644 --- a/contrib/kafka/filters/network/source/mesh/inbound_record.h +++ b/contrib/kafka/filters/network/source/mesh/inbound_record.h @@ -4,6 +4,7 @@ #include #include "absl/strings/str_cat.h" +#include "contrib/kafka/filters/network/source/kafka_types.h" namespace Envoy { namespace Extensions { @@ -20,10 +21,20 @@ struct InboundRecord { const int32_t partition_; const int64_t offset_; - // TODO (adam.kotwasinski) Get data in here in the next commits. + const NullableBytes key_; + const NullableBytes value_; - InboundRecord(std::string topic, int32_t partition, int64_t offset) - : topic_{topic}, partition_{partition}, offset_{offset} {}; + InboundRecord(const std::string& topic, const int32_t partition, const int64_t offset, + const NullableBytes& key, const NullableBytes& value) + : topic_{topic}, partition_{partition}, offset_{offset}, key_{key}, value_{value} {}; + + // Estimates how many bytes this record would take. + uint32_t dataLengthEstimate() const { + uint32_t result = 15; // Max key length, value lenght, header count. + result += key_ ? key_->size() : 0; + result += value_ ? value_->size() : 0; + return result; + } // Used in logging. std::string toString() const { diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc index 4c3dfaeab005..a6ba7a3e1d71 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.cc @@ -1,5 +1,6 @@ #include "contrib/kafka/filters/network/source/mesh/upstream_kafka_consumer_impl.h" +#include "contrib/kafka/filters/network/source/kafka_types.h" #include "contrib/kafka/filters/network/source/mesh/librdkafka_utils_impl.h" namespace Envoy { @@ -99,12 +100,27 @@ void RichKafkaConsumer::runWorkerLoop() { ENVOY_LOG(debug, "Worker thread for consumer [{}] finished", topic_); } +// Helper method, converts byte array. +static NullableBytes toBytes(const void* data, const size_t size) { + const unsigned char* as_char = static_cast(data); + if (data) { + Bytes bytes(as_char, as_char + size); + return {bytes}; + } else { + return absl::nullopt; + } +} + // Helper method, gets rid of librdkafka. static InboundRecordSharedPtr transform(RdKafkaMessagePtr arg) { const auto topic = arg->topic_name(); const auto partition = arg->partition(); const auto offset = arg->offset(); - return std::make_shared(topic, partition, offset); + + const NullableBytes key = toBytes(arg->key_pointer(), arg->key_len()); + const NullableBytes value = toBytes(arg->payload(), arg->len()); + + return std::make_shared(topic, partition, offset, key, value); } std::vector RichKafkaConsumer::receiveRecordBatch() { diff --git a/contrib/kafka/filters/network/source/serialization.cc b/contrib/kafka/filters/network/source/serialization.cc index b78085fdbf18..45f2229d8892 100644 --- a/contrib/kafka/filters/network/source/serialization.cc +++ b/contrib/kafka/filters/network/source/serialization.cc @@ -220,6 +220,55 @@ NullableBytes NullableCompactBytesDeserializer::get() const { } } +namespace VarlenUtils { + +uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst) { + uint32_t value = arg; + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.push_back(el); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.push_back(el); + + return elements_with_1 + 1; +} + +uint32_t writeVarint(const int32_t arg, Bytes& dst) { + uint32_t zz = (static_cast(arg) << 1) ^ (arg >> 31); // Zig-zag. + return writeUnsignedVarint(zz, dst); +} + +uint32_t writeVarlong(const int64_t arg, Bytes& dst) { + uint64_t value = (static_cast(arg) << 1) ^ (arg >> 63); // Zig-zag. + + uint32_t elements_with_1 = 0; + // As long as there are bits set on indexes 8 or higher (counting from 1). + while ((value & ~(0x7f)) != 0) { + // Save next 7-bit batch with highest bit set. + const uint8_t el = (value & 0x7f) | 0x80; + dst.push_back(el); + value >>= 7; + elements_with_1++; + } + + // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. + const uint8_t el = value; + dst.push_back(el); + + return elements_with_1 + 1; +} + +} // namespace VarlenUtils + } // namespace Kafka } // namespace NetworkFilters } // namespace Extensions diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index 29a9a01d3924..711b45b3778c 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -943,6 +943,28 @@ class UuidDeserializer : public Deserializer { Int64Deserializer low_bytes_deserializer_; }; +// Variable length encoding utilities. +namespace VarlenUtils { + +/** + * Writes given unsigned int in variable-length encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeUnsignedVarint(int, ByteBuffer) + */ +uint32_t writeUnsignedVarint(const uint32_t arg, Bytes& dst); + +/** + * Writes given signed int in variable-length zig-zag encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeVarint(int, ByteBuffer) + */ +uint32_t writeVarint(const int32_t arg, Bytes& dst); + +/** + * Writes given long in variable-length zig-zag encoding. + * Ref: org.apache.kafka.common.utils.ByteUtils.writeVarlong(long, ByteBuffer) + */ +uint32_t writeVarlong(const int64_t arg, Bytes& dst); +} // namespace VarlenUtils + /** * Encodes provided argument in Kafka format. * In case of primitive types, this is done explicitly as per specification. @@ -1401,23 +1423,10 @@ inline uint32_t EncodingContext::encodeCompact(const int64_t& arg, Buffer::Insta */ template <> inline uint32_t EncodingContext::encodeCompact(const uint32_t& arg, Buffer::Instance& dst) { - uint32_t value = arg; - - uint32_t elements_with_1 = 0; - // As long as there are bits set on indexes 8 or higher (counting from 1). - while ((value & ~(0x7f)) != 0) { - // Save next 7-bit batch with highest bit set. - const uint8_t el = (value & 0x7f) | 0x80; - dst.add(&el, sizeof(uint8_t)); - value >>= 7; - elements_with_1++; - } - - // After the loop has finished, we are certain that bit 8 = 0, so we can just add final element. - const uint8_t el = value; - dst.add(&el, sizeof(uint8_t)); - - return elements_with_1 + 1; + std::vector tmp; + const uint32_t written = VarlenUtils::writeUnsignedVarint(arg, tmp); + dst.add(tmp.data(), written); + return written; } /** diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD index 154b7bfb696d..c9ea469752f4 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD @@ -39,6 +39,15 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "fetch_record_converter_unit_test", + srcs = ["fetch_record_converter_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh/command_handlers:fetch_record_converter_lib", + ], +) + envoy_cc_test( name = "list_offsets_unit_test", srcs = ["list_offsets_unit_test.cc"], diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc new file mode 100644 index 000000000000..48e98bc9bfb6 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_record_converter_unit_test.cc @@ -0,0 +1,102 @@ +#include +#include + +#include "test/test_common/utility.h" + +#include "contrib/kafka/filters/network/source/mesh/command_handlers/fetch_record_converter.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +TEST(FetchRecordConverterImpl, shouldProcessEmptyInput) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + const InboundRecordsMap input = {}; + + // when + const auto result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 0); +} + +TEST(FetchRecordConverterImpl, shouldProcessInputWithNoRecords) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + InboundRecordsMap input = {}; + input[{"aaa", 0}] = {}; + input[{"aaa", 1}] = {}; + input[{"aaa", 2}] = {}; + input[{"bbb", 0}] = {}; + + // when + const std::vector result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 2); // Number of unique topic names (not partitions). + const auto topic1 = + std::find_if(result.begin(), result.end(), [](auto x) { return "aaa" == x.topic_; }); + ASSERT_EQ(topic1->partitions_.size(), 3); + const auto topic2 = + std::find_if(result.begin(), result.end(), [](auto x) { return "bbb" == x.topic_; }); + ASSERT_EQ(topic2->partitions_.size(), 1); +} + +// Helper method to generate records. +InboundRecordSharedPtr makeRecord() { + const NullableBytes key = {Bytes(128)}; + const NullableBytes value = {Bytes(1024)}; + return std::make_shared("aaa", 0, 0, key, value); +} + +TEST(FetchRecordConverterImpl, shouldProcessRecords) { + // given + const FetchRecordConverter& testee = FetchRecordConverterImpl{}; + InboundRecordsMap input = {}; + input[{"aaa", 0}] = {makeRecord(), makeRecord(), makeRecord()}; + + // when + const std::vector result = testee.convert(input); + + // then + ASSERT_EQ(result.size(), 1); + const auto& partitions = result[0].partitions_; + ASSERT_EQ(partitions.size(), 1); + const NullableBytes& data = partitions[0].records_; + ASSERT_EQ(data.has_value(), true); + ASSERT_GT(data->size(), + 3 * (128 + 1024)); // Records carry some metadata so it should always pass. + + // then - check whether metadata really says we carry 3 records. + constexpr auto record_count_offset = 57; + const auto ptr = reinterpret_cast(data->data() + record_count_offset); + const uint32_t record_count = be32toh(*ptr); + ASSERT_EQ(record_count, 3); +} + +// Here we check whether our manual implementation really works. +// https://github.com/apache/kafka/blob/3.3.2/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java +TEST(FetchRecordConverterImpl, shouldComputeCrc32c) { + constexpr auto testee = &FetchRecordConverterImpl::computeCrc32cForTest; + + std::vector arg1 = {}; + ASSERT_EQ(testee(arg1.data(), arg1.size()), 0x00000000); + + std::vector arg2 = {0, 0, 0, 0}; + ASSERT_EQ(testee(arg2.data(), arg2.size()), 0x48674BC7); + + std::vector arg3 = {13, 42, 13, 42, 13, 42, 13, 42}; + ASSERT_EQ(testee(arg3.data(), arg3.size()), 0xDB56B80F); +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc index 8a34d4a06ea1..2b65bce9c752 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/fetch_unit_test.cc @@ -97,7 +97,9 @@ TEST_F(FetchUnitTest, ShouldCleanupAfterTimer) { } // Helper method to generate records. -InboundRecordSharedPtr makeRecord() { return std::make_shared("aaa", 0, 0); } +InboundRecordSharedPtr makeRecord() { + return std::make_shared("aaa", 0, 0, absl::nullopt, absl::nullopt); +} TEST_F(FetchUnitTest, ShouldReceiveRecords) { // given diff --git a/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc b/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc index a2f4ddba6dcb..74b80f1cd32d 100644 --- a/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/shared_consumer_manager_impl_unit_test.cc @@ -148,7 +148,7 @@ class RecordDistributorTest : public testing::Test { } InboundRecordSharedPtr makeRecord(const std::string& topic, const int32_t partition) { - return std::make_shared(topic, partition, 0); + return std::make_shared(topic, partition, 0, absl::nullopt, absl::nullopt); } }; diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index b9264cf237e4..59aa0e567b96 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -568,6 +568,50 @@ TEST(TaggedFieldsDeserializer, ShouldConsumeCorrectAmountOfData) { serializeCompactThenDeserializeAndCheckEquality(value); } +// Just a helper to write shorter tests. +template Bytes toBytes(uint32_t fn(const T arg, Bytes& out), const T arg) { + Bytes res; + fn(arg, res); + return res; +} + +TEST(VarlenUtils, ShouldEncodeUnsignedVarInt) { + const auto testee = VarlenUtils::writeUnsignedVarint; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, 127), Bytes({0x7f})); + ASSERT_EQ(toBytes(testee, 128), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, 2147483647), Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x07})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x0F})); +} + +TEST(VarlenUtils, ShouldEncodeSignedVarInt) { + const auto testee = VarlenUtils::writeVarint; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x02})); + ASSERT_EQ(toBytes(testee, 63), Bytes({0x7e})); + ASSERT_EQ(toBytes(testee, 64), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, -1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::min()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0x0F})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFE, 0xFF, 0xFF, 0xFF, 0x0F})); +} + +TEST(VarlenUtils, ShouldEncodeVarLong) { + const auto testee = VarlenUtils::writeVarlong; + ASSERT_EQ(toBytes(testee, 0), Bytes({0x00})); + ASSERT_EQ(toBytes(testee, 1), Bytes({0x02})); + ASSERT_EQ(toBytes(testee, 63), Bytes({0x7e})); + ASSERT_EQ(toBytes(testee, 64), Bytes({0x80, 0x01})); + ASSERT_EQ(toBytes(testee, -1), Bytes({0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::min()), + Bytes({0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01})); + ASSERT_EQ(toBytes(testee, std::numeric_limits::max()), + Bytes({0xFE, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01})); +} + } // namespace SerializationTest } // namespace Kafka } // namespace NetworkFilters