Skip to content

Commit

Permalink
Merge pull request acrosa#11 from ept/compression
Browse files Browse the repository at this point in the history
Add consumer support for transparent gzip compression
  • Loading branch information
acrosa committed Aug 9, 2012
2 parents c00c82e + 9df61f1 commit 9cc1220
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 72 deletions.
21 changes: 3 additions & 18 deletions lib/kafka/consumer.rb
Expand Up @@ -49,8 +49,9 @@ def loop(&block)
def consume
self.offset ||= fetch_latest_offset
send_consume_request
data = read_data_response
parse_message_set_from(data)
message_set = Kafka::Message.parse_from(read_data_response)
self.offset += message_set.size
message_set.messages
rescue SocketError
nil
end
Expand Down Expand Up @@ -94,21 +95,5 @@ def encode_request(request_type, topic, partition, offset, max_size)
max_size = [max_size].pack("N")
request_type + topic + partition + offset + max_size
end

def parse_message_set_from(data)
messages = []
processed = 0
length = data.length - 4
while (processed <= length) do
message_size = data[processed, 4].unpack("N").shift + 4
message_data = data[processed, message_size]
break unless message_data.size == message_size
messages << Kafka::Message.parse_from(message_data)
processed += message_size
end
self.offset += processed
messages
end

end
end
70 changes: 56 additions & 14 deletions lib/kafka/message.rb
Expand Up @@ -36,6 +36,7 @@ class Message
BASIC_MESSAGE_HEADER = 'NC'.freeze
VERSION_0_HEADER = 'N'.freeze
VERSION_1_HEADER = 'CN'.freeze
COMPRESSION_CODEC_MASK = 0x03

attr_accessor :magic, :checksum, :payload

Expand All @@ -53,23 +54,64 @@ def valid?
self.checksum == calculate_checksum
end

def self.parse_from(binary)
size, magic = binary.unpack(BASIC_MESSAGE_HEADER)
case magic
when 0
checksum = binary[5, 4].unpack(VERSION_0_HEADER).shift # 5 = sizeof(length) + sizeof(magic)
payload = binary[9, size] # 9 = sizeof(length) + sizeof(magic) + sizeof(checksum)
Kafka::Message.new(payload, magic, checksum)
# Takes a byte string containing one or more messages; returns a MessageSet
# with the messages parsed from the string, and the number of bytes
# consumed from the string.
def self.parse_from(data)
messages = []
bytes_processed = 0

when 1
attributes, checksum = binary[5, 5].unpack(VERSION_1_HEADER)
payload = binary[10, size] # 10 = sizeof(length) + sizeof(magic) + sizeof(attrs) + sizeof(checksum)
# TODO interpret attributes
Kafka::Message.new(payload, magic, checksum)
while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER
message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER)
break if bytes_processed + message_size + 4 > data.length # message is truncated

else
raise "Unsupported Kafka message version: magic number #{magic}"
case magic
when 0
# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ...
# | | | |
# | message_size |magic| checksum | payload ...
payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum)
checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift
payload = data[bytes_processed + 9, payload_size]
messages << Kafka::Message.new(payload, magic, checksum)

when 1
# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ...
# | | | | |
# | size |magic|attrs| checksum | payload ...
payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum)
attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER)
payload = data[bytes_processed + 10, payload_size]

case attributes & COMPRESSION_CODEC_MASK
when 0 # a single uncompressed message
messages << Kafka::Message.new(payload, magic, checksum)
when 1 # a gzip-compressed message set -- parse recursively
uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
message_set = parse_from(uncompressed)
raise 'malformed compressed message' if message_set.size != uncompressed.size
messages.concat(message_set.messages)
else
# https://cwiki.apache.org/confluence/display/KAFKA/Compression
# claims that 2 is for Snappy compression, but Kafka's Scala client
# implementation doesn't seem to support it yet, so I don't have
# a reference implementation to test against.
raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
end

else
raise "Unsupported Kafka message version: magic number #{magic}"
end

bytes_processed += message_size + 4 # 4 = sizeof(message_size)
end

MessageSet.new(bytes_processed, messages)
end
end

# Encapsulates a list of Kafka messages (as Kafka::Message objects in the
# +messages+ attribute) and their total serialized size in bytes (the +size+
# attribute).
class MessageSet < Struct.new(:size, :messages); end
end
33 changes: 0 additions & 33 deletions spec/consumer_spec.rb
Expand Up @@ -91,39 +91,6 @@
@consumer.send_consume_request.should eql(true)
end

it "should parse a message set from bytes" do
bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
message = @consumer.parse_message_set_from(bytes).first
message.payload.should eql("ale")
message.checksum.should eql(1120192889)
message.magic.should eql(0)
message.valid?.should eql(true)
end

it "should skip an incomplete message at the end of the response" do
bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
# incomplete message
bytes += [8].pack("N")
messages = @consumer.parse_message_set_from(bytes)
messages.size.should eql(1)
end

