Skip to content
This repository

feature/sqs #1

Merged
merged 11 commits into from about 1 year ago

1 participant

Brian Norton
This page is out of date. Refresh to see the latest.
2  .rspec
... ...
@@ -1,2 +1,2 @@
1 1
 --color
2  
---format documentation
  2
+--format Fuubar
45  README.md
Source Rendered
@@ -66,6 +66,51 @@ def update
66 66
 end
67 67
 ```
68 68
 
  69
+##Queues
  70
+By default the queue is an in-memory queue meaning that messages are shared per-process
  71
+and any unprocessed messages are saved to a file when shutdown occurs.
  72
+
  73
+The **Redis queue** requires some configuration in your gemfile to keep the runtime dependencies to a minimum
  74
+```ruby
  75
+# Gemfile
  76
+gem 'redis'
  77
+gem 'micro_q'
  78
+
  79
+# config/initializers/micro_q.rb
  80
+require 'redis'
  81
+require 'micro_q'
  82
+
  83
+# when MicroQ starts simply use the redis queue
  84
+MicroQ.configure do |config|
  85
+  config.queue = MicroQ::Queue::Redis
  86
+end
  87
+```
  88
+
  89
+The **Amazon SQS (coming soon) queue** require some extra configuration in your gemfile.
  90
+```ruby
  91
+# Gemfile
  92
+gem 'aws-sdk'
  93
+gem 'micro_q'
  94
+
  95
+# config/initializers/micro_q.rb
  96
+require 'aws-sdk'
  97
+require 'micro_q'
  98
+
  99
+# when MicroQ starts simply use the sqs queue
  100
+# this will take care of all other switchover for the system
  101
+MicroQ.configure do |config|
  102
+  config.queue = MicroQ::Queue::Sqs
  103
+  config.aws = { :key => 'YOUR KEY', :secret => 'YOUR SECRET' }
  104
+end
  105
+
  106
+**Note that when using the SQS Queue only the MicroQ's started via command-line will actually process messages**
  107
+
  108
+# Then just use the queues in your workers
  109
+class SomeWorker
  110
+  worker :queue => :critical
  111
+end
  112
