Skip to content

Commit

Permalink
added blocking support
Browse files Browse the repository at this point in the history
  • Loading branch information
josephmancuso committed Aug 21, 2019
1 parent ae65825 commit 8458fd7
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions masonite/drivers/queue/QueueAsyncDriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

import inspect
import os
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
as_completed)

from masonite.app import App
from masonite.contracts import QueueContract
from masonite.drivers import BaseQueueDriver
from masonite.exceptions import QueueException
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from masonite.helpers import config
from masonite.helpers import HasColoredCommands, config


class QueueAsyncDriver(BaseQueueDriver, QueueContract):
class QueueAsyncDriver(BaseQueueDriver, HasColoredCommands, QueueContract):
"""Queue Aysnc Driver."""

def __init__(self, app: App):
Expand Down Expand Up @@ -63,13 +63,21 @@ def push(self, *objects, args=(), kwargs={}, **options):

# Set processor to either use threads or processes
processor = self._get_processor(mode=mode, max_workers=workers)

is_blocking = config('queue.drivers.async.blocking', False)
# with processor as executor:
ran = []
# with processor as executor:
for obj in objects:
obj = self.container.resolve(obj) if inspect.isclass(obj) else obj
try:
processor.submit(
future = processor.submit(
getattr(obj, callback), *args, **kwargs)
except AttributeError:
# Could be wanting to call only a method asyncronously
processor.submit(fn=obj, *args, **kwargs)
future = processor.submit(fn=obj, *args, **kwargs)

ran.append(future)

if is_blocking:
for job in as_completed(ran):
self.info("Job Ran: {}".format(job))

0 comments on commit 8458fd7

Please sign in to comment.