Skip to content
This repository
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 33 lines (29 sloc) 1.224 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
from datetime import datetime
from tests import PyResTests, Basic, TestProcess, ReturnAllArgsJob
from pyres.job import Job
class JobTests(PyResTests):
    def test_reserve(self):
        self.resq.enqueue(Basic,"test1")
        job = Job.reserve('basic', self.resq)
        assert job._queue == 'basic'
        assert job._payload
        self.assertEqual(job._payload, {'class':'tests.Basic','args':['test1'],'enqueue_timestamp':job.enqueue_timestamp})
    
    def test_perform(self):
        self.resq.enqueue(Basic,"test1")
        job = Job.reserve('basic',self.resq)
        self.resq.enqueue(TestProcess)
        job2 = Job.reserve('high', self.resq)
        assert job.perform() == "name:test1"
        assert job2.perform()
    
    def test_fail(self):
        self.resq.enqueue(Basic,"test1")
        job = Job.reserve('basic',self.resq)
        assert self.redis.llen('resque:failed') == 0
        job.fail("problem")
        assert self.redis.llen('resque:failed') == 1

    def test_date_arg_type(self):
        dt = datetime.now().replace(microsecond=0)
        self.resq.enqueue(ReturnAllArgsJob, dt)
        job = Job.reserve('basic',self.resq)
        result = job.perform()
        assert result[0] == dt
Something went wrong with that request. Please try again.