Skip to content

Commit

Permalink
better tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kiorky committed Oct 11, 2012
1 parent 79c9064 commit 333f350
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 103 deletions.
7 changes: 4 additions & 3 deletions docs/CHANGES.rst
Expand Up @@ -4,9 +4,10 @@ Changelog

2.3 (unreleased)
----------------

- Nothing changed yet.

- better registry handling [kiorky]
- better jobrunner [kiorky]
- better tests [kiorky]
- make install and restart code more robust, again [kiorky]

2.2 (2012-10-11)
----------------
Expand Down
16 changes: 7 additions & 9 deletions src/collective/cron/setuphandlers.py
Expand Up @@ -11,36 +11,32 @@
from zope.event import notify

def setupVarious(context):
### XXX making ._p_jar appear ###
transaction.savepoint()

# Ordinarily, GenericSetup handlers check for the existence of XML files.
# Here, we are not parsing an XML file, but we use this text file as a
# flag to check that we actually meant for this import step to be run.
# The file is found in profiles/default.

if context.readDataFile('collectivecron_various.txt') is None:
return
### XXX making ._p_jar appear ###
transaction.savepoint()
# if the queue is not marked, initialize the crontab !
portal = getToolByName(
context.getSite(), 'portal_url').getPortalObject()
# initialiing manager will in turn intialize any
# queue manager who is in charge for initializing
# needed stuff for async jobs
oldsite = getSite()
setSite(portal)
#oldsite = getSite()
#setSite(portal)
crt = crontab.Crontab.load()
marker = getMultiAdapter(
(portal, crt),
i.ICrontabManager
)
crt.save()

setSite(oldsite)
#setSite(oldsite)

def setupCrons(context): # pragma: no cover
### XXX making ._p_jar appear ###
transaction.savepoint()
logger = logging.getLogger('collective.cron.exportCrons')

# Ordinarily, GenericSetup handlers check for the existence of XML files.
Expand All @@ -51,6 +47,8 @@ def setupCrons(context): # pragma: no cover
xml = context.readDataFile('crons.xml')
if xml is None:
return
### XXX making ._p_jar appear ###
transaction.savepoint()
# if the queue is not marked, initialize the crontab !
portal = getToolByName(
context.getSite(), 'portal_url').getPortalObject()
Expand Down
49 changes: 19 additions & 30 deletions src/collective/cron/testing.py
Expand Up @@ -114,8 +114,20 @@ def tearDownDatetime():
zcatesting._now = old__now
zcatesting.tearDownDatetime()

class Timed(Layer):
defaultBases=tuple()
def setUp(self):
setUpDatetime()

def testSetUp(self):
set_now(datetime.datetime(2008, 1, 1, 1, 0))

def tearDown(self):
tearDownDatetime()

class CollectiveCronLayer(base.AsyncLayer):
defaultBases = ((Timed(),) + base.AsyncLayer.defaultBases)

