Permalink
Browse files

Cleaning up submissions

  • Loading branch information...
1 parent 2fd3b25 commit d0de330444fbbdb06bd2d8ca426309f46326288f Matthew Tai committed Oct 30, 2010
Showing with 32 additions and 32 deletions.
  1. +17 −13 gearman/client.py
  2. +6 −6 gearman/connection_manager.py
  3. +8 −12 gearman/connection_poller.py
  4. +1 −1 gearman/worker.py
View
@@ -63,32 +63,34 @@ def submit_multiple_requests(self, job_requests, wait_until_accepted=True, wait_
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
countdown_timer = gearman.util.CountdownTimer(poll_timeout)
- # We should always wait until our job is accepted, this should be fast
- self.wait_until_connection_established(poll_timeout=countdown_timer.time_remaining)
+ # Continue looping in case our job failed
+ while not countdown_timer.expired:
+ self.wait_until_connection_established(poll_timeout=countdown_timer.time_remaining)
- if wait_until_accepted and not countdown_timer.expired:
- processed_requests = self.wait_until_jobs_accepted(job_requests, poll_timeout=countdown_timer.time_remaining)
+ if wait_until_accepted and not countdown_timer.expired:
+ job_requests = self.wait_until_jobs_accepted(job_requests, poll_timeout=countdown_timer.time_remaining)
- # Optionally, we'll allow a user to wait until all jobs are complete with the same poll_timeout
- if wait_until_complete and not countdown_timer.expired:
- processed_requests = self.wait_until_jobs_completed(processed_requests, poll_timeout=countdown_timer.time_remaining)
+ # Optionally, we'll allow a user to wait until all jobs are complete with the same poll_timeout
+ if wait_until_complete and not countdown_timer.expired:
+ job_requests = self.wait_until_jobs_completed(job_requests, poll_timeout=countdown_timer.time_remaining)
- return processed_requests
+ return job_requests
def wait_until_jobs_accepted(self, job_requests, poll_timeout=None):
"""Go into a select loop until all our jobs have moved to STATE_PENDING"""
assert type(job_requests) in (list, tuple, set), "Expected multiple job requests, received 1?"
+ # Send off requests to create this job then poll and wait
+ for current_request in job_requests:
+ if current_request.state == JOB_UNKNOWN:
+ self.send_job_request(current_request)
+
def is_request_pending(current_request):
return bool(current_request.state == JOB_PENDING)
# Poll until we know we've gotten acknowledgement that our job's been accepted
# If our connection fails while we're waiting for it to be accepted, automatically retry right here
def continue_while_jobs_pending():
- for current_request in job_requests:
- if current_request.state == JOB_UNKNOWN:
- self.send_job_request(current_request)
-
return compat.any(is_request_pending(current_request) for current_request in job_requests)
self.poll_connections_until_stopped(continue_while_jobs_pending, timeout=poll_timeout)
@@ -220,8 +222,10 @@ def send_job_request(self, current_request):
if current_request.connection_attempts >= current_request.max_connection_attempts:
raise ExceededConnectionAttempts('Exceeded %d connection attempt(s) :: %r' % (current_request.max_connection_attempts, current_request))
- chosen_connection = self.choose_connection(current_request)
+ if current_request.state != JOB_UNKNOWN:
+ raise InvalidClientState
+ chosen_connection = self.choose_connection(current_request)
current_request.job.connection = chosen_connection
current_request.connection_attempts += 1
current_request.timed_out = False
@@ -139,11 +139,14 @@ def _setup_handler(self, current_handler):
def poll_connections_until_stopped(self, continue_polling_callback, timeout=None):
return self._connection_poller.poll_until_stopped(continue_polling_callback, timeout=timeout)
- def wait_until_connection_established(self, poll_timeout=None):
- # Poll to make sure we send out our request for a status update
+ def establish_connections(self):
for current_connection in self.connection_list:
if current_connection.disconnected:
- self.attempt_connect(current_connection)
+ current_connection.connect()
+
+ def wait_until_connection_established(self, poll_timeout=None):
+ # Poll to make sure we send out our request for a status update
+ self.establish_connections()
def continue_while_not_connected():
return not compat.any(current_connection.connected for current_connection in self.connection_list)
@@ -158,9 +161,6 @@ def continue_while_connections_alive():
return compat.all([current_connection.connected for current_connection in self.connection_list])
self.poll_connections_until_stopped(continue_while_connections_alive, timeout=poll_timeout)
-
- def attempt_connect(self, current_connection):
- current_connection.connect()
###################################
# Connection management functions #
@@ -84,9 +84,9 @@ def _poll_once(self, timeout=None):
try:
event_rd_conns, event_wr_conns, event_ex_conns = self._execute_select(check_rd_conns, check_wr_conns, check_ex_conns, timeout=timeout)
- actual_rd_conns |= event_rd_conns
- actual_wr_conns |= event_wr_conns
- actual_ex_conns |= event_ex_conns
+ actual_rd_conns |= set(event_rd_conns)
+ actual_wr_conns |= set(event_wr_conns)
+ actual_ex_conns |= set(event_ex_conns)
successful_select = True
except (select.error, ConnectionError):
@@ -111,26 +111,22 @@ def _poll_once(self, timeout=None):
def _execute_select(self, rd_conns, wr_conns, ex_conns, timeout=None):
"""Behave similar to select.select, except ignoring certain types of exceptions"""
- rd_set = set()
- wr_set = set()
- ex_set = set()
-
select_args = [rd_conns, wr_conns, ex_conns]
if timeout is not None:
select_args.append(timeout)
try:
rd_list, wr_list, er_list = select.select(*select_args)
- rd_set = set(rd_list)
- wr_set = set(wr_list)
- er_set = set(er_list)
-
except select.error, exc:
# Ignore interrupted system call, reraise anything else
if exc[0] != errno.EINTR:
raise
- return rd_set, wr_set, er_set
+ rd_list = []
+ wr_list = []
+ er_list = []
+
+ return rd_list, wr_list, er_list
def _handle_connection_activity(self, rd_connections, wr_connections, ex_connections):
"""Process all connection activity... executes all handle_* callbacks"""
View
@@ -68,11 +68,11 @@ def set_client_id(self, client_id):
def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS):
"""Loop indefinitely, complete tasks from all connections."""
-
countdown_timer = gearman.util.CountdownTimer(poll_timeout)
# Shuffle our connections after the poll timeout
while True:
+ print "Entering work loop with %f" % countdown_timer.time_remaining
self.wait_until_connection_established(poll_timeout=countdown_timer.time_remaining)
self.wait_until_connection_lost(poll_timeout=countdown_timer.time_remaining)

0 comments on commit d0de330

Please sign in to comment.