Skip to content

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
aemadrid committed Mar 26, 2015
1 parent bae1de3 commit 253d5f0
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .rspec
@@ -0,0 +1,3 @@
--colour
--order rand
--require spec_helper
2 changes: 1 addition & 1 deletion Gemfile
@@ -1,6 +1,6 @@
source 'https://rubygems.org'

gem 'nsq-ruby', git: 'https://github.com/aemadrid/nsq-ruby.git', ref: '4fe4079'
gem 'nsq-ruby', git: 'https://github.com/aemadrid/nsq-ruby.git', ref: 'eb74b53eec'

# Specify your gem's dependencies in tiki.gemspec
gemspec
1 change: 1 addition & 0 deletions lib/tiki/torch.rb
@@ -1,5 +1,6 @@
require 'pathname'
require 'set'
require 'thread_safe'
require 'tiki/torch/version'
require 'tiki/torch/core_ext'

Expand Down
21 changes: 14 additions & 7 deletions lib/tiki/torch/consumer.rb
Expand Up @@ -48,16 +48,17 @@ def topic(name = nil)
if name.nil?
@topic || name.to_s.underscore
else
raise "Invalid topic name [#{name}]" unless valid_topic_or_channel_name? name
@topic = name
new_name = "#{Torch.config.topic_prefix}#{name}"
raise "Invalid topic name [#{name}]" unless valid_topic_name? new_name
@topic = new_name
end
end

def channel(name = nil)
if name.nil?
@channel || 'events'
else
raise "Invalid channel name [#{name}]" unless valid_topic_or_channel_name? name
raise "Invalid channel name [#{name}]" unless valid_topic_name? name
@channel = name
end
end
Expand Down Expand Up @@ -223,11 +224,17 @@ def process(event)

private

