Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Add consumer support for transparent gzip compression #11

Merged
merged 1 commit into from over 1 year ago

2 participants

Martin Kleppmann Alejandro Crosa
Martin Kleppmann
ept commented August 08, 2012

The Scala Kafka producer implementation has support for gzip-compressing
individual messages or sets of consecutive messages, and the compression
is transparent to the Scala Kafka consumer implementation. It's very
convenient (just a matter of setting compression.codec=1 in the producer
config).

The compression codec of a message is indicated in the bottom two bits
of the 'attributes' byte of messages with magic == 1. This means it's
possible to add transparent compression support to consumers without any
configuration.

This commit adds compression support to the Ruby consumer. Because a
compressed message may actually contain more than one message inside it
(this makes compression more effective by grouping lots of small
messages into one big message), I had to move some of the parsing logic
from Kafka::Consumer to Kafka::Message.

Martin Kleppmann Add consumer support for transparent gzip compression
The Scala Kafka producer implementation has support for gzip-compressing
individual messages or sets of consecutive messages, and the compression
is transparent to the Scala Kafka consumer implementation. It's very
convenient (just a matter of setting compression.codec=1 in the producer
config).

The compression codec of a message is indicated in the bottom two bits
of the 'attributes' byte of messages with magic == 1. This means it's
possible to add transparent compression support to consumers without any
configuration.

This commit adds compression support to the Ruby consumer. Because a
compressed message may actually contain more than one message inside it
(this makes compression more effective by grouping lots of small
messages into one big message), I had to move some of the parsing logic
from Kafka::Consumer to Kafka::Message.
9df61f1
Alejandro Crosa acrosa merged commit 9cc1220 into from August 08, 2012
Alejandro Crosa acrosa closed this August 08, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 1 unique commit by 1 author.

Aug 08, 2012
Martin Kleppmann Add consumer support for transparent gzip compression
The Scala Kafka producer implementation has support for gzip-compressing
individual messages or sets of consecutive messages, and the compression
is transparent to the Scala Kafka consumer implementation. It's very
convenient (just a matter of setting compression.codec=1 in the producer
config).

The compression codec of a message is indicated in the bottom two bits
of the 'attributes' byte of messages with magic == 1. This means it's
possible to add transparent compression support to consumers without any
configuration.

This commit adds compression support to the Ruby consumer. Because a
compressed message may actually contain more than one message inside it
(this makes compression more effective by grouping lots of small
messages into one big message), I had to move some of the parsing logic
from Kafka::Consumer to Kafka::Message.
9df61f1
This page is out of date. Refresh to see the latest.
21  lib/kafka/consumer.rb
@@ -49,8 +49,9 @@ def loop(&block)
49 49
     def consume
50 50
       self.offset ||= fetch_latest_offset
51 51
       send_consume_request
52  
-      data = read_data_response
53  
-      parse_message_set_from(data)
  52
+      message_set = Kafka::Message.parse_from(read_data_response)
  53
+      self.offset += message_set.size
  54
+      message_set.messages
54 55
     rescue SocketError
55 56
       nil
56 57
     end
@@ -94,21 +95,5 @@ def encode_request(request_type, topic, partition, offset, max_size)
94 95
       max_size     = [max_size].pack("N")
95 96
       request_type + topic + partition + offset + max_size
96 97
     end
97  
-
98  
-    def parse_message_set_from(data)
99  
-      messages = []
100  
-      processed = 0
101  
-      length = data.length - 4
102  
-      while (processed <= length) do
103  
-        message_size = data[processed, 4].unpack("N").shift + 4
104  
-        message_data = data[processed, message_size]
105  
-        break unless message_data.size == message_size
106  
-        messages << Kafka::Message.parse_from(message_data)
107  
-        processed += message_size
108  
-      end
109  
-      self.offset += processed
110  
-      messages
111  
-    end
112  
-
113 98
   end
114 99
 end
70  lib/kafka/message.rb
@@ -36,6 +36,7 @@ class Message
36 36
     BASIC_MESSAGE_HEADER = 'NC'.freeze
37 37
     VERSION_0_HEADER = 'N'.freeze
