Skip to content

Commit

Permalink
remove reject queue
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Mar 7, 2018
1 parent c20e0cb commit 24a61f3
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 47 deletions.
5 changes: 1 addition & 4 deletions kuyruk/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@

class Heartbeat:

def __init__(self, connection, on_error=None, rejects=None):
def __init__(self, connection, on_error=None):
self._connection = connection
self._on_error = on_error
self._rejects = rejects
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run)

Expand All @@ -25,8 +24,6 @@ def stop(self):

def _run(self):
while not self._stop.wait(1):
if self._rejects:
self._rejects.send_pending()
try:
try:
self._connection.send_heartbeat()
Expand Down
36 changes: 0 additions & 36 deletions kuyruk/reject.py

This file was deleted.

10 changes: 3 additions & 7 deletions kuyruk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import amqp

from kuyruk import importer, signals
from kuyruk.reject import DelayedRejects
from kuyruk.heartbeat import Heartbeat
from kuyruk.exceptions import Reject, Discard, HeartbeatError

Expand Down Expand Up @@ -114,7 +113,6 @@ def run(self):

def _consume_messages(self):
with self.kuyruk.channel() as ch:
self._rejects = DelayedRejects(ch)
# Set prefetch count to 1. If we don't set this, RabbitMQ keeps
# sending messages while we are already working on a message.
ch.basic_qos(0, 1, True)
Expand All @@ -123,15 +121,13 @@ def _consume_messages(self):
self._consume_queues(ch)
logger.info('Consumer started')
self._main_loop(ch)
self._rejects.send_pending()

def _main_loop(self, ch):
while not self.shutdown_pending.is_set():
if self._max_load:
self._pause_or_resume(ch)

try:
self._rejects.send_pending()
ch.connection.heartbeat_tick()
ch.connection.drain_events(timeout=1)
except socket.timeout:
Expand Down Expand Up @@ -215,7 +211,7 @@ def _process_task(self, message, description, task, args, kwargs):
result = self._run_task(message.channel.connection, task, args, kwargs)
except Reject:
logger.warning('Task is rejected')
self._rejects.push(0, message.delivery_tag, requeue=True)
message.channel.basic_reject(message.delivery_tag, requeue=True)
except Discard:
logger.warning('Task is discarded')
message.channel.basic_reject(message.delivery_tag, requeue=False)
Expand Down Expand Up @@ -249,7 +245,7 @@ def _process_task(self, message, description, task, args, kwargs):
exc_info=exc_info,
worker=self,
queue=queue)
self._rejects.push(0, message.delivery_tag, requeue=False)
message.channel.basic_reject(message.delivery_tag, requeue=False)
if reply_to:
self._send_reply(reply_to, message.channel, None, exc_info)
else:
Expand All @@ -261,7 +257,7 @@ def _process_task(self, message, description, task, args, kwargs):
logger.debug("Task is processed")

def _run_task(self, connection, task, args, kwargs):
hb = Heartbeat(connection, self._on_heartbeat_error, rejects=self._rejects)
hb = Heartbeat(connection, self._on_heartbeat_error)
hb.start()

self.current_task = task
Expand Down

0 comments on commit 24a61f3

Please sign in to comment.