Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Make connection attempt resilient #59

Open
wants to merge 7 commits into from

4 participants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 31, 2013
  1. @jturmel

    Add support for SUBMIT_JOB_EPOCH

    jturmel authored
    * submit_job now accepts a 'when_to_run' parameter to take advantage of
      Gearman's ability to schedule jobs to be run in the future
Commits on Aug 2, 2013
  1. @jturmel

    Gearman tweaks to fix bug

    jturmel authored
Commits on Oct 2, 2013
  1. @kmorey

    Revert "Gearman tweaks to fix bug"

    kmorey authored
    This reverts commit 2ea5ca7.
  2. @kmorey

    Merge branch 'upstream/master'

    kmorey authored
  3. @kmorey
  4. @chrisvaughn

    Merge pull request #1 from lifechurch/km/read_deadlocks

    chrisvaughn authored
    Handle read deadlocks
Commits on Apr 8, 2014
  1. @bbelyeu
This page is out of date. Refresh to see the latest.
View
2  gearman/__init__.py
@@ -2,7 +2,7 @@
Gearman API - Client, worker, and admin client interfaces
"""
-__version__ = '2.0.2'
+__version__ = '2.0.2-lctv'
from gearman.admin_client import GearmanAdminClient
from gearman.client import GearmanClient
View
26 gearman/client.py
@@ -32,16 +32,22 @@ def __init__(self, host_list=None, random_unique_bytes=RANDOM_UNIQUE_BYTES):
# Ignores the fact if a request has been bound to a connection or not
self.request_to_rotating_connection_queue = weakref.WeakKeyDictionary(compat.defaultdict(collections.deque))
- def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
+ def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, when_to_run=None, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Submit a single job to any gearman server"""
- job_info = dict(task=task, data=data, unique=unique, priority=priority)
+ assert (when_to_run is None or
+ (when_to_run and
+ priority == PRIORITY_NONE and
+ not background)
+ ), "priority and background cannot be set with when_to_run"
+
+ job_info = dict(task=task, data=data, unique=unique, priority=priority, when_to_run=when_to_run)
completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout)
return gearman.util.unlist(completed_job_list)
def submit_multiple_jobs(self, jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Takes a list of jobs_to_submit with dicts of
- {'task': task, 'data': data, 'unique': unique, 'priority': priority}
+ {'task': task, 'data': data, 'unique': unique, 'priority': priority, 'when_to_run': when_to_run}
"""
assert type(jobs_to_submit) in (list, tuple, set), "Expected multiple jobs, received 1?"
@@ -165,18 +171,26 @@ def continue_while_status_not_updated(any_activity):
return job_requests
def _create_request_from_dictionary(self, job_info, background=False, max_retries=0):
- """Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'background': background}"""
+ """Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'when_to_run': when_to_run, 'background': background}"""
# Make sure we have a unique identifier for ALL our tasks
job_unique = job_info.get('unique')
if not job_unique:
job_unique = os.urandom(self.random_unique_bytes).encode('hex')
- current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, data=job_info['data'])
+ run_later = False
+ if job_info.get('when_to_run'):
+ job_info['when_to_run'] = str(job_info.get('when_to_run'))
+ run_later = True
+ # run_later jobs always are in the background, so set to True
+ background = True
+ job_info['background'] = background
+
+ current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, when_to_run=job_info.get('when_to_run'), data=job_info['data'])
initial_priority = job_info.get('priority', PRIORITY_NONE)
max_attempts = max_retries + 1
- current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, max_attempts=max_attempts)
+ current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, run_later=run_later, max_attempts=max_attempts)
return current_request
def establish_request_connection(self, current_request):
View
9 gearman/client_handler.py
@@ -6,7 +6,7 @@
from gearman.command_handler import GearmanCommandHandler
from gearman.constants import JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import InvalidClientState
-from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority
+from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority_run_later
gearman_logger = logging.getLogger(__name__)
@@ -29,10 +29,13 @@ def send_job_request(self, current_request):
gearman_job = current_request.job
# Handle the I/O for requesting a job - determine which COMMAND we need to send
- cmd_type = submit_cmd_for_background_priority(current_request.background, current_request.priority)
+ cmd_type = submit_cmd_for_background_priority_run_later(current_request.background, current_request.priority, current_request.run_later)
outbound_data = self.encode_data(gearman_job.data)
- self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)
+ if current_request.run_later:
+ self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, when_to_run=gearman_job.when_to_run, data=outbound_data)
+ else:
+ self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)
# Once this command is sent, our request needs to wait for a handle
current_request.state = JOB_PENDING
View
15 gearman/connection_manager.py
@@ -129,7 +129,7 @@ def poll_connections_once(self, poller, connection_map, timeout=None):
# a timeout of -1 when used with epoll will block until there
# is activity. Select does not support negative timeouts, so this
# is translated to a timeout=None when falling back to select
- timeout = timeout or -1
+ timeout = timeout or -1
readable = set()
writable = set()
@@ -194,7 +194,7 @@ def poll_connections_until_stopped(self, submitted_connections, callback_fxn, ti
connection_ok = compat.any(current_connection.connected for current_connection in submitted_connections)
poller = gearman.io.get_connection_poller()
if connection_ok:
- self._register_connections_with_poller(submitted_connections,
+ self._register_connections_with_poller(submitted_connections,
poller)
connection_map = dict([(c.fileno(), c) for c in
submitted_connections if c.connected])
@@ -204,10 +204,21 @@ def poll_connections_until_stopped(self, submitted_connections, callback_fxn, ti
if time_remaining == 0.0:
break
+ # if we have writes to do, do those first
+ dead_writes = set()
+ for current_connection in submitted_connections:
+ if current_connection.writable():
+ try:
+ self.handle_write(current_connection)
+ except ConnectionError:
+ # this dead connection will be closed in handle_connection_activity later
+ dead_writes.add(current_connection)
+
# Do a single robust select and handle all connection activity
read_connections, write_connections, dead_connections = self.poll_connections_once(poller, connection_map, timeout=time_remaining)
# Handle reads and writes and close all of the dead connections
+ dead_connections |= dead_writes
read_connections, write_connections, dead_connections = self.handle_connection_activity(read_connections, write_connections, dead_connections)
any_activity = compat.any([read_connections, write_connections, dead_connections])
View
14 gearman/job.py
@@ -3,27 +3,29 @@
class GearmanJob(object):
"""Represents the basics of a job... used in GearmanClient / GearmanWorker to represent job states"""
- def __init__(self, connection, handle, task, unique, data):
+ def __init__(self, connection, handle, task, unique, data, when_to_run):
self.connection = connection
self.handle = handle
self.task = task
self.unique = unique
self.data = data
+ self.when_to_run = when_to_run
def to_dict(self):
- return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data)
+ return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data, when_to_run=self.when_to_run)
def __repr__(self):
- return '<GearmanJob connection/handle=(%r, %r), task=%s, unique=%s, data=%r>' % (self.connection, self.handle, self.task, self.unique, self.data)
+ return '<GearmanJob connection/handle=(%r, %r), task=%s, unique=%s, data=%r, when_to_run=%d>' % (self.connection, self.handle, self.task, self.unique, self.data, self.when_to_run)
class GearmanJobRequest(object):
"""Represents a job request... used in GearmanClient to represent job states"""
- def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, max_attempts=1):
+ def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, run_later=False, max_attempts=3):
self.gearman_job = gearman_job
self.priority = initial_priority
self.background = background
+ self.run_later = run_later
self.connection_attempts = 0
self.max_connection_attempts = max_attempts
@@ -79,5 +81,5 @@ def complete(self):
return actually_complete
def __repr__(self):
- formatted_representation = '<GearmanJobRequest task=%r, unique=%r, priority=%r, background=%r, state=%r, timed_out=%r>'
- return formatted_representation % (self.job.task, self.job.unique, self.priority, self.background, self.state, self.timed_out)
+ formatted_representation = '<GearmanJobRequest task=%r, unique=%r, when_to_run=%r, priority=%r, background=%r, run_later=%r, state=%r, timed_out=%r>'
+ return formatted_representation % (self.job.task, self.job.unique, self.job.when_to_run, self.priority, self.background, self.run_later, self.state, self.timed_out)
View
23 gearman/protocol.py
@@ -50,6 +50,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW = 33
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG = 34
+GEARMAN_COMMAND_SUBMIT_JOB_EPOCH = 36
+
# Fake command code
GEARMAN_COMMAND_TEXT_COMMAND = 9999
@@ -95,6 +97,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW: ['task', 'unique', 'data'],
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: ['task', 'unique', 'data'],
+ GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: ['task', 'unique', 'when_to_run', 'data'],
+
# Fake gearman command
GEARMAN_COMMAND_TEXT_COMMAND: ['raw_text']
}
@@ -140,6 +144,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW',
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG',
+ GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: 'GEARMAN_COMMAND_SUBMIT_JOB_EPOCH',
+
GEARMAN_COMMAND_TEXT_COMMAND: 'GEARMAN_COMMAND_TEXT_COMMAND'
}
@@ -152,16 +158,17 @@
def get_command_name(cmd_type):
return GEARMAN_COMMAND_TO_NAME.get(cmd_type, cmd_type)
-def submit_cmd_for_background_priority(background, priority):
+def submit_cmd_for_background_priority_run_later(background, priority, run_later):
cmd_type_lookup = {
- (True, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB_BG,
- (True, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
- (True, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
- (False, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB,
- (False, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW,
- (False, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH
+ (True, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB_BG,
+ (True, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
+ (True, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
+ (False, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB,
+ (False, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW,
+ (False, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
+ (True, PRIORITY_NONE, True): GEARMAN_COMMAND_SUBMIT_JOB_EPOCH
}
- lookup_tuple = (background, priority)
+ lookup_tuple = (background, priority, run_later)
cmd_type = cmd_type_lookup[lookup_tuple]
return cmd_type
View
6 gearman/worker.py
@@ -210,7 +210,7 @@ def send_job_warning(self, current_job, data, poll_timeout=None):
def create_job(self, command_handler, job_handle, task, unique, data):
"""Create a new job using our self.job_class"""
current_connection = self.handler_to_connection_map[command_handler]
- return self.job_class(current_connection, job_handle, task, unique, data)
+ return self.job_class(current_connection, job_handle, task, unique, data, None)
def on_job_execute(self, current_job):
try:
@@ -248,10 +248,10 @@ def set_job_lock(self, command_handler, lock):
self.command_handler_holding_job_lock = None
return True
-
+
def has_job_lock(self):
return bool(self.command_handler_holding_job_lock is not None)
-
+
def check_job_lock(self, command_handler):
"""Check to see if we hold the job lock"""
return bool(self.command_handler_holding_job_lock == command_handler)
View
6 tests/_core_testing.py
@@ -78,15 +78,15 @@ def setup_command_handler(self):
self.command_handler = self.connection_manager.connection_to_handler_map[self.connection]
def generate_job(self):
- return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()))
+ return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=None)
def generate_job_dict(self):
current_job = self.generate_job()
return current_job.to_dict()
- def generate_job_request(self, priority=PRIORITY_NONE, background=False):
+ def generate_job_request(self, priority=PRIORITY_NONE, when_to_run=None, background=False):
job_handle = str(random.random())
- current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()))
+ current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=when_to_run)
current_request = self.job_request_class(current_job, initial_priority=priority, background=background)
self.assertEqual(current_request.state, JOB_UNKNOWN)
View
4 tests/client_tests.py
@@ -7,7 +7,7 @@
from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ExceededConnectionAttempts, ServerUnavailable, InvalidClientState
-from gearman.protocol import submit_cmd_for_background_priority, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
+from gearman.protocol import submit_cmd_for_background_priority_run_later, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_DATA, GEARMAN_COMMAND_WORK_WARNING
from tests._core_testing import _GearmanAbstractTest, MockGearmanConnectionManager, MockGearmanConnection
@@ -312,7 +312,7 @@ def test_send_job_request(self):
queued_request = self.command_handler.requests_awaiting_handles.popleft()
self.assertEqual(queued_request, current_request)
- expected_cmd_type = submit_cmd_for_background_priority(background, priority)
+ expected_cmd_type = submit_cmd_for_background_priority_run_later(background, priority, False)
self.assert_sent_command(expected_cmd_type, task=gearman_job.task, data=gearman_job.data, unique=gearman_job.unique)
def test_get_status_of_job(self):
Something went wrong with that request. Please try again.