38 38
     VERSION_1_HEADER = 'CN'.freeze
  39
+    COMPRESSION_CODEC_MASK = 0x03
39 40
 
40 41
     attr_accessor :magic, :checksum, :payload
41 42
 
@@ -53,23 +54,64 @@ def valid?
53 54
       self.checksum == calculate_checksum
54 55
     end
55 56
 
56  
-    def self.parse_from(binary)
57  
-      size, magic = binary.unpack(BASIC_MESSAGE_HEADER)
58  
-      case magic
59  
-      when 0
60  
-        checksum = binary[5, 4].unpack(VERSION_0_HEADER).shift # 5 = sizeof(length) + sizeof(magic)
61  
-        payload = binary[9, size] # 9 = sizeof(length) + sizeof(magic) + sizeof(checksum)
62  
-        Kafka::Message.new(payload, magic, checksum)
  57
+    # Takes a byte string containing one or more messages; returns a MessageSet
  58
+    # with the messages parsed from the string, and the number of bytes
  59
+    # consumed from the string.
  60
+    def self.parse_from(data)
  61
+      messages = []
  62
+      bytes_processed = 0
63 63
 
64  
-      when 1
65  
-        attributes, checksum = binary[5, 5].unpack(VERSION_1_HEADER)
66  
-        payload = binary[10, size] # 10 = sizeof(length) + sizeof(magic) + sizeof(attrs) + sizeof(checksum)
67  
-        # TODO interpret attributes
68  
-        Kafka::Message.new(payload, magic, checksum)
  64
+      while bytes_processed <= data.length - 5 # 5 = size of BASIC_MESSAGE_HEADER
  65
+        message_size, magic = data[bytes_processed, 5].unpack(BASIC_MESSAGE_HEADER)
  66
+        break if bytes_processed + message_size + 4 > data.length # message is truncated
69 67
 
70  
-      else
71  
-        raise "Unsupported Kafka message version: magic number #{magic}"
  68
+        case magic
  69
+        when 0
  70
+          # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9      ...
  71
+          # |                       |     |                       |
  72
+          # |      message_size     |magic|        checksum       | payload ...
  73
+          payload_size = message_size - 5 # 5 = sizeof(magic) + sizeof(checksum)
  74
+          checksum = data[bytes_processed + 5, 4].unpack(VERSION_0_HEADER).shift
  75
+          payload  = data[bytes_processed + 9, payload_size]
  76
+          messages << Kafka::Message.new(payload, magic, checksum)
  77
+
  78
+        when 1
  79
+          # |  0  |  1  |  2  |  3  |  4  |  5  |  6  |  7  |  8  |  9  | 10      ...
  80
+          # |                       |     |     |                       |
  81
+          # |         size          |magic|attrs|        checksum       | payload ...
  82
+          payload_size = message_size - 6 # 6 = sizeof(magic) + sizeof(attrs) + sizeof(checksum)
  83
+          attributes, checksum = data[bytes_processed + 5, 5].unpack(VERSION_1_HEADER)
  84
+          payload = data[bytes_processed + 10, payload_size]
  85
+
  86
+          case attributes & COMPRESSION_CODEC_MASK
  87
+          when 0 # a single uncompressed message
  88
+            messages << Kafka::Message.new(payload, magic, checksum)
  89
+          when 1 # a gzip-compressed message set -- parse recursively
  90
+            uncompressed = Zlib::GzipReader.new(StringIO.new(payload)).read
  91
+            message_set = parse_from(uncompressed)
  92
+            raise 'malformed compressed message' if message_set.size != uncompressed.size
  93
+            messages.concat(message_set.messages)
  94
+          else
  95
+            # https://cwiki.apache.org/confluence/display/KAFKA/Compression
  96
+            # claims that 2 is for Snappy compression, but Kafka's Scala client
  97
+            # implementation doesn't seem to support it yet, so I don't have
  98
+            # a reference implementation to test against.
  99
+            raise "Unsupported Kafka compression codec: #{attributes & COMPRESSION_CODEC_MASK}"
  100
+          end
  101
+
  102
