Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add support for magic byte == 1

  • Loading branch information...
commit fcaccada712659f9376f093341ffe1922fed5df8 1 parent f690403
@ept ept authored
Showing with 52 additions and 9 deletions.
  1. +35 −8 lib/kafka/message.rb
  2. +17 −1 spec/message_spec.rb
View
43 lib/kafka/message.rb
@@ -14,14 +14,28 @@
# limitations under the License.
module Kafka
- # A message. The format of an N byte message is the following:
- # 1 byte "magic" identifier to allow format changes
- # 4 byte CRC32 of the payload
- # N - 5 byte payload
+ # A message. The format of a message is as follows:
+ #
+ # 4 byte big-endian int: length of message in bytes (including the rest of
+ # the header, but excluding the length field itself)
+ # 1 byte: "magic" identifier (format version number)
+ #
+ # If the magic byte == 0, there is one more header field:
+ #
+ # 4 byte big-endian int: CRC32 checksum of the payload
+ #
+ # If the magic byte == 1, there are two more header fields:
+ #
+ # 1 byte: "attributes" (flags for compression, codec etc)
+ # 4 byte big-endian int: CRC32 checksum of the payload
+ #
+ # All following bytes are the payload.
class Message
MAGIC_IDENTIFIER_DEFAULT = 0
- MESSAGE_HEADER_FORMAT = 'NCN'.freeze
+ BASIC_MESSAGE_HEADER = 'NC'.freeze
+ VERSION_0_HEADER = 'N'.freeze
+ VERSION_1_HEADER = 'CN'.freeze
attr_accessor :magic, :checksum, :payload
@@ -40,9 +54,22 @@ def valid?
end
def self.parse_from(binary)
- size, magic, checksum = binary.unpack(MESSAGE_HEADER_FORMAT)
- payload = binary[9, size] # 5 = 1 + 4 is Magic + Checksum
- Kafka::Message.new(payload, magic, checksum)
+ size, magic = binary.unpack(BASIC_MESSAGE_HEADER)
+ case magic
+ when 0
+ checksum = binary[5, 4].unpack(VERSION_0_HEADER).shift # 5 = sizeof(length) + sizeof(magic)
+ payload = binary[9, size] # 9 = sizeof(length) + sizeof(magic) + sizeof(checksum)
+ Kafka::Message.new(payload, magic, checksum)
+
+ when 1
+ attributes, checksum = binary[5, 5].unpack(VERSION_1_HEADER)
+ payload = binary[10, size] # 10 = sizeof(length) + sizeof(magic) + sizeof(attrs) + sizeof(checksum)
+ # TODO interpret attributes
+ Kafka::Message.new(payload, magic, checksum)
+
+ else
+ raise "Unsupported Kafka message version: magic number #{magic}"
+ end
end
end
end
View
18 spec/message_spec.rb
@@ -57,7 +57,7 @@
@message.valid?.should eql(false)
end
- it "should parse a message from bytes" do
+ it "should parse a version-0 message from bytes" do
bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
message = Kafka::Message.parse_from(bytes)
message.valid?.should eql(true)
@@ -65,5 +65,21 @@
message.checksum.should eql(1120192889)
message.payload.should eql("ale")
end
+
+ it "should parse a version-1 message from bytes" do
+ bytes = [14, 1, 0, 755095536].pack('NCCN') + 'martin'
+ message = Kafka::Message.parse_from(bytes)
+ message.should be_valid
+ message.magic.should == 1
+ message.checksum.should == 755095536
+ message.payload.should == 'martin'
+ end
+
+ it "should raise an error if the magic number is not recognised" do
+ bytes = [14, 2, 0, 755095536].pack('NCCN') + 'martin' # 2 = some future format that's not yet invented
+ lambda {
+ Kafka::Message.parse_from(bytes)
+ }.should raise_error(RuntimeError, /Unsupported Kafka message version/)
+ end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.