From a4cd818ef00f338704bb5d80f2a0cf73d9ac5ac6 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko Date: Wed, 5 Feb 2014 07:32:24 +0000 Subject: [PATCH] Added consumer abstraction --- Gemfile | 1 + Gemfile.lock | 20 +++++++++++++------- lib/fluq/input/kafka.rb | 2 +- lib/fluq/kafka.rb | 5 +---- lib/fluq/kafka/consumer.rb | 13 +++++++++++++ 5 files changed, 29 insertions(+), 12 deletions(-) create mode 100644 lib/fluq/kafka/consumer.rb diff --git a/Gemfile b/Gemfile index 737cbea..c63f6fb 100644 --- a/Gemfile +++ b/Gemfile @@ -2,3 +2,4 @@ source "https://rubygems.org" gemspec gem "poseidon", git: "https://github.com/dim/poseidon.git" +gem "poseidon_cluster", git: "https://github.com/bsm/poseidon_cluster.git" diff --git a/Gemfile.lock b/Gemfile.lock index aa44ff0..2c196f2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,3 +1,11 @@ +GIT + remote: https://github.com/bsm/poseidon_cluster.git + revision: 540110f33f3176ba980444509844d075fa11434a + specs: + poseidon_cluster (0.1.0) + poseidon + zk + GIT remote: https://github.com/dim/poseidon.git revision: f853e8f07a6f2ff4a520d3f7e1dac030453a18ed @@ -26,7 +34,7 @@ GEM term-ansicolor thor diff-lcs (1.2.5) - docile (1.1.2) + docile (1.1.3) fluq (0.8.0) celluloid-io (~> 0.15.0) multi_json @@ -38,9 +46,6 @@ GEM multi_json (1.8.4) nio4r (1.0.0) nio4r (1.0.0-java) - poseidon_cluster (0.0.5) - poseidon - zk rake (10.1.1) rest-client (1.6.7) mime-types (>= 1.16) @@ -49,9 +54,9 @@ GEM rspec-expectations (~> 2.14.0) rspec-mocks (~> 2.14.0) rspec-core (2.14.7) - rspec-expectations (2.14.4) + rspec-expectations (2.14.5) diff-lcs (>= 1.1.3, < 2.0) - rspec-mocks (2.14.4) + rspec-mocks (2.14.5) simplecov (0.8.2) docile (~> 1.1.0) multi_json @@ -64,7 +69,7 @@ GEM thor (0.18.1) timed_lru (0.3.1) timers (1.1.0) - tins (0.13.1) + tins (0.13.2) yard (0.8.7.3) zk (1.9.3) logging (~> 1.7.2) @@ -83,6 +88,7 @@ DEPENDENCIES coveralls fluq-kafka! poseidon! + poseidon_cluster! rake rspec yard diff --git a/lib/fluq/input/kafka.rb b/lib/fluq/input/kafka.rb index d204851..4f823f3 100644 --- a/lib/fluq/input/kafka.rb +++ b/lib/fluq/input/kafka.rb @@ -64,7 +64,7 @@ def process(partition, messages) protected def consumer - @consumer ||= ::Poseidon::ConsumerGroup.new config[:group], config[:brokers], config[:zookeepers], config[:topic], + @consumer ||= ::FluQ::Kafka::Consumer.new config[:group], config[:brokers], config[:zookeepers], config[:topic], min_bytes: config[:min_bytes], max_bytes: config[:max_bytes], max_wait_ms: config[:max_wait_ms] diff --git a/lib/fluq/kafka.rb b/lib/fluq/kafka.rb index b52b88f..cd4c055 100644 --- a/lib/fluq/kafka.rb +++ b/lib/fluq/kafka.rb @@ -1,8 +1,5 @@ require 'fluq' require 'fluq/kafka/version' -require 'poseidon_cluster' +require 'fluq/kafka/consumer' require 'fluq/input/kafka' -class Poseidon::Connection - TCPSocket = Celluloid::IO::TCPSocket -end diff --git a/lib/fluq/kafka/consumer.rb b/lib/fluq/kafka/consumer.rb new file mode 100644 index 0000000..fe493a1 --- /dev/null +++ b/lib/fluq/kafka/consumer.rb @@ -0,0 +1,13 @@ +require 'poseidon_cluster' + +class Poseidon::Connection + TCPSocket = Celluloid::IO::TCPSocket +end + +module FluQ + module Kafka + class Consumer < Poseidon::ConsumerGroup + end + end +end +