From 9df61f1c89be44a8dcc7ea6a05ce2eda7f2cd8e8 Mon Sep 17 00:00:00 2001 From: Martin Kleppmann Date: Wed, 8 Aug 2012 18:01:33 -0700 Subject: [PATCH] Add consumer support for transparent gzip compression The Scala Kafka producer implementation has support for gzip-compressing individual messages or sets of consecutive messages, and the compression is transparent to the Scala Kafka consumer implementation. It's very convenient (just a matter of setting compression.codec=1 in the producer config). The compression codec of a message is indicated in the bottom two bits of the 'attributes' byte of messages with magic == 1. This means it's possible to add transparent compression support to consumers without any configuration. This commit adds compression support to the Ruby consumer. Because a compressed message may actually contain more than one message inside it (this makes compression more effective by grouping lots of small messages into one big message), I had to move some of the parsing logic from Kafka::Consumer to Kafka::Message. --- lib/kafka/consumer.rb | 21 ++----------- lib/kafka/message.rb | 70 ++++++++++++++++++++++++++++++++--------- spec/consumer_spec.rb | 33 -------------------- spec/message_spec.rb | 72 ++++++++++++++++++++++++++++++++++++++++--- spec/producer_spec.rb | 5 +-- 5 files changed, 129 insertions(+), 72 deletions(-) diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index e6adb5e..c4221ea 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -49,8 +49,9 @@ def loop(&block) def consume self.offset ||= fetch_latest_offset send_consume_request - data = read_data_response - parse_message_set_from(data) + message_set = Kafka::Message.parse_from(read_data_response) + self.offset += message_set.size + message_set.messages rescue SocketError nil end @@ -94,21 +95,5 @@ def encode_request(request_type, topic, partition, offset, max_size) max_size = [max_size].pack("N") request_type + topic + partition + offset + max_size end - - def parse_message_set_from(data) - messages = [] - processed = 0 - length = data.length - 4 - while (processed <= length) do - message_size = data[processed, 4].unpack("N").shift + 4 - message_data = data[processed, message_size] - break unless message_data.size == message_size - messages << Kafka::Message.parse_from(message_data) - processed += message_size - end - self.offset += processed - messages - end - end end diff --git a/lib/kafka/message.rb b/lib/kafka/message.rb index 6774731..958a50f 100644 --- a/lib/kafka/message.rb +++ b/lib/kafka/message.rb @@ -36,6 +36,7 @@ class Message BASIC_MESSAGE_HEADER = 'NC'.freeze VERSION_0_HEADER = 'N'.freeze VERSION_1_HEADER = 'CN'.freeze + COMPRESSION_CODEC_MASK = 0x03 attr_accessor :magic, :checksum, :payload @@ -53,23 +54,64 @@ def valid? self.checksum == calculate_checksum end - def self.parse_from(binary) - size, magic = binary.unpack(BASIC_MESSAGE_HEADER) - case magic - when 0 - checksum = binary[5, 4].unpack(VERSION_0_HEADER).shift # 5 = sizeof(length) + sizeof(magic) - payload = binary[9, size] # 9 = sizeof(length) + sizeof(magic) + sizeof(checksum) - Kafka::Message.new(payload, magic, checksum) + # Takes a byte string containing one or more messages; returns a MessageSet + # with the messages parsed from the string, and the number of bytes + # consumed from the string. + def self.parse_from(data) + messages = [] + bytes_processed = 0 - when 1 - attributes, checksum = binary[5, 5].unpack(VERSION_1_HEADER) - payload = binary[10, size] # 10 = sizeof(length) + sizeof(magic) + sizeof(attrs) + sizeof(checksum) - # TODO interpret attributes - Kafka::Message.new(payload, magic, checksum) + while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER + message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER) + break if bytes_processed + message_size + 4 > data.length # message is truncated - else - raise "Unsupported Kafka message version: magic number #{magic}" + case magic + when 0 + # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ... + # | | | | + # | message_size |magic| checksum | payload ... + payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum) + checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift + payload = data[bytes_processed + 9, payload_size] + messages << Kafka::Message.new(payload, magic, checksum) + + when 1 + # | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ... + # | | | | | + # | size |magic|attrs| checksum | payload ... + payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum) + attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER) + payload = data[bytes_processed + 10, payload_size] + + case attributes & COMPRESSION_CODEC_MASK + when 0 # a single uncompressed message + messages << Kafka::Message.new(payload, magic, checksum) + when 1 # a gzip-compressed message set -- parse recursively + uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read + message_set = parse_from(uncompressed) + raise 'malformed compressed message' if message_set.size != uncompressed.size + messages.concat(message_set.messages) + else + # https://cwiki.apache.org/confluence/display/KAFKA/Compression + # claims that 2 is for Snappy compression, but Kafka's Scala client + # implementation doesn't seem to support it yet, so I don't have + # a reference implementation to test against. + raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}" + end + + else + raise "Unsupported Kafka message version: magic number #{magic}" + end + + bytes_processed += message_size + 4 # 4 = sizeof(message_size) end + + MessageSet.new(bytes_processed, messages) end end + + # Encapsulates a list of Kafka messages (as Kafka::Message objects in the + # +messages+ attribute) and their total serialized size in bytes (the +size+ + # attribute). + class MessageSet < Struct.new(:size, :messages); end end diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index cc41b25..1a5a1a8 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -91,39 +91,6 @@ @consumer.send_consume_request.should eql(true) end - it "should parse a message set from bytes" do - bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" - message = @consumer.parse_message_set_from(bytes).first - message.payload.should eql("ale") - message.checksum.should eql(1120192889) - message.magic.should eql(0) - message.valid?.should eql(true) - end - - it "should skip an incomplete message at the end of the response" do - bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" - # incomplete message - bytes += [8].pack("N") - messages = @consumer.parse_message_set_from(bytes) - messages.size.should eql(1) - end - - it "should skip an incomplete message at the end of the response which has the same length as an empty message" do - bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" - # incomplete message because payload is missing - bytes += [8].pack("N") + [0].pack("C") + [1120192889].pack("N") - messages = @consumer.parse_message_set_from(bytes) - messages.size.should eql(1) - end - - it "should read empty messages correctly" do - # empty message - bytes = [5].pack("N") + [0].pack("C") + [0].pack("N") + "" - messages = @consumer.parse_message_set_from(bytes) - messages.size.should eql(1) - messages.first.payload.should eql("") - end - it "should consume messages" do @consumer.should_receive(:send_consume_request).and_return(true) @consumer.should_receive(:read_data_response).and_return("") diff --git a/spec/message_spec.rb b/spec/message_spec.rb index 0075be2..8b6a043 100644 --- a/spec/message_spec.rb +++ b/spec/message_spec.rb @@ -56,10 +56,12 @@ @message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum @message.valid?.should eql(false) end + end + describe "parsing" do it "should parse a version-0 message from bytes" do - bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale" - message = Kafka::Message.parse_from(bytes) + bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*') + message = Kafka::Message.parse_from(bytes).messages.first message.valid?.should eql(true) message.magic.should eql(0) message.checksum.should eql(1120192889) @@ -67,8 +69,8 @@ end it "should parse a version-1 message from bytes" do - bytes = [14, 1, 0, 755095536].pack('NCCN') + 'martin' - message = Kafka::Message.parse_from(bytes) + bytes = [12, 1, 0, 755095536, 'martin'].pack('NCCNa*') + message = Kafka::Message.parse_from(bytes).messages.first message.should be_valid message.magic.should == 1 message.checksum.should == 755095536 @@ -76,10 +78,70 @@ end it "should raise an error if the magic number is not recognised" do - bytes = [14, 2, 0, 755095536].pack('NCCN') + 'martin' # 2 = some future format that's not yet invented + bytes = [12, 2, 0, 755095536, 'martin'].pack('NCCNa*') # 2 = some future format that's not yet invented lambda { Kafka::Message.parse_from(bytes) }.should raise_error(RuntimeError, /Unsupported Kafka message version/) end + + it "should skip an incomplete message at the end of the response" do + bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*') + bytes += [8].pack('N') # incomplete message (only length, rest is truncated) + message_set = Message.parse_from(bytes) + message_set.messages.size.should == 1 + message_set.size.should == 12 # bytes consumed + end + + it "should skip an incomplete message at the end of the response which has the same length as an empty message" do + bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*') + bytes += [8, 0, 1120192889].pack('NCN') # incomplete message (payload is missing) + message_set = Message.parse_from(bytes) + message_set.messages.size.should == 1 + message_set.size.should == 12 # bytes consumed + end + + it "should read empty messages correctly" do + # empty message + bytes = [5, 0, 0, ''].pack('NCNa*') + messages = Message.parse_from(bytes).messages + messages.size.should == 1 + messages.first.payload.should == '' + end + + it "should parse a gzip-compressed message" do + compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift + bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*') + message = Message.parse_from(bytes).messages.first + message.should be_valid + message.payload.should == 'abracadabra' + end + + it "should recursively parse nested compressed messages" do + uncompressed = [17, 1, 0, 401275319, 'abracadabra'].pack('NCCNa*') + uncompressed << [12, 1, 0, 2666930069, 'foobar'].pack('NCCNa*') + compressed_io = StringIO.new('') + Zlib::GzipWriter.new(compressed_io).tap{|gzip| gzip << uncompressed; gzip.close } + compressed = compressed_io.string + bytes = [compressed.size + 6, 1, 1, Zlib.crc32(compressed), compressed].pack('NCCNa*') + messages = Message.parse_from(bytes).messages + messages.map(&:payload).should == ['abracadabra', 'foobar'] + messages.map(&:valid?).should == [true, true] + end + + it "should support a mixture of compressed and uncompressed messages" do + compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift + bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*') + bytes << [11, 1, 0, 907060870, 'hello'].pack('NCCNa*') + messages = Message.parse_from(bytes).messages + messages.map(&:payload).should == ['abracadabra', 'hello'] + messages.map(&:valid?).should == [true, true] + end + + it "should raise an error if the compression codec is not supported" do + bytes = [6, 1, 2, 0, ''].pack('NCCNa*') # 2 = Snappy codec + lambda { + Kafka::Message.parse_from(bytes) + }.should raise_error(RuntimeError, /Unsupported Kafka compression codec/) + end end end diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index 1b07097..6e0e597 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -65,11 +65,12 @@ message = Kafka::Message.new("ümlaut") encoded = @producer.encode(message) data = [encoded.size].pack("N") + encoded + message = Kafka::Message.parse_from(data).messages.first if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding ic = Iconv.new('UTF-8//IGNORE', 'UTF-8') - ic.iconv(Kafka::Message.parse_from(data).payload).should eql("ümlaut") + ic.iconv(message.payload).should eql("ümlaut") else - Kafka::Message.parse_from(data).payload.force_encoding(Encoding::UTF_8).should eql("ümlaut") + message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut") end end end