Skip to content

Commit

Permalink
Merge 95e6c55 into 5f5f41b
Browse files Browse the repository at this point in the history
  • Loading branch information
João Dias Conde Azevedo committed Oct 26, 2020
2 parents 5f5f41b + 95e6c55 commit e278545
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions src/appier/queuing.py
Expand Up @@ -40,6 +40,7 @@
import json
import uuid
import heapq
import functools

from . import amqp
from . import legacy
Expand Down Expand Up @@ -179,7 +180,9 @@ def push(self, value, priority = None, identify = False):
reverse = False
)
body = self._dump(value)
self.channel.basic_publish(

self._add_callback_threadsafe(
self.channel.basic_publish,
exchange = "",
routing_key = self.name,
body = body,
Expand All @@ -203,7 +206,7 @@ def handler(channel, method, properties, body):
result = (priority, identifier, value) if full else value
ack = lambda: self.ack(delivery_tag = method.delivery_tag)
nack = lambda: self.nack(delivery_tag = method.delivery_tag)
callback(result) if auto_ack else callback(result, ack, nack)
callback(result) if auto_ack else callback(result, ack, nack)

self.channel.basic_consume(
queue = self.name,
Expand Down Expand Up @@ -259,3 +262,12 @@ def _build(self):
)
self._dumper = getattr(self, "_dump_" + self.encoder)
self._loader = getattr(self, "_load_" + self.encoder)

def _add_callback_threadsafe(self, callback, *args, **kwargs):
self.connection.add_callback_threadsafe(
functools.partial(
callback,
*args,
**kwargs
)
)

0 comments on commit e278545

Please sign in to comment.