Skip to content

Commit

Permalink
Closes #2326 for master branch
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jan 12, 2015
1 parent 8184205 commit fab0b11
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
20 changes: 12 additions & 8 deletions celery/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,17 +432,22 @@ def _strip_prefix(self, key):
return bytes_to_str(key[len(prefix):])
return bytes_to_str(key)

def _filter_ready(self, values, READY_STATES=states.READY_STATES):
for k, v in values:
if v is not None:
v = self.decode_result(v)
if v['status'] in READY_STATES:
yield k, v

def _mget_to_results(self, values, keys):
if hasattr(values, 'items'):
# client returns dict so mapping preserved.
return dict((self._strip_prefix(k), self.decode_result(v))
for k, v in items(values)
if v is not None)
return dict((self._strip_prefix(k), v)
for k, v in self._filter_ready(items(values)))
else:
# client returns list so need to recreate mapping.
return dict((bytes_to_str(keys[i]), self.decode_result(value))
for i, value in enumerate(values)
if value is not None)
return dict((bytes_to_str(keys[i]), v)
for i, v in self._filter_ready(enumerate(values)))

def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
READY_STATES=states.READY_STATES):
Expand All @@ -469,8 +474,7 @@ def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
cache.update(r)
ids.difference_update(set(bytes_to_str(v) for v in r))
for key, value in items(r):
if value['status'] in READY_STATES:
yield bytes_to_str(key), value
yield bytes_to_str(key), value
if timeout and iterations * interval >= timeout:
raise TimeoutError('Operation timed out ({0})'.format(timeout))
time.sleep(interval) # don't busy loop.
Expand Down
2 changes: 1 addition & 1 deletion celery/worker/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def body(self):
self.maybe_scale()
sleep(1.0)

def _maybe_scale(self):
def _maybe_scale(self, req=None):
procs = self.processes
cur = min(self.qty, self.max_concurrency)
if cur > procs:
Expand Down

0 comments on commit fab0b11

Please sign in to comment.