Skip to content

Commit

Permalink
Merge 544dcc6 into 8a8db0f
Browse files Browse the repository at this point in the history
  • Loading branch information
josephmancuso committed Dec 2, 2018
2 parents 8a8db0f + 544dcc6 commit 6502784
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 35 deletions.
Empty file added config/__init__.py
Empty file.
10 changes: 9 additions & 1 deletion config/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@
|
"""

DRIVERS = {}
DRIVERS = {
'amqp': {
'username': 'guest',
'password': 'guest',
'host': 'localhost',
'port': '5672',
'channel': 'default',
}
}
8 changes: 7 additions & 1 deletion masonite/commands/QueueWorkCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ def callback(ch, method, properties, body):
job = pickle.loads(body)
obj = job['obj']
args = job['args']
callback = job['callback']
if inspect.isclass(obj):
obj = container.resolve(obj)
obj.handle(*args)

try:
getattr(obj, callback)(*args)
except AttributeError:
obj(*args)

ch.basic_ack(delivery_tag=method.delivery_tag)


Expand Down
4 changes: 2 additions & 2 deletions masonite/drivers/QueueAmqpDriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _publish(self, body):
delivery_mode=2, # make message persistent
))

def push(self, *objects, args=()):
def push(self, *objects, args=(), callback='handle'):
"""Push objects onto the amqp stack.
Arguments:
Expand All @@ -70,7 +70,7 @@ def push(self, *objects, args=()):
for obj in objects:
# Publish to the channel for each object
try:
self._publish({'obj': obj, 'args': args})
self._publish({'obj': obj, 'args': args, 'callback': callback})
except self.pika.exceptions.ConnectionClosed:
self._connect()
self._publish({'obj': obj, 'args': args})
12 changes: 9 additions & 3 deletions masonite/drivers/QueueAsyncDriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, app: App):
"""
self.container = app

def push(self, *objects, args=()):
def push(self, *objects, args=(), callback='handle'):
"""Push objects onto the async stack.
Arguments:
Expand All @@ -29,6 +29,12 @@ def push(self, *objects, args=()):
if inspect.isclass(obj):
obj = self.container.resolve(obj)

thread = threading.Thread(
target=obj.handle, args=args, kwargs={})
try:
thread = threading.Thread(
target=getattr(obj, callback), args=args, kwargs={})
except AttributeError:
# Could be wanting to call only a method asyncronously
thread = threading.Thread(
target=obj, args=args, kwargs={})

thread.start()
2 changes: 1 addition & 1 deletion masonite/info.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Module for specifying the Masonite version in a central location."""

VERSION = '2.1.0b3.post7'
VERSION = '2.1.0'
2 changes: 2 additions & 0 deletions masonite/queues/ShouldQueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ShouldQueue:
pass
Empty file added tests/__init__.py
Empty file.
Empty file added tests/queues/__init__.py
Empty file.
27 changes: 0 additions & 27 deletions tests/queues/test_async_driver.py

This file was deleted.

55 changes: 55 additions & 0 deletions tests/queues/test_drivers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from masonite.app import App
from masonite.drivers import QueueAsyncDriver, QueueAmqpDriver
from masonite.managers import QueueManager
from config import queue

from masonite.queues.Queueable import Queueable
import os
from masonite.environment import LoadEnvironment, env

LoadEnvironment()


class Job(Queueable):

def handle(self):
print('sending from job handled')
return 'test'

class Random(Queueable):

def send(self):
print('sending from random send method')
return 'test'

def handle(self):
print('sending from random handle method')
return 'test'


class TestAsyncDriver:

def setup_method(self):
self.app = App()

self.app.bind('QueueAsyncDriver', QueueAsyncDriver)
self.app.bind('QueueAmqpDriver', QueueAmqpDriver)
self.app.bind('QueueConfig', queue)
self.app.bind('Queueable', Queueable)
self.app.bind('Container', self.app)
self.app.bind('QueueManager', QueueManager(self.app))
self.drivers = ['async']
if env('RUN_AMQP'):
self.drivers.append('amqp')

def test_async_driver_pushes_to_queue(self):
for driver in self.drivers:
assert self.app.make('QueueManager').driver(driver).push(Job) is None

def test_async_driver_can_run_any_callback_method(self):
for driver in self.drivers:
assert self.app.make('QueueManager').driver(driver).push(Random, callback="send") is None

def test_async_driver_can_run_any_method(self):
for driver in self.drivers:
assert self.app.make('QueueManager').driver(driver).push(Random().send) is None

0 comments on commit 6502784

Please sign in to comment.