-
Notifications
You must be signed in to change notification settings - Fork 0
/
Scheduler.py
172 lines (163 loc) · 7.81 KB
/
Scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Copyright (C) 2012-2020 MIPS Tech LLC
# Written by Matthew Fortune <matthew.fortune@imgtec.com> and
# Daniel Sanders <daniel.sanders@imgtec.com>
# This file is part of Overtest.
#
# Overtest is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# Overtest is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with overtest; see the file COPYING. If not, write to the Free
# Software Foundation, 51 Franklin Street - Fifth Floor, Boston, MA
# 02110-1301, USA.
import time
import signal
from OvertestExceptions import *
class Scheduler:
"""
The scheduler is responsible for the logic in choosing which
test to run from what testrun. It also drives the running of tests.
"""
def __init__(self, host, threadnumber):
self.threadnumber = threadnumber
self.host = host
self.quit = False
self.exitcode = 0
signal.signal(signal.SIGTERM, self.termHandler)
signal.signal(signal.SIGHUP, self.termHandler)
def logHelper(self, message):
self.host.logHelper(self.threadnumber, message)
def logDB(self, message):
self.host.logDB(self.threadnumber, message)
def termHandler(self, signum, frame):
"""
Handle a sigterm gracefully
"""
self.quit = True
self.exitcode = 2
self.logHelper("SIGTERM: Finishing current test and exiting")
def run(self):
"""
Continuously find and select tests to run.
This function must be highly resilient to concurrency issues.
"""
try:
nothingToDo = False
archiverHadWork = False
self.quit = False
while not self.quit:
if self.host.shouldThreadExit(self.threadnumber):
self.logHelper("Thread %u exiting by database request"%self.threadnumber)
self.logDB("Thread %u exiting by database request"%self.threadnumber)
self.quit = True
break
# Check for 'aborting' testruns, and mark them aborted as appropriate
self.host.checkAbortedTestruns()
# Find all testruns that need archiving
# Need a new flag in testrunaction to show when it is archived.
testruns = self.host.getTestrunsToArchiveOrDelete(self.logDB)
archiverHadWork = False
for testrun in testruns:
task = True
# WORK NEEDED: Put the line below back in after having hte testruntoarchiveordelete to ensure that at least one
# task executed on this host (or no host).
#archiverHadWork = True
# Determine if this is a deletion or an archive
# and mark the testrun as archiving or deleting as appropriate
(ignoreme, delete) = testrun.setArchiveOrDeleteInProgress()
while task != None:
# Find a task to archive and mark it as started
# This is an atomic operation that uses the testrun table as a LOCK
task = testrun.getNextTaskToArchive(self.host.getHostID())
if task != None:
try:
try:
# The task will delete its work directory and perform any custom cleanup
# When deleting it will also delete its results area
task.archiveChecked(delete)
except KeyboardInterrupt, e:
testrun.logHelper("Archive interrupted, aborting")
self.logDB("Interrupt received, shutting down")
self.quit = True
raise
except Exception, e:
testrun.logHelper("Exception raised, archive code unsafe: A%u.py"%task.actionid)
self.logDB("Exception raised, archive code unsafe: A%u.py\n%s"%(task.actionid, formatExceptionInfo()))
finally:
testrun.setTaskArchived(task.testrunactionid)
del task
task = True
# When the archive is complete the testrun can be marked as archived
testrun.archiveComplete()
# Testrun is no longer a valid handle (it may have been deleted)
testrun = None
# Find all testruns (in order) that need tasks executing
testruns = self.host.getNextTestruns(self.logDB)
if not archiverHadWork and (nothingToDo or len(testruns) == 0):
time.sleep(5)
nothingToDo = True
for testrun in testruns:
# Find a new task and mark it as started
# This is an atomic operation that uses the testrun table as a LOCK
task = testrun.getNextTask(self.host.getHostID())
if task == None:
continue
try:
nothingToDo = False
# Run the task specific actions the runChecked wrapper
# handles standard overtest exceptions
try:
# Wrap the following in a try except block to handle result submission errors
# This is to deal with internal consistency. It 'should' only fail if multiple
# versions of the same action exist in a testrun!
try:
if task.runChecked():
testrun.submit(task.actionid, True)
except TaskRunErrorException, e:
# Assert failure
testrun.submit(e.actionid, False)
# Abort the whole testrun. Any task failure is a full failure
testrun.setAborting(lock = True)
testrun.logHelper("Task failed... Testrun aborted")
except ResultSubmissionException, e:
testrun.submit(task.actionid, False, {"__OVT_EXCEPT__":"Failed to submit result: %s"%formatExceptionInfo()})
testrun.logHelper("Task failed to submit result")
except TestrunAbortedException, e:
testrun.submit(task.actionid, False, {"Aborted":"User abort or another task failed"})
testrun.logHelper("Task aborted due to testrun abort")
except KeyboardInterrupt, e:
testrun.submit(task.actionid, False, {"__OVT_EXCEPT__":"Keyboard interrupt"})
testrun.setAborting(lock = True)
testrun.logHelper("Run interrupted, aborting")
self.logDB("Interrupt received, shutting down")
self.quit = True
raise
except Exception, e:
testrun.submit(task.actionid, False, {"__OVT_EXCEPT__":formatExceptionInfo()})
testrun.setAborting(lock = True)
testrun.logHelper("Exception raised, module run code unsafe: A%u.py"%task.actionid)
self.logDB("Exception raised, module run code unsafe: A%u.py\n%s"%(task.actionid, formatExceptionInfo()))
except ResultSubmissionException, e:
# The task will not get a result which is bad but nothing can be done. It will
# be marked as complete below so should not be run in an endless loop
# This is a catastrophic testrun failure and suggests inconsistency in the overtest internals
testrun.setAborting(lock = True)
testrun.logHelper("Failed to submit result for action: %u"%(task.actionid))
testrun.logDB("Failed to submit result for action: %u\n%s"%(task.actionid, formatExceptionInfo()))
finally:
# Mark the task as complete
testrun.setTaskEnded(task.testrunactionid)
del task
# Start from the first testrun again. This permits priority ordering
break
except KeyboardInterrupt, e:
self.logHelper("Scheduler Interrupted. Exiting")
sys.exit(1)
sys.exit(self.exitcode)