Permalink
Browse files

automatic reconnect on socket errors

  • Loading branch information...
1 parent 2426543 commit 13dad52b64c303972680a02296c14a173691386e @tlossen tlossen committed Dec 6, 2011
Showing with 62 additions and 106 deletions.
  1. +1 −0 .gitignore
  2. +33 −35 Rakefile
  3. +0 −46 kafka-rb.gemspec
  4. +5 −2 lib/kafka.rb
  5. +4 −2 lib/kafka/consumer.rb
  6. +13 −12 lib/kafka/io.rb
  7. +6 −9 spec/io_spec.rb
View
@@ -1 +1,2 @@
pkg
+.rvmrc
View
@@ -19,38 +19,34 @@ require 'rubygems/specification'
require 'date'
require 'rspec/core/rake_task'
-GEM = 'kafka-rb'
-GEM_NAME = 'Kafka Client'
-GEM_VERSION = '0.0.5'
-AUTHORS = ['Alejandro Crosa']
-EMAIL = "alejandrocrosa@gmail.com"
-HOMEPAGE = "http://github.com/acrosa/kafka-rb"
-SUMMARY = "A Ruby client for the Kafka distributed publish/subscribe messaging service"
-DESCRIPTION = "kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service."
-
spec = Gem::Specification.new do |s|
- s.name = GEM
- s.version = GEM_VERSION
- s.platform = Gem::Platform::RUBY
- s.has_rdoc = true
+ s.name = %q{wooga-kafka-rb}
+ s.version = "0.0.7"
+
+ s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
+ s.authors = ["Alejandro Crosa", "Stefan Mees", "Tim Lossen"]
+ s.autorequire = %q{kafka-rb}
+ s.date = Time.now.strftime("%Y-%m-%d")
+ s.description = %q{kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service.}
s.extra_rdoc_files = ["LICENSE"]
- s.summary = SUMMARY
- s.description = DESCRIPTION
- s.authors = AUTHORS
- s.email = EMAIL
- s.homepage = HOMEPAGE
- s.add_development_dependency "rspec"
- s.require_path = 'lib'
- s.autorequire = GEM
- s.files = %w(LICENSE README.md Rakefile) + Dir.glob("{lib,tasks,spec}/**/*")
-end
+ s.files = ["LICENSE", "README.md", "Rakefile", "lib/kafka", "lib/kafka/batch.rb", "lib/kafka/consumer.rb", "lib/kafka/io.rb", "lib/kafka/message.rb", "lib/kafka/producer.rb", "lib/kafka/request_type.rb", "lib/kafka/error_codes.rb", "lib/kafka.rb", "spec/batch_spec.rb", "spec/consumer_spec.rb", "spec/io_spec.rb", "spec/kafka_spec.rb", "spec/message_spec.rb", "spec/producer_spec.rb", "spec/spec_helper.rb"]
+ s.homepage = %q{http://github.com/wooga/kafka-rb}
+ s.require_paths = ["lib"]
+ s.rubygems_version = %q{1.3.7}
+ s.summary = %q{A Ruby client for the Kafka distributed publish/subscribe messaging service}
-task :default => :spec
+ if s.respond_to? :specification_version then
+ current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
+ s.specification_version = 3
-desc "Run specs"
-RSpec::Core::RakeTask.new do |t|
- t.pattern = FileList['spec/**/*_spec.rb']
- t.rspec_opts = %w(-fs --color)
+ if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
+ s.add_development_dependency(%q<rspec>, [">= 0"])
+ else
+ s.add_dependency(%q<rspec>, [">= 0"])
+ end
+ else
+ s.add_dependency(%q<rspec>, [">= 0"])
+ end
end
Rake::GemPackageTask.new(spec) do |pkg|
@@ -62,15 +58,17 @@ task :install => [:package] do
sh %{sudo gem install pkg/#{GEM}-#{GEM_VERSION}}
end
-desc "create a gemspec file"
-task :make_spec do
- File.open("#{GEM}.gemspec", "w") do |file|
- file.puts spec.to_ruby
- end
-end
-
desc "Run all examples with RCov"
RSpec::Core::RakeTask.new(:rcov) do |t|
t.pattern = FileList['spec/**/*_spec.rb']
t.rcov = true
end
+
+desc "Run specs"
+RSpec::Core::RakeTask.new do |t|
+ t.pattern = FileList['spec/**/*_spec.rb']
+ t.rspec_opts = %w(-fs --color)
+end
+
+task :default => :spec
+
View
@@ -1,46 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-Gem::Specification.new do |s|
- s.name = %q{kafka-rb}
- s.version = "0.0.6"
-
- s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
- s.authors = ["Alejandro Crosa"]
- s.autorequire = %q{kafka-rb}
- s.date = %q{2011-01-13}
- s.description = %q{kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service.}
- s.email = %q{alejandrocrosa@gmail.com}
- s.extra_rdoc_files = ["LICENSE"]
- s.files = ["LICENSE", "README.md", "Rakefile", "lib/kafka", "lib/kafka/batch.rb", "lib/kafka/consumer.rb", "lib/kafka/io.rb", "lib/kafka/message.rb", "lib/kafka/producer.rb", "lib/kafka/request_type.rb", "lib/kafka/error_codes.rb", "lib/kafka.rb", "spec/batch_spec.rb", "spec/consumer_spec.rb", "spec/io_spec.rb", "spec/kafka_spec.rb", "spec/message_spec.rb", "spec/producer_spec.rb", "spec/spec_helper.rb"]
- s.homepage = %q{http://github.com/acrosa/kafka-rb}
- s.require_paths = ["lib"]
- s.rubygems_version = %q{1.3.7}
- s.summary = %q{A Ruby client for the Kafka distributed publish/subscribe messaging service}
-
- if s.respond_to? :specification_version then
- current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
- s.specification_version = 3
-
- if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
- s.add_development_dependency(%q<rspec>, [">= 0"])
- else
- s.add_dependency(%q<rspec>, [">= 0"])
- end
- else
- s.add_dependency(%q<rspec>, [">= 0"])
- end
-end
View
@@ -4,9 +4,9 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,4 +27,7 @@
require File.join(File.dirname(__FILE__), "kafka", "consumer")
module Kafka
+
+ class SocketError < RuntimeError; end
+
end
View
@@ -50,6 +50,8 @@ def consume
send_consume_request
data = read_data_response
parse_message_set_from(data)
+ rescue SocketError
+ nil
end
def fetch_earliest_offset
@@ -72,8 +74,8 @@ def send_consume_request
end
def read_data_response
- data_length = socket.read(4).unpack("N").shift
- data = socket.read(data_length)
+ data_length = read(4).unpack("N").shift
+ data = read(data_length)
# TODO: inspect error code instead of skipping it
data[2, data.length]
end
View
@@ -24,30 +24,31 @@ def connect(host, port)
end
def reconnect
+ self.socket = TCPSocket.new(self.host, self.port)
+ rescue
self.disconnect
- self.socket = self.connect(self.host, self.port)
+ raise
end
def disconnect
self.socket.close rescue nil
self.socket = nil
end
+ def read(length)
+ self.socket.read(length) || raise(SocketError, "no data")
+ rescue
+ self.disconnect
+ raise SocketError, "cannot read: #{$!.message}"
+ end
+
def write(data)
self.reconnect unless self.socket
self.socket.write(data)
- rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
- self.reconnect
- self.socket.write(data) # retry
+ rescue
+ self.disconnect
+ raise SocketError, "cannot write: #{$!.message}"
end
- def read(length)
- begin
- self.socket.read(length)
- rescue Errno::EAGAIN
- self.disconnect
- raise Errno::EAGAIN, "Timeout reading from the socket"
- end
- end
end
end
View
@@ -55,15 +55,15 @@ class IOTest
it "should read from a socket" do
length = 200
- @mocked_socket.should_receive(:read).with(length).and_return(nil)
+ @mocked_socket.should_receive(:read).with(length).and_return("foo")
@io.read(length)
end
it "should disconnect on a timeout when reading from a socket (to aviod protocol desync state)" do
length = 200
@mocked_socket.should_receive(:read).with(length).and_raise(Errno::EAGAIN)
@io.should_receive(:disconnect)
- lambda { @io.read(length) }.should raise_error(Errno::EAGAIN)
+ lambda { @io.read(length) }.should raise_error(Kafka::SocketError)
end
it "should disconnect" do
@@ -73,18 +73,15 @@ class IOTest
end
it "should reconnect" do
- @mocked_socket.should_receive(:close)
- @io.should_receive(:connect)
+ TCPSocket.should_receive(:new)
@io.reconnect
end
- it "should reconnect on a broken pipe error" do
+ it "should disconnect on a broken pipe error" do
[Errno::ECONNABORTED, Errno::EPIPE, Errno::ECONNRESET].each do |error|
- @mocked_socket.should_receive(:write).exactly(:twice).and_raise(error)
+ @mocked_socket.should_receive(:write).exactly(:once).and_raise(error)
@mocked_socket.should_receive(:close).exactly(:once).and_return(nil)
- lambda {
- @io.write("some data to send")
- }.should raise_error(error)
+ lambda { @io.write("some data to send") }.should raise_error(Kafka::SocketError)
end
end
end

0 comments on commit 13dad52

Please sign in to comment.