def setUpZope(self, app, configurationContext):
base.AsyncLayer.setUpZope(self, app, configurationContext)
import collective.cron
Expand All @@ -139,6 +151,7 @@ def testTearDown(self):
transaction.commit()
def testSetUp(self):
base.LayerMixin.testSetUp(self)
transaction.commit()
self['cron'] = crontab.Cron(
name=u'testcron',
activated = True,
Expand All @@ -160,12 +173,16 @@ def testSetUp(self):

class IntegrationTesting(LayerMixin, base.IntegrationTesting):
def testTearDown(self):
transaction.commit()
LayerMixin.testTearDown(self)
transaction.commit()
base.IntegrationTesting.testTearDown(self)

def testSetUp(self):
base.IntegrationTesting.testSetUp(self)
transaction.commit()
LayerMixin.testSetUp(self)
transaction.commit()

class FunctionalTesting(LayerMixin, base.FunctionalTesting):
def testTearDown(self):
Expand All @@ -179,50 +196,22 @@ def testSetUp(self): # pragma: no cover
LayerMixin.testSetUp(self)
transaction.commit()

class TimedTesting(Layer):
def setUp(self):
setUpDatetime()

def tearDown(self):
tearDownDatetime()

class TimedFunctionalTesting(FunctionalTesting):
defaultBases = ((TimedTesting(),) + FunctionalTesting.defaultBases)

def testSetUp(self):
set_now(datetime.datetime(2008,1,1,1,1))
transaction.commit()
FunctionalTesting.testSetUp(self)
transaction.commit()

def testTearDown(self):
transaction.commit()
FunctionalTesting.testTearDown(self)
transaction.commit()

class SimpleLayer(Layer):
defaultBases = tuple()
defaultBases = (Timed(),)

COLLECTIVE_CRON_SIMPLE = SimpleLayer(name='CollectiveCron:Simple')
COLLECTIVE_CRON_INTEGRATION_TESTING = IntegrationTesting(
name = "CollectiveCron:Integration")
COLLECTIVE_CRON_FUNCTIONAL_TESTING = FunctionalTesting(
name = "CollectiveCron:Functional")
COLLECTIVE_CRON_TFUNCTIONAL_TESTING = TimedFunctionalTesting(
name = "CollectiveCron:TFunctional")
COLLECTIVE_CRON_SELENIUM_TESTING = FunctionalTesting(
bases = (SELENIUM_TESTING,
COLLECTIVE_CRON_FUNCTIONAL_TESTING,),
name = "CollectiveCron:Selenium")

"""
Register our layers as using async storage
"""
base.registerAsyncLayers(
[COLLECTIVE_CRON_FIXTURE,
COLLECTIVE_CRON_INTEGRATION_TESTING,
COLLECTIVE_CRON_FUNCTIONAL_TESTING,
COLLECTIVE_CRON_SELENIUM_TESTING]
COLLECTIVE_CRON_FUNCTIONAL_TESTING,]
)


Expand Down
31 changes: 10 additions & 21 deletions src/collective/cron/tests/base.py
Expand Up @@ -11,59 +11,48 @@

import unittest2 as unittest

from collective.cron.testing import (
COLLECTIVE_CRON_SIMPLE,
COLLECTIVE_CRON_FIXTURE as UNIT_TESTING,
COLLECTIVE_CRON_INTEGRATION_TESTING as INTEGRATION_TESTING,
COLLECTIVE_CRON_FUNCTIONAL_TESTING as FUNCTIONAL_TESTING,
COLLECTIVE_CRON_SELENIUM_TESTING as SELENIUM_TESTING,
)
from collective.cron import testing as t

from plone.app.async.tests import base
from plone.testing.z2 import Browser


class TestCase(base.AsyncTestCase):
layer = UNIT_TESTING
layer = t.COLLECTIVE_CRON_FIXTURE
def setUp(self):
super(TestCase, self).setUp()
setUpDatetime()
self.setRoles(['CollectiveCron'])
self.queue = self.layer['queue']
set_now(datetime.datetime(2008, 1, 1, 1, 1, tzinfo=pytz.UTC))
transaction.commit()

def tearDown(self):
noecho = [self.queue.remove(j)
for j in self.queue]
tearDownDatetime()
transaction.commit()
noecho = [self.queue.remove(j) for j in self.queue]
super(TestCase, self).tearDown()

class SimpleTestCase(TestCase):
layer = COLLECTIVE_CRON_SIMPLE
layer = t.COLLECTIVE_CRON_SIMPLE
def setUp(self):
setUpDatetime()
set_now(datetime.datetime(2008, 1, 1, 1, 1, tzinfo=pytz.UTC))
pass

def tearDown(self):
tearDownDatetime()
pass

