Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #12 from liamstewart/ls/multi_producer

Multiproduce support
  • Loading branch information...
commit e2b7528b17c0eae2bd88b4a5420f1ac7081f1012 2 parents 9cc1220 + 29e4b16
@acrosa authored
View
3  lib/kafka.rb
@@ -20,10 +20,13 @@
require File.join(File.dirname(__FILE__), "kafka", "io")
require File.join(File.dirname(__FILE__), "kafka", "request_type")
+require File.join(File.dirname(__FILE__), "kafka", "encoder")
require File.join(File.dirname(__FILE__), "kafka", "error_codes")
require File.join(File.dirname(__FILE__), "kafka", "batch")
require File.join(File.dirname(__FILE__), "kafka", "message")
+require File.join(File.dirname(__FILE__), "kafka", "multi_producer")
require File.join(File.dirname(__FILE__), "kafka", "producer")
+require File.join(File.dirname(__FILE__), "kafka", "producer_request")
require File.join(File.dirname(__FILE__), "kafka", "consumer")
module Kafka
View
4 lib/kafka/consumer.rb
@@ -29,8 +29,8 @@ class Consumer
def initialize(options = {})
self.topic = options[:topic] || "test"
self.partition = options[:partition] || 0
- self.host = options[:host] || "localhost"
- self.port = options[:port] || 9092
+ self.host = options[:host] || HOST
+ self.port = options[:port] || PORT
self.offset = options[:offset]
self.max_size = options[:max_size] || MAX_SIZE
self.polling = options[:polling] || DEFAULT_POLLING_INTERVAL
View
61 lib/kafka/encoder.rb
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Kafka
+ module Encoder
+ def self.message(message)
+ payload = \
+ if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
+ Iconv.new('UTF-8//IGNORE', 'UTF-8').iconv(message.payload.to_s)
+ else
+ message.payload.to_s.force_encoding(Encoding::ASCII_8BIT)
+ end
+ data = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + payload
+
+ [data.length].pack("N") + data
+ end
+
+ def self.message_block(topic, partition, messages)
+ message_set = Array(messages).collect { |message|
+ self.message(message)
+ }.join("")
+
+ topic = [topic.length].pack("n") + topic
+ partition = [partition].pack("N")
+ messages = [message_set.length].pack("N") + message_set
+
+ return topic + partition + messages
+ end
+
+ def self.produce(topic, partition, messages)
+ request = [RequestType::PRODUCE].pack("n")
+ data = request + self.message_block(topic, partition, messages)
+
+ return [data.length].pack("N") + data
+ end
+
+ def self.multiproduce(producer_requests)
+ part_set = Array(producer_requests).map { |req|
+ self.message_block(req.topic, req.partition, req.messages)
+ }
+
+ request = [RequestType::MULTIPRODUCE].pack("n")
+ parts = [part_set.length].pack("n") + part_set.join("")
+ data = request + parts
+
+ return [data.length].pack("N") + data
+ end
+ end
+end
View
3  lib/kafka/io.rb
@@ -16,6 +16,9 @@ module Kafka
module IO
attr_accessor :socket, :host, :port
+ HOST = "localhost"
+ PORT = 9092
+
def connect(host, port)
raise ArgumentError, "No host or port specified" unless host && port
self.host = host
View
2  lib/kafka/message.rb
@@ -42,7 +42,7 @@ class Message
def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil)
self.magic = magic
- self.payload = payload
+ self.payload = payload || ""
self.checksum = checksum || self.calculate_checksum
end
View
34 lib/kafka/multi_producer.rb
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+module Kafka
+ class MultiProducer
+ include Kafka::IO
+
+ def initialize(options={})
+ self.host = options[:host] || HOST
+ self.port = options[:port] || PORT
+ self.connect(self.host, self.port)
+ end
+
+ def send(topic, messages, options={})
+ partition = options[:partition] || 0
+ self.write(Encoder.produce(topic, partition, messages))
+ end
+
+ def multi_send(producer_requests)
+ self.write(Encoder.multiproduce(producer_requests))
+ end
+ end
+end
View
33 lib/kafka/producer.rb
@@ -17,45 +17,18 @@ class Producer
include Kafka::IO
- PRODUCE_REQUEST_ID = Kafka::RequestType::PRODUCE
-
attr_accessor :topic, :partition
def initialize(options = {})
self.topic = options[:topic] || "test"
self.partition = options[:partition] || 0
- self.host = options[:host] || "localhost"
- self.port = options[:port] || 9092
+ self.host = options[:host] || HOST
+ self.port = options[:port] || PORT
self.connect(self.host, self.port)
end
- def encode(message)
- if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
- ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
- [message.magic].pack("C") + [message.calculate_checksum].pack("N") + ic.iconv(message.payload.to_s)
- else
- [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s.force_encoding(Encoding::ASCII_8BIT)
- end
- end
-
- def encode_request(topic, partition, messages)
- message_set = Array(messages).collect { |message|
- encoded_message = self.encode(message)
- [encoded_message.length].pack("N") + encoded_message
- }.join("")
-
- request = [PRODUCE_REQUEST_ID].pack("n")
- topic = [topic.length].pack("n") + topic
- partition = [partition].pack("N")
- messages = [message_set.length].pack("N") + message_set
-
- data = request + topic + partition + messages
-
- return [data.length].pack("N") + data
- end
-
def send(messages)
- self.write(self.encode_request(self.topic, self.partition, messages))
+ self.write(Encoder.produce(self.topic, self.partition, messages))
end
def batch(&block)
View
26 lib/kafka/producer_request.rb
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Kafka
+ class ProducerRequest
+ attr_accessor :topic, :messages, :partition
+
+ def initialize(topic, messages, options={})
+ self.topic = topic
+ self.partition = options[:partition] || 0
+ self.messages = Array(messages)
+ end
+ end
+end
View
173 spec/encoder_spec.rb
@@ -0,0 +1,173 @@
+# encoding: utf-8
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Encoder do
+ def check_message(bytes, message)
+ encoded = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload
+ encoded = [encoded.length].pack("N") + encoded
+ bytes.should == encoded
+ end
+
+ describe "Message Encoding" do
+ it "should encode a message" do
+ message = Kafka::Message.new("alejandro")
+ check_message(described_class.message(message), message)
+ end
+
+ it "should encode an empty message" do
+ message = Kafka::Message.new
+ check_message(described_class.message(message), message)
+ end
+
+ it "should encode strings containing non-ASCII characters" do
+ message = Kafka::Message.new("ümlaut")
+ encoded = described_class.message(message)
+ message = Kafka::Message.parse_from(encoded).messages.first
+ if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
+ ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
+ ic.iconv(message.payload).should eql("ümlaut")
+ else
+ message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
+ end
+ end
+ end
+
+ describe "produce" do
+ it "should binary encode an empty request" do
+ bytes = described_class.produce("test", 0, [])
+ bytes.length.should eql(20)
+ bytes.should eql("\000\000\000\020\000\000\000\004test\000\000\000\000\000\000\000\000")
+ end
+
+ it "should binary encode a request with a message, using a specific wire format" do
+ message = Kafka::Message.new("ale")
+ bytes = described_class.produce("test", 3, message)
+ data_size = bytes[0, 4].unpack("N").shift
+ request_id = bytes[4, 2].unpack("n").shift
+ topic_length = bytes[6, 2].unpack("n").shift
+ topic = bytes[8, 4]
+ partition = bytes[12, 4].unpack("N").shift
+ messages_length = bytes[16, 4].unpack("N").shift
+ messages = bytes[20, messages_length]
+
+ bytes.length.should eql(32)
+ data_size.should eql(28)
+ request_id.should eql(0)
+ topic_length.should eql(4)
+ topic.should eql("test")
+ partition.should eql(3)
+ messages_length.should eql(12)
+ end
+ end
+
+ describe "multiproduce" do
+ it "encodes an empty request" do
+ bytes = described_class.multiproduce([])
+ bytes.length.should == 8
+ bytes.should == "\x00\x00\x00\x04\x00\x03\x00\x00"
+ end
+
+ it "encodes a request with a single topic/partition" do
+ message = Kafka::Message.new("ale")
+ bytes = described_class.multiproduce(Kafka::ProducerRequest.new("test", message))
+
+ req_length = bytes[0, 4].unpack("N").shift
+ req_type = bytes[4, 2].unpack("n").shift
+ tp_count = bytes[6, 2].unpack("n").shift
+
+ req_type.should == Kafka::RequestType::MULTIPRODUCE
+ tp_count.should == 1
+
+ topic_length = bytes[8, 2].unpack("n").shift
+ topic = bytes[10, 4]
+ partition = bytes[14, 4].unpack("N").shift
+ messages_length = bytes[18, 4].unpack("N").shift
+ messages_data = bytes[22, messages_length]
+
+ topic_length.should == 4
+ topic.should == "test"
+ partition.should == 0
+ messages_length.should == 12
+ check_message(messages_data, message)
+ end
+
+ it "encodes a request with a single topic/partition but multiple messages" do
+ messages = [Kafka::Message.new("ale"), Kafka::Message.new("beer")]
+ bytes = described_class.multiproduce(Kafka::ProducerRequest.new("test", messages))
+
+ req_length = bytes[0, 4].unpack("N").shift
+ req_type = bytes[4, 2].unpack("n").shift
+ tp_count = bytes[6, 2].unpack("n").shift
+
+ req_type.should == Kafka::RequestType::MULTIPRODUCE
+ tp_count.should == 1
+
+ topic_length = bytes[8, 2].unpack("n").shift
+ topic = bytes[10, 4]
+ partition = bytes[14, 4].unpack("N").shift
+ messages_length = bytes[18, 4].unpack("N").shift
+ messages_data = bytes[22, messages_length]
+
+ topic_length.should == 4
+ topic.should == "test"
+ partition.should == 0
+ messages_length.should == 25
+ check_message(messages_data[0, 12], messages[0])
+ check_message(messages_data[12, 13], messages[1])
+ end
+
+ it "encodes a request with multiple topic/partitions" do
+ messages = [Kafka::Message.new("ale"), Kafka::Message.new("beer")]
+ bytes = described_class.multiproduce([
+ Kafka::ProducerRequest.new("test", messages[0]),
+ Kafka::ProducerRequest.new("topic", messages[1], partition: 1),
+ ])
+
+ req_length = bytes[0, 4].unpack("N").shift
+ req_type = bytes[4, 2].unpack("n").shift
+ tp_count = bytes[6, 2].unpack("n").shift
+
+ req_type.should == Kafka::RequestType::MULTIPRODUCE
+ tp_count.should == 2
+
+ topic_length = bytes[8, 2].unpack("n").shift
+ topic = bytes[10, 4]
+ partition = bytes[14, 4].unpack("N").shift
+ messages_length = bytes[18, 4].unpack("N").shift
+ messages_data = bytes[22, 12]
+
+ topic_length.should == 4
+ topic.should == "test"
+ partition.should == 0
+ messages_length.should == 12
+ check_message(messages_data[0, 12], messages[0])
+
+ topic_length = bytes[34, 2].unpack("n").shift
+ topic = bytes[36, 5]
+ partition = bytes[41, 4].unpack("N").shift
+ messages_length = bytes[45, 4].unpack("N").shift
+ messages_data = bytes[49, 13]
+
+ topic_length.should == 5
+ topic.should == "topic"
+ partition.should == 1
+ messages_length.should == 13
+ check_message(messages_data[0, 13], messages[1])
+ end
+ end
+end
View
4 spec/message_spec.rb
@@ -40,6 +40,10 @@
@message.magic.should eql(1)
end
+ it "should have an empty payload by default" do
+ @message.payload.should == ""
+ end
+
it "should calculate the checksum (crc32 of a given message)" do
@message.payload = "ale"
@message.calculate_checksum.should eql(1120192889)
View
50 spec/multi_producer_spec.rb
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe MultiProducer do
+ before(:each) do
+ @mocked_socket = mock(TCPSocket)
+ TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
+ end
+
+ describe "Kafka Producer" do
+ it "should set default host and port if none is specified" do
+ subject.host.should eql("localhost")
+ subject.port.should eql(9092)
+ end
+
+ it "sends single messages" do
+ message = Kafka::Message.new("ale")
+ encoded = Kafka::Encoder.produce("test", 0, message)
+
+ subject.should_receive(:write).with(encoded).and_return(encoded.length)
+ subject.send("test", message, partition: 0).should == encoded.length
+ end
+
+ it "sends multiple messages" do
+ messages = [Kafka::Message.new("ale"), Kafka::Message.new("beer")]
+ reqs = [
+ Kafka::ProducerRequest.new("topic", messages[0]),
+ Kafka::ProducerRequest.new("topic", messages[1]),
+ ]
+ encoded = Encoder.multiproduce(reqs)
+
+ subject.should_receive(:write).with(encoded).and_return(encoded.length)
+ subject.multi_send(reqs).should == encoded.length
+ end
+ end
+end
View
38 spec/producer_request_spec.rb
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe ProducerRequest do
+ let(:message) { Kafka::Message.new }
+ let(:req) { described_class.new("topic", message) }
+
+ it "has a topic" do
+ req.topic = "topic"
+ end
+
+ it "has a set of messages" do
+ req.messages.should == [message]
+ end
+
+ it "has a default partition" do
+ req.partition.should == 0
+ end
+
+ it "can use a user-specified partition" do
+ req = described_class.new("topic", message, partition: 42)
+ req.partition.should == 42
+ end
+end
View
61 spec/producer_spec.rb
@@ -25,10 +25,6 @@
end
describe "Kafka Producer" do
- it "should have a PRODUCE_REQUEST_ID" do
- Producer::PRODUCE_REQUEST_ID.should eql(0)
- end
-
it "should have a topic and a partition" do
@producer.should respond_to(:topic)
@producer.should respond_to(:partition)
@@ -47,61 +43,6 @@
@producer.host.should eql("localhost")
@producer.port.should eql(9092)
end
-
- describe "Message Encoding" do
- it "should encode a message" do
- message = Kafka::Message.new("alejandro")
- full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload
- @producer.encode(message).should eql(full_message)
- end
-
- it "should encode an empty message" do
- message = Kafka::Message.new()
- full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
- @producer.encode(message).should eql(full_message)
- end
-
- it "should encode strings containing non-ASCII characters" do
- message = Kafka::Message.new("ümlaut")
- encoded = @producer.encode(message)
- data = [encoded.size].pack("N") + encoded
- message = Kafka::Message.parse_from(data).messages.first
- if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
- ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
- ic.iconv(message.payload).should eql("ümlaut")
- else
- message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
- end
- end
- end
-
- describe "Request Encoding" do
- it "should binary encode an empty request" do
- bytes = @producer.encode_request("test", 0, [])
- bytes.length.should eql(20)
- bytes.should eql("\000\000\000\020\000\000\000\004test\000\000\000\000\000\000\000\000")
- end
-
- it "should binary encode a request with a message, using a specific wire format" do
- message = Kafka::Message.new("ale")
- bytes = @producer.encode_request("test", 3, message)
- data_size = bytes[0, 4].unpack("N").shift
- request_id = bytes[4, 2].unpack("n").shift
- topic_length = bytes[6, 2].unpack("n").shift
- topic = bytes[8, 4]
- partition = bytes[12, 4].unpack("N").shift
- messages_length = bytes[16, 4].unpack("N").shift
- messages = bytes[20, messages_length]
-
- bytes.length.should eql(32)
- data_size.should eql(28)
- request_id.should eql(0)
- topic_length.should eql(4)
- topic.should eql("test")
- partition.should eql(3)
- messages_length.should eql(12)
- end
- end
end
it "should send messages" do
@@ -121,4 +62,4 @@
end
end
end
-end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.