Skip to content

Commit

Permalink
rqflush asks for confirmation [see rq#218]
Browse files Browse the repository at this point in the history
  • Loading branch information
depaolim committed Feb 14, 2017
1 parent 8179258 commit c8d00f5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
33 changes: 25 additions & 8 deletions django_rq/management/commands/rqflush.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,42 @@
from django.core.management.base import BaseCommand

from django.utils.six.moves import input

from django_rq import get_queue


class Command(BaseCommand):
"""
Removes all queue jobs
Flushes the queue specified as argument
"""
help = __doc__

def add_arguments(self, parser):
parser.add_argument(
'--noinput', '--no-input', action='store_false',
dest='interactive', default=True,
help='Tells Django to NOT prompt the user for input of any kind.',
)
parser.add_argument('--queue', '-q', dest='queue', default='default',
help='Specify the queue [default]')

def handle(self, *args, **options):
"""
Queues the function given with the first argument with the
parameters given with the rest of the argument list.
"""
verbosity = int(options.get('verbosity', 1))
interactive = options['interactive']
queue = get_queue(options.get('queue'))
queue.empty()
if verbosity:
print('Queue "%s" cleaned' % queue.name)

if interactive:
confirm = input("""You have requested a flush the "%s" queue.
Are you sure you want to do this?
Type 'yes' to continue, or 'no' to cancel: """ % queue.name)
else:
confirm = 'yes'

if confirm == 'yes':
queue.empty()
if verbosity:
print('Queue "%s" flushed.' % queue.name)
else:
if verbosity:
print("Flush cancelled.")
31 changes: 29 additions & 2 deletions django_rq/tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

from django.contrib.auth.models import User
from django.core.management import call_command
from django.core.urlresolvers import reverse
Expand All @@ -11,6 +13,7 @@
from django.test import override_settings
except ImportError:
from django.test.utils import override_settings
from django.utils.six import StringIO
from django.conf import settings

from rq import get_current_job, Queue
Expand Down Expand Up @@ -78,6 +81,16 @@ def get_queue_index(name='default'):
return queue_index


def stub_stdin(testcase_inst, inputs):
stdin = sys.stdin

def cleanup():
sys.stdin = stdin

testcase_inst.addCleanup(cleanup)
sys.stdin = StringIO(inputs)


@override_settings(RQ={'AUTOCOMMIT': True})
class QueuesTest(TestCase):

Expand Down Expand Up @@ -308,15 +321,29 @@ def test_default_timeout(self):
def test_rqflush_default(self):
queue = get_queue()
queue.enqueue(divide, 42, 1)
call_command("rqflush", "--verbosity", "0")
call_command("rqflush", "--verbosity", "0", "--noinput")
self.assertFalse(queue.jobs)

def test_rqflush_test(self):
def test_rqflush_test3(self):
queue = get_queue("test3")
queue.enqueue(divide, 42, 1)
call_command("rqflush", "--queue", "test3", "--verbosity", "0", "--noinput")
self.assertFalse(queue.jobs)

def test_rqflush_test3_interactive_yes(self):
queue = get_queue("test3")
queue.enqueue(divide, 42, 1)
stub_stdin(self, "yes")
call_command("rqflush", "--queue", "test3", "--verbosity", "0")
self.assertFalse(queue.jobs)

def test_rqflush_test3_interactive_no(self):
queue = get_queue("test3")
queue.enqueue(divide, 42, 1)
stub_stdin(self, "no")
call_command("rqflush", "--queue", "test3", "--verbosity", "0")
self.assertTrue(queue.jobs)


@override_settings(RQ={'AUTOCOMMIT': True})
class DecoratorTest(TestCase):
Expand Down

0 comments on commit c8d00f5

Please sign in to comment.