Skip to content

Commit

Permalink
Next Minor (#553)
Browse files Browse the repository at this point in the history
* added new config helper function

* flake8

* added docstrings

* flake8

* removed need for config prefix

* Add in container (#520)

* added contains method to container

* formatted and added tests

* bumped version

* Add ability to login using multiple columns (#521)

* add list to auth class

* adds ability to set a password column

* formatted

* fixed space in exception

* Adds route redirection (#545)

* adds a new redirect route class

* flake8

* fixes issue where a json null could throw an exception (#544)

* added ability to set status code in controller (#540)

* added ability to set status code in controller

* flake8

* added better exception when passing incorrect parameter type (#539)

* added better exception when passing incorrect parameter type

* fixed view test

* fix delete method when body length is 0 (#529)

* fix docstring for secure headers middleware (#548)

* bumped version

* Adds ability to use list for route params (#552)

* Adds ability to use list for route params

* added new assertion

* Add new data type to be done serializing after response into json.

* Fixing statement else if to elif

* Inicial code to test response.

* Assigment the serialized data to view variable to return just at end method.

* Add test to verify the return of Response.

* Wrapped serialized content by json() method from Response.

* Filling more code to verify why Response is come empty.

* minor tweaks for tests

* flake8

* added ability to specify routes in the initializer (#559)

* flake8

* Add migration message (#551)

* added ability to show message when migrations are unmigrated

* flake8 fixes

* fixed command

* flake8

* Adding statement to verify whether queue's driver is default.

* Adding tests do verify default queue driver. Closes #564

* reworked queue connection drivers (#563)

* fixed login method (#562)

* bumped version

* reworked queue connection drivers

* modified queues

* Masonite can now retry failed jobs 3 times and call a failed callback

* added queue route

* added failed job handling and job database table

* fixed queue command docstrings

* job failed methods now need to accept payload and error

* commit

* can now specify the channel you want to listen on

* flake8

* flake8

* removed queue failed command

* fixed command option descroption

* fixed failed and fair

* modified test

* added base queue driver

* only queueable classes will get requeud on fail

* flake8

* upgrades async driver

* updated async

* cleaned up class inheritance

* flake8

* fixed contract

* updated queue contract

* added better test for default

* bumped version number
  • Loading branch information
josephmancuso committed Feb 10, 2019
1 parent e99da6c commit 7f30c28
Show file tree
Hide file tree
Showing 32 changed files with 448 additions and 101 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
include masonite/snippets/exceptions/css/*
include masonite/snippets/exceptions/*
include masonite/snippets/*
include masonite/snippets/migrations/*
include masonite/snippets/scaffold/*
include masonite/snippets/auth/controllers/*
include masonite/snippets/auth/templates/auth/*
10 changes: 10 additions & 0 deletions app/http/controllers/TestController.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from masonite.exceptions import DebugException

from masonite.request import Request
from masonite import Queue
from app.jobs.TestJob import TestJob


class TestController:
Expand Down Expand Up @@ -32,6 +34,14 @@ def post_test(self):
def json(self):
return 'success'

def bad(self):
return 5/0

def session(self, request: Request):
request.session.set('test', 'value')
return 'session set'

def queue(self, queue: Queue):
# queue.driver('amqp').push(self.bad)
queue.driver('amqp').push(TestJob, channel='default')
return 'queued'
23 changes: 23 additions & 0 deletions app/jobs/TestJob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
""" A TestJob Queue Job """

from masonite.queues import Queueable


class TestJob(Queueable):
"""A TestJob Job
"""

def __init__(self):
"""A TestJob Constructor
"""

pass

def handle(self):
"""Logic to handle the job
"""

return 2/0

def failed(self, payload, error):
print('running a failed job hook')
22 changes: 22 additions & 0 deletions databases/migrations/2019_02_07_015506_create_failed_jobs_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from orator.migrations import Migration


class CreateFailedJobsTable(Migration):

def up(self):
"""
Run the migrations.
"""
with self.schema.create('failed_jobs') as table:
table.increments('id')
table.string('driver')
table.string('channel')
table.binary('payload')
table.timestamp('failed_at')
table.timestamps()

def down(self):
"""
Revert the migrations.
"""
self.schema.drop('failed_jobs')
2 changes: 1 addition & 1 deletion masonite/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def _find_parameter(self, parameter):
object -- Returns the object found in the container
"""
parameter = str(parameter)
if parameter is not 'self' and parameter in self.providers:
if parameter != 'self' and parameter in self.providers:
obj = self.providers[parameter]
self.fire_hook('resolve', parameter, obj)
return obj
Expand Down
18 changes: 18 additions & 0 deletions masonite/commands/QueueTableCommand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
""" A QueueTableCommand Command """


from cleo import Command

from masonite.helpers.filesystem import copy_migration


class QueueTableCommand(Command):
"""
Create migration files for the queue feature
queue:table
"""

def handle(self):
copy_migration('masonite/snippets/migrations/create_failed_jobs_table.py')
self.info('Migration created successfully')
58 changes: 15 additions & 43 deletions masonite/commands/QueueWorkCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,59 +5,31 @@

from cleo import Command

from config import queue
from masonite import Queue
from masonite.exceptions import DriverLibraryNotFound


def callback(ch, method, properties, body):
from wsgi import container
job = pickle.loads(body)
obj = job['obj']
args = job['args']
callback = job['callback']
if inspect.isclass(obj):
obj = container.resolve(obj)

try:
getattr(obj, callback)(*args)
except AttributeError:
obj(*args)

ch.basic_ack(delivery_tag=method.delivery_tag)


class QueueWorkCommand(Command):
"""
Start the queue worker
queue:work
{--c|channel=default : The channel to listen on the queue}
{--d|driver=default : Specify the driver you would like to connect to}
{--f|fair : Send jobs to queues that have no jobs instead of randomly selecting a queue}
{--failed : Run only the failed jobs}
"""

def handle(self):
try:
import pika
except ImportError:
raise DriverLibraryNotFound(
"Could not find the 'pika' library. Run pip install pika to fix this.")

connection = pika.BlockingConnection(pika.URLParameters('amqp://{}:{}@{}{}/{}'.format(
queue.DRIVERS['amqp']['username'],
queue.DRIVERS['amqp']['password'],
queue.DRIVERS['amqp']['host'],
':' + str(queue.DRIVERS['amqp']['port']) if 'port' in queue.DRIVERS['amqp'] and queue.DRIVERS['amqp']['port'] else '',
queue.DRIVERS['amqp']['vhost'] if 'vhost' in queue.DRIVERS['amqp'] and queue.DRIVERS['amqp']['vhost'] else '%2F'
)))
channel = connection.channel()

channel.queue_declare(queue=self.option('channel'), durable=True)

channel.basic_consume(callback,
queue=self.option('channel'))
if self.option('fair'):
channel.basic_qos(prefetch_count=1)

self.info(' [*] Waiting to process jobs on the "{}" channel. To exit press CTRL+C'.format(
self.option('channel')))
channel.start_consuming()
from wsgi import container

if self.option('driver') == 'default':
queue = container.make(Queue)
else:
queue = container.make(Queue).driver(self.option('driver'))

if self.option('failed'):
queue.run_failed_jobs()
return

queue.connect().consume(self.option('channel'), fair=self.option('fair'))
4 changes: 4 additions & 0 deletions masonite/commands/ServeCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from hupper.logger import DefaultLogger, LogLevel
from hupper.reloader import Reloader, find_default_monitor_factory
from cleo import Command
from masonite.helpers import has_unmigrated_migrations


class ServeCommand(Command):
Expand All @@ -19,6 +20,9 @@ class ServeCommand(Command):
"""

def handle(self):
if has_unmigrated_migrations():
self.comment("\nYou have unmigrated migrations. Run 'craft migrate' to migrate them\n")

if self.option('reload'):
logger = DefaultLogger(LogLevel.INFO)

Expand Down
1 change: 1 addition & 0 deletions masonite/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .ModelDocstringCommand import ModelDocstringCommand
from .ProviderCommand import ProviderCommand
from .QueueWorkCommand import QueueWorkCommand
from .QueueTableCommand import QueueTableCommand
from .ServeCommand import ServeCommand
from .ViewCommand import ViewCommand
from .ValidatorCommand import ValidatorCommand
Expand Down
22 changes: 21 additions & 1 deletion masonite/contracts/QueueContract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,25 @@
class QueueContract(ABC):

@abstractmethod
def push(self):
def push(self, *objects, args=(), callback='handle', ran=1, channel=None):
pass

@abstractmethod
def connect(self):
pass

@abstractmethod
def consume(self, channel, fair=False):
pass

@abstractmethod
def work(self):
pass

@abstractmethod
def run_failed_jobs(self):
pass

@abstractmethod
def add_to_failed_queue_table(self):
pass
50 changes: 50 additions & 0 deletions masonite/drivers/BaseQueueDriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Base queue driver."""

import pickle

import pendulum

from config import queue
from masonite.drivers import BaseDriver
from masonite.helpers import HasColoredCommands

if 'amqp' in queue.DRIVERS:
listening_channel = queue.DRIVERS['amqp']['channel']
else:
listening_channel = 'default'


class BaseQueueDriver(BaseDriver, HasColoredCommands):

def add_to_failed_queue_table(self, payload):
from config.database import DB as schema
if schema.get_schema_builder().has_table('failed_jobs'):
schema.table('failed_jobs').insert({
'driver': 'amqp',
'channel': listening_channel,
'payload': pickle.dumps(payload),
'failed_at': pendulum.now()
})

def run_failed_jobs(self):
from config.database import DB as schema
try:
self.success('Attempting to send failed jobs back to the queue ...')
for job in schema.table('failed_jobs').get():
payload = pickle.loads(job.payload)
schema.table('failed_jobs').where('payload', job.payload).delete()
self.push(payload['obj'], args=payload['args'], callback=payload['callback'])
except Exception:
self.danger('Could not get the failed_jobs table')

def push(self, *objects, args=(), callback='handle', ran=1, channel=None):
raise NotImplementedError

def connect(self):
return self

def consume(self, channel, fair=False):
raise NotImplementedError('The {} driver does not implement consume'.format(self.__class__.__name__))

def work(self):
raise NotImplementedError('The {} driver does not implement work'.format(self.__class__.__name__))
Loading

0 comments on commit 7f30c28

Please sign in to comment.