Navigation Menu

Skip to content

Commit

Permalink
Merged PYTHON-546
Browse files Browse the repository at this point in the history
  • Loading branch information
beltran committed Nov 17, 2017
2 parents 0cee88e + 4ceea3e commit 8d13b13
Show file tree
Hide file tree
Showing 5 changed files with 518 additions and 16 deletions.
23 changes: 22 additions & 1 deletion build.yaml
Expand Up @@ -58,6 +58,18 @@ schedules:
env_vars: |
EVENT_LOOP_MANAGER='twisted'
upgrade_tests:
schedule: adhoc
branches:
include: [master, python-546]
env_vars: |
EVENT_LOOP_MANAGER='libev'
JUST_UPGRADE=True
matrix:
exclude:
- python: [3.4, 3.6]
- cassandra: ['2.0', '2.1', '2.2', '3.0']

python:
- 2.7
- 3.4
Expand Down Expand Up @@ -90,6 +102,7 @@ build:
pip install -r test-requirements.txt
pip install nose-ignore-docstring
pip install nose-exclude
FORCE_CYTHON=False
if [[ $CYTHON == 'CYTHON' ]]; then
FORCE_CYTHON=True
Expand Down Expand Up @@ -117,6 +130,14 @@ build:
popd
echo "JUST_UPGRADE: $JUST_UPGRADE"
if [[ $JUST_UPGRADE == 'True' ]]; then
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=upgrade_results.xml tests/integration/upgrade || true
exit 0
fi
# Run the unit tests, this is not done in travis because
# it takes too much time for the whole matrix to build with cython
if [[ $CYTHON == 'CYTHON' ]]; then
Expand All @@ -138,6 +159,6 @@ build:
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=standard_results.xml tests/integration/standard/ || true
echo "==========RUNNING LONG INTEGRATION TESTS=========="
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --with-ignore-docstrings --with-xunit --xunit-file=long_results.xml tests/integration/long/ || true
EVENT_LOOP_MANAGER=$EVENT_LOOP_MANAGER CASSANDRA_VERSION=$CCM_CASSANDRA_VERSION VERIFY_CYTHON=$FORCE_CYTHON nosetests -s -v --logging-format="[%(levelname)s] %(asctime)s %(thread)d: %(message)s" --exclude-dir=tests/integration/long/upgrade --with-ignore-docstrings --with-xunit --xunit-file=long_results.xml tests/integration/long/ || true
- xunit:
- "*_results.xml"
32 changes: 22 additions & 10 deletions tests/integration/__init__.py
Expand Up @@ -31,9 +31,11 @@
from threading import Event
from subprocess import call
from itertools import groupby
import six

from cassandra import OperationTimedOut, ReadTimeout, ReadFailure, WriteTimeout, WriteFailure, AlreadyExists, \
InvalidRequest
from cassandra.cluster import NoHostAvailable

from cassandra.protocol import ConfigurationException

Expand Down Expand Up @@ -230,7 +232,7 @@ def get_unsupported_upper_protocol():
notpy3 = unittest.skipIf(sys.version_info >= (3, 0), "Test not applicable for Python 3.x runtime")
requiresmallclockgranularity = unittest.skipIf("Windows" in platform.system() or "async" in EVENT_LOOP_MANAGER,
"This test is not suitible for environments with large clock granularity")
requiressimulacron = unittest.skipIf(SIMULACRON_JAR is None, "Simulacron jar hasn't been specified")
requiressimulacron = unittest.skipIf(SIMULACRON_JAR is None or CASSANDRA_VERSION < "2.1", "Simulacron jar hasn't been specified or C* version is 2.0")


def wait_for_node_socket(node, timeout):
Expand Down Expand Up @@ -308,16 +310,22 @@ def is_current_cluster(cluster_name, node_counts):
return False


def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[], set_keyspace=True, ccm_options=None,
configuration_options={}):
set_default_cass_ip()

if ccm_options is None:
ccm_options = CCM_KWARGS
cassandra_version = ccm_options.get('version', CASSANDRA_VERSION)

global CCM_CLUSTER
if USE_CASS_EXTERNAL:
if CCM_CLUSTER:
log.debug("Using external CCM cluster {0}".format(CCM_CLUSTER.name))
else:
log.debug("Using unnamed external cluster")
setup_keyspace(ipformat=ipformat, wait=False)
if set_keyspace and start:
setup_keyspace(ipformat=ipformat, wait=False)
return

