Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

123 lines (109 sloc) 5.304 kB
from tests import PyResTests, Basic, TestProcess
from pyres import ResQ
from pyres.worker import Worker
from pyres.job import Job
import os
class ResQTests(PyResTests):
def test_enqueue(self):
self.resq.enqueue(Basic,"test1")
self.resq.enqueue(Basic,"test2", "moretest2args")
ResQ._enqueue(Basic, "test3")
assert self.redis.llen("resque:queue:basic") == 3
assert self.redis.sismember('resque:queues','basic')
def test_push(self):
self.resq.push('pushq','content-newqueue')
self.resq.push('pushq','content2-newqueue')
assert self.redis.llen('resque:queue:pushq') == 2
assert self.redis.lindex('resque:queue:pushq', 0) == ResQ.encode('content-newqueue')
assert self.redis.lindex('resque:queue:pushq', 1) == ResQ.encode('content2-newqueue')
def test_pop(self):
self.resq.push('pushq','content-newqueue')
self.resq.push('pushq','content2-newqueue')
assert self.redis.llen('resque:queue:pushq') == 2
assert self.resq.pop('pushq') == ('pushq', 'content-newqueue')
assert self.redis.llen('resque:queue:pushq') == 1
assert self.resq.pop(['pushq']) == ('pushq', 'content2-newqueue')
assert self.redis.llen('resque:queue:pushq') == 0
def test_pop_two_queues(self):
self.resq.push('pushq1', 'content-q1-1')
self.resq.push('pushq1', 'content-q1-2')
self.resq.push('pushq2', 'content-q2-1')
assert self.redis.llen('resque:queue:pushq1') == 2
assert self.redis.llen('resque:queue:pushq2') == 1
assert self.resq.pop(['pushq1', 'pushq2']) == ('pushq1', 'content-q1-1')
assert self.redis.llen('resque:queue:pushq1') == 1
assert self.redis.llen('resque:queue:pushq2') == 1
assert self.resq.pop(['pushq2', 'pushq1']) == ('pushq2', 'content-q2-1')
assert self.redis.llen('resque:queue:pushq1') == 1
assert self.redis.llen('resque:queue:pushq2') == 0
assert self.resq.pop(['pushq2', 'pushq1']) == ('pushq1', 'content-q1-2')
assert self.redis.llen('resque:queue:pushq1') == 0
assert self.redis.llen('resque:queue:pushq2') == 0
assert self.resq.pop(['pushq1', 'pushq2'], timeout=1) == (None, None)
def test_peek(self):
self.resq.enqueue(Basic,"test1")
self.resq.enqueue(Basic,"test2")
assert len(self.resq.peek('basic',0,20)) == 2
def test_size(self):
self.resq.enqueue(Basic,"test1")
self.resq.enqueue(Basic,"test2")
assert self.resq.size('basic') == 2
assert self.resq.size('noq') == 0
def test_redis_property(self):
from redis import Redis
rq = ResQ(server="localhost:6379")
red = Redis()
#rq2 = ResQ(server=red)
self.assertRaises(Exception, rq.redis,[Basic])
def test_info(self):
self.resq.enqueue(Basic,"test1")
self.resq.enqueue(TestProcess)
info = self.resq.info()
assert info['queues'] == 2
assert info['servers'] == ['localhost:6379']
assert info['workers'] == 0
worker = Worker(['basic'])
worker.register_worker()
info = self.resq.info()
assert info['workers'] == 1
def test_workers(self):
worker = Worker(['basic'])
worker.register_worker()
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
assert len(self.resq.workers()) == 1
#assert Worker.find(name, self.resq) in self.resq.workers()
def test_enqueue_from_string(self):
self.resq.enqueue_from_string('tests.Basic','basic','test1')
name = "%s:%s:%s" % (os.uname()[1],os.getpid(),'basic')
assert self.redis.llen("resque:queue:basic") == 1
job = Job.reserve('basic', self.resq)
worker = Worker(['basic'])
worker.process(job)
assert not self.redis.get('resque:worker:%s' % worker)
assert not self.redis.get("resque:stat:failed")
assert not self.redis.get("resque:stat:failed:%s" % name)
def test_remove_queue(self):
self.resq.enqueue_from_string('tests.Basic','basic','test1')
assert 'basic' in self.resq._watched_queues
assert self.redis.sismember('resque:queues','basic')
assert self.redis.llen('resque:queue:basic') == 1
self.resq.remove_queue('basic')
assert 'basic' not in self.resq._watched_queues
assert not self.redis.sismember('resque:queues','basic')
assert not self.redis.exists('resque:queue:basic')
def test_keys(self):
self.resq.enqueue_from_string('tests.Basic','basic','test1')
assert 'queue:basic' in self.resq.keys()
assert 'queues' in self.resq.keys()
def test_queues(self):
assert self.resq.queues() == []
self.resq.enqueue_from_string('tests.Basic','basic','test1')
assert len(self.resq.queues()) == 1
self.resq.enqueue_from_string('tests.Basic','basic','test1')
assert len(self.resq.queues()) == 1
self.resq.enqueue_from_string('tests.Basic','basic2','test1')
assert len(self.resq.queues()) == 2
assert 'test' not in self.resq.queues()
assert 'basic' in self.resq.queues()
def test_close(self):
self.resq.close()
Jump to Line
Something went wrong with that request. Please try again.