Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Remove code that now lives in the Encode module.

  • Loading branch information...
commit 29e4b163b3bfd7cd85196864c20f075512d35a0d 1 parent 25201dd
Liam Stewart authored
Showing with 2 additions and 88 deletions.
  1. +1 −28 lib/kafka/producer.rb
  2. +1 −60 spec/producer_spec.rb
View
29 lib/kafka/producer.rb
@@ -17,8 +17,6 @@ class Producer
include Kafka::IO
- PRODUCE_REQUEST_ID = Kafka::RequestType::PRODUCE
-
attr_accessor :topic, :partition
def initialize(options = {})
@@ -29,33 +27,8 @@ def initialize(options = {})
self.connect(self.host, self.port)
end
- def encode(message)
- if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
- ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
- [message.magic].pack("C") + [message.calculate_checksum].pack("N") + ic.iconv(message.payload.to_s)
- else
- [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s.force_encoding(Encoding::ASCII_8BIT)
- end
- end
-
- def encode_request(topic, partition, messages)
- message_set = Array(messages).collect { |message|
- encoded_message = self.encode(message)
- [encoded_message.length].pack("N") + encoded_message
- }.join("")
-
- request = [PRODUCE_REQUEST_ID].pack("n")
- topic = [topic.length].pack("n") + topic
- partition = [partition].pack("N")
- messages = [message_set.length].pack("N") + message_set
-
- data = request + topic + partition + messages
-
- return [data.length].pack("N") + data
- end
-
def send(messages)
- self.write(self.encode_request(self.topic, self.partition, messages))
+ self.write(Encoder.produce(self.topic, self.partition, messages))
end
def batch(&block)
View
61 spec/producer_spec.rb
@@ -25,10 +25,6 @@
end
describe "Kafka Producer" do
- it "should have a PRODUCE_REQUEST_ID" do
- Producer::PRODUCE_REQUEST_ID.should eql(0)
- end
-
it "should have a topic and a partition" do
@producer.should respond_to(:topic)
@producer.should respond_to(:partition)
@@ -47,61 +43,6 @@
@producer.host.should eql("localhost")
@producer.port.should eql(9092)
end
-
- describe "Message Encoding" do
- it "should encode a message" do
- message = Kafka::Message.new("alejandro")
- full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload
- @producer.encode(message).should eql(full_message)
- end
-
- it "should encode an empty message" do
- message = Kafka::Message.new()
- full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
- @producer.encode(message).should eql(full_message)
- end
-
- it "should encode strings containing non-ASCII characters" do
- message = Kafka::Message.new("ümlaut")
- encoded = @producer.encode(message)
- data = [encoded.size].pack("N") + encoded
- message = Kafka::Message.parse_from(data).messages.first
- if RUBY_VERSION[0,3] == "1.8" # Use old iconv on Ruby 1.8 for encoding
- ic = Iconv.new('UTF-8//IGNORE', 'UTF-8')
- ic.iconv(message.payload).should eql("ümlaut")
- else
- message.payload.force_encoding(Encoding::UTF_8).should eql("ümlaut")
- end
- end
- end
-
- describe "Request Encoding" do
- it "should binary encode an empty request" do
- bytes = @producer.encode_request("test", 0, [])
- bytes.length.should eql(20)
- bytes.should eql("\000\000\000\020\000\000\000\004test\000\000\000\000\000\000\000\000")
- end
-
- it "should binary encode a request with a message, using a specific wire format" do
- message = Kafka::Message.new("ale")
- bytes = @producer.encode_request("test", 3, message)
- data_size = bytes[0, 4].unpack("N").shift
- request_id = bytes[4, 2].unpack("n").shift
- topic_length = bytes[6, 2].unpack("n").shift
- topic = bytes[8, 4]
- partition = bytes[12, 4].unpack("N").shift
- messages_length = bytes[16, 4].unpack("N").shift
- messages = bytes[20, messages_length]
-
- bytes.length.should eql(32)
- data_size.should eql(28)
- request_id.should eql(0)
- topic_length.should eql(4)
- topic.should eql("test")
- partition.should eql(3)
- messages_length.should eql(12)
- end
- end
end
it "should send messages" do
@@ -121,4 +62,4 @@
end
end
end
-end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.