Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

786 lines (639 sloc) 29.562 kb
#!/usr/bin/python
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import sys
import subprocess
import shlex
import time
def output_from_command(command):
test = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
test.wait()
return test.communicate()[0].split("\n")
class Test:
def __init__(self, name, description, verbose = 0, with_cpg = 0):
self.name = name
self.description = description
self.cmds = []
self.verbose = verbose
self.result_txt = ""
self.cmd_tool_output = ""
self.result_exitcode = 0;
self.stonith_options = "-s"
self.enable_corosync = 0
if with_cpg:
self.stonith_options = "-c"
self.enable_corosync = 1
self.stonith_process = None
self.stonith_output = ""
self.stonith_patterns = []
self.negative_stonith_patterns = []
self.executed = 0
rsc_classes = output_from_command("crm_resource --list-standards")
def __new_cmd(self, cmd, args, exitcode, stdout_match = "", no_wait = 0, stdout_negative_match = "", kill=None):
self.cmds.append(
{
"cmd" : cmd,
"kill" : kill,
"args" : args,
"expected_exitcode" : exitcode,
"stdout_match" : stdout_match,
"stdout_negative_match" : stdout_negative_match,
"no_wait" : no_wait,
}
)
def stop_pacemaker(self):
cmd = shlex.split("killall -9 -q pacemakerd")
test = subprocess.Popen(cmd, stdout=subprocess.PIPE)
test.wait()
def start_environment(self):
### make sure we are in full control here ###
self.stop_pacemaker()
cmd = shlex.split("killall -9 -q stonithd")
test = subprocess.Popen(cmd, stdout=subprocess.PIPE)
test.wait()
if self.verbose:
print "Starting stonithd with %s" % self.stonith_options
self.stonith_process = subprocess.Popen(
shlex.split("@CRM_DAEMON_DIR@/stonithd %s -V" % self.stonith_options),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
time.sleep(1)
def clean_environment(self):
if self.stonith_process:
self.stonith_process.terminate()
self.stonith_output = self.stonith_process.communicate()[1]
self.stonith_process = None
if self.verbose:
print self.stonith_output
def add_stonith_log_pattern(self, pattern):
self.stonith_patterns.append(pattern)
def add_stonith_negative_log_pattern(self, pattern):
self.negative_stonith_patterns.append(pattern)
def add_cmd(self, cmd, args):
self.__new_cmd(cmd, args, 0, "")
def add_cmd_no_wait(self, cmd, args):
self.__new_cmd(cmd, args, 0, "", 1)
def add_cmd_check_stdout(self, cmd, args, match, no_match = ""):
self.__new_cmd(cmd, args, 0, match, 0, no_match)
def add_expected_fail_cmd(self, cmd, args, exitcode = 255):
self.__new_cmd(cmd, args, exitcode, "")
def get_exitcode(self):
return self.result_exitcode
def print_result(self, filler):
print "%s%s" % (filler, self.result_txt)
def run_cmd(self, args):
cmd = shlex.split(args['args'])
cmd.insert(0, args['cmd'])
if self.verbose:
print "\n\nRunning: "+" ".join(cmd)
test = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if args['kill']:
if self.verbose:
print "Also running: "+args['kill']
subprocess.Popen(shlex.split(args['kill']))
if args['no_wait'] == 0:
test.wait()
else:
return 0
output = test.communicate()[0]
if self.verbose:
print output
if args['stdout_match'] != "" and output.count(args['stdout_match']) == 0:
test.returncode = -2
print "STDOUT string '%s' was not found in cmd output: %s" % (args['stdout_match'], output)
if args['stdout_negative_match'] != "" and output.count(args['stdout_negative_match']) != 0:
test.returncode = -2
print "STDOUT string '%s' was found in cmd output: %s" % (args['stdout_negative_match'], output)
return test.returncode;
def count_negative_matches(self, outline):
count = 0
for line in self.negative_stonith_patterns:
if outline.count(line):
count = 1
if self.verbose:
print "This pattern should not have matched = '%s" % (line)
return count
def match_stonith_patterns(self):
negative_matches = 0
cur = 0
pats = self.stonith_patterns
total_patterns = len(self.stonith_patterns)
if len(self.stonith_patterns) == 0:
return
for line in self.stonith_output.split("\n"):
negative_matches = negative_matches + self.count_negative_matches(line)
if len(pats) == 0:
continue
cur = -1
for p in pats:
cur = cur + 1
if line.count(pats[cur]):
del pats[cur]
break
if len(pats) > 0 or negative_matches:
if self.verbose:
for p in pats:
print "Pattern Not Matched = '%s'" % p
self.result_txt = "FAILURE - '%s' failed. %d patterns out of %d not matched. %d negative matches." % (self.name, len(pats), total_patterns, negative_matches)
self.result_exitcode = -1
def run(self):
res = 0
i = 1
self.start_environment()
if self.verbose:
print "\n--- START TEST - %s" % self.name
self.result_txt = "SUCCESS - '%s'" % (self.name)
self.result_exitcode = 0
for cmd in self.cmds:
res = self.run_cmd(cmd)
if res != cmd['expected_exitcode']:
print "Step %d FAILED - command returned %d, expected %d" % (i, res, cmd['expected_exitcode'])
self.result_txt = "FAILURE - '%s' failed at step %d. Command: lrmd_test %s" % (self.name, i, cmd['args'])
self.result_exitcode = -1
break
else:
if self.verbose:
print "Step %d SUCCESS" % (i)
i = i + 1
self.clean_environment()
if self.result_exitcode == 0:
self.match_stonith_patterns()
print self.result_txt
if self.verbose:
print "--- END TEST - %s\n" % self.name
self.executed = 1
return res
class Tests:
def __init__(self, verbose = 0):
self.tests = []
self.verbose = verbose
self.autogen_corosync_cfg = 0
if not os.path.exists("/etc/corosync/corosync.conf"):
self.autogen_corosync_cfg = 1
def new_test(self, name, description, with_cpg = 0):
test = Test(name, description, self.verbose, with_cpg)
self.tests.append(test)
return test
def print_list(self):
print "\n==== %d TESTS FOUND ====" % (len(self.tests))
print "%35s - %s" % ("TEST NAME", "TEST DESCRIPTION")
print "%35s - %s" % ("--------------------", "--------------------")
for test in self.tests:
print "%35s - %s" % (test.name, test.description)
print "==== END OF LIST ====\n"
def start_corosync(self):
if self.verbose:
print "Starting corosync"
test = subprocess.Popen("corosync", stdout=subprocess.PIPE)
test.wait()
time.sleep(10)
def stop_corosync(self):
cmd = shlex.split("killall -9 -q corosync")
test = subprocess.Popen(cmd, stdout=subprocess.PIPE)
test.wait()
def run_single(self, name):
for test in self.tests:
if test.name == name:
test.run()
break;
def run_tests_matching(self, pattern):
for test in self.tests:
if test.name.count(pattern) != 0:
test.run()
def run_cpg_only(self):
for test in self.tests:
if test.enable_corosync:
test.run()
def run_no_cpg(self):
for test in self.tests:
if not test.enable_corosync:
test.run()
def run_tests(self):
for test in self.tests:
test.run()
def exit(self):
for test in self.tests:
if test.executed == 0:
continue
if test.get_exitcode() != 0:
sys.exit(-1)
sys.exit(0)
def print_results(self):
failures = 0;
success = 0;
print "\n\n======= FINAL RESULTS =========="
print "\n--- FAILURE RESULTS:"
for test in self.tests:
if test.executed == 0:
continue
if test.get_exitcode() != 0:
failures = failures + 1
test.print_result(" ")
else:
success = success + 1
if failures == 0:
print " None"
print "\n--- TOTALS\n Pass:%d\n Fail:%d\n" % (success, failures)
def build_api_sanity_tests(self):
verbose_arg = ""
if self.verbose:
verbose_arg = "-V"
test = self.new_test("standalone_low_level_api_test", "Sanity test client api in standalone mode.")
test.add_cmd("@CRM_DAEMON_DIR@/stonith-test", "-t %s" % (verbose_arg))
test = self.new_test("cpg_low_level_api_test", "Sanity test client api using mainloop and cpg.", 1)
test.add_cmd("@CRM_DAEMON_DIR@/stonith-test", "-m %s" % (verbose_arg))
def build_custom_timeout_tests(self):
# custom timeout without topology
test = self.new_test("cpg_custom_timeout_1",
"Verify per device timeouts work as expected without using topology.", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\" -o \"pcmk_off_timeout=1\"")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node3\" -o \"pcmk_off_timeout=4\"")
test.add_cmd("stonith_admin", "-F node3 -t 2")
# timeout is 2+1+4 = 7
test.add_stonith_log_pattern("remote op timeout set to 7")
# custom timeout _WITH_ topology
test = self.new_test("cpg_custom_timeout_2",
"Verify per device timeouts work as expected _WITH_ topology.", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\" -o \"pcmk_off_timeout=1\"")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node3\" -o \"pcmk_off_timeout=4000\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v false2")
test.add_cmd("stonith_admin", "-F node3 -t 2")
# timeout is 2+1+4000 = 4003
test.add_stonith_log_pattern("remote op timeout set to 4003")
def build_fence_merge_tests(self):
### Simple test that overlapping fencing operations get merged
test = self.new_test("cpg_custom_merge_single",
"Verify overlapping identical fencing operations are merged, no fencing levels used.", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node3\"")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### one merger will happen
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
### Test that multiple mergers occur
test = self.new_test("cpg_custom_merge_multiple",
"Verify multiple overlapping identical fencing operations are merged", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node3\"")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### 4 mergers should occur
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
### Test that multiple mergers occur with topologies used
test = self.new_test("cpg_custom_merge_with_topology",
"Verify multiple overlapping identical fencing operations are merged with fencing levels.", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd_no_wait("stonith_admin", "-F node3 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
### 4 mergers should occur
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
test.add_stonith_log_pattern("Merging stonith action off for node node3 originating from client")
### the pattern below signifies that both the original and duplicate operation completed
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test.add_stonith_log_pattern("Operation off of node3 by")
test = self.new_test("cpg_custom_no_merge",
"Verify differing fencing operations are not merged", 1)
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node3 node2\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3 node2\" ")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node3 node2\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true1")
test.add_cmd_no_wait("stonith_admin", "-F node2 -t 10")
test.add_cmd("stonith_admin", "-F node3 -t 10")
test.add_stonith_negative_log_pattern("Merging stonith action off for node node3 originating from client")
def build_standalone_tests(self):
test_types = [
{
"prefix" : "standalone" ,
"use_cpg" : 0,
},
{
"prefix" : "cpg" ,
"use_cpg" : 1,
},
]
# test what happens when all devices timeout
for test_type in test_types:
test = self.new_test("%s_fence_multi_device_failure" % test_type["prefix"],
"Verify that all devices timeout, a fencing failure is returned.", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R false3 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_expected_fail_cmd("stonith_admin", "-F node3 -t 2", 194)
if test_type["use_cpg"] == 1:
test.add_stonith_log_pattern("remote op timeout set to 6")
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -62")
test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: -62")
test.add_stonith_log_pattern("for host 'node3' with device 'false3' returned: -62")
# test what happens when multiple devices can fence a node, but the first device fails.
for test_type in test_types:
test = self.new_test("%s_fence_device_failure_rollover" % test_type["prefix"],
"Verify that when one fence device fails for a node, the others are tried.", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-F node3 -t 2")
if test_type["use_cpg"] == 1:
test.add_stonith_log_pattern("remote op timeout set to 6")
# simple topology test for one device
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_simple" % test_type["prefix"],
"Verify all fencing devices at a level are used.", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("remote op timeout set to 2")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test what happens when the first fencing level has multiple devices.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_device_fails" % test_type["prefix"],
"Verify if one device in a level fails, the other is tried.", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R false -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true")
test.add_cmd("stonith_admin", "-F node3 -t 20")
test.add_stonith_log_pattern("remote op timeout set to 4")
test.add_stonith_log_pattern("for host 'node3' with device 'false' returned: -62")
test.add_stonith_log_pattern("for host 'node3' with device 'true' returned: 0")
# test what happens when the first fencing level fails.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_multi_level_fails" % test_type["prefix"],
"Verify if one level fails, the next leve is tried.", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true2 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true3 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true4 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
test.add_cmd("stonith_admin", "-F node3 -t 2")
test.add_stonith_log_pattern("remote op timeout set to 12")
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -62")
test.add_stonith_log_pattern("for host 'node3' with device 'false2' returned: -62")
test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0")
test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0")
# Test what happens if multiple fencing levels are defined, and then the first one is removed.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_topology_level_removal" % test_type["prefix"],
"Verify level removal works.", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true2 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true3 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true4 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R false2 -a fence_false -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v false1")
test.add_cmd("stonith_admin", "-r node3 -i 1 -v true1")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v true2")
test.add_cmd("stonith_admin", "-r node3 -i 2 -v false2")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true3")
test.add_cmd("stonith_admin", "-r node3 -i 3 -v true4")
# Now remove level 2, verify none of the devices in level two are hit.
test.add_cmd("stonith_admin", "-d node3 -i 2")
test.add_cmd("stonith_admin", "-F node3 -t 20")
test.add_stonith_log_pattern("remote op timeout set to 8")
test.add_stonith_log_pattern("for host 'node3' with device 'false1' returned: -62")
test.add_stonith_negative_log_pattern("for host 'node3' with device 'false2' returned: -62")
test.add_stonith_negative_log_pattern("for host 'node3' with device 'false2' returned: -1001")
test.add_stonith_log_pattern("for host 'node3' with device 'true3' returned: 0")
test.add_stonith_log_pattern("for host 'node3' with device 'true4' returned: 0")
# test the stonith builds the correct list of devices that can fence a node.
for test_type in test_types:
test = self.new_test("%s_list_devices" % test_type["prefix"],
"Verify list of devices that can fence a node is correct", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R true2 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd("stonith_admin", "-R true3 -a fence_true -o \"pcmk_host_list=node1 node2 node3\"")
test.add_cmd_check_stdout("stonith_admin", "-l node1 -V", "true2", "true1")
test.add_cmd_check_stdout("stonith_admin", "-l node1 -V", "true3", "true1")
# simple test of device monitor
for test_type in test_types:
test = self.new_test("%s_monitor" % test_type["prefix"],
"Verify device is reachable", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-R false1 -a fence_false -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-Q true1")
test.add_cmd("stonith_admin", "-Q false1")
test.add_expected_fail_cmd("stonith_admin", "-Q true2", 237)
# simple register test
for test_type in test_types:
test = self.new_test("%s_register" % test_type["prefix"],
"Verify devices can be registered and un-registered", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-Q true1")
test.add_cmd("stonith_admin", "-D true1")
test.add_expected_fail_cmd("stonith_admin", "-Q true1", 237)
# simple reboot test
for test_type in test_types:
test = self.new_test("%s_reboot" % test_type["prefix"],
"Verify devices can be rebooted", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-B node3 -t 2")
test.add_cmd("stonith_admin", "-D true1")
test.add_expected_fail_cmd("stonith_admin", "-Q true1", 237)
# test fencing history.
for test_type in test_types:
if test_type["use_cpg"] == 0:
continue
test = self.new_test("%s_fence_history" % test_type["prefix"],
"Verify last fencing operation is returned.", test_type["use_cpg"])
test.add_cmd("stonith_admin", "-R true1 -a fence_true -o \"pcmk_host_list=node3\"")
test.add_cmd("stonith_admin", "-F node3 -t 2 -V")
test.add_cmd_check_stdout("stonith_admin", "-H node3", "was able to turn off node node3", "")
def setup_environment(self, use_corosync):
if self.autogen_corosync_cfg and use_corosync:
corosync_conf = ("""
totem {
version: 2
crypto_cipher: none
crypto_hash: none
nodeid: 101
secauth: off
interface {
ttl: 1
ringnumber: 0
mcastport: 6666
mcastaddr: 226.94.1.1
bindnetaddr: 127.0.0.1
}
}
logging {
debug: off
fileline: off
to_syslog: no
to_stderr: no
syslog_facility: daemon
timestamp: on
to_logfile: yes
logfile: /var/log/corosync.log
logfile_priority: info
}
""")
os.system("cat <<-END >>/etc/corosync/corosync.conf\n%s\nEND" % (corosync_conf))
if use_corosync:
### make sure we are in control ###
self.stop_corosync()
self.start_corosync()
os.system("cp /usr/share/pacemaker/tests/cts/fence_false /usr/sbin/fence_false")
os.system("cp /usr/share/pacemaker/tests/cts/fence_true /usr/sbin/fence_true")
def cleanup_environment(self, use_corosync):
if use_corosync:
self.stop_corosync()
if self.verbose and os.path.exists('/var/log/corosync.log'):
print "Daemon output"
f = open('/var/log/corosync.log', 'r')
for line in f.readlines():
print line.strip()
os.remove('/var/log/corosync.log')
if self.autogen_corosync_cfg:
os.system("rm -f /etc/corosync/corosync.conf")
class TestOptions:
def __init__(self):
self.options = {}
self.options['list-tests'] = 0
self.options['run-all'] = 1
self.options['run-only'] = ""
self.options['run-only-pattern'] = ""
self.options['verbose'] = 0
self.options['invalid-arg'] = ""
self.options['cpg-only'] = 0
self.options['no-cpg'] = 0
self.options['show-usage'] = 0
def build_options(self, argv):
args = argv[1:]
skip = 0
for i in range(0, len(args)):
if skip:
skip = 0
continue
elif args[i] == "-h" or args[i] == "--help":
self.options['show-usage'] = 1
elif args[i] == "-l" or args[i] == "--list-tests":
self.options['list-tests'] = 1
elif args[i] == "-V" or args[i] == "--verbose":
self.options['verbose'] = 1
elif args[i] == "-n" or args[i] == "--no-cpg":
self.options['no-cpg'] = 1
elif args[i] == "-c" or args[i] == "--cpg-only":
self.options['cpg-only'] = 1
elif args[i] == "-r" or args[i] == "--run-only":
self.options['run-only'] = args[i+1]
skip = 1
elif args[i] == "-p" or args[i] == "--run-only-pattern":
self.options['run-only-pattern'] = args[i+1]
skip = 1
def show_usage(self):
print "usage: " + sys.argv[0] + " [options]"
print "If no options are provided, all tests will run"
print "Options:"
print "\t [--help | -h] Show usage"
print "\t [--list-tests | -l] Print out all registered tests."
print "\t [--cpg-only | -c] Only run tests that require corosync."
print "\t [--no-cpg | -n] Only run tests that do not require corosync"
print "\t [--run-only | -r 'testname'] Run a specific test"
print "\t [--verbose | -V] Verbose output"
print "\t [--run-only-pattern | -p 'string'] Run only tests containing the string value"
print "\n\tExample: Run only the test 'start_top'"
print "\t\t python ./regression.py --run-only start_stop"
print "\n\tExample: Run only the tests with the string 'systemd' present in them"
print "\t\t python ./regression.py --run-only-pattern systemd"
def main(argv):
o = TestOptions()
o.build_options(argv)
use_corosync = 1
tests = Tests(o.options['verbose'])
tests.build_standalone_tests()
tests.build_custom_timeout_tests()
tests.build_api_sanity_tests()
tests.build_fence_merge_tests()
if o.options['list-tests']:
tests.print_list()
sys.exit(0)
elif o.options['show-usage']:
o.show_usage()
sys.exit(0)
print "Starting ..."
if o.options['no-cpg']:
use_corosync = 0
tests.setup_environment(use_corosync)
if o.options['run-only-pattern'] != "":
tests.run_tests_matching(o.options['run-only-pattern'])
tests.print_results()
elif o.options['run-only'] != "":
tests.run_single(o.options['run-only'])
tests.print_results()
elif o.options['no-cpg']:
tests.run_no_cpg()
tests.print_results()
elif o.options['cpg-only']:
tests.run_cpg_only()
tests.print_results()
else:
tests.run_tests()
tests.print_results()
tests.cleanup_environment(use_corosync)
tests.exit()
if __name__=="__main__":
main(sys.argv)
Jump to Line
Something went wrong with that request. Please try again.