TOPIC_CHANNEL_NAME_RX = /^[\.a-zA-Z0-9_-]+(#ephemeral)?$/
TOPIC_NAME_RX = /^[\.a-zA-Z0-9_-]+$/
CHANNEL_NAME_RX = /^[\.a-zA-Z0-9_-]+(#ephemeral)?$/

def valid_topic_or_channel_name?(name)
return false if name.size < 1 || name.size > 64
!!name.match(TOPIC_CHANNEL_NAME_RX)
def valid_topic_name?(name)
return false if name.size < 1 || name.size > 32
!!name.match(TOPIC_NAME_RX)
end

def valid_channel_name?(name)
return false if name.size < 1 || name.size > 32
!!name.match(CHANNEL_NAME_RX)
end

end
Expand Down
2 changes: 2 additions & 0 deletions lib/tiki/torch/event.rb
Expand Up @@ -25,6 +25,8 @@ def message_id
properties[:message_id]
end

alias :id :message_id

def finish
debug "Finishing ##{message_id} ..."
res = message.finish
Expand Down
2 changes: 2 additions & 0 deletions lib/tiki/torch/logging.rb
Expand Up @@ -92,5 +92,7 @@ def logger
@logger ||= Logger.new(STDOUT).tap { |x| x.level = Logger::INFO }
end

logger

end
end
18 changes: 14 additions & 4 deletions lib/tiki/torch/publisher.rb
Expand Up @@ -17,7 +17,8 @@ def initialize
def publish(topic_name, payload = {}, properties = {}, code = Torch.config.transcoder_code)
properties = Torch.config.default_message_properties.merge properties.dup
encoded = Torch::Transcoder.encode payload, properties, code
get_or_set(topic_name).write encoded
full_name = full_topic_name topic_name
get_or_set(full_name).write encoded
end

def stop
Expand All @@ -35,11 +36,12 @@ def stopped?

private

def get_or_set(name)
key = name.to_s.underscore
@mutex.synchronize do
def get_or_set(key)
res = @mutex.synchronize do
get(key) || set(key)
end
debug_var :res, res
res
end

def get(key)
Expand All @@ -50,6 +52,14 @@ def set(key)
@producers[key] = ::Nsq::Producer.new Torch.config.producer_connection_options(key)
end

def full_topic_name(name)
prefix = Torch.config.topic_prefix
new_name = name.to_s
return new_name if new_name.to_s.start_with? prefix

"#{prefix}#{new_name}"
end

end

extend self
Expand Down
12 changes: 8 additions & 4 deletions spec/integration/multiple_consumer_spec.rb
@@ -1,13 +1,17 @@
describe 'simple consumers', integration: true do
let(:consumer) { SimpleConsumer }
let(:consumers) { [MultipleFirstConsumer, MultipleSecondConsumer] }

it 'receives multiple messages' do
(1..4).each { |nr| Tiki::Torch.publish consumer.topic, "a#{nr}" }
max = 4
max.times { |nr| Tiki::Torch.publish consumers.first.topic, "a#{nr + 1}" }
sleep 1

# Get all messages
expect($messages.payloads).to eq %w{ a1 a2 a3 a4 }
expect($messages.payloads.sort).to eq %w{ a1 a1 a2 a2 a3 a3 a4 a4 }
# All different threads
expect($messages.thread_ids.uniq.size).to eq 4
expect($messages.thread_ids.uniq.size).to eq $messages.size
# For one consumer
expect($messages.consumer_count(consumers.first.name)).to eq max
expect($messages.consumer_count(consumers.last.name)).to eq max
end
end
16 changes: 16 additions & 0 deletions spec/integration/simple_consumer_spec.rb
@@ -0,0 +1,16 @@
describe 'simple consumers', integration: true do
let(:consumer) { SimpleConsumer }

it 'receives multiple messages' do
max = 4
max.times { |nr| Tiki::Torch.publish consumer.topic, "a#{nr + 1}" }
sleep 1

# Got all messages
expect($messages.payloads).to eq %w{ a1 a2 a3 a4 }
# For one consumer
expect($messages.consumer_count(consumer.name)).to eq max
# All different threads
expect($messages.thread_ids.uniq.size).to eq $messages.size
end
end
25 changes: 25 additions & 0 deletions spec/spec_helper.rb
@@ -0,0 +1,25 @@
require 'rubygems'
require 'bundler'

ENV['APP_NAME'] = 'Alice'

Bundler.require(:default, :development, :test)

require 'tiki_torch'

require 'support/helpers'
require 'support/consumers'

RSpec.configure do |config|
config.mock_with :rspec do |mocks|
mocks.verify_partial_doubles = true
end

config.before(:each, :integration => true) do
$messages = TestingHelpers::Messages.new
setup_torch
end
config.after(:each, :integration => true) do
take_down_torch
end
end
43 changes: 43 additions & 0 deletions spec/support/consumers.rb
@@ -0,0 +1,43 @@
class SimpleConsumer < Tiki::Torch::Consumer

topic 'test.single.events'
channel 'events'
max_in_flight 1

def process
debug "message : #{message.local_methods}"
debug "message : #{message.inspect}"
$messages.add self.class.name, event
sleep payload[:sleep_time] if payload.is_a?(Hash) && payload[:sleep_time]
end

end

class MultipleFirstConsumer < Tiki::Torch::Consumer

topic 'test.multiple.events'
channel 'first'
max_in_flight 1

def process
$messages.add self.class.name, event
sleep payload[:sleep_time] if payload.is_a?(Hash) && payload[:sleep_time]
end

end

class MultipleSecondConsumer < Tiki::Torch::Consumer

topic 'test.multiple.events'
channel 'second'
max_in_flight 1

def process
$messages.add self.class.name, event
sleep payload[:sleep_time] if payload.is_a?(Hash) && payload[:sleep_time]
end

end

Nsq::Message
Nsq::Connection
65 changes: 65 additions & 0 deletions spec/support/helpers.rb
@@ -0,0 +1,65 @@
module TestingHelpers
class Messages

Message = Struct.new :consumer, :id, :payload, :properties, :thread_id

attr_reader :all

def initialize
@all = ThreadSafe::Array.new
end

def size
all.size
end

def add(class_name, event)
all << Message.new(class_name, event.message_id, event.payload, event.properties, Thread.current.object_id)
end

def add_payload(class_name, payload)
all << Message.new(class_name, nil, payload, nil, Thread.current.object_id)
end

def message_ids
all.map { |x| x.message_id }
end

def payloads
all.map { |x| x.payload }
end

def properties
all.map { |x| x.properties }
end

def thread_ids
all.map { |x| x.thread_id }
end

def consumer_count(class_name)
all.count { |x| x.consumer == class_name }
end

end
end

def setup_torch
Tiki::Torch.configure do |c|
if (lkd = ENV['NSLOOKUPD_ADDRESS'])
c.nsqlookupd = lkd
elsif (nsd = ENV['NSQD_ADDRESS'])
c.nsqd = nsd
else
c.nsqd = 'localhost:4150'
end
c.colorized = false
end
# Tiki::Torch.logger.level = Logger::DEBUG
Tiki::Torch.start_polling
end

def take_down_torch
Tiki::Torch.shutdown
end

3 changes: 2 additions & 1 deletion tiki_torch.gemspec
Expand Up @@ -19,12 +19,13 @@ Gem::Specification.new do |spec|
spec.require_paths = ['lib']

spec.add_dependency 'nsq-ruby'
spec.add_dependency 'thread_safe'
spec.add_dependency 'concurrent-ruby'
spec.add_dependency 'multi_json'
spec.add_dependency 'pry'
spec.add_dependency 'colorize'
spec.add_dependency 'lifeguard'

spec.add_development_dependency 'rspec'
spec.add_development_dependency 'rspec', '~> 3.0'
spec.add_development_dependency 'rake'
end

0 comments on commit 253d5f0

Please sign in to comment.