+```
  113
+
69 114
 ## Contributing
70 115
 
71 116
 1. Fork it
11  bin/microq
... ...
@@ -0,0 +1,11 @@
  1
+#!/usr/bin/env ruby
  2
+
  3
+require_relative '../lib/micro_q/cli'
  4
+
  5
+begin
  6
+  MicroQ::CLI.run
  7
+rescue => e
  8
+  STDERR.puts e.message
  9
+  STDERR.puts e.backtrace.join("\n")
  10
+  exit 1
  11
+end
5  lib/micro_q.rb
@@ -53,11 +53,15 @@ def self.clear
53 53
 require 'micro_q/dsl'
54 54
 require 'micro_q/worker'
55 55
 require 'micro_q/queue'
  56
+require 'micro_q/sqs_client'
56 57
 
  58
+require 'micro_q/fetchers/sqs'
57 59
 require 'micro_q/redis'
  60
+require 'micro_q/inspect'
58 61
 
59 62
 require 'micro_q/wrappers/action_mailer'
60 63
 
  64
+
61 65
 # add Class and Instance methods first then
62 66
 # override with additional extensions
63 67
 
@@ -72,4 +76,5 @@ def self.clear
72 76
 # There is a better way coming soon 2/18/13
73 77
 at_exit do
74 78
   MicroQ::Manager::Default.shutdown!
  79
+  MicroQ::Queue::Sqs.shutdown!
75 80
 end
60  lib/micro_q/cli.rb
... ...
@@ -0,0 +1,60 @@
  1
+require 'slop'
  2
+require 'micro_q'
  3
+
  4
+module MicroQ
  5
+  class CLI
  6
+    def self.run
  7
+      @cli ||= new
  8
+      @cli.parse
  9
+      @cli.verify!
  10
+      @cli.setup
  11
+    end
  12
+
  13
+    def parse
  14
+      opts = Slop.parse do
  15
+        banner 'Usage: microq [options]'
  16
+
  17
+        on 'r=', 'The path to the rails application'
  18
+        on 'require=', 'The path to the rails application'
  19
+        on 'w=', 'The number of worker threads'
  20
+        on 'workers=', 'The number of worker threads'
  21
+      end
  22
+
  23
+      @workers = opts[:workers] || opts[:w]
  24
+      @require = opts[:require] || opts[:r]
  25
+    end
  26
+
  27
+    def verify!
  28
+      raise "Need a valid path to a rails application, you gave us #{@require}\n" unless /environment\.rb/ === @require || File.exist?("#{@require}/config/application.rb")
  29
+    end
  30
+
  31
+    def setup
  32
+      puts 'Requiring rails...'
  33
+      require 'rails'
  34
+
  35
+      puts 'Requiring rails application...'
  36
+      if File.directory?(@require)
  37
+        require File.expand_path("#{@require}/config/environment.rb")
  38
+      else
  39
+        require @require
  40
+      end
  41
+
  42
+      aws_keys = MicroQ.config.aws.try(:keys) || []
  43
+      raise 'SQS mode requires an aws :key and :secret see https://github.com/bnorton/micro_q/wiki/Named-Queues' unless aws_keys.include?(:key) && aws_keys.include?(:secret)
  44
+
  45
+      MicroQ.configure do |config|
  46
+        config.queue = MicroQ::Queue::Sqs # set workers after since this must set workers=0 internally
  47
+        config.workers = @workers.to_i if @workers
  48
+        config['worker_mode?'] = true
  49
+      end
  50
+
  51
+      puts "Running micro_q in SQS mode with #{MicroQ.config.workers} workers... Hit ctl+c to stop...\n"
  52
+      MicroQ.start
  53
+
  54
+      sleep
  55
+    rescue Interrupt
  56
+      puts 'Exiting via interrupt'
  57
+      exit(1)
  58
+    end
  59
+  end
  60
+end
15  lib/micro_q/config.rb
@@ -13,11 +13,13 @@ def initialize
13 13
         'workers' => 5,
14 14
         'timeout' => 120,
15 15
         'interval' => 5,
  16
+        'env' => defined?(Rails) ? Rails.env : 'development',
16 17
         'middleware' => Middleware::Chain.new,
17 18
         'manager' => Manager::Default,
18 19
         'worker' => Worker::Standard,
19 20
         'queue' => Queue::Default,
20 21
         'statistics' => Statistics::Default,
  22
+        'aws' => {},
21 23
         'redis_pool' => { :size => 15, :timeout => 1 },
22 24
         'redis' => { :host => 'localhost', :port => 6379 }
23 25
       }
@@ -31,6 +33,19 @@ def [](key)
31 33
       @data[key.to_s]
32 34
     end
33 35
 
  36
+    def queue=(q)
  37
+      if q == Queue::Sqs
  38
+        require 'aws-sdk'
  39
+
  40
+        @data['sqs?'] = true
  41
+        @data['workers'] = 0
  42
+      end
  43
+
  44
+      @data['queue'] = q
  45
+    rescue LoadError
  46
+      raise "Looks you need `gem 'aws-sdk'` in your Gemfile to use the SQS queue."
  47
+    end
  48
+
34 49
     def method_missing(method, *args)
35 50
       case method
36 51
         when /(.+)=$/ then @data[$1] = args.first
37  lib/micro_q/fetchers/sqs.rb
... ...
@@ -0,0 +1,37 @@
  1
+module MicroQ
  2
+  module Fetcher
  3
+    class Sqs
  4
+      include Celluloid
  5
+      attr_reader :name
  6
+
  7
+      def initialize(name, manager)
  8
+        @name = name.to_s
  9
+        @manager = manager
  10
+      end
  11
+
  12
+      def start
  13
+        defer do
  14
+          client.messages.tap do |messages|
  15
+            @manager.receive_messages!(messages) if messages.any?
  16
+          end
  17
+        end
  18
+
  19
+        after(2) { start }
  20
+      end
  21
+
  22
+      def add_message(message, time=nil)
  23
+        message['run_at'] = time.to_f if time
  24
+
  25
+        defer do
  26
+          client.messages_create(message)
  27
+        end
  28
+      end
  29
+
  30
+      private
  31
+
  32
+      def client
  33
+        @client ||= MicroQ::SqsClient.new(name)
  34
+      end
  35
+    end
  36
+  end
  37
+end
17  lib/micro_q/inspect.rb
... ...
@@ -0,0 +1,17 @@
  1
+module MicroQ
  2
+  module Inspect
  3
+    def self.included(base)
  4
+      base.send(:include, InstanceMethods)
  5
+    end
  6
+
  7
+    module InstanceMethods
  8
+      def inspect
  9
+        "#<#{self.class.name}: #{self.object_id}>"
  10
+      end
  11
+    end
  12
+  end
  13
+end
  14
+
  15
+['Worker::Standard', 'Queue::Default', 'Queue::Redis', 'Queue::Sqs', 'Manager::Default', 'Fetcher::Sqs'].each do |postfix|
  16
+  MicroQ::Util.constantize("MicroQ::#{postfix}").send(:include, MicroQ::Inspect)
  17
+end
10  lib/micro_q/manager/default.rb
@@ -23,6 +23,8 @@ class Default
23 23
       attr_reader :queue, :workers
24 24
 
25 25
       def start
  26
+        return if queue_only?
  27
+
26 28
         count = workers.size
27 29
 
28 30
         if (messages = queue.dequeue(count)).any?
@@ -53,6 +55,7 @@ def reinitialize(*)
53 55
         end
54 56
 
55 57
         @busy ||= []
  58
+        @workers ||= []
56 59
 
57 60
         build_missing_workers
58 61
       end
@@ -61,9 +64,10 @@ def reinitialize(*)
61 64
 
62 65
       # Don't shrink the pool if the config changes
63 66
       def build_missing_workers
64  
-        @workers ||= []
  67
+        return if queue_only?
65 68
 
66 69
         workers.select!(&:alive?)
  70
+        @busy.select!(&:alive?)
67 71
 
68 72
         missing_worker_count.times do
69 73
           workers << MicroQ.config.worker.new_link(current_actor)
@@ -78,6 +82,10 @@ def kill_all
78 82
         (@workers + @busy).each {|w| w.terminate if w.alive? }
79 83
       end
80 84
 
  85
+      def queue_only?
  86
+        @queue_only ||= MicroQ.config.sqs? && !MicroQ.config.worker_mode?
  87
+      end
  88
+
81 89
       def self.shutdown?
82 90
         !!@shutdown
83 91
       end
1  lib/micro_q/queue.rb
... ...
@@ -1,5 +1,6 @@
1 1
 require 'micro_q/queue/default'
2 2
 require 'micro_q/queue/redis'
  3
+require 'micro_q/queue/sqs'
3 4
 
4 5
 ##
5 6
 # The Queueing interface
85  lib/micro_q/queue/sqs.rb
... ...
@@ -0,0 +1,85 @@
  1
+module MicroQ
  2
+  module Queue
  3
+    class Sqs
  4
+      include Celluloid
  5
+
  6
+      exit_handler :build_missing_fetchers
  7
+
  8
+      attr_accessor :messages
  9
+      attr_reader   :fetchers, :entries, :later
  10
+
  11
+      def initialize
  12
+        @lock = Mutex.new
  13
+
  14
+        @messages, @entries, @later = [], [], []
  15
+        @fetcher_map = {}
  16
+
  17
+        build_missing_fetchers
  18
+      end
  19
+
  20
+      def push(item)
  21
+        async.sync_push(item)
  22
+      end
  23
+
  24
+      def sync_push(item, options={})
  25
+        item, options = MicroQ::Util.stringify(item, options)
  26
+        item['class'] = item['class'].to_s
  27
+
  28
+        MicroQ.middleware.client.call(item, options) do
  29
+          args, queue_name = [item], verify_queue(item['queue'].to_s)
  30
+
  31
+          if (time = options['when'])
  32
+            args << time.to_f
  33
+          end
  34
+
  35
+          @fetcher_map[queue_name].add_message(*args)
  36
+        end
  37
+      end
  38
+
  39
+      def receive_messages(*items)
  40
+        @lock.synchronize do
  41
+          (@messages += items).flatten!
  42
+        end
  43
+      end
  44
+
  45
+      def dequeue(limit=30)
  46
+        return [] unless limit > 0 && messages.any?
  47
+
  48
+        @lock.synchronize do
  49
+          limit.times.collect do
  50
+            messages.pop
  51
+          end.compact
  52
+        end
  53
+      end
  54
+
  55
+      def verify_queue(name)
  56
+        QUEUES_KEYS.include?(name) ? name : 'default'
  57
+      end
  58
+
  59
+      def self.shutdown!
  60
+        @shutdown = true
  61
+      end
  62
+
  63
+      private
  64
+
  65
+      def self.shutdown?
  66
+        @shutdown
  67
+      end
  68
+
  69
+      def build_missing_fetchers(*)
  70
+        return if self.class.shutdown?
  71
+
  72
+        @fetchers = QUEUES_KEYS.map do |name|
  73
+          ((existing = @fetcher_map[name]) && existing.alive? && existing) ||
  74
+            MicroQ::Fetcher::Sqs.new_link(name, current_actor).tap do |fetcher|
  75
+              @fetcher_map[name] = fetcher
  76
+              fetcher.start!
  77
+            end
  78
+        end
  79
+      end
  80
+
  81
+      QUEUES = { 'low' => 1, 'default' => 3, 'critical' => 5 }
  82
+      QUEUES_KEYS = QUEUES.keys
  83
+    end
  84
+  end
  85
+end
47  lib/micro_q/sqs_client.rb
... ...
@@ -0,0 +1,47 @@
  1
+module MicroQ
  2
+  class SqsClient
  3
+    attr_reader :url
  4
+
  5
+    def initialize(name)
  6
+      @name = "#{MicroQ.config.env}_#{name}"
  7
+      @url  = client.create_queue(:queue_name => @name)[:queue_url]
  8
+    end
  9
+
  10
+    def messages
  11
+      response = client.receive_message(
  12
+        :queue_url => url,
  13
+        :wait_time_seconds => 10,
  14
+        :max_number_of_messages => 10,
  15
+        :visibility_timeout => 5 * 60
  16
+      )
  17
+
  18
+      ((response && response[:messages]) || []).collect do |message|
  19
+        JSON.parse(message[:body]).merge(
  20
+          'sqs_id' => message[:message_id],
  21
+          'sqs_handle' => message[:receipt_handle],
  22
+          'sqs_queue' => url
  23
+        )
  24
+      end
  25
+    end
  26
+
  27
+    def messages_create(message)
  28
+      attrs = {
  29
+        :queue_url => url,
  30
+        :message_body => message.to_json
  31
+      }
  32
+
  33
+      attrs[:delay_seconds] = (message['run_at'].to_i - Time.now.to_i) if message.key?('run_at')
  34
+
  35
+      client.send_message(attrs)[:message_id]
  36
+    end
  37
+
  38
+    private
  39
+
  40
+    def client
  41
+      @client ||= AWS::SQS::Client.new(
  42
+        :access_key_id => MicroQ.config.aws[:key],
  43
+        :secret_access_key => MicroQ.config.aws[:secret]
  44
+      )
  45
+    end
  46
+  end
  47
+end
2  lib/micro_q/version.rb
... ...
@@ -1,7 +1,7 @@
1 1
 module MicroQ
2 2
   MAJOR = 0
3 3
   MINOR = 9
4  
-  POINT = 2
  4
+  POINT = 4
5 5
 
6 6
   VERSION = [MAJOR, MINOR, POINT].join('.')
7 7
 end
3  micro_q.gemspec
@@ -19,7 +19,9 @@ Gem::Specification.new do |gem|
19 19
 
20 20
   gem.add_dependency             "celluloid", '~> 0.12.0'
21 21
   gem.add_dependency             "connection_pool"
  22
+  gem.add_dependency             "slop"
22 23
   gem.add_development_dependency "rake"
  24
+  gem.add_development_dependency "fuubar"
23 25
   gem.add_development_dependency "rspec"
24 26
   gem.add_development_dependency "timecop"
25 27
   gem.add_development_dependency "psych"
@@ -27,4 +29,5 @@ Gem::Specification.new do |gem|
27 29
   gem.add_development_dependency "actionmailer", "> 3.2.0"
28 30
   gem.add_development_dependency "sqlite3-ruby"
29 31
   gem.add_development_dependency "mock_redis"
  32
+  gem.add_development_dependency "aws-sdk"
30 33
 end
2  spec/helpers/queues_examples.rb
... ...
@@ -1,4 +1,4 @@
1  
-shared_examples_for 'Queue#sync_push' do
  1
+shared_examples 'Queue#sync_push' do
2 2
   it 'should add to the entries' do
3 3
     subject.sync_push(item)
4 4
 
47  spec/lib/config_spec.rb
@@ -36,6 +36,14 @@
36 36
       subject.timeout.should == 120
37 37
     end
38 38
 
  39
+    it 'should have the default env' do
  40
+      subject.env.should == 'development'
  41
+    end
  42
+
  43
+    it 'should not be in sqs mode' do
  44
+      subject.should_not be_sqs
  45
+    end
  46
+
39 47
     it 'should have middleware chain' do
40 48
       subject.middleware.class.should == MicroQ::Middleware::Chain
41 49
     end
@@ -68,4 +76,43 @@
68 76
       subject.statistics.should == MicroQ::Statistics::Default
69 77
     end
70 78
   end
  79
+
  80
+  describe 'when rails is defined' do
  81
+    before do
  82
+      module Rails end
  83
+      def Rails.env; 'the-env' end
  84
+    end
  85
+
  86
+    it 'should have the rails env' do
  87
+      subject.env.should == 'the-env'
  88
+    end
  89
+  end
  90
+
  91
+  describe '#queue=' do
  92
+    before do
  93
+      subject.queue = 'blah-blah'
  94
+    end
  95
+
  96
+    it 'should have the given queue' do
  97
+      subject.queue.should == 'blah-blah'
  98
+    end
  99
+
  100
+    describe 'when setting the SQS queue' do
  101
+      before do
  102
+        subject.queue = MicroQ::Queue::Sqs
  103
+      end
  104
+
  105
+      it 'should have the given queue' do
  106
+        subject.queue.should == MicroQ::Queue::Sqs
  107
+      end
  108
+
  109
+      it 'should enable sqs mode' do
  110
+        subject.sqs?.should == true
  111
+      end
  112
+
  113
+      it 'should have zero workers' do
  114
+        subject.workers.should == 0
  115
+      end
  116
+    end
  117
+  end
71 118
 end
81  spec/lib/fetchers/sqs_spec.rb
... ...
@@ -0,0 +1,81 @@
  1
+require 'spec_helper'
  2
+
  3
+describe MicroQ::Fetcher::Sqs do
  4
+  let(:queue) { mock(MicroQ::Queue::Sqs, :receive_messages! => nil) }
  5
+
  6
+  subject { MicroQ::Fetcher::Sqs.new(:low, queue) }
  7
+
  8
+  before do
  9
+    @client = mock(MicroQ::SqsClient, :messages => [])
  10
+    MicroQ::SqsClient.stub(:new => @client)
  11
+  end
  12
+
  13
+  describe '.new' do
  14
+    it 'should have the queue name' do
  15
+      subject.name.should == 'low'
  16
+    end
  17
+  end
  18
+
  19
+  describe '#start' do
  20
+    it 'should create an sqs client' do
  21
+      MicroQ::SqsClient.should_receive(:new).with('low').and_return(@client)
  22
+
  23
+      subject.start
  24
+    end
  25
+
  26
+    describe 'when called again' do
  27
+      it 'should create an sqs client' do
  28
+        MicroQ::SqsClient.should_receive(:new).and_return(@client)
  29
+        subject.start
  30
+
  31
+        MicroQ::SqsClient.rspec_verify
  32
+        MicroQ::SqsClient.rspec_reset
  33
+
  34
+        MicroQ::SqsClient.should_not_receive(:new)
  35
+
  36
+        subject.start
  37
+      end
  38
+    end
  39
+
  40
+    it 'should request messages from the queue' do
  41
+      @client.should_receive(:messages)
  42
+
  43
+      subject.start
  44
+    end
  45
+
  46
+    describe 'when there are messages in the queue' do
  47
+      let(:messages) { 2.times.map {|i| mock("message_#{i}") }}
  48
+
  49
+      before do
  50
+        @client.stub(:messages).and_return(messages)
  51
+      end
  52
+
  53
+      it 'should hand of the messages to the manager' do
  54
+        queue.should_receive(:receive_messages!).with(messages)
  55
+
  56
+        subject.start
  57
+      end
  58
+    end
  59
+  end
  60
+
  61
+  describe '#add_message' do
  62
+    let(:message) { {'class' => 'FooBar'} }
  63
+    let(:add_message) { subject.add_message(message) }
  64
+
  65
+    it 'should create the message' do
  66
+      @client.should_receive(:messages_create).with(message)
  67
+
  68
+      add_message
  69
+    end
  70
+
  71
+    describe 'when the message has an associated time' do
  72
+      let(:add_message) { subject.add_message(message, Time.now.to_i) }
  73
+
  74
+      it 'should send the time' do
  75
+        @client.should_receive(:messages_create).with(message.merge('run_at' => Time.now.to_i))
  76
+
  77
+        add_message
  78
+      end
  79
+    end
  80
+  end
  81
+end
69  spec/lib/manager/default_spec.rb
@@ -82,6 +82,38 @@
82 82
 
83 83
         subject.start
84 84
       end
  85
+
  86
+      describe 'when the manager is in SQS mode' do
  87
+        before do
  88
+          MicroQ.config['sqs?'] = true
  89
+        end
  90
+
  91
+        it 'should not perform the items' do
  92
+          @queue.should_not_receive(:dequeue)
  93
+          [@worker1, @worker2].each {|w| w.should_not_receive(:perform!) }
  94
+
  95
+          subject.start
  96
+        end
  97
+
  98
+        describe 'when in worker mode' do
  99
+          before do
  100
+            MicroQ.config['worker_mode?'] = true
  101
+          end
  102
+
  103
+          it 'should dequeue the number of free workers' do
  104
+            @queue.should_receive(:dequeue).with(2)
  105
+
  106
+            subject.start
  107
+          end
  108
+
  109
+          it 'should perform the items' do
  110
+            @worker1.should_receive(:perform!).with(@other_item)
  111
+            @worker2.should_receive(:perform!).with(@item)
  112
+
  113
+            subject.start
  114
+          end
  115
+        end
  116
+      end
85 117
     end
86 118
   end
87 119
 
@@ -144,6 +176,43 @@
144 176
 
145 177
         subject.workers.should == [@worker1, @new_worker2]
146 178
       end
  179
+
  180
+      describe 'when a busy worker has died' do
  181
+        before do
  182
+          subject.wrapped_object.instance_variable_set(:@busy, [@worker2])
  183
+        end
  184
+
  185
+        it 'should restart the dead worker' do
  186
+          MicroQ::Worker::Standard.should_receive(:new_link).and_return(@new_worker2)
  187
+
  188
+          death.call
  189
+        end
  190
+
  191
+        it 'should remove the worker from the busy list' do
  192
+          death.call
  193
+
  194
+          subject.wrapped_object.instance_variable_get(:@busy).should == []
  195
+        end
  196
+
  197
+        it 'should have the new worker' do
  198
+          death.call
  199
+
  200
+          subject.workers.should == [@worker1, @new_worker2]
  201
+        end
  202
+      end
  203
+
  204
+      describe 'when in SQS mode' do
  205
+        before do
  206
+          MicroQ.config['sqs?'] = true
  207
+        end
  208
+
  209
+        it 'should have the original items' do
  210
+          death.call
  211
+
  212
+          subject.queue.should == @queue
  213
+          subject.workers.should == [@worker1, @worker2]
  214
+        end
  215
+      end
147 216
     end
148 217
   end
149 218
 end
5  spec/lib/queue/redis_spec.rb
@@ -65,11 +65,6 @@
65 65
   describe '#dequeue' do
66 66
     let(:item) { { 'class' => 'MyWorker', 'args' => [] } }
67 67
 
68  
-    class MyWorker
69  
-      def perform(*)
70  
-      end
71  
-    end
72  
-
73 68
     describe 'when there are entries' do
74 69
       before do
75 70
         subject.sync_push(item)
206  spec/lib/queue/sqs_spec.rb
... ...
@@ -0,0 +1,206 @@
  1
+require 'spec_helper'
  2
+
  3
+describe MicroQ::Queue::Sqs do
  4
+  let(:item) { { 'class' => 'MyWorker', 'args' => [4] } }
  5
+
  6
+  before do
  7
+    @fetcher = mock('fetcher', :start! => true)
  8
+    MicroQ::Fetcher::Sqs.stub(:new_link).and_return(@fetcher)
  9
+  end
  10
+
  11
+  describe '.new' do
  12
+    it 'should create three fetchers' do
  13
+      MicroQ::Fetcher::Sqs.should_receive(:new_link).exactly(3).and_return(@fetcher)
  14
+
  15
+      subject
  16
+    end
  17
+
  18
+    it 'should send the current actor along too' do
  19
+      MicroQ::Fetcher::Sqs.should_receive(:new_link).exactly(3).with(anything, subject).and_return(@fetcher)
  20
+
  21
+      subject
  22
+    end
  23
+
  24
+    it 'should start the fetcher' do
  25
+      @fetcher.should_receive(:start!)
  26
+
  27
+      subject
  28
+    end
  29
+
  30
+    it 'should have the fetchers' do
  31
+      subject.fetchers.uniq.should == [@fetcher]
  32
+    end
  33
+  end
  34
+
  35
+  describe '#receive_messages' do
  36
+    let(:messages) { 3.times.map {|i| mock("message_#{i}")} }
  37
+
  38
+    it 'should have no messages' do
  39
+      subject.messages.should == []
  40
+    end
  41
+
  42
+    describe 'when messages have given back' do
  43
+      before do
  44
+        subject.receive_messages(messages.first(1))
  45
+      end
  46
+
  47
+      it 'should have the messages' do
  48
+        subject.messages.should == [messages.first]
  49
+      end
  50
+
  51
+      describe 'when more messages have been received' do
  52
+        before do
  53
+          subject.receive_messages(messages.last(2))
  54
+        end
  55
+
  56
+        it 'should have the messages' do
  57
+          subject.messages.should == messages
  58
+        end
  59
+      end
  60
+    end
  61
+  end
  62
+
  63
+  describe '#sync_push' do
  64
+    before do
  65
+      @fetchers = [:low, :default, :critical].collect do |name|
  66
+        mock('MicroQ::Fetcher::Sqs : ' + name.to_s, :start! => nil).tap do |fetcher|
  67
+          MicroQ::Fetcher::Sqs.stub(:new_link).with(name.to_s, anything).and_return(fetcher)
  68
+        end
  69
+      end
  70
+    end
  71
+
  72
+    it 'should add to the entries' do
  73
+      @fetchers[1].should_receive(:add_message).with(item)
  74
+
  75
+      subject.sync_push(item)
  76
+    end
  77
+
  78
+    it 'should stringify the class' do
  79
+      @fetchers[1].should_receive(:add_message).with(hash_including('class' => 'MyWorker'))
  80
+
  81
+      subject.sync_push(:class => MyWorker)
  82
+    end
  83
+
  84
+    [:low, :default, :critical].each_with_index do |name, i|
  85
+      describe "when the message has a queue named #{name}" do
  86
+        before do
  87
+          item['queue'] = name
  88
+        end
  89
+
  90
+        it 'should create the message on the right queue' do
  91
+          @fetchers[i].should_receive(:add_message).with(item)
  92
+
  93
+          subject.sync_push(item)
  94
+        end
  95
+      end
  96
+    end
  97
+
  98
+    describe 'when given the \'when\' key' do
  99
+      let(:worker) { [item, { 'when' => (Time.now + 100).to_i }] }
  100
+
  101
+      it 'should schedule the item for later' do
  102
+        @fetchers[1].should_receive(:add_message).with(item, (Time.now + 100).to_i)
  103
+
  104
+        subject.sync_push(*worker)
  105
+      end
  106
+
  107
+      it 'should process the middleware chain' do
  108
+        MicroQ.middleware.client.should_receive(:call) do |payload, options|
  109
+          payload['class'].should == 'MyWorker'
  110
+          payload['args'].should == [4]
  111
+          options['when'].should == (Time.now + 100).to_i
  112
+        end
  113
+
  114
+        subject.sync_push(*worker)
  115
+      end
  116
+    end
  117
+
  118
+    describe 'when given the symbol :when key' do
  119
+      let(:worker) { [item, { :when => (Time.now + 100).to_i }] }
  120
+
  121
+      it 'should schedule the item for later' do
  122
+        @fetchers[1].should_receive(:add_message).with(item, (Time.now + 100).to_i)
  123
+
  124
+        subject.sync_push(*worker)
  125
+      end
  126
+    end
  127
+
  128
+    describe 'client middleware' do
  129
+      it 'should process the middleware chain' do
  130
+        MicroQ.middleware.client.should_receive(:call) do |payload, opts|
  131
+          payload['class'].should == 'MyWorker'
  132
+          payload['args'].should == [4]
  133
+
  134
+          opts['when'].should == 'now'
  135
+        end
  136
+
  137
+        subject.sync_push(item, 'when' => 'now')
  138
+      end
  139
+    end
  140
+  end
  141
+
  142
+  describe '#push' do
  143
+    before do
  144
+      @async = mock(Celluloid::ActorProxy)
  145
+      subject.stub(:async).and_return(@async)
  146
+    end
  147
+
  148
+    it 'should asynchronously push the item' do
  149
+      @async.should_receive(:sync_push).with(item)
  150
+
  151
+      subject.push(item)
  152
+    end
  153
+  end
  154
+
  155
+  describe '#dequeue' do
  156
+    let(:items) { 2.times.map {|i| { 'class' => 'SqsWorker', 'args' => [i] }} }
  157
+    let(:item) { items.first }
  158
+
  159
+    class SqsWorker
  160
+      def perform(*)
  161
+      end
  162
+    end
  163
+
  164
+    describe 'when there are messages' do
  165
+      before do
  166
+        subject.messages = items.map(&:dup)
  167
+      end
  168
+
  169
+      it 'should return the limited number of items' do
  170
+        subject.dequeue(1).should == [items.last]
  171
+      end
  172
+
  173
+      it 'should remove the item from the list' do
  174
+        subject.dequeue.should == items.reverse
  175
+
  176
+        subject.messages.should == []
  177
+      end
  178
+    end
  179
+
  180
+    describe 'when there are many messages' do
  181
+      let(:messages) do
  182
+        5.times.collect {|i|
  183
+          item.dup.tap {|it| it['args'] = [i]}
  184
+        }
  185
+      end
  186
+
  187
+      before do
  188
+        subject.messages = messages.map(&:dup)
  189
+      end
  190
+
  191
+      it 'should return up to the limit number of items' do
  192
+        subject.dequeue(4).should == messages.last(4).reverse
  193
+
  194
+        subject.messages.should include(messages.first)
  195
+        subject.dequeue.should == messages.first(1)
  196
+      end
  197
+
  198
+      it 'should remove the items' do
  199
+        subject.dequeue.should == messages.reverse
  200
+        subject.dequeue.should == []
  201
+
  202
+        subject.messages.should == []
  203
+      end
  204
+    end
  205
+  end
  206
+end
166  spec/lib/sqs_client_spec.rb
... ...
@@ -0,0 +1,166 @@
  1
+require 'spec_helper'
  2
+
  3
+describe MicroQ::SqsClient, :aws => true do
  4
+  let(:url) { 'http://the.queue/' }
  5
+
  6
+  subject { MicroQ::SqsClient.new('low') }
  7
+
  8
+  before do
  9
+    MicroQ.config.env = 'dev-env'
  10
+
  11
+    @client = mock(AWS::SQS::Client, :receive_message => {}, :create_queue => {:queue_url => url})
  12
+    AWS::SQS::Client.stub(:new).and_return(@client)
  13
+  end
  14
+
  15
+  describe '.new' do
  16
+    it 'should create the queue' do
  17
+      @client.should_receive(:create_queue).and_return({})
  18
+
  19
+      subject
  20
+    end
  21
+
  22
+    it 'should prefix the name with the environment' do
  23
+      @client.should_receive(:create_queue).with(
  24
+        :queue_name => 'dev-env_low'
  25
+      ).and_return({})
  26
+
  27
+      subject
  28
+    end
  29
+
  30
+    it 'should have the url' do
  31
+      subject.url.should == url
  32
+    end
  33
+
  34
+    describe 'when the call fails' do
  35
+      before do
  36
+        @client.stub(:create_queue).and_raise
  37
+      end
  38
+
  39
+      it 'should error' do
  40
+        expect {
  41
+          subject
  42
+        }.to raise_error
  43
+      end
  44
+    end
  45
+  end
  46
+
  47
+  describe '#messages' do
  48
+    let(:messages) { subject.messages }
  49
+
  50
+    it 'should be an empty' do
  51
+      messages.should == []
  52
+    end
  53
+
  54
+    it 'should connect to the api for reading messages' do
  55
+      @client.should_receive(:receive_message)
  56
+
  57
+      messages
  58
+    end
  59
+
  60
+    it 'should have the queue url' do
  61
+      @client.should_receive(:receive_message).with(hash_including(:queue_url => url))
  62
+
  63
+      messages
  64
+    end
  65
+
  66
+    it 'should timeout after 10 seconds' do
  67
+      @client.should_receive(:receive_message).with(hash_including(:wait_time_seconds => 10))
  68
+
  69
+      messages
  70
+    end
  71
+
  72
+    it 'should request 10 items' do
  73
+      @client.should_receive(:receive_message).with(hash_including(:max_number_of_messages => 10))
  74
+
  75
+      messages
  76
+    end
  77
+
  78
+    it 'should make message available again after 5 minutes' do
  79
+      @client.should_receive(:receive_message).with(hash_including(:visibility_timeout => 5 * 60))
  80
+
  81
+      messages
  82
+    end
  83
+
  84
+    describe 'when messages are returned (body as json)' do
  85
+      let(:response) do
  86
+        { :messages => 3.times.map {|i|
  87
+          {
  88
+            :body => {:id => i}.to_json,
  89
+            :message_id => "id:#{i*5}",
  90
+            :receipt_handle => "hand:#{i+10}"
  91
+          }
  92
+        }}
  93
+      end
  94
+
  95
+      before do
  96
+        @client.stub(:receive_message).and_return(response)
  97
+      end
  98
+
  99
+      it 'should return the messages' do
  100
+        messages.map {|m| m['id']}.should == [0, 1, 2]
  101
+      end
  102
+
  103
+      it 'should merge in the message id' do
  104
+        messages.map {|m| m['sqs_id']}.should == ['id:0', 'id:5', 'id:10']
  105
+      end
  106
+
  107
+      it 'should merge in the message id' do
  108
+        messages.map {|m| m['sqs_handle']}.should == ['hand:10', 'hand:11', 'hand:12']
  109
+      end
  110
+
  111
+      it 'should merge in the queue url' do
  112
+        messages.map {|m| m['sqs_queue']}.uniq.should == [url]
  113
+      end
  114
+    end
  115
+  end
  116
+
  117
+  describe '#messages_create' do
  118
+    let(:message) { {:class => 'MyWorker', :args => ['hi']} }
  119
+    let(:send_message) { subject.messages_create(message) }
  120
+
  121
+    before do
  122
+      @response = {:message_id => '10'}
  123
+      @client.stub(:send_message).and_return(@response)
  124
+    end
  125
+
  126
+    it 'should send the message' do
  127
+      @client.should_receive(:send_message).and_return(@response)
  128
+
  129
+      send_message
  130
+    end
  131
+
  132
+    it 'should send it for the queue' do
  133
+      @client.should_receive(:send_message).with(hash_including(:queue_url => url)).and_return(@response)
  134
+
  135
+      send_message
  136
+    end
  137
+
  138
+    it 'should send it for the queue' do
  139
+      @client.should_receive(:send_message).with(hash_including(:message_body => message.to_json)).and_return(@response)
  140
+
  141
+      send_message
  142
+    end
  143
+
  144
+    it 'should not delay the message' do
  145
+      @client.should_receive(:send_message).with(hash_excluding(:delay_seconds)).and_return(@response)
  146
+
  147
+      send_message
  148
+    end
  149
+
  150
+    it 'should return the message id' do
  151
+      send_message.should == '10'
  152
+    end
  153
+
  154
+    describe 'when the message is to be run later' do
  155
+      before do
  156
+        message['run_at'] = (Time.now + 60 * 60)
  157
+      end
  158
+
  159
+      it 'should set the delay for sqs' do
  160
+        @client.should_receive(:send_message).with(hash_including(:delay_seconds => 60*60)).and_return(@response)
  161
+
  162
+        send_message
  163
+      end
  164
+    end
  165
+  end
  166
+end
24  spec/spec_helper.rb
@@ -13,17 +13,34 @@
13 13
   Redis = MockRedis
14 14
 end
15 15
 
  16
+class MyWorker; end
  17
+
16 18
 RSpec.configure do |config|
17 19
   config.treat_symbols_as_metadata_keys_with_true_values = true
18 20
   config.run_all_when_everything_filtered = true
19 21
 
20 22
   config.order = 'default'
21 23
 
  24
+  config.before :all do
  25
+    GC.disable
  26
+  end
  27
+
22 28
   config.before :each do
23 29
     MicroQ.send :clear
24 30
     MicroQ.redis {|r| r.flushdb }
25 31