Skip to content

Commit

Permalink
Support cluster backends other then ccm when running dtests
Browse files Browse the repository at this point in the history
Patch by Jordan West; Reviewed by Ariel Weisberg for CASSANDRA-14449
  • Loading branch information
jrwest authored and aweisberg committed May 29, 2018
1 parent 0fd9a55 commit cf1e5a6
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 95 deletions.
106 changes: 26 additions & 80 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import platform
import copy
import inspect
import subprocess

from itertools import zip_longest

from dtest import running_in_docker, cleanup_docker_environment_before_test_execution
Expand All @@ -18,62 +18,14 @@
from psutil import virtual_memory

import netifaces as ni
import ccmlib.repository
from ccmlib.common import validate_install_dir, get_version_from_build, is_win
from ccmlib.common import validate_install_dir, is_win

from dtest_config import DTestConfig
from dtest_setup import DTestSetup
from dtest_setup_overrides import DTestSetupOverrides

logger = logging.getLogger(__name__)


class DTestConfig:
def __init__(self):
self.use_vnodes = True
self.use_off_heap_memtables = False
self.num_tokens = -1
self.data_dir_count = -1
self.force_execution_of_resource_intensive_tests = False
self.skip_resource_intensive_tests = False
self.cassandra_dir = None
self.cassandra_version = None
self.cassandra_version_from_build = None
self.delete_logs = False
self.execute_upgrade_tests = False
self.disable_active_log_watching = False
self.keep_test_dir = False
self.enable_jacoco_code_coverage = False
self.jemalloc_path = find_libjemalloc()

def setup(self, request):
self.use_vnodes = request.config.getoption("--use-vnodes")
self.use_off_heap_memtables = request.config.getoption("--use-off-heap-memtables")
self.num_tokens = request.config.getoption("--num-tokens")
self.data_dir_count = request.config.getoption("--data-dir-count-per-instance")
self.force_execution_of_resource_intensive_tests = request.config.getoption("--force-resource-intensive-tests")
self.skip_resource_intensive_tests = request.config.getoption("--skip-resource-intensive-tests")
if request.config.getoption("--cassandra-dir") is not None:
self.cassandra_dir = os.path.expanduser(request.config.getoption("--cassandra-dir"))
self.cassandra_version = request.config.getoption("--cassandra-version")

# There are times when we want to know the C* version we're testing against
# before we do any cluster. In the general case, we can't know that -- the
# test method could use any version it wants for self.cluster. However, we can
# get the version from build.xml in the C* repository specified by
# CASSANDRA_VERSION or CASSANDRA_DIR.
if self.cassandra_version is not None:
ccm_repo_cache_dir, _ = ccmlib.repository.setup(self.cassandra_version)
self.cassandra_version_from_build = get_version_from_build(ccm_repo_cache_dir)
elif self.cassandra_dir is not None:
self.cassandra_version_from_build = get_version_from_build(self.cassandra_dir)

self.delete_logs = request.config.getoption("--delete-logs")
self.execute_upgrade_tests = request.config.getoption("--execute-upgrade-tests")
self.disable_active_log_watching = request.config.getoption("--disable-active-log-watching")
self.keep_test_dir = request.config.getoption("--keep-test-dir")
self.enable_jacoco_code_coverage = request.config.getoption("--enable-jacoco-code-coverage")


def check_required_loopback_interfaces_available():
"""
We need at least 3 loopback interfaces configured to run almost all dtests. On Linux, loopback
Expand All @@ -88,7 +40,6 @@ def check_required_loopback_interfaces_available():
"On Mac you can create the required loopback interfaces by running "
"'for i in {1..9}; do sudo ifconfig lo0 alias 127.0.0.$i up; done;'")


def pytest_addoption(parser):
parser.addoption("--use-vnodes", action="store_true", default=False,
help="Determines wither or not to setup clusters using vnodes for tests")
Expand Down Expand Up @@ -143,6 +94,12 @@ def fixture_dtest_setup_overrides(dtest_config):
"""
return DTestSetupOverrides()

@pytest.fixture(scope='function')
def fixture_dtest_cluster_name():
"""
:return: The name to use for the running test's cluster
"""
return "test"