if is_current_cluster(cluster_name, nodes):
Expand All @@ -331,20 +339,22 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
CCM_CLUSTER = CCMClusterFactory.load(path, cluster_name)
log.debug("Found existing CCM cluster, {0}; clearing.".format(cluster_name))
CCM_CLUSTER.clear()
CCM_CLUSTER.set_install_dir(**CCM_KWARGS)
CCM_CLUSTER.set_install_dir(**ccm_options)
CCM_CLUSTER.set_configuration_options(configuration_options)
except Exception:
ex_type, ex, tb = sys.exc_info()
log.warn("{0}: {1} Backtrace: {2}".format(ex_type.__name__, ex, traceback.extract_tb(tb)))
del tb

log.debug("Creating new CCM cluster, {0}, with args {1}".format(cluster_name, CCM_KWARGS))
CCM_CLUSTER = CCMCluster(path, cluster_name, **CCM_KWARGS)
log.debug("Creating new CCM cluster, {0}, with args {1}".format(cluster_name, ccm_options))
CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options)
CCM_CLUSTER.set_configuration_options({'start_native_transport': True})
if CASSANDRA_VERSION >= '2.2':
if cassandra_version >= '2.2':
CCM_CLUSTER.set_configuration_options({'enable_user_defined_functions': True})
if CASSANDRA_VERSION >= '3.0':
if cassandra_version >= '3.0':
CCM_CLUSTER.set_configuration_options({'enable_scripted_user_defined_functions': True})
common.switch_cluster(path, cluster_name)
CCM_CLUSTER.set_configuration_options(configuration_options)
CCM_CLUSTER.populate(nodes, ipformat=ipformat)
try:
jvm_args = []
Expand All @@ -362,18 +372,20 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=[]):
# Added to wait for slow nodes to start up
for node in CCM_CLUSTER.nodes.values():
wait_for_node_socket(node, 120)
setup_keyspace(ipformat=ipformat)
if set_keyspace:
setup_keyspace(ipformat=ipformat)
except Exception:
log.exception("Failed to start CCM cluster; removing cluster.")

if os.name == "nt":
if CCM_CLUSTER:
for node in CCM_CLUSTER.nodes.itervalues():
for node in six.itervalues(CCM_CLUSTER.nodes):
os.system("taskkill /F /PID " + str(node.pid))
else:
call(["pkill", "-9", "-f", ".ccm"])
remove_cluster()
raise
return CCM_CLUSTER


def teardown_package():
Expand Down
11 changes: 6 additions & 5 deletions tests/integration/simulacron/test_policies.py
Expand Up @@ -21,7 +21,8 @@
from cassandra.query import SimpleStatement
from cassandra.policies import ConstantSpeculativeExecutionPolicy, RoundRobinPolicy, RetryPolicy, WriteType

from tests.integration import PROTOCOL_VERSION, greaterthancass21, requiressimulacron, SIMULACRON_JAR
from tests.integration import PROTOCOL_VERSION, greaterthancass21, requiressimulacron, SIMULACRON_JAR, \
CASSANDRA_VERSION
from tests.integration.simulacron.utils import start_and_prime_singledc, prime_query, \
stop_simulacron, NO_THEN, clear_queries

Expand All @@ -45,7 +46,7 @@ class SpecExecTest(unittest.TestCase):

@classmethod
def setUpClass(cls):
if SIMULACRON_JAR is None:
if SIMULACRON_JAR is None or CASSANDRA_VERSION < "2.1":
return

start_and_prime_singledc()
Expand All @@ -70,7 +71,7 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
if SIMULACRON_JAR is None:
if SIMULACRON_JAR is None or CASSANDRA_VERSION < "2.1":
return

