From 8458fd7c15aca7f3ec53651c83b83d34046bd820 Mon Sep 17 00:00:00 2001 From: Joseph Mancuso Date: Wed, 21 Aug 2019 19:56:19 -0400 Subject: [PATCH] added blocking support --- masonite/drivers/queue/QueueAsyncDriver.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/masonite/drivers/queue/QueueAsyncDriver.py b/masonite/drivers/queue/QueueAsyncDriver.py index 383d5da91..32e03492b 100644 --- a/masonite/drivers/queue/QueueAsyncDriver.py +++ b/masonite/drivers/queue/QueueAsyncDriver.py @@ -2,17 +2,17 @@ import inspect import os +from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor, + as_completed) from masonite.app import App from masonite.contracts import QueueContract from masonite.drivers import BaseQueueDriver from masonite.exceptions import QueueException -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import ProcessPoolExecutor -from masonite.helpers import config +from masonite.helpers import HasColoredCommands, config -class QueueAsyncDriver(BaseQueueDriver, QueueContract): +class QueueAsyncDriver(BaseQueueDriver, HasColoredCommands, QueueContract): """Queue Aysnc Driver.""" def __init__(self, app: App): @@ -63,13 +63,21 @@ def push(self, *objects, args=(), kwargs={}, **options): # Set processor to either use threads or processes processor = self._get_processor(mode=mode, max_workers=workers) - + is_blocking = config('queue.drivers.async.blocking', False) + # with processor as executor: + ran = [] # with processor as executor: for obj in objects: obj = self.container.resolve(obj) if inspect.isclass(obj) else obj try: - processor.submit( + future = processor.submit( getattr(obj, callback), *args, **kwargs) except AttributeError: # Could be wanting to call only a method asyncronously - processor.submit(fn=obj, *args, **kwargs) + future = processor.submit(fn=obj, *args, **kwargs) + + ran.append(future) + + if is_blocking: + for job in as_completed(ran): + self.info("Job Ran: {}".format(job))