lukeredpath / beanstalk-messaging

beanstalk-messaging / test / beanstalk_queue_poller_test.rb
100644 224 lines (171 sloc) 8.139 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
require File.dirname(__FILE__) + '/test_helper'
 
class BeanstalkQueuePollerTest < Test::Unit::TestCase
  
  def setup
    @dev_null = stub_everything('output stream')
  end
  
  def test_should_output_messages_to_given_output_object
    message = "Hello"
    poller = Beanstalk::QueuePoller.new(anything, anything, output = stub('output'))
    output.expects(:puts).with(message)
    poller.puts(message)
  end
  
  def test_should_sleep_for_the_given_delay_until_a_non_stale_queue_was_found
    stale_queue = stub('stale queue', :stale? => true)
    fresh_queue = stub('fresh queue', :stale? => false)
    
    stale_manager = stub('manager with stale queues')
    unsuccessful_connection_attempts = [stale_queue, stale_queue, stale_queue]
    stale_manager.stubs(:reset_queue).returns(*(unsuccessful_connection_attempts + [fresh_queue]))
    
    delay = stub('delay')
    poller = Beanstalk::QueuePoller.new(stale_manager, delay, @dev_null)
    
    poller.expects(:sleep).with(delay).times(unsuccessful_connection_attempts.length)
    
    limit_looping(poller, iterations = 4)
    
    poller.load_queue!(:queue_name)
  end
  
  def test_should_set_instance_queue_when_a_non_stale_queue_is_found
    manager = stub('queue manager')
    manager.stubs(:reset_queue).with(:queue_name).returns(queue = stub('queue', :stale? => false))
    
    poller = Beanstalk::QueuePoller.new(manager, 1, @dev_null)
    poller.load_queue!(:queue_name)
    
    assert_equal queue, poller.queue
  end
  
  def test_poller_yields_the_message_from_pool
    queue = Beanstalk::Queue.new(pool = stub('pool'))
    queue.stubs(:number_of_pending_messages).returns(1)
 
    queued_message = stub('message')
    pool.stubs(:reserve).returns(queued_message)
    queued_message.expects(:delete)
    
    manager = stub('queue manager')
    manager.stubs(:reset_queue).with(:queue_name).returns(queue)
    
    poller = Beanstalk::QueuePoller.new(manager, 30, @dev_null)
    
    limit_looping(poller)
    
    poller.poll(:queue_name) do |message|
      assert_equal queued_message, message
    end
  end
  
  def test_poller_uses_the_specified_tube
    queue = Beanstalk::Queue.new(pool = stub('pool'))
    queue.stubs(:number_of_pending_messages).returns(0)
    queue.expects(:use_tube).with('tubename')
    
    manager = stub('queue manager')
    manager.stubs(:reset_queue).with(:queue_name).returns(queue)
 
    poller = Beanstalk::QueuePoller.new(manager, 30, @dev_null)
    limit_looping(poller)
    
    poller.poll(:queue_name, 'tubename') do
    end
  end
  
  def test_message_should_be_deleted_once_it_has_been_successfully_processed
    poller = Beanstalk::QueuePoller.new(nil, anything, @dev_null)
    poller.stubs(:queue).returns(queue = stub('queue'))
    queue.stubs(:number_of_pending_messages).returns(1)
    queue.stubs(:next_message).returns(stubbed_message = stub('message'))
    
    poller.retrieve_and_handle_message(:queue_name) do |message|
      assert_equal stubbed_message, message
      
      # Since this expectation doesn't exist until the very end of this block
      # we can be assured that the method has not been called up until now.
      stubbed_message.expects(:delete) # after this block finishes.
    end
  end
 
  def test_message_should_NOT_be_deleted_if_beanstalk_received_an_unexpected_response
    poller = Beanstalk::QueuePoller.new(nil, 0.1, @dev_null)
    poller.stubs(:queue).returns(queue = stub('queue'))
    queue.stubs(:number_of_pending_messages).returns(1)
    queue.stubs(:next_message).returns(stubbed_message = stub_everything('message'))
    
    stubbed_message.expects(:delete).never
 
    poller.retrieve_and_handle_message(:queue_name) do |message|
      assert_equal stubbed_message, message # just to be sure we're checking the right message
      raise Beanstalk::UnexpectedResponse.new("Oh no!")
    end
  end
  
  def test_message_should_be_released_if_a_message_was_found_but_a_beanstalk_error_was_found_anyway
    poller = Beanstalk::QueuePoller.new(nil, 0.1, @dev_null)
    poller.stubs(:queue).returns(queue = stub('queue'))
    queue.stubs(:number_of_pending_messages).returns(1)
    queue.stubs(:next_message).returns(stubbed_message = stub_everything('message'))
    
    poller.retrieve_and_handle_message(:queue_name) do |message|
      message.expects(:release) # after this block returns
      raise Beanstalk::UnexpectedResponse.new("Oh no!")
    end
  end
 
  def test_polling_should_wait_if_a_beanstalk_error_occurred_while_polling
    poller = Beanstalk::QueuePoller.new(nil, delay = stub('delay'), @dev_null)
    poller.stubs(:queue).returns(queue = stub('queue'))
    queue.stubs(:number_of_pending_messages).returns(1)
    queue.stubs(:next_message).returns(stubbed_message = stub_everything('message'))
    
    poller.retrieve_and_handle_message(:queue_name) do |message|
      poller.expects(:sleep).with(delay) # after this block returns
      raise Beanstalk::UnexpectedResponse.new("Oh no!")
    end
  end
 
  def test_polling_should_sleep_and_reload_the_queue_if_a_EOFerror_occurred_while_retrieving_the_next_message
    poller = Beanstalk::QueuePoller.new(nil, delay = stub('delay'), @dev_null)
    poller.stubs(:queue).returns(queue = stub('queue'))
    queue.stubs(:number_of_pending_messages).returns(1)
    queue.stubs(:next_message).raises(EOFError)
    
    poller.expects(:puts).with(regexp_matches(/Caught exception/))
    poller.expects(:sleep).with(delay) # after this block returns
    poller.expects(:load_queue!)
 
    poller.retrieve_and_handle_message(:queue_name) do |message|
      flunk "This block should never get called"
    end
  end
  
  def test_polling_should_sleep_and_reload_the_queue_if_a_connection_reset_error_occurred_while_retrieving_the_next_message
    poller = Beanstalk::QueuePoller.new(nil, delay = stub('delay'), @dev_null)
    poller.stubs(:queue).returns(queue = stub('queue'))
    queue.stubs(:number_of_pending_messages).returns(1)
    queue.stubs(:next_message).raises(Errno::ECONNRESET)
    
    poller.expects(:puts).with(regexp_matches(/Caught exception/))
    poller.expects(:sleep).with(delay) # after this block returns
    poller.expects(:load_queue!)
 
    poller.retrieve_and_handle_message(:queue_name) do |message|
      flunk "This block should never get called"
    end
  end
  
  def test_polling_should_sleep_and_reload_the_queue_if_a_connection_refused_error_occurred_while_retrieving_the_next_message
    poller = Beanstalk::QueuePoller.new(nil, delay = stub('delay'), @dev_null)
    poller.stubs(:queue).returns(queue = stub('queue'))
    queue.stubs(:number_of_pending_messages).returns(1)
    queue.stubs(:next_message).raises(Errno::ECONNREFUSED)
    
    poller.expects(:puts).with(regexp_matches(/Caught exception/))
    poller.expects(:sleep).with(delay) # after this block returns
    poller.expects(:load_queue!)
 
    poller.retrieve_and_handle_message(:queue_name) do |message|
      flunk "This block should never get called"
    end
  end
  
  private
  
  def limit_looping(object, iterations = 1)
    object.instance_eval %{
def loop
#{iterations}.times { yield }
end
}
  end
end
 
class BeanstalkQueuePollerWithBufferTest < Test::Unit::TestCase
  
  def setup
    @dev_null = stub_everything('output stream')
  end
  
  def test_should_collect_messages_into_a_buffer_then_yield_when_the_buffer_is_full
    queue = Beanstalk::Queue.new(pool = stub('pool'))
    queue.stubs(:number_of_pending_messages).returns(4)
 
    m1 = stub_everything('message 1')
    m2 = stub_everything('message 2')
    pool.stubs(:reserve).returns(m1, m2)
    
    manager = stub('queue manager')
    manager.stubs(:reset_queue).with(:queue_name).returns(queue)
    
    poller = Beanstalk::QueuePoller.new(manager, 30, @dev_null)
    
    limit_looping(poller)
    
    poller.poll_with_buffer(:queue_name, 2) do |message_buffer|
      assert_equal message_buffer[0], m1
      assert_equal message_buffer[1], m2
    end
  end
  
  private
  
  def limit_looping(object, iterations = 1)
    object.instance_eval %{
def loop
#{iterations}.times { yield }
end
}
  end
end