+        else
  103
+          raise "Unsupported Kafka message version: magic number #{magic}"
  104
+        end
  105
+
  106
+        bytes_processed += message_size + 4 # 4 = sizeof(message_size)
72 107
       end
  108
+
  109
+      MessageSet.new(bytes_processed, messages)
73 110
     end
74 111
   end
  112
+
  113
+  # Encapsulates a list of Kafka messages (as Kafka::Message objects in the
  114
+  # +messages+ attribute) and their total serialized size in bytes (the +size+
  115
+  # attribute).
  116
+  class MessageSet < Struct.new(:size, :messages); end
75 117
 end
33  spec/consumer_spec.rb
@@ -91,39 +91,6 @@
91 91
       @consumer.send_consume_request.should eql(true)
92 92
     end
93 93
 
94  
-    it "should parse a message set from bytes" do
95  
-      bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
96  
-      message = @consumer.parse_message_set_from(bytes).first
97  
-      message.payload.should eql("ale")
98  
-      message.checksum.should eql(1120192889)
99  
-      message.magic.should eql(0)
100  
-      message.valid?.should eql(true)
101  
-    end
102  
-
103  
-    it "should skip an incomplete message at the end of the response" do
104  
-      bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
105  
-      # incomplete message
106  
-      bytes += [8].pack("N")
107  
-      messages = @consumer.parse_message_set_from(bytes)
108  
-      messages.size.should eql(1)
109  
-    end
110  
-
111  
-    it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
112  
-      bytes = [8].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
113  
-      # incomplete message because payload is missing
114  
-      bytes += [8].pack("N") + [0].pack("C") + [1120192889].pack("N")
115  
-      messages = @consumer.parse_message_set_from(bytes)
116  
-      messages.size.should eql(1)
117  
-    end
118  
-
119  
-    it "should read empty messages correctly" do
120  
-      # empty message
121  
-      bytes = [5].pack("N") + [0].pack("C") + [0].pack("N") + ""
122  
-      messages = @consumer.parse_message_set_from(bytes)
123  
-      messages.size.should eql(1)
124  
-      messages.first.payload.should eql("")
125  
-    end
126  
-
127 94
     it "should consume messages" do
128 95
       @consumer.should_receive(:send_consume_request).and_return(true)
129 96
       @consumer.should_receive(:read_data_response).and_return("")
72  spec/message_spec.rb
@@ -56,10 +56,12 @@
56 56
       @message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum
57 57
       @message.valid?.should eql(false)
58 58
     end
  59
+  end
59 60
 
  61
+  describe "parsing" do
60 62
     it "should parse a version-0 message from bytes" do
61  
-      bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
62  
-      message = Kafka::Message.parse_from(bytes)
  63
+      bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
  64
+      message = Kafka::Message.parse_from(bytes).messages.first
63 65
       message.valid?.should eql(true)
64 66
       message.magic.should eql(0)
65 67
       message.checksum.should eql(1120192889)
@@ -67,8 +69,8 @@
67 69
     end
68 70
 
69 71
     it "should parse a version-1 message from bytes" do
70  
-      bytes = [14, 1, 0, 755095536].pack('NCCN') + 'martin'
71  
-      message = Kafka::Message.parse_from(bytes)
  72
+      bytes = [12, 1, 0, 755095536, 'martin'].pack('NCCNa*')
  73
+      message = Kafka::Message.parse_from(bytes).messages.first
72 74
       message.should be_valid
73 75
       message.magic.should == 1
74 76
       message.checksum.should == 755095536
@@ -76,10 +78,70 @@
76 78
     end
77 79
 
78 80
     it "should raise an error if the magic number is not recognised" do
79  
-      bytes = [14, 2, 0, 755095536].pack('NCCN') + 'martin' # 2 = some future format that's not yet invented
  81
+      bytes = [12, 2, 0, 755095536, 'martin'].pack('NCCNa*') # 2 = some future format that's not yet invented
80 82
       lambda {
81 83
         Kafka::Message.parse_from(bytes)
82 84
       }.should raise_error(RuntimeError, /Unsupported Kafka message version/)
