Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

feature/sqs #1

Merged
merged 11 commits into from
This page is out of date. Refresh to see the latest.
View
2  .rspec
@@ -1,2 +1,2 @@
--color
---format documentation
+--format Fuubar
View
45 README.md
@@ -66,6 +66,51 @@ def update
end
```
+##Queues
+By default the queue is an in-memory queue meaning that messages are shared per-process
+and any unprocessed messages are saved to a file when shutdown occurs.
+
+The **Redis queue** requires some configuration in your gemfile to keep the runtime dependencies to a minimum
+```ruby
+# Gemfile
+gem 'redis'
+gem 'micro_q'
+
+# config/initializers/micro_q.rb
+require 'redis'
+require 'micro_q'
+
+# when MicroQ starts simply use the redis queue
+MicroQ.configure do |config|
+ config.queue = MicroQ::Queue::Redis
+end
+```
+
+The **Amazon SQS (coming soon) queue** require some extra configuration in your gemfile.
+```ruby
+# Gemfile
+gem 'aws-sdk'
+gem 'micro_q'
+
+# config/initializers/micro_q.rb
+require 'aws-sdk'
+require 'micro_q'
+
+# when MicroQ starts simply use the sqs queue
+# this will take care of all other switchover for the system
+MicroQ.configure do |config|
+ config.queue = MicroQ::Queue::Sqs
+ config.aws = { :key => 'YOUR KEY', :secret => 'YOUR SECRET' }
+end
+
+**Note that when using the SQS Queue only the MicroQ's started via command-line will actually process messages**
+
+# Then just use the queues in your workers
+class SomeWorker
+ worker :queue => :critical
+end
+```
+
## Contributing
1. Fork it
View
11 bin/microq
@@ -0,0 +1,11 @@
+#!/usr/bin/env ruby
+
+require_relative '../lib/micro_q/cli'
+
+begin
+ MicroQ::CLI.run
+rescue => e
+ STDERR.puts e.message
+ STDERR.puts e.backtrace.join("\n")
+ exit 1
+end
View
5 lib/micro_q.rb
@@ -53,11 +53,15 @@ def self.clear
require 'micro_q/dsl'
require 'micro_q/worker'
require 'micro_q/queue'
+require 'micro_q/sqs_client'
+require 'micro_q/fetchers/sqs'
require 'micro_q/redis'
+require 'micro_q/inspect'
require 'micro_q/wrappers/action_mailer'
+
# add Class and Instance methods first then
# override with additional extensions
@@ -72,4 +76,5 @@ def self.clear
# There is a better way coming soon 2/18/13
at_exit do
MicroQ::Manager::Default.shutdown!
+ MicroQ::Queue::Sqs.shutdown!
end
View
60 lib/micro_q/cli.rb
@@ -0,0 +1,60 @@
+require 'slop'
+require 'micro_q'
+
+module MicroQ
+ class CLI
+ def self.run
+ @cli ||= new
+ @cli.parse
+ @cli.verify!
+ @cli.setup
+ end
+
+ def parse
+ opts = Slop.parse do
+ banner 'Usage: microq [options]'
+
+ on 'r=', 'The path to the rails application'
+ on 'require=', 'The path to the rails application'
+ on 'w=', 'The number of worker threads'
+ on 'workers=', 'The number of worker threads'
+ end
+
+ @workers = opts[:workers] || opts[:w]
+ @require = opts[:require] || opts[:r]
+ end
+
+ def verify!
+ raise "Need a valid path to a rails application, you gave us #{@require}\n" unless /environment\.rb/ === @require || File.exist?("#{@require}/config/application.rb")
+ end
+
+ def setup
+ puts 'Requiring rails...'
+ require 'rails'
+
+ puts 'Requiring rails application...'
+ if File.directory?(@require)
+ require File.expand_path("#{@require}/config/environment.rb")
+ else
+ require @require
+ end
+
+ aws_keys = MicroQ.config.aws.try(:keys) || []
+ raise 'SQS mode requires an aws :key and :secret see https://github.com/bnorton/micro_q/wiki/Named-Queues' unless aws_keys.include?(:key) && aws_keys.include?(:secret)
+
+ MicroQ.configure do |config|
+ config.queue = MicroQ::Queue::Sqs # set workers after since this must set workers=0 internally
+ config.workers = @workers.to_i if @workers
+ config['worker_mode?'] = true
+ end
+
+ puts "Running micro_q in SQS mode with #{MicroQ.config.workers} workers... Hit ctl+c to stop...\n"
+ MicroQ.start
+
+ sleep
+ rescue Interrupt
+ puts 'Exiting via interrupt'
+ exit(1)
+ end
+ end
+end
View
15 lib/micro_q/config.rb
@@ -13,11 +13,13 @@ def initialize
'workers' => 5,
'timeout' => 120,
'interval' => 5,
+ 'env' => defined?(Rails) ? Rails.env : 'development',
'middleware' => Middleware::Chain.new,
'manager' => Manager::Default,
'worker' => Worker::Standard,
'queue' => Queue::Default,
'statistics' => Statistics::Default,
+ 'aws' => {},
'redis_pool' => { :size => 15, :timeout => 1 },
'redis' => { :host => 'localhost', :port => 6379 }
}
@@ -31,6 +33,19 @@ def [](key)
@data[key.to_s]
end
+ def queue=(q)
+ if q == Queue::Sqs
+ require 'aws-sdk'
+
+ @data['sqs?'] = true
+ @data['workers'] = 0
+ end
+
+ @data['queue'] = q
+ rescue LoadError
+ raise "Looks you need `gem 'aws-sdk'` in your Gemfile to use the SQS queue."
+ end
+
def method_missing(method, *args)
case method
when /(.+)=$/ then @data[$1] = args.first
View
37 lib/micro_q/fetchers/sqs.rb
@@ -0,0 +1,37 @@
+module MicroQ
+ module Fetcher
+ class Sqs
+ include Celluloid
+ attr_reader :name
+
+ def initialize(name, manager)
+ @name = name.to_s
+ @manager = manager
+ end
+
+ def start
+ defer do
+ client.messages.tap do |messages|
+ @manager.receive_messages!(messages) if messages.any?
+ end
+ end
+
+ after(2) { start }
+ end
+
+ def add_message(message, time=nil)
+ message['run_at'] = time.to_f if time
+
+ defer do
+ client.messages_create(message)
+ end
+ end
+
+ private
+
+ def client
+ @client ||= MicroQ::SqsClient.new(name)
+ end
+ end
+ end
+end
View
17 lib/micro_q/inspect.rb
@@ -0,0 +1,17 @@
+module MicroQ
+ module Inspect
+ def self.included(base)
+ base.send(:include, InstanceMethods)
+ end
+
+ module InstanceMethods
+ def inspect
+ "#<#{self.class.name}: #{self.object_id}>"
+ end
+ end
+ end
+end
+
+['Worker::Standard', 'Queue::Default', 'Queue::Redis', 'Queue::Sqs', 'Manager::Default', 'Fetcher::Sqs'].each do |postfix|
+ MicroQ::Util.constantize("MicroQ::#{postfix}").send(:include, MicroQ::Inspect)
+end
View
10 lib/micro_q/manager/default.rb
@@ -23,6 +23,8 @@ class Default
attr_reader :queue, :workers
def start
+ return if queue_only?
+
count = workers.size
if (messages = queue.dequeue(count)).any?
@@ -53,6 +55,7 @@ def reinitialize(*)
end
@busy ||= []
+ @workers ||= []
build_missing_workers
end
@@ -61,9 +64,10 @@ def reinitialize(*)
# Don't shrink the pool if the config changes
def build_missing_workers
- @workers ||= []
+ return if queue_only?
workers.select!(&:alive?)
+ @busy.select!(&:alive?)
missing_worker_count.times do
workers << MicroQ.config.worker.new_link(current_actor)
@@ -78,6 +82,10 @@ def kill_all
(@workers + @busy).each {|w| w.terminate if w.alive? }
end
+ def queue_only?
+ @queue_only ||= MicroQ.config.sqs? && !MicroQ.config.worker_mode?
+ end
+
def self.shutdown?
!!@shutdown
end
View
1  lib/micro_q/queue.rb
@@ -1,5 +1,6 @@
require 'micro_q/queue/default'
require 'micro_q/queue/redis'
+require 'micro_q/queue/sqs'
##
# The Queueing interface
View
85 lib/micro_q/queue/sqs.rb
@@ -0,0 +1,85 @@
+module MicroQ
+ module Queue
+ class Sqs
+ include Celluloid
+
+ exit_handler :build_missing_fetchers
+
+ attr_accessor :messages
+ attr_reader :fetchers, :entries, :later
+
+ def initialize
+ @lock = Mutex.new
+
+ @messages, @entries, @later = [], [], []
+ @fetcher_map = {}
+
+ build_missing_fetchers
+ end
+
+ def push(item)
+ async.sync_push(item)
+ end
+
+ def sync_push(item, options={})
+ item, options = MicroQ::Util.stringify(item, options)
+ item['class'] = item['class'].to_s
+
+ MicroQ.middleware.client.call(item, options) do
+ args, queue_name = [item], verify_queue(item['queue'].to_s)
+
+ if (time = options['when'])
+ args << time.to_f
+ end
+
+ @fetcher_map[queue_name].add_message(*args)
+ end
+ end
+
+ def receive_messages(*items)
+ @lock.synchronize do
+ (@messages += items).flatten!
+ end
+ end
+
+ def dequeue(limit=30)
+ return [] unless limit > 0 && messages.any?
+
+ @lock.synchronize do
+ limit.times.collect do
+ messages.pop
+ end.compact
+ end
+ end
+
+ def verify_queue(name)
+ QUEUES_KEYS.include?(name) ? name : 'default'
+ end
+
+ def self.shutdown!
+ @shutdown = true
+ end
+
+ private
+
+ def self.shutdown?
+ @shutdown
+ end
+
+ def build_missing_fetchers(*)
+ return if self.class.shutdown?
+
+ @fetchers = QUEUES_KEYS.map do |name|
+ ((existing = @fetcher_map[name]) && existing.alive? && existing) ||
+ MicroQ::Fetcher::Sqs.new_link(name, current_actor).tap do |fetcher|
+ @fetcher_map[name] = fetcher
+ fetcher.start!
+ end
+ end
+ end
+
+ QUEUES = { 'low' => 1, 'default' => 3, 'critical' => 5 }
+ QUEUES_KEYS = QUEUES.keys
+ end
+ end
+end
View
47 lib/micro_q/sqs_client.rb
@@ -0,0 +1,47 @@
+module MicroQ
+ class SqsClient
+ attr_reader :url
+
+ def initialize(name)
+ @name = "#{MicroQ.config.env}_#{name}"
+ @url = client.create_queue(:queue_name => @name)[:queue_url]
+ end
+
+ def messages
+ response = client.receive_message(
+ :queue_url => url,
+ :wait_time_seconds => 10,
+ :max_number_of_messages => 10,
+ :visibility_timeout => 5 * 60
+ )
+
+ ((response && response[:messages]) || []).collect do |message|
+ JSON.parse(message[:body]).merge(
+ 'sqs_id' => message[:message_id],
+ 'sqs_handle' => message[:receipt_handle],
+ 'sqs_queue' => url
+ )
+ end
+ end
+
+ def messages_create(message)
+ attrs = {
+ :queue_url => url,
+ :message_body => message.to_json
+ }
+
+ attrs[:delay_seconds] = (message['run_at'].to_i - Time.now.to_i) if message.key?('run_at')
+
+ client.send_message(attrs)[:message_id]
+ end
+
+ private
+
+ def client
+ @client ||= AWS::SQS::Client.new(
+ :access_key_id => MicroQ.config.aws[:key],
+ :secret_access_key => MicroQ.config.aws[:secret]
+ )
+ end
+ end
+end
View
2  lib/micro_q/version.rb
@@ -1,7 +1,7 @@
module MicroQ
MAJOR = 0
MINOR = 9
- POINT = 2
+ POINT = 4
VERSION = [MAJOR, MINOR, POINT].join('.')
end
View
3  micro_q.gemspec
@@ -19,7 +19,9 @@ Gem::Specification.new do |gem|
gem.add_dependency "celluloid", '~> 0.12.0'
gem.add_dependency "connection_pool"
+ gem.add_dependency "slop"
gem.add_development_dependency "rake"
+ gem.add_development_dependency "fuubar"
gem.add_development_dependency "rspec"
gem.add_development_dependency "timecop"
gem.add_development_dependency "psych"
@@ -27,4 +29,5 @@ Gem::Specification.new do |gem|
gem.add_development_dependency "actionmailer", "> 3.2.0"
gem.add_development_dependency "sqlite3-ruby"
gem.add_development_dependency "mock_redis"
+ gem.add_development_dependency "aws-sdk"
end
View
2  spec/helpers/queues_examples.rb
@@ -1,4 +1,4 @@
-shared_examples_for 'Queue#sync_push' do
+shared_examples 'Queue#sync_push' do
it 'should add to the entries' do
subject.sync_push(item)
View
47 spec/lib/config_spec.rb
@@ -36,6 +36,14 @@
subject.timeout.should == 120
end
+ it 'should have the default env' do
+ subject.env.should == 'development'
+ end
+
+ it 'should not be in sqs mode' do
+ subject.should_not be_sqs
+ end
+
it 'should have middleware chain' do
subject.middleware.class.should == MicroQ::Middleware::Chain
end
@@ -68,4 +76,43 @@
subject.statistics.should == MicroQ::Statistics::Default
end
end
+
+ describe 'when rails is defined' do
+ before do
+ module Rails end
+ def Rails.env; 'the-env' end
+ end
+
+ it 'should have the rails env' do
+ subject.env.should == 'the-env'
+ end
+ end
+
+ describe '#queue=' do
+ before do
+ subject.queue = 'blah-blah'
+ end
+
+ it 'should have the given queue' do
+ subject.queue.should == 'blah-blah'
+ end
+
+ describe 'when setting the SQS queue' do
+ before do
+ subject.queue = MicroQ::Queue::Sqs
+ end
+
+ it 'should have the given queue' do
+ subject.queue.should == MicroQ::Queue::Sqs
+ end
+
+ it 'should enable sqs mode' do
+ subject.sqs?.should == true
+ end
+
+ it 'should have zero workers' do
+ subject.workers.should == 0
+ end
+ end
+ end
end
View
81 spec/lib/fetchers/sqs_spec.rb
@@ -0,0 +1,81 @@
+require 'spec_helper'
+
+describe MicroQ::Fetcher::Sqs do
+ let(:queue) { mock(MicroQ::Queue::Sqs, :receive_messages! => nil) }
+
+ subject { MicroQ::Fetcher::Sqs.new(:low, queue) }
+
+ before do
+ @client = mock(MicroQ::SqsClient, :messages => [])
+ MicroQ::SqsClient.stub(:new => @client)
+ end
+
+ describe '.new' do
+ it 'should have the queue name' do
+ subject.name.should == 'low'
+ end
+ end
+
+ describe '#start' do
+ it 'should create an sqs client' do
+ MicroQ::SqsClient.should_receive(:new).with('low').and_return(@client)
+
+ subject.start
+ end
+
+ describe 'when called again' do
+ it 'should create an sqs client' do
+ MicroQ::SqsClient.should_receive(:new).and_return(@client)
+ subject.start
+
+ MicroQ::SqsClient.rspec_verify
+ MicroQ::SqsClient.rspec_reset
+
+ MicroQ::SqsClient.should_not_receive(:new)
+
+ subject.start
+ end
+ end
+
+ it 'should request messages from the queue' do
+ @client.should_receive(:messages)
+
+ subject.start
+ end
+
+ describe 'when there are messages in the queue' do
+ let(:messages) { 2.times.map {|i| mock("message_#{i}") }}
+
+ before do
+ @client.stub(:messages).and_return(messages)
+ end
+
+ it 'should hand of the messages to the manager' do
+ queue.should_receive(:receive_messages!).with(messages)
+
+ subject.start
+ end
+ end
+ end
+
+ describe '#add_message' do
+ let(:message) { {'class' => 'FooBar'} }
+ let(:add_message) { subject.add_message(message) }
+
+ it 'should create the message' do
+ @client.should_receive(:messages_create).with(message)
+
+ add_message
+ end
+
+ describe 'when the message has an associated time' do
+ let(:add_message) { subject.add_message(message, Time.now.to_i) }
+
+ it 'should send the time' do
+ @client.should_receive(:messages_create).with(message.merge('run_at' => Time.now.to_i))
+
+ add_message
+ end
+ end
+ end
+end
View
69 spec/lib/manager/default_spec.rb
@@ -82,6 +82,38 @@
subject.start
end
+
+ describe 'when the manager is in SQS mode' do
+ before do
+ MicroQ.config['sqs?'] = true
+ end
+
+ it 'should not perform the items' do
+ @queue.should_not_receive(:dequeue)
+ [@worker1, @worker2].each {|w| w.should_not_receive(:perform!) }
+
+ subject.start
+ end
+
+ describe 'when in worker mode' do
+ before do
+ MicroQ.config['worker_mode?'] = true
+ end
+
+ it 'should dequeue the number of free workers' do
+ @queue.should_receive(:dequeue).with(2)
+
+ subject.start
+ end
+
+ it 'should perform the items' do
+ @worker1.should_receive(:perform!).with(@other_item)
+ @worker2.should_receive(:perform!).with(@item)
+
+ subject.start
+ end
+ end
+ end
end
end
@@ -144,6 +176,43 @@
subject.workers.should == [@worker1, @new_worker2]
end
+
+ describe 'when a busy worker has died' do
+ before do
+ subject.wrapped_object.instance_variable_set(:@busy, [@worker2])
+ end
+
+ it 'should restart the dead worker' do
+ MicroQ::Worker::Standard.should_receive(:new_link).and_return(@new_worker2)
+
+ death.call
+ end
+
+ it 'should remove the worker from the busy list' do
+ death.call
+
+ subject.wrapped_object.instance_variable_get(:@busy).should == []
+ end
+
+ it 'should have the new worker' do
+ death.call
+
+ subject.workers.should == [@worker1, @new_worker2]
+ end
+ end
+
+ describe 'when in SQS mode' do
+ before do
+ MicroQ.config['sqs?'] = true
+ end
+
+ it 'should have the original items' do
+ death.call
+
+ subject.queue.should == @queue
+ subject.workers.should == [@worker1, @worker2]
+ end
+ end
end
end
end
View
5 spec/lib/queue/redis_spec.rb
@@ -65,11 +65,6 @@
describe '#dequeue' do
let(:item) { { 'class' => 'MyWorker', 'args' => [] } }
- class MyWorker
- def perform(*)
- end
- end
-
describe 'when there are entries' do
before do
subject.sync_push(item)
View
206 spec/lib/queue/sqs_spec.rb
@@ -0,0 +1,206 @@
+require 'spec_helper'
+
+describe MicroQ::Queue::Sqs do
+ let(:item) { { 'class' => 'MyWorker', 'args' => [4] } }
+
+ before do
+ @fetcher = mock('fetcher', :start! => true)
+ MicroQ::Fetcher::Sqs.stub(:new_link).and_return(@fetcher)
+ end
+
+ describe '.new' do
+ it 'should create three fetchers' do
+ MicroQ::Fetcher::Sqs.should_receive(:new_link).exactly(3).and_return(@fetcher)
+
+ subject
+ end
+
+ it 'should send the current actor along too' do
+ MicroQ::Fetcher::Sqs.should_receive(:new_link).exactly(3).with(anything, subject).and_return(@fetcher)
+
+ subject
+ end
+
+ it 'should start the fetcher' do
+ @fetcher.should_receive(:start!)
+
+ subject
+ end
+
+ it 'should have the fetchers' do
+ subject.fetchers.uniq.should == [@fetcher]
+ end
+ end
+
+ describe '#receive_messages' do
+ let(:messages) { 3.times.map {|i| mock("message_#{i}")} }
+
+ it 'should have no messages' do
+ subject.messages.should == []
+ end
+
+ describe 'when messages have given back' do
+ before do
+ subject.receive_messages(messages.first(1))
+ end
+
+ it 'should have the messages' do
+ subject.messages.should == [messages.first]
+ end
+
+ describe 'when more messages have been received' do
+ before do
+ subject.receive_messages(messages.last(2))
+ end
+
+ it 'should have the messages' do
+ subject.messages.should == messages
+ end
+ end
+ end
+ end
+
+ describe '#sync_push' do
+ before do
+ @fetchers = [:low, :default, :critical].collect do |name|
+ mock('MicroQ::Fetcher::Sqs : ' + name.to_s, :start! => nil).tap do |fetcher|
+ MicroQ::Fetcher::Sqs.stub(:new_link).with(name.to_s, anything).and_return(fetcher)
+ end
+ end
+ end
+
+ it 'should add to the entries' do
+ @fetchers[1].should_receive(:add_message).with(item)
+
+ subject.sync_push(item)
+ end
+
+ it 'should stringify the class' do
+ @fetchers[1].should_receive(:add_message).with(hash_including('class' => 'MyWorker'))
+
+ subject.sync_push(:class => MyWorker)
+ end
+
+ [:low, :default, :critical].each_with_index do |name, i|
+ describe "when the message has a queue named #{name}" do
+ before do
+ item['queue'] = name
+ end
+
+ it 'should create the message on the right queue' do
+ @fetchers[i].should_receive(:add_message).with(item)
+
+ subject.sync_push(item)
+ end
+ end
+ end
+
+ describe 'when given the \'when\' key' do
+ let(:worker) { [item, { 'when' => (Time.now + 100).to_i }] }
+
+ it 'should schedule the item for later' do
+ @fetchers[1].should_receive(:add_message).with(item, (Time.now + 100).to_i)
+
+ subject.sync_push(*worker)
+ end
+
+ it 'should process the middleware chain' do
+ MicroQ.middleware.client.should_receive(:call) do |payload, options|
+ payload['class'].should == 'MyWorker'
+ payload['args'].should == [4]
+ options['when'].should == (Time.now + 100).to_i
+ end
+
+ subject.sync_push(*worker)
+ end
+ end
+
+ describe 'when given the symbol :when key' do
+ let(:worker) { [item, { :when => (Time.now + 100).to_i }] }
+
+ it 'should schedule the item for later' do
+ @fetchers[1].should_receive(:add_message).with(item, (Time.now + 100).to_i)
+
+ subject.sync_push(*worker)
+ end
+ end
+
+ describe 'client middleware' do
+ it 'should process the middleware chain' do
+ MicroQ.middleware.client.should_receive(:call) do |payload, opts|
+ payload['class'].should == 'MyWorker'
+ payload['args'].should == [4]
+
+ opts['when'].should == 'now'
+ end
+
+ subject.sync_push(item, 'when' => 'now')
+ end
+ end
+ end
+
+ describe '#push' do
+ before do
+ @async = mock(Celluloid::ActorProxy)
+ subject.stub(:async).and_return(@async)
+ end
+
+ it 'should asynchronously push the item' do
+ @async.should_receive(:sync_push).with(item)
+
+ subject.push(item)
+ end
+ end
+
+ describe '#dequeue' do
+ let(:items) { 2.times.map {|i| { 'class' => 'SqsWorker', 'args' => [i] }} }
+ let(:item) { items.first }
+
+ class SqsWorker
+ def perform(*)
+ end
+ end
+
+ describe 'when there are messages' do
+ before do
+ subject.messages = items.map(&:dup)
+ end
+
+ it 'should return the limited number of items' do
+ subject.dequeue(1).should == [items.last]
+ end
+
+ it 'should remove the item from the list' do
+ subject.dequeue.should == items.reverse
+
+ subject.messages.should == []
+ end
+ end
+
+ describe 'when there are many messages' do
+ let(:messages) do
+ 5.times.collect {|i|
+ item.dup.tap {|it| it['args'] = [i]}
+ }
+ end
+
+ before do
+ subject.messages = messages.map(&:dup)
+ end
+
+ it 'should return up to the limit number of items' do
+ subject.dequeue(4).should == messages.last(4).reverse
+
+ subject.messages.should include(messages.first)
+ subject.dequeue.should == messages.first(1)
+ end
+
+ it 'should remove the items' do
+ subject.dequeue.should == messages.reverse
+ subject.dequeue.should == []
+
+ subject.messages.should == []
+ end
+ end
+ end
+end
View
166 spec/lib/sqs_client_spec.rb
@@ -0,0 +1,166 @@
+require 'spec_helper'
+
+describe MicroQ::SqsClient, :aws => true do
+ let(:url) { 'http://the.queue/' }
+
+ subject { MicroQ::SqsClient.new('low') }
+
+ before do
+ MicroQ.config.env = 'dev-env'
+
+ @client = mock(AWS::SQS::Client, :receive_message => {}, :create_queue => {:queue_url => url})
+ AWS::SQS::Client.stub(:new).and_return(@client)
+ end
+
+ describe '.new' do
+ it 'should create the queue' do
+ @client.should_receive(:create_queue).and_return({})
+
+ subject
+ end
+
+ it 'should prefix the name with the environment' do
+ @client.should_receive(:create_queue).with(
+ :queue_name => 'dev-env_low'
+ ).and_return({})
+
+ subject
+ end
+
+ it 'should have the url' do
+ subject.url.should == url
+ end
+
+ describe 'when the call fails' do
+ before do
+ @client.stub(:create_queue).and_raise
+ end
+
+ it 'should error' do
+ expect {
+ subject
+ }.to raise_error
+ end
+ end
+ end
+
+ describe '#messages' do
+ let(:messages) { subject.messages }
+
+ it 'should be an empty' do
+ messages.should == []
+ end
+
+ it 'should connect to the api for reading messages' do
+ @client.should_receive(:receive_message)
+
+ messages
+ end
+
+ it 'should have the queue url' do
+ @client.should_receive(:receive_message).with(hash_including(:queue_url => url))
+
+ messages
+ end
+
+ it 'should timeout after 10 seconds' do
+ @client.should_receive(:receive_message).with(hash_including(:wait_time_seconds => 10))
+
+ messages
+ end
+
+ it 'should request 10 items' do
+ @client.should_receive(:receive_message).with(hash_including(:max_number_of_messages => 10))
+
+ messages
+ end
+
+ it 'should make message available again after 5 minutes' do
+ @client.should_receive(:receive_message).with(hash_including(:visibility_timeout => 5 * 60))
+
+ messages
+ end
+
+ describe 'when messages are returned (body as json)' do
+ let(:response) do
+ { :messages => 3.times.map {|i|
+ {
+ :body => {:id => i}.to_json,
+ :message_id => "id:#{i*5}",
+ :receipt_handle => "hand:#{i+10}"
+ }
+ }}
+ end
+
+ before do
+ @client.stub(:receive_message).and_return(response)
+ end
+
+ it 'should return the messages' do
+ messages.map {|m| m['id']}.should == [0, 1, 2]
+ end
+
+ it 'should merge in the message id' do
+ messages.map {|m| m['sqs_id']}.should == ['id:0', 'id:5', 'id:10']
+ end
+
+ it 'should merge in the message id' do
+ messages.map {|m| m['sqs_handle']}.should == ['hand:10', 'hand:11', 'hand:12']
+ end
+
+ it 'should merge in the queue url' do
+ messages.map {|m| m['sqs_queue']}.uniq.should == [url]
+ end
+ end
+ end
+
+ describe '#messages_create' do
+ let(:message) { {:class => 'MyWorker', :args => ['hi']} }
+ let(:send_message) { subject.messages_create(message) }
+
+ before do
+ @response = {:message_id => '10'}
+ @client.stub(:send_message).and_return(@response)
+ end
+
+ it 'should send the message' do
+ @client.should_receive(:send_message).and_return(@response)
+
+ send_message
+ end
+
+ it 'should send it for the queue' do
+ @client.should_receive(:send_message).with(hash_including(:queue_url => url)).and_return(@response)
+
+ send_message
+ end
+
+ it 'should send it for the queue' do
+ @client.should_receive(:send_message).with(hash_including(:message_body => message.to_json)).and_return(@response)
+
+ send_message
+ end
+
+ it 'should not delay the message' do
+ @client.should_receive(:send_message).with(hash_excluding(:delay_seconds)).and_return(@response)
+
+ send_message
+ end
+
+ it 'should return the message id' do
+ send_message.should == '10'
+ end
+
+ describe 'when the message is to be run later' do
+ before do
+ message['run_at'] = (Time.now + 60 * 60)
+ end
+
+ it 'should set the delay for sqs' do
+ @client.should_receive(:send_message).with(hash_including(:delay_seconds => 60*60)).and_return(@response)
+
+ send_message
+ end
+ end
+ end
+end
View
24 spec/spec_helper.rb
@@ -13,17 +13,34 @@
Redis = MockRedis
end
+class MyWorker; end
+
RSpec.configure do |config|
config.treat_symbols_as_metadata_keys_with_true_values = true
config.run_all_when_everything_filtered = true
config.order = 'default'
+ config.before :all do
+ GC.disable
+ end
+
config.before :each do
MicroQ.send :clear
MicroQ.redis {|r| r.flushdb }
end
+ config.before :each, :aws => true do
+ require 'aws-sdk'
+ end
+
+ config.before :each, :middleware => true do
+ class WorkerClass; end
+
+ @worker = WorkerClass.new
+ @payload = { 'class' => 'WorkerClass', 'args' => [1, 2], 'queue' => 'a-queue'}
+ end
+
config.before :each, :active_record => true do
require 'active_record'
require 'sqlite3' # https://github.com/luislavena/sqlite3-ruby
@@ -52,11 +69,8 @@
@_db.rollback
end
- config.before :each, :middleware => true do
- class WorkerClass; end
-
- @worker = WorkerClass.new
- @payload = { 'class' => 'WorkerClass', 'args' => [1, 2], 'queue' => 'a-queue'}
+ config.after :all do
+ GC.enable
end
end
Something went wrong with that request. Please try again.