Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consumer support for transparent gzip compression #11

Merged
merged 1 commit into from Aug 9, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 3 additions & 18 deletions lib/kafka/consumer.rb
Expand Up @@ -49,8 +49,9 @@ def loop(&block)
def consume
self.offset ||= fetch_latest_offset
send_consume_request
data = read_data_response
parse_message_set_from(data)
message_set = Kafka::Message.parse_from(read_data_response)
self.offset += message_set.size
message_set.messages
rescue SocketError
nil
end
Expand Down Expand Up @@ -94,21 +95,5 @@ def encode_request(request_type, topic, partition, offset, max_size)
max_size = [max_size].pack("N")
request_type + topic + partition + offset + max_size
end

def parse_message_set_from(data)
messages = []
processed = 0
length = data.length - 4
while (processed <= length) do
message_size = data[processed, 4].unpack("N").shift + 4
message_data = data[processed, message_size]
break unless message_data.size == message_size
messages << Kafka::Message.parse_from(message_data)
processed += message_size
end
self.offset += processed
messages
end

end
end
70 changes: 56 additions & 14 deletions lib/kafka/message.rb
Expand Up @@ -36,6 +36,7 @@ class Message
BASIC_MESSAGE_HEADER = 'NC'.freeze
VERSION_0_HEADER = 'N'.freeze
VERSION_1_HEADER = 'CN'.freeze
COMPRESSION_CODEC_MASK = 0x03

attr_accessor :magic, :checksum, :payload

Expand All @@ -53,23 +54,64 @@ def valid?
self.checksum == calculate_checksum
end

def self.parse_from(binary)
size, magic = binary.unpack(BASIC_MESSAGE_HEADER)
case magic
when 0
checksum = binary[5, 4].unpack(VERSION_0_HEADER).shift # 5 = sizeof(length) + sizeof(magic)
payload = binary[9, size] # 9 = sizeof(length) + sizeof(magic) + sizeof(checksum)
Kafka::Message.new(payload, magic, checksum)
# Takes a byte string containing one or more messages; returns a MessageSet
# with the messages parsed from the string, and the number of bytes
# consumed from the string.
def self.parse_from(data)
messages = []
bytes_processed = 0

when 1
attributes, checksum = binary[5, 5].unpack(VERSION_1_HEADER)
payload = binary[10, size] # 10 = sizeof(length) + sizeof(magic) + sizeof(attrs) + sizeof(checksum)
# TODO interpret attributes
Kafka::Message.new(payload, magic, checksum)
while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER
message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER)
break if bytes_processed + message_size + 4 > data.length # message is truncated

else
raise "Unsupported Kafka message version: magic number #{magic}"
case magic
when 0
# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 ...
# | | | |
# | message_size |magic| checksum | payload ...
payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum)
checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift
payload = data[bytes_processed + 9, payload_size]
messages << Kafka::Message.new(payload, magic, checksum)

when 1
# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 ...
# | | | | |
# | size |magic|attrs| checksum | payload ...
payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum)
attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER)
payload = data[bytes_processed + 10, payload_size]

case attributes & COMPRESSION_CODEC_MASK
when 0 # a single uncompressed message
messages << Kafka::Message.new(payload, magic, checksum)
when 1 # a gzip-compressed message set -- parse recursively
uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
message_set = parse_from(uncompressed)
raise 'malformed compressed message' if message_set.size != uncompressed.size
messages.concat(message_set.messages)
else
# https://cwiki.apache.org/confluence/display/KAFKA/Compression
# claims that 2 is for Snappy compression, but Kafka's Scala client
# implementation doesn't seem to support it yet, so I don't have
# a reference implementation to test against.
raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
end

else
raise "Unsupported Kafka message version: magic number #{magic}"
end

bytes_processed += message_size + 4 # 4 = sizeof(message_size)
end

MessageSet.new(bytes_processed, messages)
end
end

# Encapsulates a list of Kafka messages (as Kafka::Message objects in the
# +messages+ attribute) and their total serialized size in bytes (the +size+
# attribute).
class MessageSet < Struct.new(:size, :messages); end
end
33 changes: 0 additions & 33 deletions spec/consumer_spec.rb
Expand Up @@ -91,39 +91,6 @@
@consumer.send_consume_request.should eql(true)
end

it "should parse a message set from bytes" do
bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
message = @consumer.parse_message_set_from(bytes).first
message.payload.should eql("ale")
message.checksum.should eql(1120192889)
message.magic.should eql(0)
message.valid?.should eql(true)
end

it "should skip an incomplete message at the end of the response" do
bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
# incomplete message
bytes += [8].pack("N")
messages = @consumer.parse_message_set_from(bytes)
messages.size.should eql(1)
end