"""
Not exactly sure why :\ but, this fixture needs to be scoped to function level and not
Expand Down Expand Up @@ -304,17 +261,31 @@ def reset_environment_vars(initial_environment):
os.environ.update(initial_environment)
os.environ['PYTEST_CURRENT_TEST'] = pytest_current_test

@pytest.fixture(scope='function')
def fixture_dtest_create_cluster_func():
"""
:return: A function whose sole argument is a DTestSetup instance and returns an
object that operates with the same interface as ccmlib.Cluster.
"""
return DTestSetup.create_ccm_cluster

@pytest.fixture(scope='function', autouse=False)
def fixture_dtest_setup(request, dtest_config, fixture_dtest_setup_overrides, fixture_logging_setup):
def fixture_dtest_setup(request,
dtest_config,
fixture_dtest_setup_overrides,
fixture_logging_setup,
fixture_dtest_cluster_name,
fixture_dtest_create_cluster_func):
if running_in_docker():
cleanup_docker_environment_before_test_execution()

# do all of our setup operations to get the enviornment ready for the actual test
# to run (e.g. bring up a cluster with the necessary config, populate variables, etc)
initial_environment = copy.deepcopy(os.environ)
dtest_setup = DTestSetup(dtest_config=dtest_config, setup_overrides=fixture_dtest_setup_overrides)
dtest_setup.initialize_cluster()
dtest_setup = DTestSetup(dtest_config=dtest_config,
setup_overrides=fixture_dtest_setup_overrides,
cluster_name=fixture_dtest_cluster_name)
dtest_setup.initialize_cluster(fixture_dtest_create_cluster_func)

if not dtest_config.disable_active_log_watching:
dtest_setup.log_watch_thread = dtest_setup.begin_active_log_watch()
Expand Down Expand Up @@ -451,10 +422,6 @@ def pytest_collection_modifyitems(items, config):
logger.debug("has sufficient resources? %s" % sufficient_system_resources_resource_intensive)

for item in items:
# set a timeout for all tests, it may be overwritten at the test level with an additional marker
if not item.get_marker("timeout"):
item.add_marker(pytest.mark.timeout(60*15))

deselect_test = False

if item.get_marker("resource_intensive"):
Expand Down Expand Up @@ -503,24 +470,3 @@ def pytest_collection_modifyitems(items, config):
config.hook.pytest_deselected(items=deselected_items)
items[:] = selected_items


# Determine the location of the libjemalloc jar so that we can specify it
# through environment variables when start Cassandra. This reduces startup
# time, making the dtests run faster.
def find_libjemalloc():
if is_win():
# let the normal bat script handle finding libjemalloc
return ""

this_dir = os.path.dirname(os.path.realpath(__file__))
script = os.path.join(this_dir, "findlibjemalloc.sh")
try:
p = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if stderr or not stdout:
return "-" # tells C* not to look for libjemalloc
else:
return stdout
except Exception as exc:
print("Failed to run script to prelocate libjemalloc ({}): {}".format(script, exc))
return ""
77 changes: 77 additions & 0 deletions dtest_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import subprocess
import os
import ccmlib.repository

from ccmlib.common import is_win, get_version_from_build

class DTestConfig:
def __init__(self):
self.use_vnodes = True
self.use_off_heap_memtables = False
self.num_tokens = -1
self.data_dir_count = -1
self.force_execution_of_resource_intensive_tests = False
self.skip_resource_intensive_tests = False
self.cassandra_dir = None
self.cassandra_version = None
self.cassandra_version_from_build = None
self.delete_logs = False
self.execute_upgrade_tests = False
self.disable_active_log_watching = False
self.keep_test_dir = False
self.enable_jacoco_code_coverage = False
self.jemalloc_path = find_libjemalloc()

def setup(self, request):
self.use_vnodes = request.config.getoption("--use-vnodes")
self.use_off_heap_memtables = request.config.getoption("--use-off-heap-memtables")
self.num_tokens = request.config.getoption("--num-tokens")
self.data_dir_count = request.config.getoption("--data-dir-count-per-instance")
self.force_execution_of_resource_intensive_tests = request.config.getoption("--force-resource-intensive-tests")
self.skip_resource_intensive_tests = request.config.getoption("--skip-resource-intensive-tests")
if request.config.getoption("--cassandra-dir") is not None:
self.cassandra_dir = os.path.expanduser(request.config.getoption("--cassandra-dir"))
self.cassandra_version = request.config.getoption("--cassandra-version")

self.cassandra_version_from_build = self.get_version_from_build()

self.delete_logs = request.config.getoption("--delete-logs")
self.execute_upgrade_tests = request.config.getoption("--execute-upgrade-tests")
self.disable_active_log_watching = request.config.getoption("--disable-active-log-watching")
self.keep_test_dir = request.config.getoption("--keep-test-dir")
self.enable_jacoco_code_coverage = request.config.getoption("--enable-jacoco-code-coverage")

def get_version_from_build(self):
# There are times when we want to know the C* version we're testing against
# before we do any cluster. In the general case, we can't know that -- the
# test method could use any version it wants for self.cluster. However, we can
# get the version from build.xml in the C* repository specified by
# CASSANDRA_VERSION or CASSANDRA_DIR.
if self.cassandra_version is not None:
ccm_repo_cache_dir, _ = ccmlib.repository.setup(self.cassandra_version)
return get_version_from_build(ccm_repo_cache_dir)
elif self.cassandra_dir is not None:
return get_version_from_build(self.cassandra_dir)



# Determine the location of the libjemalloc jar so that we can specify it
# through environment variables when start Cassandra. This reduces startup
# time, making the dtests run faster.
def find_libjemalloc():
if is_win():
# let the normal bat script handle finding libjemalloc
return ""

this_dir = os.path.dirname(os.path.realpath(__file__))
script = os.path.join(this_dir, "findlibjemalloc.sh")
try:
p = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if stderr or not stdout:
return "-" # tells C* not to look for libjemalloc
else:
return stdout
except Exception as exc:
print("Failed to run script to prelocate libjemalloc ({}): {}".format(script, exc))
return ""
37 changes: 22 additions & 15 deletions dtest_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ def retry_till_success(fun, *args, **kwargs):


class DTestSetup:
def __init__(self, dtest_config=None, setup_overrides=None):
def __init__(self, dtest_config=None, setup_overrides=None, cluster_name="test"):
self.dtest_config = dtest_config
self.setup_overrides = setup_overrides
self.cluster_name = cluster_name
self.ignore_log_patterns = []
self.cluster = None
self.cluster_options = []
Expand All @@ -69,6 +70,8 @@ def __init__(self, dtest_config=None, setup_overrides=None):
self.log_watch_thread = None
self.last_test_dir = "last_test_dir"
self.jvm_args = []
self.create_cluster_func = None
self.iterations = 0

def get_test_path(self):
test_path = tempfile.mkdtemp(prefix='dtest-')
Expand Down Expand Up @@ -145,7 +148,7 @@ def copy_logs(self, directory=None, name=None):
os.mkdir(directory)
logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(),
node.compactionlogfilename())
for node in list(self.cluster.nodes.values())]
for node in self.cluster.nodelist()]
if len(logs) is not 0:
basedir = str(int(time.time() * 1000)) + '_' + str(id(self))
logdir = os.path.join(directory, basedir)
Expand Down Expand Up @@ -382,7 +385,7 @@ def cleanup_and_replace_cluster(self):

self.cleanup_cluster()
self.test_path = self.get_test_path()
self.initialize_cluster()
self.initialize_cluster(self.create_cluster_func)

def init_default_config(self):
# the failure detector can be quite slow in such tests with quick start/stop
Expand Down Expand Up @@ -438,25 +441,27 @@ def maybe_setup_jacoco(self, cluster_name='test'):
else:
logger.debug("Jacoco agent not found or is not file. Execution will not be recorded.")

def create_ccm_cluster(self, name):
logger.debug("cluster ccm directory: " + self.test_path)
version = self.dtest_config.cassandra_version

@staticmethod
def create_ccm_cluster(dtest_setup):
logger.debug("cluster ccm directory: " + dtest_setup.test_path)
version = dtest_setup.dtest_config.cassandra_version

if version:
cluster = Cluster(self.test_path, name, cassandra_version=version)
cluster = Cluster(dtest_setup.test_path, dtest_setup.cluster_name, cassandra_version=version)
else:
cluster = Cluster(self.test_path, name, cassandra_dir=self.dtest_config.cassandra_dir)
cluster = Cluster(dtest_setup.test_path, dtest_setup.cluster_name, cassandra_dir=dtest_setup.dtest_config.cassandra_dir)

if self.dtest_config.use_vnodes:
cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': self.dtest_config.num_tokens})
if dtest_setup.dtest_config.use_vnodes:
cluster.set_configuration_options(values={'initial_token': None, 'num_tokens': dtest_setup.dtest_config.num_tokens})
else:
cluster.set_configuration_options(values={'num_tokens': None})

if self.dtest_config.use_off_heap_memtables:
if dtest_setup.dtest_config.use_off_heap_memtables:
cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'})

cluster.set_datadir_count(self.dtest_config.data_dir_count)
cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', self.dtest_config.jemalloc_path)
cluster.set_datadir_count(dtest_setup.dtest_config.data_dir_count)
cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', dtest_setup.dtest_config.jemalloc_path)

return cluster

Expand All @@ -474,7 +479,7 @@ def set_cluster_log_levels(self):
log_level = logging.root.level
self.cluster.set_log_level(log_level)

def initialize_cluster(self):
def initialize_cluster(self, create_cluster_func):
"""
This method is responsible for initializing and configuring a ccm
cluster for the next set of tests. This can be called for two
Expand All @@ -487,7 +492,9 @@ def initialize_cluster(self):
"""
# connections = []
# cluster_options = []
self.cluster = self.create_ccm_cluster(name='test')
self.iterations += 1
self.create_cluster_func = create_cluster_func
self.cluster = self.create_cluster_func(self)
self.init_default_config()
self.maybe_setup_jacoco()
self.set_cluster_log_levels()
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ junit_suite_name = Cassandra dtests
log_print = True
log_level = INFO
log_format = %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s
timeout = 900

0 comments on commit cf1e5a6

Please sign in to comment.