Skip to content

Commit

Permalink
Merge 51cc2c4 into ff1de57
Browse files Browse the repository at this point in the history
  • Loading branch information
josephmancuso committed Oct 5, 2018
2 parents ff1de57 + 51cc2c4 commit 5580bf2
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 4 deletions.
51 changes: 51 additions & 0 deletions masonite/commands/QueueWorkCommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
""" A QueueWorkCommand Command """

import inspect
import pickle

from cleo import Command

from config import queue
from masonite.exceptions import DriverLibraryNotFound


def callback(ch, method, properties, body):
from wsgi import container
job = pickle.loads(body)
if inspect.isclass(job):
job = container.resolve(job)
job.handle()
ch.basic_ack(delivery_tag=method.delivery_tag)


class QueueWorkCommand(Command):
"""
Start the queue worker
queue:work
{--c|channel=default : The channel to listen on the queue}
{--f|fair : Send jobs to queues that have no jobs instead of randomly selecting a queue}
"""

def handle(self):
try:
import pika
except ImportError:
raise DriverLibraryNotFound(
"Could not find the 'pika' library. Run pip install pika to fix this.")

connection = pika.BlockingConnection(pika.URLParameters('amqp://{}:{}@{}:{}/%2F'.format(
queue.DRIVERS['amqp']['username'], queue.DRIVERS['amqp']['password'], queue.DRIVERS['amqp']['host'], queue.DRIVERS['amqp']['port'],
)))
channel = connection.channel()

channel.queue_declare(queue=self.option('channel'), durable=True)

channel.basic_consume(callback,
queue=self.option('channel'))
if self.option('fair'):
channel.basic_qos(prefetch_count=1)

self.info(' [*] Waiting to process jobs on the "{}" channel. To exit press CTRL+C'.format(
self.option('channel')))
channel.start_consuming()
1 change: 1 addition & 0 deletions masonite/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .MigrateRollbackCommand import MigrateRollbackCommand
from .ModelCommand import ModelCommand
from .ProviderCommand import ProviderCommand
from .QueueWorkCommand import QueueWorkCommand
from .ServeCommand import ServeCommand
from .ViewCommand import ViewCommand
from .ValidatorCommand import ValidatorCommand
Expand Down
58 changes: 58 additions & 0 deletions masonite/drivers/QueueAmqpDriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
""" Driver for AMQP support """

import pickle
import threading

from config import queue
from masonite.contracts import QueueContract
from masonite.drivers import BaseDriver
from masonite.exceptions import DriverLibraryNotFound

if 'amqp' in queue.DRIVERS:
listening_channel = queue.DRIVERS['amqp']['channel']
else:
listening_channel = 'default'


class QueueAmqpDriver(QueueContract, BaseDriver):

def __init__(self, Container):
"""Queue AMQP Driver
Arguments:
Container {masonite.app.App} -- The application container.
"""

try:
import pika
self.pika = pika
except ImportError:
raise DriverLibraryNotFound(
"Could not find the 'pika' library. Run pip install pika to fix this.")

# Start the connection
connection = self.pika.BlockingConnection(
self.pika.ConnectionParameters('localhost')
)

# Get the channel
self.channel = connection.channel()

# Declare what queue we are working with
self.channel.queue_declare(queue=listening_channel, durable=True)

def push(self, *objects):
"""Push objects onto the amqp stack.
Arguments:
objects {*args of objects} - This can be several objects as parameters into this method.
"""

for obj in objects:
# Publish to the channel for each object
self.channel.basic_publish(exchange='',
routing_key=listening_channel,
body=pickle.dumps(obj),
properties=self.pika.BasicProperties(
delivery_mode=2, # make message persistent
))
5 changes: 4 additions & 1 deletion masonite/drivers/QueueAsyncDriver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Async Driver Method """

import threading
import inspect

from masonite.contracts.QueueContract import QueueContract
from masonite.drivers.BaseDriver import BaseDriver
Expand All @@ -27,7 +28,9 @@ def push(self, *objects):
"""

for obj in objects:
obj = self.container.resolve(obj)
if inspect.isclass(obj):
obj = self.container.resolve(obj)

thread = threading.Thread(
target=obj.dispatch(), args=(), kwargs={})
thread.start()
1 change: 1 addition & 0 deletions masonite/drivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .MailLogDriver import MailLogDriver
from .MailTerminalDriver import MailTerminalDriver
from .QueueAsyncDriver import QueueAsyncDriver
from .QueueAmqpDriver import QueueAmqpDriver
from .SessionCookieDriver import SessionCookieDriver
from .SessionMemoryDriver import SessionMemoryDriver
from .UploadDiskDriver import UploadDiskDriver
Expand Down
5 changes: 3 additions & 2 deletions masonite/providers/AppProvider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
KeyCommand, MakeMigrationCommand,
MigrateCommand, MigrateRefreshCommand,
MigrateResetCommand, MigrateRollbackCommand,
ModelCommand, ProviderCommand, RoutesCommand,
SeedCommand, SeedRunCommand, ServeCommand,
ModelCommand, ProviderCommand, QueueWorkCommand,
RoutesCommand, SeedCommand, SeedRunCommand, ServeCommand,
TinkerCommand, ViewCommand, ValidatorCommand)

from masonite.exception_handler import ExceptionHandler
Expand Down Expand Up @@ -50,6 +50,7 @@ def register(self):
MigrateRollbackCommand())
self.app.bind('MasoniteModelCommand', ModelCommand())
self.app.bind('MasoniteProviderCommand', ProviderCommand())
self.app.bind('MasoniteQueueWorkCommand', QueueWorkCommand())
self.app.bind('MasoniteViewCommand', ViewCommand())
self.app.bind('MasoniteRoutesCommand', RoutesCommand())
self.app.bind('MasoniteServeCommand', ServeCommand())
Expand Down
3 changes: 2 additions & 1 deletion masonite/providers/QueueProvider.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
""" A RedirectionProvider Service Provider """

from config import queue
from masonite.drivers import QueueAsyncDriver
from masonite.drivers import QueueAsyncDriver, QueueAmqpDriver
from masonite.managers import QueueManager
from masonite.provider import ServiceProvider

Expand All @@ -12,6 +12,7 @@ class QueueProvider(ServiceProvider):

def register(self):
self.app.bind('QueueAsyncDriver', QueueAsyncDriver)
self.app.bind('QueueAmqpDriver', QueueAmqpDriver)
self.app.bind('QueueManager', QueueManager)
self.app.bind('QueueConfig', queue)

Expand Down

0 comments on commit 5580bf2

Please sign in to comment.