it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
# incomplete message because payload is missing
bytes += [8].pack("N") + [0].pack("C") + [1120192889].pack("N")
messages = @consumer.parse_message_set_from(bytes)
messages.size.should eql(1)
end

it "should read empty messages correctly" do
# empty message
bytes = [5].pack("N") + [0].pack("C") + [0].pack("N") + ""
messages = @consumer.parse_message_set_from(bytes)
messages.size.should eql(1)
messages.first.payload.should eql("")
end

it "should consume messages" do
@consumer.should_receive(:send_consume_request).and_return(true)
@consumer.should_receive(:read_data_response).and_return("")
Expand Down
72 changes: 67 additions & 5 deletions spec/message_spec.rb
Expand Up @@ -56,30 +56,92 @@
@message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum
@message.valid?.should eql(false)
end
end

describe "parsing" do
it "should parse a version-0 message from bytes" do
bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
message = Kafka::Message.parse_from(bytes)
bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
message = Kafka::Message.parse_from(bytes).messages.first
message.valid?.should eql(true)
message.magic.should eql(0)
message.checksum.should eql(1120192889)
message.payload.should eql("ale")
end

it "should parse a version-1 message from bytes" do
bytes = [14, 1, 0, 755095536].pack('NCCN') + 'martin'
message = Kafka::Message.parse_from(bytes)
bytes = [12, 1, 0, 755095536, 'martin'].pack('NCCNa*')
message = Kafka::Message.parse_from(bytes).messages.first
message.should be_valid
message.magic.should == 1
message.checksum.should == 755095536
message.payload.should == 'martin'
end

it "should raise an error if the magic number is not recognised" do
bytes = [14, 2, 0, 755095536].pack('NCCN') + 'martin' # 2 = some future format that's not yet invented
bytes = [12, 2, 0, 755095536, 'martin'].pack('NCCNa*') # 2 = some future format that's not yet invented
lambda {
Kafka::Message.parse_from(bytes)
}.should raise_error(RuntimeError, /Unsupported Kafka message version/)
end

it "should skip an incomplete message at the end of the response" do
bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
bytes += [8].pack('N') # incomplete message (only length, rest is truncated)
message_set = Message.parse_from(bytes)
message_set.messages.size.should == 1
message_set.size.should == 12 # bytes consumed
end

it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
bytes += [8, 0, 1120192889].pack('NCN') # incomplete message (payload is missing)
message_set = Message.parse_from(bytes)
message_set.messages.size.should == 1
message_set.size.should == 12 # bytes consumed
end

it "should read empty messages correctly" do
# empty message
bytes = [5, 0, 0, ''].pack('NCNa*')
messages = Message.parse_from(bytes).messages
messages.size.should == 1
messages.first.payload.should == ''
end

it "should parse a gzip-compressed message" do
compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
message = Message.parse_from(bytes).messages.first
message.should be_valid
message.payload.should == 'abracadabra'
end

it "should recursively parse nested compressed messages" do
uncompressed = [17, 1, 0, 401275319, 'abracadabra'].pack('NCCNa*')
uncompressed << [12, 1, 0, 2666930069, 'foobar'].pack('NCCNa*')
compressed_io = StringIO.new('')
Zlib::GzipWriter.new(compressed_io).tap{|gzip| gzip << uncompressed; gzip.close }
compressed = compressed_io.string
bytes = [compressed.size + 6, 1, 1, Zlib.crc32(compressed), compressed].pack('NCCNa*')
messages = Message.parse_from(bytes).messages
messages.map(&:payload).should == ['abracadabra', 'foobar']
messages.map(&:valid?).should == [true, true]
end

it "should support a mixture of compressed and uncompressed messages" do
compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
bytes << [11, 1, 0, 907060870, 'hello'].pack('NCCNa*')
messages = Message.parse_from(bytes).messages
messages.map(&:payload).should == ['abracadabra', 'hello']
messages.map(&:valid?).should == [true, true]
end

it "should raise an error if the compression codec is not supported" do
bytes = [6, 1, 2, 0, ''].pack('NCCNa*') # 2 = Snappy codec
lambda {
Kafka::Message.parse_from(bytes)
}.should raise_error(RuntimeError, /Unsupported Kafka compression codec/)
end
end
end
5 changes: 3 additions & 2 deletions spec/producer_spec.rb
Expand Up @@ -65,11 +65,12 @@
message = Kafka::Message.new("ümlaut")
encoded = @producer.encode(message)
data = [encoded.size].pack("N") + encoded
message = Kafka::Message.parse_from(data).messages.first
if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
ic.iconv(Kafka::Message.parse_from(data).payload).should eql("ümlaut")
ic.iconv(message.payload).should eql("ümlaut")
else
Kafka::Message.parse_from(data).payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
end
end
end
Expand Down

0 comments on commit 9cc1220

Please sign in to comment.