class IntegrationTestCase(TestCase):
"""Integration base TestCase."""
layer = INTEGRATION_TESTING
layer = t.COLLECTIVE_CRON_INTEGRATION_TESTING
def setUp(self):
TestCase.setUp(self)
self.crontab = self.layer['crontab']
self.cron = self.layer['cron']
self.crontab_manager = self.layer['crontab_manager']
self.cron_manager = self.layer['cron_manager']
self.cron_utils = self.layer['cron_utils']
transaction.commit()

class FunctionalTestCase(IntegrationTestCase):
"""Functionnal base TestCase."""
layer = FUNCTIONAL_TESTING
layer = t.COLLECTIVE_CRON_FUNCTIONAL_TESTING

class SeleniumTestCase(TestCase):
"""Functionnal base TestCase."""
layer = SELENIUM_TESTING
# vim:set et sts=4 ts=4 tw=80:
3 changes: 1 addition & 2 deletions src/collective/cron/tests/test_async_annotedqueue.py
Expand Up @@ -30,8 +30,7 @@ def tearDown(self):
[self.queue.remove(j) for j in self.queue]

def test_pasync_queue_annotations(self):
self.assertEquals(self.aqueue.annotations,
{'plone': ['/plone']})
self.assertEquals(self.aqueue.annotations, {'plone': ['/plone']})

def test_suite():
return unittest.defaultTestLoader.loadTestsFromName(__name__)
Expand Down
5 changes: 4 additions & 1 deletion src/collective/cron/tests/test_async_queue.py
Expand Up @@ -115,11 +115,13 @@ def test_pasync_queue_get_job_infos(self):

def test_pasync_queue_get_job_present(self):
# a job without begin_after, first of all
[self.layer['queue'].remove(a) for a in self.layer['queue']]
transaction.commit()
job, job_infos = self.make_simpleOne()
job1 = self.cqueue.get_job_present(job_infos)
self.assertTrue(job is job1)
self.queue.remove(job)
job1 = self.cqueue.get_job_present(job_infos)
job1 = self.cqueue.get_job_in_queue(job_infos)
self.assertTrue(job1 is None)

def test_pasync_queue_compare_job(self):
Expand Down Expand Up @@ -166,6 +168,7 @@ def test_pasync_queue_is_job_present(self):
self.assertFalse(self.cqueue.is_job_present())

def test_pasync_queue_get_job_status(self):
transaction.commit()
job, job_infos = self.make_simpleOne()
self.assertTrue(
self.cqueue.get_job_status()
Expand Down
2 changes: 1 addition & 1 deletion src/collective/cron/tests/test_cron.py
Expand Up @@ -507,7 +507,7 @@ def test_objects_cron_save(self):
cron.save()
lllen = len(crt.manager.crontab)
self.assertEquals(
crontab.json.loads(crt.manager.crontab[-1])['name'],
crontab.json.loads(crt.manager.crontab[0])['name'],
crondata["name"])
self.assertEquals(llen, lllen)
crondata["uid"] = "notyet"
Expand Down
6 changes: 3 additions & 3 deletions src/collective/cron/tests/test_doctests.py
Expand Up @@ -14,7 +14,7 @@
from collective.cron.tests.globals import *
from collective.cron.testing import (
COLLECTIVE_CRON_FUNCTIONAL_TESTING as FUNCTIONAL_TESTING,
COLLECTIVE_CRON_TFUNCTIONAL_TESTING as TFUNCTIONAL_TESTING,
# COLLECTIVE_CRON_TFUNCTIONAL_TESTING as TFUNCTIONAL_TESTING,
)
import unittest2 as unittest
import glob
Expand Down Expand Up @@ -42,8 +42,8 @@ def test_suite():
globs = globals()
for s in files:
layer = FUNCTIONAL_TESTING
if 'timed_' in s:
layer = TFUNCTIONAL_TESTING
#if 'timed_' in s:
# layer = FUNCTIONAL_TESTING
suite.addTests([
layered(
doctest.DocFileSuite(
Expand Down

0 comments on commit 333f350

Please sign in to comment.