83 85
     end
  86
+
  87
+    it "should skip an incomplete message at the end of the response" do
  88
+      bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
  89
+      bytes += [8].pack('N') # incomplete message (only length, rest is truncated)
  90
+      message_set = Message.parse_from(bytes)
  91
+      message_set.messages.size.should == 1
  92
+      message_set.size.should == 12 # bytes consumed
  93
+    end
  94
+
  95
+    it "should skip an incomplete message at the end of the response which has the same length as an empty message" do
  96
+      bytes = [8, 0, 1120192889, 'ale'].pack('NCNa*')
  97
+      bytes += [8, 0, 1120192889].pack('NCN') # incomplete message (payload is missing)
  98
+      message_set = Message.parse_from(bytes)
  99
+      message_set.messages.size.should == 1
  100
+      message_set.size.should == 12 # bytes consumed
  101
+    end
  102
+
  103
+    it "should read empty messages correctly" do
  104
+      # empty message
  105
+      bytes = [5, 0, 0, ''].pack('NCNa*')
  106
+      messages = Message.parse_from(bytes).messages
  107
+      messages.size.should == 1
  108
+      messages.first.payload.should == ''
  109
+    end
  110
+
  111
+    it "should parse a gzip-compressed message" do
  112
+      compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
  113
+      bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
  114
+      message = Message.parse_from(bytes).messages.first
  115
+      message.should be_valid
  116
+      message.payload.should == 'abracadabra'
  117
+    end
  118
+
  119
+    it "should recursively parse nested compressed messages" do
  120
+      uncompressed = [17, 1, 0, 401275319, 'abracadabra'].pack('NCCNa*')
  121
+      uncompressed << [12, 1, 0, 2666930069, 'foobar'].pack('NCCNa*')
  122
+      compressed_io = StringIO.new('')
  123
+      Zlib::GzipWriter.new(compressed_io).tap{|gzip| gzip << uncompressed; gzip.close }
  124
+      compressed = compressed_io.string
  125
+      bytes = [compressed.size + 6, 1, 1, Zlib.crc32(compressed), compressed].pack('NCCNa*')
  126
+      messages = Message.parse_from(bytes).messages
  127
+      messages.map(&:payload).should == ['abracadabra', 'foobar']
  128
+      messages.map(&:valid?).should == [true, true]
  129
+    end
  130
+
  131
+    it "should support a mixture of compressed and uncompressed messages" do
  132
+      compressed = 'H4sIAG0LI1AAA2NgYBBkZBB/9XN7YlJRYnJiCogCAH9lueQVAAAA'.unpack('m*').shift
  133
+      bytes = [45, 1, 1, 1303540914, compressed].pack('NCCNa*')
  134
+      bytes << [11, 1, 0, 907060870, 'hello'].pack('NCCNa*')
  135
+      messages = Message.parse_from(bytes).messages
  136
+      messages.map(&:payload).should == ['abracadabra', 'hello']
  137
+      messages.map(&:valid?).should == [true, true]
  138
+    end
  139
+
  140
+    it "should raise an error if the compression codec is not supported" do
  141
+      bytes = [6, 1, 2, 0, ''].pack('NCCNa*') # 2 = Snappy codec
  142
+      lambda {
  143
+        Kafka::Message.parse_from(bytes)
  144
+      }.should raise_error(RuntimeError, /Unsupported Kafka compression codec/)
  145
+    end
84 146
   end
85 147
 end
5  spec/producer_spec.rb
@@ -65,11 +65,12 @@
65 65
         message = Kafka::Message.new("ümlaut")
66 66
         encoded = @producer.encode(message)
67 67
         data = [encoded.size].pack("N") + encoded
  68
+        message = Kafka::Message.parse_from(data).messages.first
68 69
         if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
69 70
           ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
70  
-          ic.iconv(Kafka::Message.parse_from(data).payload).should eql("ümlaut")
  71
+          ic.iconv(message.payload).should eql("ümlaut")
71 72
         else
72  
-          Kafka::Message.parse_from(data).payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
  73
+          message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
73 74
         end
74 75
       end
75 76
     end
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.