cls.cluster.shutdown()
Expand Down Expand Up @@ -181,7 +182,7 @@ def on_write_timeout(self, query, consistency, write_type,
class RetryPolicyTets(unittest.TestCase):
@classmethod
def setUpClass(cls):
if SIMULACRON_JAR is None:
if SIMULACRON_JAR is None or CASSANDRA_VERSION < "2.1":
return
start_and_prime_singledc()

Expand All @@ -191,7 +192,7 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
if SIMULACRON_JAR is None:
if SIMULACRON_JAR is None or CASSANDRA_VERSION < "2.1":
return
cls.cluster.shutdown()
stop_simulacron()
Expand Down
189 changes: 189 additions & 0 deletions tests/integration/upgrade/__init__.py
@@ -0,0 +1,189 @@
# Copyright 2013-2017 DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from tests.integration import CCM_KWARGS, use_cluster, remove_cluster, MockLoggingHandler
from tests.integration import setup_keyspace

from cassandra.cluster import Cluster
from cassandra import cluster

from collections import namedtuple
from functools import wraps
import logging
from threading import Thread, Event
from ccmlib.node import TimeoutError
import time
import logging

try:
import unittest2 as unittest
except ImportError:
import unittest # noqa


def setup_module():
remove_cluster()


UPGRADE_CLUSTER_NAME = "upgrade_cluster"
UpgradePath = namedtuple('UpgradePath', ('name', 'starting_version', 'upgrade_version', 'configuration_options'))

log = logging.getLogger(__name__)


class upgrade_paths(object):
"""
Decorator used to specify the upgrade paths for a particular method
"""
def __init__(self, paths):
self.paths = paths

def __call__(self, method):
@wraps(method)
def wrapper(*args, **kwargs):
for path in self.paths:
self_from_decorated = args[0]
log.debug('setting up {path}'.format(path=path))
self_from_decorated.UPGRADE_PATH = path
self_from_decorated._upgrade_step_setup()
method(*args, **kwargs)
log.debug('tearing down {path}'.format(path=path))
self_from_decorated._upgrade_step_teardown()
return wrapper


class UpgradeBase(unittest.TestCase):
"""
Base class for the upgrade tests. The _setup method
will clean the environment and start the appropriate C* version according
to the upgrade path. The upgrade can be done in a different thread using the
start_upgrade upgrade_method (this would be the most realistic scenario)
or node by node, waiting for the upgrade to happen, using _upgrade_one_node method
"""
UPGRADE_PATH = None
start_cluster = True
set_keyspace = True

@classmethod
def setUpClass(cls):
cls.logger_handler = MockLoggingHandler()
logger = logging.getLogger(cluster.__name__)
logger.addHandler(cls.logger_handler)

def _upgrade_step_setup(self):
"""
This is not the regular _setUp method because it will be called from
the decorator instead of letting nose handle it.
This setup method will start a cluster with the right version according
to the variable UPGRADE_PATH.
"""
remove_cluster()
self.cluster = use_cluster(UPGRADE_CLUSTER_NAME + self.UPGRADE_PATH.name, [3],
ccm_options=self.UPGRADE_PATH.starting_version, set_keyspace=self.set_keyspace,
configuration_options=self.UPGRADE_PATH.configuration_options)
self.nodes = self.cluster.nodelist()
self.last_node_upgraded = None
self.upgrade_done = Event()
self.upgrade_thread = None

if self.start_cluster:
setup_keyspace()

self.cluster_driver = Cluster()
self.session = self.cluster_driver.connect()
self.logger_handler.reset()

def _upgrade_step_teardown(self):
"""
special tearDown method called by the decorator after the method has ended
"""
if self.upgrade_thread:
self.upgrade_thread.join(timeout=5)
self.upgrade_thread = None

if self.start_cluster:
self.cluster_driver.shutdown()

def start_upgrade(self, time_node_upgrade):
"""
Starts the upgrade in a different thread
"""
log.debug('Starting upgrade in new thread')
self.upgrade_thread = Thread(target=self._upgrade, args=(time_node_upgrade,))
self.upgrade_thread.start()

def _upgrade(self, time_node_upgrade):
"""
Starts the upgrade in the same thread
"""
start_time = time.time()
for node in self.nodes:
self.upgrade_node(node)
end_time = time.time()
time_to_upgrade = end_time - start_time
if time_node_upgrade > time_to_upgrade:
time.sleep(time_node_upgrade - time_to_upgrade)
self.upgrade_done.set()

def is_upgraded(self):
"""
Returns True if the upgrade has finished and False otherwise
"""
return self.upgrade_done.is_set()

def wait_for_upgrade(self, timeout=None):
"""
Waits until the upgrade has completed
"""
self.upgrade_done.wait(timeout=timeout)

def upgrade_node(self, node):
"""
Upgrades only one node. Return True if the upgrade
has finished and False otherwise
"""
node.drain()
node.stop(gently=True)

node.set_install_dir(**self.UPGRADE_PATH.upgrade_version)

# There must be a cleaner way of doing this, but it's necessary here
# to call the private method from cluster __update_topology_files
self.cluster._Cluster__update_topology_files()
try:
node.start(wait_for_binary_proto=True, wait_other_notice=True)
except TimeoutError:
self.fail("Error starting C* node while upgrading")

return True


class UpgradeBaseAuth(UpgradeBase):
"""
Base class of authentication test, the authentication parameters for
C* still have to be specified within the upgrade path variable
"""
start_cluster = False
set_keyspace = False


def _upgrade_step_setup(self):
"""
We sleep here for the same reason as we do in test_authentication.py:
there seems to be some race, with some versions of C* taking longer to
get the auth (and default user) setup. Sleep here to give it a chance
"""
super(UpgradeBaseAuth, self)._upgrade_step_setup()
time.sleep(10)

0 comments on commit 8d13b13

Please sign in to comment.