it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
# incomplete message because payload is missing
bytes += [8].pack("N") + [0].pack("C") + [1120192889].pack("N")
messages = @consumer.parse_message_set_from(bytes)
messages.size.should eql(1)
end

it "should read empty messages correctly" do
# empty message
bytes = [5].pack("N") + [0].pack("C") + [0].pack("N") + ""
messages = @consumer.parse_message_set_from(bytes)
messages.size.should eql(1)
messages.first.payload.should eql("")
end

it "should consume messages" do
@consumer.should_receive(:send_consume_request).and_return(true)
@consumer.should_receive(:read_data_response).and_return("")
Expand Down
72 changes: 67 additions & 5 deletions spec/message_spec.rb
Expand Up @@ -56,30 +56,92 @@
@message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum
@message.valid?.should eql(false)
end
end

describe "parsing" do
it "should parse a version-0 message from bytes" do
bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
message = Kafka::Message.parse_from(bytes)
bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
message = Kafka::Message.parse_from(bytes).messages.first
message.valid?.should eql(true)
message.magic.should eql(0)
message.checksum.should eql(1120192889)
message.payload.should eql("ale")
end

it "should parse a version-1 message from bytes" do
bytes = [14, 1, 0, 755095536].pack('NCCN') + 'martin'
message = Kafka::Message.parse_from(bytes)
bytes = [12, 1, 0, 755095536, 'martin'].pack('NCCNa*')
message = Kafka::Message.parse_from(bytes).messages.first
message.should be_valid
message.magic.should == 1
message.checksum.should == 755095536
message.payload.should == 'martin'
end

it "should raise an error if the magic number is not recognised" do
bytes = [14, 2, 0, 755095536].pack('NCCN') + 'martin' # 2 = some future format that's not yet invented
bytes = [12, 2, 0, 755095536, 'martin'].pack('NCCNa*') # 2 = some future format that's not yet invented
lambda {
Kafka::Message.parse_from(bytes)
}.should raise_error(RuntimeError, /Unsupported Kafka message version/)
end

it "should skip an incomplete message at the end of the response" do
bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
bytes += [8].pack('N') # incomplete message (only length, rest is truncated)
message_set = Message.parse_from(bytes)
message_set.messages.size.should == 1
message_set.size.should == 12 # bytes consumed
end

it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
bytes += [8, 0, 1120192889].pack('NCN') # incomplete message (payload is missing)
message_set = Message.parse_from(bytes)
message_set.messages.size.should == 1
message_set.size.should == 12 # bytes consumed
end

it "should read empty messages correctly" do
# empty message
bytes = [5, 0, 0, ''].pack('NCNa*')
messages = Message.parse_from(bytes).messages
messages.size.should == 1
messages.first.payload.should == ''
end

it "should parse a gzip-compressed message" do
compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
message = Message.parse_from(bytes).messages.first
message.should be_valid
message.payload.should == 'abracadabra'
end

it "should recursively parse nested compressed messages" do
uncompressed = [17, 1, 0, 401275319, 'abracadabra'].pack('NCCNa*')
uncompressed << [12, 1, 0, 2666930069, 'foobar'].pack('NCCNa*')
compressed_io = StringIO.new('')
Zlib::GzipWriter.new(compressed_io).tap{|gzip| gzip << uncompressed; gzip.close }
compressed = compressed_io.string
bytes = [compressed.size + 6, 1, 1, Zlib.crc32(compressed), compressed].pack('NCCNa*')
messages = Message.parse_from(bytes).messages
messages.map(&:payload).should == ['abracadabra', 'foobar']
messages.map(&:valid?).should == [true, true]
end

it "should support a mixture of compressed and uncompressed messages" do
compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
bytes << [11, 1, 0, 907060870, 'hello'].pack('NCCNa*')
messages = Message.parse_from(bytes).messages
messages.map(&:payload).should == ['abracadabra', 'hello']
messages.map(&:valid?).should == [true, true]
end

it "should raise an error if the compression codec is not supported" do
bytes = [6, 1, 2, 0, ''].pack('NCCNa*') # 2 = Snappy codec
lambda {
Kafka::Message.parse_from(bytes)
}.should raise_error(RuntimeError, /Unsupported Kafka compression codec/)
end
end
end
5 changes: 3 additions & 2 deletions spec/producer_spec.rb
Expand Up @@ -65,11 +65,12 @@
message = Kafka::Message.new("ümlaut")
encoded = @producer.encode(message)
data = [encoded.size].pack("N") + encoded
message = Kafka::Message.parse_from(data).messages.first
if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
ic.iconv(Kafka::Message.parse_from(data).payload).should eql("ümlaut")
ic.iconv(message.payload).should eql("ümlaut")
else
Kafka::Message.parse_from(data).payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
end
end
end
Expand Down