Browse files

Merge pull request #11 from ept/compression

Add consumer support for transparent gzip compression
  • Loading branch information...
2 parents c00c82e + 9df61f1 commit 9cc122063929b366d5e001e27245b50bcc7d71e8 @acrosa committed Aug 8, 2012
Showing with 129 additions and 72 deletions.
  1. +3 −18 lib/kafka/consumer.rb
  2. +56 −14 lib/kafka/message.rb
  3. +0 −33 spec/consumer_spec.rb
  4. +67 −5 spec/message_spec.rb
  5. +3 −2 spec/producer_spec.rb
View
21 lib/kafka/consumer.rb
@@ -49,8 +49,9 @@ def loop(&block)
def consume
self.offset ||= fetch_latest_offset
send_consume_request
- data = read_data_response
- parse_message_set_from(data)
+ message_set = Kafka::Message.parse_from(read_data_response)
+ self.offset += message_set.size
+ message_set.messages
rescue SocketError
nil
end
@@ -94,21 +95,5 @@ def encode_request(request_type, topic, partition, offset, max_size)
max_size = [max_size].pack("N")
request_type + topic + partition + offset + max_size
end
-
- def parse_message_set_from(data)
- messages = []
- processed = 0
- length = data.length - 4
- while (processed <= length) do
- message_size = data[processed, 4].unpack("N").shift + 4
- message_data = data[processed, message_size]
- break unless message_data.size == message_size
- messages << Kafka::Message.parse_from(message_data)
- processed += message_size
- end
- self.offset += processed
- messages
- end
-
end
end
View
70 lib/kafka/message.rb
@@ -36,6 +36,7 @@ class Message
BASIC_MESSAGE_HEADER = 'NC'.freeze
VERSION_0_HEADER = 'N'.freeze
VERSION_1_HEADER = 'CN'.freeze
+ COMPRESSION_CODEC_MASK = 0x03
attr_accessor :magic, :checksum, :payload
@@ -53,23 +54,64 @@ def valid?
self.checksum == calculate_checksum
end
- def self.parse_from(binary)
- size, magic = binary.unpack(BASIC_MESSAGE_HEADER)
- case magic
- when 0
- checksum = binary[5, 4].unpack(VERSION_0_HEADER).shift # 5 = sizeof(length) + sizeof(magic)
- payload = binary[9, size] # 9 = sizeof(length) + sizeof(magic) + sizeof(checksum)
- Kafka::Message.new(payload, magic, checksum)
+ # Takes a byte string containing one or more messages; returns a MessageSet
+ # with the messages parsed from the string, and the number of bytes
+ # consumed from the string.
+ def self.parse_from(data)
+ messages = []
+ bytes_processed = 0
- when 1
- attributes, checksum = binary[5, 5].unpack(VERSION_1_HEADER)
- payload = binary[10, size] # 10 = sizeof(length) + sizeof(magic) + sizeof(attrs) + sizeof(checksum)
- # TODO interpret attributes
- Kafka::Message.new(payload, magic, checksum)
+ while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER
+ message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER)
+ break if bytes_processed + message_size + 4 > data.length # message is truncated
- else
- raise "Unsupported Kafka message version: magic number #{magic}"
+ case magic
+ when 0
+ # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ...
+ # | | | |
+ # | message_size |magic| checksum | payload ...
+ payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum)
+ checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift
+ payload = data[bytes_processed + 9, payload_size]
+ messages << Kafka::Message.new(payload, magic, checksum)
+
+ when 1
+ # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ...
+ # | | | | |
+ # | size |magic|attrs| checksum | payload ...
+ payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum)
+ attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER)
+ payload = data[bytes_processed + 10, payload_size]
+
+ case attributes & COMPRESSION_CODEC_MASK
+ when 0 # a single uncompressed message
+ messages << Kafka::Message.new(payload, magic, checksum)
+ when 1 # a gzip-compressed message set -- parse recursively
+ uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
+ message_set = parse_from(uncompressed)
+ raise 'malformed compressed message' if message_set.size != uncompressed.size
+ messages.concat(message_set.messages)
+ else
+ # https://cwiki.apache.org/confluence/display/KAFKA/Compression
+ # claims that 2 is for Snappy compression, but Kafka's Scala client
+ # implementation doesn't seem to support it yet, so I don't have
+ # a reference implementation to test against.
+ raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
+ end
+
+ else
+ raise "Unsupported Kafka message version: magic number #{magic}"
+ end
+
+ bytes_processed += message_size + 4 # 4 = sizeof(message_size)
end
+
+ MessageSet.new(bytes_processed, messages)
end
end
+
+ # Encapsulates a list of Kafka messages (as Kafka::Message objects in the
+ # +messages+ attribute) and their total serialized size in bytes (the +size+
+ # attribute).
+ class MessageSet < Struct.new(:size, :messages); end
end
View
33 spec/consumer_spec.rb
@@ -91,39 +91,6 @@
@consumer.send_consume_request.should eql(true)
end
- it "should parse a message set from bytes" do
- bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
- message = @consumer.parse_message_set_from(bytes).first
- message.payload.should eql("ale")
- message.checksum.should eql(1120192889)
- message.magic.should eql(0)
- message.valid?.should eql(true)
- end
-
- it "should skip an incomplete message at the end of the response" do
- bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
- # incomplete message
- bytes += [8].pack("N")
- messages = @consumer.parse_message_set_from(bytes)
- messages.size.should eql(1)
- end
-
- it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
- bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
- # incomplete message because payload is missing
- bytes += [8].pack("N") + [0].pack("C") + [1120192889].pack("N")
- messages = @consumer.parse_message_set_from(bytes)
- messages.size.should eql(1)
- end
-
- it "should read empty messages correctly" do
- # empty message
- bytes = [5].pack("N") + [0].pack("C") + [0].pack("N") + ""
- messages = @consumer.parse_message_set_from(bytes)
- messages.size.should eql(1)
- messages.first.payload.should eql("")
- end
-
it "should consume messages" do
@consumer.should_receive(:send_consume_request).and_return(true)
@consumer.should_receive(:read_data_response).and_return("")
View
72 spec/message_spec.rb
@@ -56,30 +56,92 @@
@message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum
@message.valid?.should eql(false)
end
+ end
+ describe "parsing" do
it "should parse a version-0 message from bytes" do
- bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
- message = Kafka::Message.parse_from(bytes)
+ bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
+ message = Kafka::Message.parse_from(bytes).messages.first
message.valid?.should eql(true)
message.magic.should eql(0)
message.checksum.should eql(1120192889)
message.payload.should eql("ale")
end
it "should parse a version-1 message from bytes" do
- bytes = [14, 1, 0, 755095536].pack('NCCN') + 'martin'
- message = Kafka::Message.parse_from(bytes)
+ bytes = [12, 1, 0, 755095536, 'martin'].pack('NCCNa*')
+ message = Kafka::Message.parse_from(bytes).messages.first
message.should be_valid
message.magic.should == 1
message.checksum.should == 755095536
message.payload.should == 'martin'
end
it "should raise an error if the magic number is not recognised" do
- bytes = [14, 2, 0, 755095536].pack('NCCN') + 'martin' # 2 = some future format that's not yet invented
+ bytes = [12, 2, 0, 755095536, 'martin'].pack('NCCNa*') # 2 = some future format that's not yet invented
lambda {
Kafka::Message.parse_from(bytes)
}.should raise_error(RuntimeError, /Unsupported Kafka message version/)
end
+
+ it "should skip an incomplete message at the end of the response" do
+ bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
+ bytes += [8].pack('N') # incomplete message (only length, rest is truncated)
+ message_set = Message.parse_from(bytes)
+ message_set.messages.size.should == 1
+ message_set.size.should == 12 # bytes consumed
+ end
+
+ it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
+ bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
+ bytes += [8, 0, 1120192889].pack('NCN') # incomplete message (payload is missing)
+ message_set = Message.parse_from(bytes)
+ message_set.messages.size.should == 1
+ message_set.size.should == 12 # bytes consumed
+ end
+
+ it "should read empty messages correctly" do
+ # empty message
+ bytes = [5, 0, 0, ''].pack('NCNa*')
+ messages = Message.parse_from(bytes).messages
+ messages.size.should == 1
+ messages.first.payload.should == ''
+ end
+
+ it "should parse a gzip-compressed message" do
+ compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
+ bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
+ message = Message.parse_from(bytes).messages.first
+ message.should be_valid
+ message.payload.should == 'abracadabra'
+ end
+
+ it "should recursively parse nested compressed messages" do
+ uncompressed = [17, 1, 0, 401275319, 'abracadabra'].pack('NCCNa*')
+ uncompressed << [12, 1, 0, 2666930069, 'foobar'].pack('NCCNa*')
+ compressed_io = StringIO.new('')
+ Zlib::GzipWriter.new(compressed_io).tap{|gzip| gzip << uncompressed; gzip.close }
+ compressed = compressed_io.string
+ bytes = [compressed.size + 6, 1, 1, Zlib.crc32(compressed), compressed].pack('NCCNa*')
+ messages = Message.parse_from(bytes).messages
+ messages.map(&:payload).should == ['abracadabra', 'foobar']
+ messages.map(&:valid?).should == [true, true]
+ end
+
+ it "should support a mixture of compressed and uncompressed messages" do
+ compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
+ bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
+ bytes << [11, 1, 0, 907060870, 'hello'].pack('NCCNa*')
+ messages = Message.parse_from(bytes).messages
+ messages.map(&:payload).should == ['abracadabra', 'hello']
+ messages.map(&:valid?).should == [true, true]
+ end
+
+ it "should raise an error if the compression codec is not supported" do
+ bytes = [6, 1, 2, 0, ''].pack('NCCNa*') # 2 = Snappy codec
+ lambda {
+ Kafka::Message.parse_from(bytes)
+ }.should raise_error(RuntimeError, /Unsupported Kafka compression codec/)
+ end
end
end
View
5 spec/producer_spec.rb
@@ -65,11 +65,12 @@
message = Kafka::Message.new("ümlaut")
encoded = @producer.encode(message)
data = [encoded.size].pack("N") + encoded
+ message = Kafka::Message.parse_from(data).messages.first
if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
- ic.iconv(Kafka::Message.parse_from(data).payload).should eql("ümlaut")
+ ic.iconv(message.payload).should eql("ümlaut")
else
- Kafka::Message.parse_from(data).payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
+ message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
end
end
end

0 comments on commit 9cc1220

Please sign in to comment.