Permalink
Browse files

Merge pull request #42 from hashbrowncipher/server_dead

Retry gracefully when a server restarts
  • Loading branch information...
2 parents 9be238d + 79b8c44 commit a5f9a574023a069cb54ba9641df1d9686c2fb0f3 @klange klange committed Jun 17, 2013
Showing with 31 additions and 6 deletions.
  1. +4 −1 gearman/client_handler.py
  2. +0 −3 gearman/connection_manager.py
  3. +27 −2 tests/client_tests.py
View
5 gearman/client_handler.py
@@ -69,7 +69,10 @@ def recv_job_created(self, job_handle):
if not self.requests_awaiting_handles:
raise InvalidClientState('Received a job_handle with no pending requests')
- # If our client got a JOB_CREATED, our request now has a server handle
+ """The gearman server returns JOB_CREATED responses in order
+ in response to SUBMIT_JOB_* requests. We therefore know that the
+ handle we just got cooresponds to the first request awaiting a
+ handle"""
current_request = self.requests_awaiting_handles.popleft()
self._assert_request_state(current_request, JOB_PENDING)
View
3 gearman/connection_manager.py
@@ -201,9 +201,6 @@ def poll_connections_until_stopped(self, submitted_connections, callback_fxn, ti
any_activity = compat.any([read_connections, write_connections, dead_connections])
- # Do not retry dead connections on the next iteration of the loop, as we closed them in handle_error
- submitted_connections -= dead_connections
-
callback_ok = callback_fxn(any_activity)
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)
View
29 tests/client_tests.py
@@ -1,4 +1,5 @@
import collections
+import mock
import random
import unittest
@@ -131,7 +132,7 @@ def fail_then_create_jobs(rx_conns, wr_conns, ex_conns):
current_request.max_connection_attempts = self.connection_manager.expected_failures + 1
current_request.state = JOB_UNKNOWN
- accepted_jobs = self.connection_manager.wait_until_jobs_accepted([current_request])
+ self.connection_manager.wait_until_jobs_accepted([current_request])
self.assertEquals(current_request.state, JOB_CREATED)
self.assertEquals(current_request.connection_attempts, current_request.max_connection_attempts)
@@ -144,6 +145,31 @@ def fail_then_create_jobs(rx_conns, wr_conns, ex_conns):
self.assertEquals(current_request.state, JOB_UNKNOWN)
self.assertEquals(current_request.connection_attempts, current_request.max_connection_attempts)
+ def test_retry_flaked_connection(self):
+ flaky_connection = MockGearmanConnection()
+ flaky_connection._fail_on_read = True
+ flaky_connection.poll_count = 0
+
+ expected_job = self.generate_job()
+
+ self.connection_manager.connection_list = [flaky_connection]
+
+ def poll_connections_once(connections, timeout=None):
+ flaky_connection.poll_count += 1
+ if flaky_connection.poll_count == 1:
+ flaky_connection._fail_on_read = False
+ return set(), set(), set([flaky_connection])
+ else:
+ handler = self.connection_manager.connection_to_handler_map[flaky_connection]
+ handler.recv_command(
+ GEARMAN_COMMAND_JOB_CREATED,
+ job_handle=expected_job.handle)
+ return set([flaky_connection]), set(), set()
+
+ with mock.patch.object(self.connection_manager, 'poll_connections_once', poll_connections_once):
+ job_request = self.connection_manager.submit_job(expected_job.task, expected_job.data, unique=expected_job.unique, background=True, priority=PRIORITY_LOW, wait_until_complete=False, max_retries=1)
+ self.assertEqual(job_request.state, JOB_CREATED)
+
def test_multiple_fg_job_submission(self):
submitted_job_count = 5
expected_job_list = [self.generate_job() for _ in xrange(submitted_job_count)]
@@ -426,7 +452,6 @@ def test_work_fail(self):
current_request = self.generate_job_request()
job_handle = current_request.job.handle
- new_data = str(random.random())
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_FAIL, job_handle=job_handle)
self.assertEqual(current_request.state, JOB_FAILED)

0 comments on commit a5f9a57

Please sign in to comment.