From 253d5f00fd7465db9801122a984d6e234b3891d4 Mon Sep 17 00:00:00 2001 From: Adrian Madrid Date: Thu, 26 Mar 2015 13:29:52 -0600 Subject: [PATCH] ... --- .rspec | 3 + Gemfile | 2 +- lib/tiki/torch.rb | 1 + lib/tiki/torch/consumer.rb | 21 ++++--- lib/tiki/torch/event.rb | 2 + lib/tiki/torch/logging.rb | 2 + lib/tiki/torch/publisher.rb | 18 ++++-- spec/integration/multiple_consumer_spec.rb | 12 ++-- spec/integration/simple_consumer_spec.rb | 16 ++++++ spec/spec_helper.rb | 25 +++++++++ spec/support/consumers.rb | 43 ++++++++++++++ spec/support/helpers.rb | 65 ++++++++++++++++++++++ tiki_torch.gemspec | 3 +- 13 files changed, 196 insertions(+), 17 deletions(-) diff --git a/.rspec b/.rspec index e69de29..efc5a2f 100644 --- a/.rspec +++ b/.rspec @@ -0,0 +1,3 @@ +--colour +--order rand +--require spec_helper \ No newline at end of file diff --git a/Gemfile b/Gemfile index 7396c4b..df0310d 100644 --- a/Gemfile +++ b/Gemfile @@ -1,6 +1,6 @@ source 'https://rubygems.org' -gem 'nsq-ruby', git: 'https://github.com/aemadrid/nsq-ruby.git', ref: '4fe4079' +gem 'nsq-ruby', git: 'https://github.com/aemadrid/nsq-ruby.git', ref: 'eb74b53eec' # Specify your gem's dependencies in tiki.gemspec gemspec diff --git a/lib/tiki/torch.rb b/lib/tiki/torch.rb index c18a291..622a4f9 100644 --- a/lib/tiki/torch.rb +++ b/lib/tiki/torch.rb @@ -1,5 +1,6 @@ require 'pathname' require 'set' +require 'thread_safe' require 'tiki/torch/version' require 'tiki/torch/core_ext' diff --git a/lib/tiki/torch/consumer.rb b/lib/tiki/torch/consumer.rb index 028479f..2b82098 100644 --- a/lib/tiki/torch/consumer.rb +++ b/lib/tiki/torch/consumer.rb @@ -48,8 +48,9 @@ def topic(name = nil) if name.nil? @topic || name.to_s.underscore else - raise "Invalid topic name [#{name}]" unless valid_topic_or_channel_name? name - @topic = name + new_name = "#{Torch.config.topic_prefix}#{name}" + raise "Invalid topic name [#{name}]" unless valid_topic_name? new_name + @topic = new_name end end @@ -57,7 +58,7 @@ def channel(name = nil) if name.nil? @channel || 'events' else - raise "Invalid channel name [#{name}]" unless valid_topic_or_channel_name? name + raise "Invalid channel name [#{name}]" unless valid_topic_name? name @channel = name end end @@ -223,11 +224,17 @@ def process(event) private - TOPIC_CHANNEL_NAME_RX = /^[\.a-zA-Z0-9_-]+(#ephemeral)?$/ + TOPIC_NAME_RX = /^[\.a-zA-Z0-9_-]+$/ + CHANNEL_NAME_RX = /^[\.a-zA-Z0-9_-]+(#ephemeral)?$/ - def valid_topic_or_channel_name?(name) - return false if name.size < 1 || name.size > 64 - !!name.match(TOPIC_CHANNEL_NAME_RX) + def valid_topic_name?(name) + return false if name.size < 1 || name.size > 32 + !!name.match(TOPIC_NAME_RX) + end + + def valid_channel_name?(name) + return false if name.size < 1 || name.size > 32 + !!name.match(CHANNEL_NAME_RX) end end diff --git a/lib/tiki/torch/event.rb b/lib/tiki/torch/event.rb index af8fc56..b75edc2 100644 --- a/lib/tiki/torch/event.rb +++ b/lib/tiki/torch/event.rb @@ -25,6 +25,8 @@ def message_id properties[:message_id] end + alias :id :message_id + def finish debug "Finishing ##{message_id} ..." res = message.finish diff --git a/lib/tiki/torch/logging.rb b/lib/tiki/torch/logging.rb index 8ce0ead..a794801 100644 --- a/lib/tiki/torch/logging.rb +++ b/lib/tiki/torch/logging.rb @@ -92,5 +92,7 @@ def logger @logger ||= Logger.new(STDOUT).tap { |x| x.level = Logger::INFO } end + logger + end end \ No newline at end of file diff --git a/lib/tiki/torch/publisher.rb b/lib/tiki/torch/publisher.rb index 1506c63..cd053eb 100644 --- a/lib/tiki/torch/publisher.rb +++ b/lib/tiki/torch/publisher.rb @@ -17,7 +17,8 @@ def initialize def publish(topic_name, payload = {}, properties = {}, code = Torch.config.transcoder_code) properties = Torch.config.default_message_properties.merge properties.dup encoded = Torch::Transcoder.encode payload, properties, code - get_or_set(topic_name).write encoded + full_name = full_topic_name topic_name + get_or_set(full_name).write encoded end def stop @@ -35,11 +36,12 @@ def stopped? private - def get_or_set(name) - key = name.to_s.underscore - @mutex.synchronize do + def get_or_set(key) + res = @mutex.synchronize do get(key) || set(key) end + debug_var :res, res + res end def get(key) @@ -50,6 +52,14 @@ def set(key) @producers[key] = ::Nsq::Producer.new Torch.config.producer_connection_options(key) end + def full_topic_name(name) + prefix = Torch.config.topic_prefix + new_name = name.to_s + return new_name if new_name.to_s.start_with? prefix + + "#{prefix}#{new_name}" + end + end extend self diff --git a/spec/integration/multiple_consumer_spec.rb b/spec/integration/multiple_consumer_spec.rb index 2ae4465..008122c 100644 --- a/spec/integration/multiple_consumer_spec.rb +++ b/spec/integration/multiple_consumer_spec.rb @@ -1,13 +1,17 @@ describe 'simple consumers', integration: true do - let(:consumer) { SimpleConsumer } + let(:consumers) { [MultipleFirstConsumer, MultipleSecondConsumer] } it 'receives multiple messages' do - (1..4).each { |nr| Tiki::Torch.publish consumer.topic, "a#{nr}" } + max = 4 + max.times { |nr| Tiki::Torch.publish consumers.first.topic, "a#{nr + 1}" } sleep 1 # Get all messages - expect($messages.payloads).to eq %w{ a1 a2 a3 a4 } + expect($messages.payloads.sort).to eq %w{ a1 a1 a2 a2 a3 a3 a4 a4 } # All different threads - expect($messages.thread_ids.uniq.size).to eq 4 + expect($messages.thread_ids.uniq.size).to eq $messages.size + # For one consumer + expect($messages.consumer_count(consumers.first.name)).to eq max + expect($messages.consumer_count(consumers.last.name)).to eq max end end diff --git a/spec/integration/simple_consumer_spec.rb b/spec/integration/simple_consumer_spec.rb index e69de29..ed8196a 100644 --- a/spec/integration/simple_consumer_spec.rb +++ b/spec/integration/simple_consumer_spec.rb @@ -0,0 +1,16 @@ +describe 'simple consumers', integration: true do + let(:consumer) { SimpleConsumer } + + it 'receives multiple messages' do + max = 4 + max.times { |nr| Tiki::Torch.publish consumer.topic, "a#{nr + 1}" } + sleep 1 + + # Got all messages + expect($messages.payloads).to eq %w{ a1 a2 a3 a4 } + # For one consumer + expect($messages.consumer_count(consumer.name)).to eq max + # All different threads + expect($messages.thread_ids.uniq.size).to eq $messages.size + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index e69de29..02a7f43 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -0,0 +1,25 @@ +require 'rubygems' +require 'bundler' + +ENV['APP_NAME'] = 'Alice' + +Bundler.require(:default, :development, :test) + +require 'tiki_torch' + +require 'support/helpers' +require 'support/consumers' + +RSpec.configure do |config| + config.mock_with :rspec do |mocks| + mocks.verify_partial_doubles = true + end + + config.before(:each, :integration => true) do + $messages = TestingHelpers::Messages.new + setup_torch + end + config.after(:each, :integration => true) do + take_down_torch + end +end \ No newline at end of file diff --git a/spec/support/consumers.rb b/spec/support/consumers.rb index e69de29..4384b92 100644 --- a/spec/support/consumers.rb +++ b/spec/support/consumers.rb @@ -0,0 +1,43 @@ +class SimpleConsumer < Tiki::Torch::Consumer + + topic 'test.single.events' + channel 'events' + max_in_flight 1 + + def process + debug "message : #{message.local_methods}" + debug "message : #{message.inspect}" + $messages.add self.class.name, event + sleep payload[:sleep_time] if payload.is_a?(Hash) && payload[:sleep_time] + end + +end + +class MultipleFirstConsumer < Tiki::Torch::Consumer + + topic 'test.multiple.events' + channel 'first' + max_in_flight 1 + + def process + $messages.add self.class.name, event + sleep payload[:sleep_time] if payload.is_a?(Hash) && payload[:sleep_time] + end + +end + +class MultipleSecondConsumer < Tiki::Torch::Consumer + + topic 'test.multiple.events' + channel 'second' + max_in_flight 1 + + def process + $messages.add self.class.name, event + sleep payload[:sleep_time] if payload.is_a?(Hash) && payload[:sleep_time] + end + +end + +Nsq::Message +Nsq::Connection \ No newline at end of file diff --git a/spec/support/helpers.rb b/spec/support/helpers.rb index e69de29..fff8e43 100644 --- a/spec/support/helpers.rb +++ b/spec/support/helpers.rb @@ -0,0 +1,65 @@ +module TestingHelpers + class Messages + + Message = Struct.new :consumer, :id, :payload, :properties, :thread_id + + attr_reader :all + + def initialize + @all = ThreadSafe::Array.new + end + + def size + all.size + end + + def add(class_name, event) + all << Message.new(class_name, event.message_id, event.payload, event.properties, Thread.current.object_id) + end + + def add_payload(class_name, payload) + all << Message.new(class_name, nil, payload, nil, Thread.current.object_id) + end + + def message_ids + all.map { |x| x.message_id } + end + + def payloads + all.map { |x| x.payload } + end + + def properties + all.map { |x| x.properties } + end + + def thread_ids + all.map { |x| x.thread_id } + end + + def consumer_count(class_name) + all.count { |x| x.consumer == class_name } + end + + end +end + +def setup_torch + Tiki::Torch.configure do |c| + if (lkd = ENV['NSLOOKUPD_ADDRESS']) + c.nsqlookupd = lkd + elsif (nsd = ENV['NSQD_ADDRESS']) + c.nsqd = nsd + else + c.nsqd = 'localhost:4150' + end + c.colorized = false + end + # Tiki::Torch.logger.level = Logger::DEBUG + Tiki::Torch.start_polling +end + +def take_down_torch + Tiki::Torch.shutdown +end + diff --git a/tiki_torch.gemspec b/tiki_torch.gemspec index da1df44..3e0489f 100644 --- a/tiki_torch.gemspec +++ b/tiki_torch.gemspec @@ -19,12 +19,13 @@ Gem::Specification.new do |spec| spec.require_paths = ['lib'] spec.add_dependency 'nsq-ruby' + spec.add_dependency 'thread_safe' spec.add_dependency 'concurrent-ruby' spec.add_dependency 'multi_json' spec.add_dependency 'pry' spec.add_dependency 'colorize' spec.add_dependency 'lifeguard' - spec.add_development_dependency 'rspec' + spec.add_development_dependency 'rspec', '~> 3.0' spec.add_development_dependency 'rake' end