From 9759416560abef7472787583083d880c99feb124 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 14 Sep 2018 14:32:31 +0200 Subject: [PATCH] Transient Replication and Cheap Quorums tests Patch by Blake Eggleston, Alex Petrov, Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-14404 Co-authored-by: Blake Eggleston Co-authored-by: Ariel Weisberg --- transient_replication_ring_test.py | 502 ++++++++++++++++++++++ transient_replication_test.py | 653 +++++++++++++++++++++++++++++ 2 files changed, 1155 insertions(+) create mode 100644 transient_replication_ring_test.py create mode 100644 transient_replication_test.py diff --git a/transient_replication_ring_test.py b/transient_replication_ring_test.py new file mode 100644 index 0000000000..a3b596ebe9 --- /dev/null +++ b/transient_replication_ring_test.py @@ -0,0 +1,502 @@ +import logging +import types + +from cassandra import ConsistencyLevel +from cassandra.query import SimpleStatement +from ccmlib.node import Node + +from dtest import Tester +from tools.assertions import (assert_all) + +from flaky import flaky + +from cassandra.metadata import BytesToken, OrderedDict +import pytest +from itertools import chain +from tools.misc import new_node + +logging.getLogger('cassandra').setLevel(logging.CRITICAL) + +NODELOCAL = 11 + +def jmx_start(to_start, **kwargs): + kwargs['jvm_args'] = kwargs.get('jvm_args', []) + ['-XX:-PerfDisableSharedMem'] + to_start.start(**kwargs) + + +def gen_expected(*values): + return [["%05d" % i, i, i] for i in chain(*values)] + + +def repair_nodes(nodes): + for node in nodes: + node.nodetool('repair -pr') + +def cleanup_nodes(nodes): + for node in nodes: + node.nodetool('cleanup') + +def patch_start(startable): + old_start = startable.start + + def new_start(self, *args, **kwargs): + kwargs['jvm_args'] = kwargs.get('jvm_args', []) + ['-XX:-PerfDisableSharedMem' + ' -Dcassandra.enable_nodelocal_queries=true'] + return old_start(*args, **kwargs) + + startable.start = types.MethodType(new_start, startable) + +class TestTransientReplicationRing(Tester): + + keyspace = "ks" + table = "tbl" + + def select(self): + return "SELECT * from %s.%s" % (self.keyspace, self.table) + + def select_statement(self): + return SimpleStatement(self.select(), consistency_level=NODELOCAL) + + def point_select(self): + return "SELECT * from %s.%s where pk = %%s" % (self.keyspace, self.table) + + def point_select_statement(self): + return SimpleStatement(self.point_select(), consistency_level=NODELOCAL) + + def check_expected(self, sessions, expected, node=[i for i in range(0,1000)], cleanup=False): + """Check that each node has the expected values present""" + for idx, session, expect, node in zip(range(0, 1000), sessions, expected, node): + print("Checking idx " + str(idx)) + print(str([row for row in session.execute(self.select_statement())])) + if cleanup: + node.nodetool('cleanup') + assert_all(session, + self.select(), + expect, + cl=NODELOCAL) + + def check_replication(self, sessions, exactly=None, gte=None, lte=None): + """Assert that the test values are replicated a required number of times""" + for i in range(0, 40): + count = 0 + for session in sessions: + for row in session.execute(self.point_select_statement(), ["%05d" % i]): + count += 1 + if exactly: + assert count == exactly, "Wrong replication for %05d should be exactly" % (i, exactly) + if gte: + assert count >= gte, "Count for %05d should be >= %d" % (i, gte) + if lte: + assert count <= lte, "Count for %05d should be <= %d" % (i, lte) + + @pytest.fixture + def cheap_quorums(self): + return False + + @pytest.fixture(scope='function', autouse=True) + def setup_cluster(self, fixture_dtest_setup): + self.tokens = ['00010', '00020', '00030'] + + patch_start(self.cluster) + self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False, + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500, + 'enable_transient_replication': True, + 'partitioner' : 'org.apache.cassandra.dht.OrderPreservingPartitioner'}) + print("CLUSTER INSTALL DIR: ") + print(self.cluster.get_install_dir()) + self.cluster.populate(3, tokens=self.tokens, debug=True, install_byteman=True) + # self.cluster.populate(3, debug=True, install_byteman=True) + self.cluster.start(wait_other_notice=True, wait_for_binary_proto=True, jvm_args=['-Dcassandra.enable_nodelocal_queries=true']) + + # enable shared memory + for node in self.cluster.nodelist(): + patch_start(node) + print(node.logfilename()) + + self.nodes = self.cluster.nodelist() + self.node1, self.node2, self.node3 = self.nodes + session = self.exclusive_cql_connection(self.node3) + + replication_params = OrderedDict() + replication_params['class'] = 'NetworkTopologyStrategy' + replication_params['datacenter1'] = '3/1' + replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items()) + + session.execute("CREATE KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params)) + print("CREATE KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params)) + self.create_table(session) + + def create_table(self, session, never_speculate=False): + if never_speculate: + session.execute("CREATE TABLE %s.%s (pk varchar, ck int, value int, PRIMARY KEY (pk, ck)) WITH speculative_retry = 'NEVER' AND read_repair = 'NONE'" % (self.keyspace, self.table)) + else: + session.execute("CREATE TABLE %s.%s (pk varchar, ck int, value int, PRIMARY KEY (pk, ck)) WITH read_repair = 'NONE'" % (self.keyspace, self.table)) + print(str(self.node1.run_cqlsh("describe table %s.%s" % (self.keyspace, self.table)))) + + def quorum(self, session, stmt_str): + return session.execute(SimpleStatement(stmt_str, consistency_level=ConsistencyLevel.QUORUM)) + + def insert_row(self, pk, ck, value, session=None, node=None): + session = session or self.exclusive_cql_connection(node or self.node1) + #token = BytesToken.from_key(pack('>i', pk)).value + #assert token < BytesToken.from_string(self.tokens[0]).value or BytesToken.from_string(self.tokens[-1]).value < token # primary replica should be node1 + #TODO Is quorum really right? I mean maybe we want ALL with retries since we really don't want to the data + #not at a replica unless it is intentional + self.quorum(session, "INSERT INTO %s.%s (pk, ck, value) VALUES ('%05d', %s, %s)" % (self.keyspace, self.table, pk, ck, value)) + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_bootstrap_and_cleanup(self): + """Test bootstrapping a new node across a mix of repaired and unrepaired data""" + main_session = self.patient_cql_connection(self.node1) + self.table = 'tbl2' + self.create_table(main_session, never_speculate=True) + nodes = [self.node1, self.node2, self.node3] + + for i in range(0, 40, 2): + self.insert_row(i, i, i, main_session) + + sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3]] + + expected = [gen_expected(range(0, 11, 2), range(22, 40, 2)), + gen_expected(range(0, 22, 2), range(32, 40, 2)), + gen_expected(range(12, 31, 2))] + self.check_expected(sessions, expected) + + #Make sure at least a little data is repaired, this shouldn't move data anywhere + repair_nodes(nodes) + + self.check_expected(sessions, expected) + + #Ensure that there is at least some transient data around, because of this if it's missing after bootstrap + #We know we failed to get it from the transient replica losing the range entirely + nodes[1].stop(wait_other_notice=True) + + for i in range(1, 40, 2): + self.insert_row(i, i, i, main_session) + + nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True) + + sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3]] + + expected = [gen_expected(range(0, 11), range(11, 20, 2), range(21, 40)), + gen_expected(range(0, 21, 2), range(32, 40, 2)), + gen_expected(range(1, 11, 2), range(11, 31), range(31, 40, 2))] + + #Every node should have some of its fully replicated data and one and two should have some transient data + self.check_expected(sessions, expected) + + node4 = new_node(self.cluster, bootstrap=True, token='00040') + patch_start(node4) + nodes.append(node4) + node4.start(wait_for_binary_proto=True, wait_other_notice=True) + + expected.append(gen_expected(range(11, 20, 2), range(21, 40))) + sessions.append(self.exclusive_cql_connection(node4)) + + #Because repair was never run and nodes had transient data it will have data for transient ranges (node1, 11-20) + assert_all(sessions[3], + self.select(), + expected[3], + cl=NODELOCAL) + + #Node1 no longer transiently replicates 11-20, so cleanup will clean it up + #Node1 also now transiently replicates 21-30 and half the values in that range were repaired + expected[0] = gen_expected(range(0, 11), range(21, 30, 2), range(31, 40)) + #Node2 still missing data since it was down during some insertions, it also lost some range (31-40) + expected[1] = gen_expected(range(0, 21, 2)) + expected[2] = gen_expected(range(1, 11, 2), range(11, 31)) + + #Cleanup should only impact if a node lost a range entirely or started to transiently replicate it and the data + #was repaired + self.check_expected(sessions, expected, nodes, cleanup=True) + + repair_nodes(nodes) + + expected = [gen_expected(range(0, 11), range(31, 40)), + gen_expected(range(0, 21)), + gen_expected(range(11, 31)), + gen_expected(range(21, 40))] + + self.check_expected(sessions, expected, nodes, cleanup=True) + + #Every value should be replicated exactly 2 times + self.check_replication(sessions, exactly=2) + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def move_test(self, move_token, expected_after_move, expected_after_repair): + """Helper method to run a move test cycle""" + node4 = new_node(self.cluster, bootstrap=True, token='00040') + patch_start(node4) + node4.start(wait_for_binary_proto=True, wait_other_notice=True) + main_session = self.patient_cql_connection(self.node1) + self.table = 'tbl2' + self.create_table(main_session, never_speculate=True) + nodes = [self.node1, self.node2, self.node3, node4] + + for i in range(0, 40, 2): + print("Inserting " + str(i)) + self.insert_row(i, i, i, main_session) + + # Make sure at least a little data is repaired + repair_nodes(nodes) + + # Ensure that there is at least some transient data around, because of this if it's missing after bootstrap + # We know we failed to get it from the transient replica losing the range entirely + nodes[1].stop(wait_other_notice=True) + + for i in range(1, 40, 2): + print("Inserting " + str(i)) + self.insert_row(i, i, i, main_session) + + nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True) + sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3, node4]] + + expected = [gen_expected(range(0, 11), range(31, 40)), + gen_expected(range(0, 21, 2)), + gen_expected(range(1, 11, 2), range(11, 31)), + gen_expected(range(11, 20, 2), range(21, 40))] + self.check_expected(sessions, expected) + self.check_replication(sessions, exactly=2) + + nodes[0].nodetool('move %s' % move_token) + cleanup_nodes(nodes) + + self.check_replication(sessions, gte=2, lte=3) + self.check_expected(sessions, expected=expected_after_move) + + repair_nodes(nodes) + + self.check_expected(sessions, expected_after_repair, nodes, cleanup=True) + self.check_replication(sessions, exactly=2) + + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_move_forwards_between_and_cleanup(self): + """Test moving a node forwards past a neighbor token""" + move_token = '00025' + expected_after_move = [gen_expected(range(0, 26), range(31, 40, 2)), + gen_expected(range(0, 21, 2), range(31, 40)), + gen_expected(range(1, 11, 2), range(11, 21, 2), range(21,31)), + gen_expected(range(21, 26, 2), range(26, 40))] + expected_after_repair = [gen_expected(range(0, 26)), + gen_expected(range(0, 21), range(31, 40)), + gen_expected(range(21, 31),), + gen_expected(range(26, 40))] + self.move_test(move_token, expected_after_move, expected_after_repair) + + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_move_forwards_and_cleanup(self): + """Test moving a node forwards without going past a neighbor token""" + move_token = '00015' + expected_after_move = [gen_expected(range(0, 16), range(31, 40)), + gen_expected(range(0, 21, 2)), + gen_expected(range(1, 16, 2), range(16, 31)), + gen_expected(range(17, 20, 2), range(21, 40))] + expected_after_repair = [gen_expected(range(0, 16), range(31, 40)), + gen_expected(range(0, 21)), + gen_expected(range(16, 31)), + gen_expected(range(21, 40))] + self.move_test(move_token, expected_after_move, expected_after_repair) + + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_move_backwards_between_and_cleanup(self): + """Test moving a node backwards past it's preceding neighbor's token""" + move_token = '00035' + expected_after_move = [gen_expected(range(1, 21, 2), range(21, 36)), + gen_expected(range(0, 21, 2), range(36, 40)), + gen_expected(range(0, 31), range(37, 40, 2)), + gen_expected(range(21, 30, 2), range(31, 40))] + expected_after_repair = [gen_expected(range(21, 36)), + gen_expected(range(0, 21), range(36, 40)), + gen_expected(range(0, 31)), + gen_expected(range(31, 40))] + self.move_test(move_token, expected_after_move, expected_after_repair) + + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_move_backwards_and_cleanup(self): + """Test moving a node backwards without moving past a neighbor token""" + move_token = '00005' + expected_after_move = [gen_expected(range(0, 6), range(31, 40)), + gen_expected(range(0, 21, 2)), + gen_expected(range(1, 6, 2), range(6, 31)), + gen_expected(range(7, 20, 2), range(21, 40))] + expected_after_repair = [gen_expected(range(0, 6), range(31, 40)), + gen_expected(range(0, 21)), + gen_expected(range(6, 31)), + gen_expected(range(21, 40))] + self.move_test(move_token, expected_after_move, expected_after_repair) + + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_decommission(self): + """Test decommissioning a node correctly streams out all the data""" + node4 = new_node(self.cluster, bootstrap=True, token='00040') + patch_start(node4) + node4.start(wait_for_binary_proto=True, wait_other_notice=True) + main_session = self.patient_cql_connection(self.node1) + self.table = 'tbl2' + self.create_table(main_session, never_speculate=True) + nodes = [self.node1, self.node2, self.node3, node4] + + for i in range(0, 40, 2): + print("Inserting " + str(i)) + self.insert_row(i, i, i, main_session) + + # Make sure at least a little data is repaired + repair_nodes(nodes) + + # Ensure that there is at least some transient data around, because of this if it's missing after bootstrap + # We know we failed to get it from the transient replica losing the range entirely + nodes[1].stop(wait_other_notice=True) + + for i in range(1, 40, 2): + print("Inserting " + str(i)) + self.insert_row(i, i, i, main_session) + + nodes[1].start(wait_for_binary_proto=True, wait_other_notice=True) + sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3, node4]] + + expected = [gen_expected(range(0, 11), range(31, 40)), + gen_expected(range(0, 21, 2)), + gen_expected(range(1, 11, 2), range(11, 31)), + gen_expected(range(11, 20, 2), range(21, 40))] + + self.check_expected(sessions, expected) + + #node1 has transient data we want to see streamed out on move + nodes[3].nodetool('decommission') + + nodes = nodes[:-1] + sessions = sessions[:-1] + + expected = [gen_expected(range(0, 11), range(11, 21, 2), range(21, 40)), + gen_expected(range(0, 21, 2), range(21, 30, 2), range(31, 40)), + gen_expected(range(1, 11, 2), range(11, 31), range(31, 40, 2))] + + cleanup_nodes(nodes) + + self.check_replication(sessions, gte=2, lte=3) + self.check_expected(sessions, expected) + + repair_nodes(nodes) + + #There should be no transient data anywhere + expected = [gen_expected(range(0, 11), range(21, 40)), + gen_expected(range(0, 21), range(31, 40)), + gen_expected(range(11, 31))] + + self.check_expected(sessions, expected, nodes, cleanup=True) + self.check_replication(sessions, exactly=2) + + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_remove(self): + """Test a mix of ring change operations across a mix of transient and repaired/unrepaired data""" + node4 = new_node(self.cluster, bootstrap=True, token='00040') + patch_start(node4) + node4.start(wait_for_binary_proto=True, wait_other_notice=True) + main_session = self.patient_cql_connection(self.node1) + self.table = 'tbl2' + self.create_table(main_session, never_speculate=True) + nodes = [self.node1, self.node2, self.node3] + + #We want the node being removed to have no data on it so nodetool remove always gets all the necessary data + #from survivors + node4_id = node4.nodetool('info').stdout[25:61] + node4.stop(wait_other_notice=True) + + for i in range(0, 40): + print("Inserting " + str(i)) + self.insert_row(i, i, i, main_session) + + sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3]] + + expected = [gen_expected(range(0, 11), range(21, 40)), + gen_expected(range(0, 21), range(31, 40)), + gen_expected(range(11, 31))] + + # Every node should some of its fully replicated data and one and two should have some transient data + self.check_expected(sessions, expected) + + nodes[0].nodetool('removenode ' + node4_id) + + #Give streaming time to occur, it's asynchronous from removenode completing at other ndoes + import time + time.sleep(15) + + # Everyone should have everything except + expected = [gen_expected(range(0, 40)), + gen_expected(range(0, 40)), + gen_expected(range(0,40))] + + self.check_replication(sessions, exactly=3) + self.check_expected(sessions, expected) + repair_nodes(nodes) + cleanup_nodes(nodes) + + self.check_replication(sessions, exactly=2) + + expected = [gen_expected(range(0,11), range(21,40)), + gen_expected(range(0,21), range(31, 40)), + gen_expected(range(11,31))] + self.check_expected(sessions, expected) + + @flaky(max_runs=1) + @pytest.mark.no_vnodes + def test_replace(self): + main_session = self.patient_cql_connection(self.node1) + self.table = 'tbl2' + self.create_table(main_session, never_speculate=True) + + #We want the node being replaced to have no data on it so the replacement definitely fetches all the data + self.node2.stop(wait_other_notice=True) + + for i in range(0, 40): + print("Inserting " + str(i)) + self.insert_row(i, i, i, main_session) + + replacement_address = self.node2.address() + self.node2.stop(wait_other_notice=True) + self.cluster.remove(self.node2) + self.node2 = Node('replacement', cluster=self.cluster, auto_bootstrap=True, + thrift_interface=None, storage_interface=(replacement_address, 7000), + jmx_port='7400', remote_debug_port='0', initial_token=None, binary_interface=(replacement_address, 9042)) + patch_start(self.node2) + nodes = [self.node1, self.node2, self.node3] + self.cluster.add(self.node2, False, data_center='datacenter1') + jvm_args = ["-Dcassandra.replace_address=%s" % replacement_address, + "-Dcassandra.ring_delay_ms=10000", + "-Dcassandra.broadcast_interval_ms=10000"] + self.node2.start(jvm_args=jvm_args, wait_for_binary_proto=True, wait_other_notice=True) + + sessions = [self.exclusive_cql_connection(node) for node in [self.node1, self.node2, self.node3]] + + # Everyone should have everything + expected = [gen_expected(range(0, 40)), + gen_expected(range(0, 40)), + gen_expected(range(0,40))] + + self.check_replication(sessions, exactly=3) + self.check_expected(sessions, expected) + + repair_nodes(nodes) + cleanup_nodes(nodes) + + self.check_replication(sessions, exactly=2) + + expected = [gen_expected(range(0,11), range(21,40)), + gen_expected(range(0,21), range(31, 40)), + gen_expected(range(11,31))] + self.check_expected(sessions, expected) \ No newline at end of file diff --git a/transient_replication_test.py b/transient_replication_test.py new file mode 100644 index 0000000000..05716781bd --- /dev/null +++ b/transient_replication_test.py @@ -0,0 +1,653 @@ +import re +import logging +import types +from struct import pack +from uuid import UUID + +from cassandra import ConsistencyLevel, InvalidRequest +from cassandra.query import SimpleStatement +from cassandra.protocol import ConfigurationException +from ccmlib.node import Node + +from dtest import Tester +from tools.misc import ImmutableMapping +from tools.jmxutils import JolokiaAgent, make_mbean +from tools.data import rows_to_list +from tools.assertions import (assert_all, assert_invalid, assert_length_equal, + assert_none, assert_one, assert_unavailable) + +from cassandra.metadata import Murmur3Token, OrderedDict +import pytest + + +logging.getLogger('cassandra').setLevel(logging.CRITICAL) + +NODELOCAL = 11 +class SSTable(object): + + def __init__(self, name, repaired, pending_id): + self.name = name + self.repaired = repaired + self.pending_id = pending_id + + +class TableMetrics(object): + + def __init__(self, node, keyspace, table): + assert isinstance(node, Node) + self.jmx = JolokiaAgent(node) + self.write_latency_mbean = make_mbean("metrics", type="Table", name="WriteLatency", keyspace=keyspace, scope=table) + self.speculative_reads_mbean = make_mbean("metrics", type="Table", name="SpeculativeRetries", keyspace=keyspace, scope=table) + self.transient_writes_mbean = make_mbean("metrics", type="Table", name="TransientWrites", keyspace=keyspace, scope=table) + + @property + def write_count(self): + return self.jmx.read_attribute(self.write_latency_mbean, "Count") + + @property + def speculative_reads(self): + return self.jmx.read_attribute(self.speculative_reads_mbean, "Count") + + @property + def transient_writes(self): + return self.jmx.read_attribute(self.transient_writes_mbean, "Count") + + def start(self): + self.jmx.start() + + def stop(self): + self.jmx.stop() + + def __enter__(self): + """ For contextmanager-style usage. """ + self.start() + return self + + def __exit__(self, exc_type, value, traceback): + """ For contextmanager-style usage. """ + self.stop() + + +class StorageProxy(object): + + def __init__(self, node): + assert isinstance(node, Node) + self.node = node + self.jmx = JolokiaAgent(node) + self.mbean = make_mbean("db", type="StorageProxy") + + def start(self): + self.jmx.start() + + def stop(self): + self.jmx.stop() + + @property + def blocking_read_repair(self): + return self.jmx.read_attribute(self.mbean, "ReadRepairRepairedBlocking") + + @property + def speculated_data_request(self): + return self.jmx.read_attribute(self.mbean, "ReadRepairSpeculatedRequest") + + @property + def speculated_data_repair(self): + return self.jmx.read_attribute(self.mbean, "ReadRepairSpeculatedRepair") + + def __enter__(self): + """ For contextmanager-style usage. """ + self.start() + return self + + def __exit__(self, exc_type, value, traceback): + """ For contextmanager-style usage. """ + self.stop() + +class StorageService(object): + + def __init__(self, node): + assert isinstance(node, Node) + self.node = node + self.jmx = JolokiaAgent(node) + self.mbean = make_mbean("db", type="StorageService") + + def start(self): + self.jmx.start() + + def stop(self): + self.jmx.stop() + + def get_replicas(self, ks, cf, key): + return self.jmx.execute_method(self.mbean, "getNaturalEndpointsWithPort(java.lang.String,java.lang.String,java.lang.String,boolean)", [ks, cf, key, True]) + + def __enter__(self): + """ For contextmanager-style usage. """ + self.start() + return self + + def __exit__(self, exc_type, value, traceback): + """ For contextmanager-style usage. """ + self.stop() + +def patch_start(startable): + old_start = startable.start + + def new_start(self, *args, **kwargs): + kwargs['jvm_args'] = kwargs.get('jvm_args', []) + ['-XX:-PerfDisableSharedMem', + '-Dcassandra.enable_nodelocal_queries=true'] + return old_start(*args, **kwargs) + + startable.start = types.MethodType(new_start, startable) + return startable + +def get_sstable_data(cls, node, keyspace): + _sstable_name = re.compile('SSTable: (.+)') + _repaired_at = re.compile('Repaired at: (\d+)') + _pending_repair = re.compile('Pending repair: (\-\-|null|[a-f0-9\-]+)') + + out = node.run_sstablemetadata(keyspace=keyspace).stdout + + def matches(pattern): + return filter(None, [pattern.match(l) for l in out.decode("utf-8").split('\n')]) + names = [m.group(1) for m in matches(_sstable_name)] + repaired_times = [int(m.group(1)) for m in matches(_repaired_at)] + + def uuid_or_none(s): + return None if s == 'null' or s == '--' else UUID(s) + pending_repairs = [uuid_or_none(m.group(1)) for m in matches(_pending_repair)] + assert names + assert repaired_times + assert pending_repairs + assert len(names) == len(repaired_times) == len(pending_repairs) + return [SSTable(*a) for a in zip(names, repaired_times, pending_repairs)] + + +class TransientReplicationBase(Tester): + + keyspace = "ks" + table = "tbl" + + @pytest.fixture + def cheap_quorums(self): + return False + + def populate(self): + self.cluster.populate(3, tokens=self.tokens, debug=True, install_byteman=True) + + def set_nodes(self): + self.node1, self.node2, self.node3 = self.nodes + + # Make sure digest is not attempted against the transient node + self.node3.byteman_submit(['./byteman/throw_on_digest.btm']) + + + def replication_factor(self): + return '3/1' + + def tokens(self): + return [0, 1, 2] + + def setup_schema(self): + session = self.exclusive_cql_connection(self.node1) + replication_params = OrderedDict() + replication_params['class'] = 'NetworkTopologyStrategy' + replication_params['datacenter1'] = self.replication_factor() + replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items()) + session.execute("CREATE KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params)) + session.execute("CREATE TABLE %s.%s (pk int, ck int, value int, PRIMARY KEY (pk, ck)) WITH speculative_retry = 'NEVER' AND read_repair = 'NONE'" % (self.keyspace, self.table)) + + @pytest.fixture(scope='function', autouse=True) + def setup_cluster(self, fixture_dtest_setup): + self.tokens = self.tokens() + + patch_start(self.cluster) + self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False, + 'num_tokens': 1, + 'commitlog_sync_period_in_ms': 500, + 'enable_transient_replication': True, + 'dynamic_snitch': False}) + self.populate() + self.cluster.start(wait_other_notice=True, wait_for_binary_proto=True) + + self.nodes = [patch_start(node) for node in self.cluster.nodelist()] + self.set_nodes() + + session = self.exclusive_cql_connection(self.node3) + self.setup_schema() + + def assert_has_sstables(self, node, flush=False, compact=False): + if flush: + node.flush() + if compact: + node.nodetool(' '.join(['compact', self.keyspace, self.table])) + + sstables = node.get_sstables(self.keyspace, self.table) + assert sstables + + def assert_has_no_sstables(self, node, flush=False, compact=False): + if flush: + node.flush() + if compact: + node.nodetool(' '.join(['compact', self.keyspace, self.table])) + + sstables = node.get_sstables(self.keyspace, self.table) + assert not sstables + + def quorum(self, session, stmt_str): + return session.execute(SimpleStatement(stmt_str, consistency_level=ConsistencyLevel.QUORUM)) + + def nodelocal(self, session, stmt_str): + return session.execute(SimpleStatement(stmt_str, consistency_level=NODELOCAL)) + + def assert_local_rows(self, node, rows, ignore_order=False): + assert_all(self.exclusive_cql_connection(node), + "SELECT * FROM %s.%s" % (self.keyspace, self.table), + rows, + cl=NODELOCAL, + ignore_order=ignore_order) + + def insert_row(self, pk, ck, value, session=None, node=None): + session = session or self.exclusive_cql_connection(node or self.node1) + token = Murmur3Token.from_key(pack('>i', pk)).value + assert token < self.tokens[0] or self.tokens[-1] < token # primary replica should be node1 + self.quorum(session, "INSERT INTO %s.%s (pk, ck, value) VALUES (%s, %s, %s)" % (self.keyspace, self.table, pk, ck, value)) + + def delete_row(self, pk, ck, session=None, node=None): + session = session or self.exclusive_cql_connection(node or self.node1) + token = Murmur3Token.from_key(pack('>i', pk)).value + assert token < self.tokens[0] or self.tokens[-1] < token # primary replica should be node1 + self.quorum(session, "DELETE FROM %s.%s WHERE pk = %s AND ck = %s" % (self.keyspace, self.table, pk, ck)) + + def read_as_list(self, query, session=None, node=None): + session = session or self.exclusive_cql_connection(node or self.node1) + return rows_to_list(self.quorum(session, query)) + + def table_metrics(self, node): + return TableMetrics(node, self.keyspace, self.table) + + def split(self, arr): + arr1 = [] + arr2 = [] + for idx, item in enumerate(arr): + if idx % 2 == 0: + arr1.append(item) + else: + arr2.append(item) + return (arr1, arr2) + + def generate_rows(self, partitions, rows): + return [[pk, ck, pk+ck] for ck in range(rows) for pk in range(partitions)] + + +class TestTransientReplication(TransientReplicationBase): + + @pytest.mark.no_vnodes + def test_transient_noop_write(self): + """ If both full replicas are available, nothing should be written to the transient replica """ + for node in self.nodes: + self.assert_has_no_sstables(node) + + tm = lambda n: self.table_metrics(n) + with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3: + assert tm1.write_count == 0 + assert tm2.write_count == 0 + assert tm3.write_count == 0 + self.insert_row(1, 1, 1) + assert tm1.write_count == 1 + assert tm2.write_count == 1 + assert tm3.write_count == 0 + + self.assert_has_sstables(self.node1, flush=True) + self.assert_has_sstables(self.node2, flush=True) + self.assert_has_no_sstables(self.node3, flush=True) + + @pytest.mark.no_vnodes + def test_transient_write(self): + """ If write can't succeed on full replica, it's written to the transient node instead """ + for node in self.nodes: + self.assert_has_no_sstables(node) + + tm = lambda n: self.table_metrics(n) + with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3: + self.insert_row(1, 1, 1) + # Stop writes to the other full node + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + self.insert_row(1, 2, 2) + + # node1 should contain both rows + self.assert_local_rows(self.node1, + [[1, 1, 1], + [1, 2, 2]]) + + # write couldn't succeed on node2, so it has only the first row + self.assert_local_rows(self.node2, + [[1, 1, 1]]) + + # transient replica should hold only the second row + self.assert_local_rows(self.node3, + [[1, 2, 2]]) + + @pytest.mark.no_vnodes + def test_transient_full_merge_read(self): + """ When reading, transient replica should serve a missing read """ + for node in self.nodes: + self.assert_has_no_sstables(node) + + tm = lambda n: self.table_metrics(n) + self.insert_row(1, 1, 1) + # Stop writes to the other full node + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + self.insert_row(1, 2, 2) + + # Stop reads from the node that will hold the second row + self.node1.stop() + + # Whether we're reading from the full node or from the transient node, we should get consistent results + for node in [self.node2, self.node3]: + assert_all(self.exclusive_cql_connection(node), + "SELECT * FROM %s.%s" % (self.keyspace, self.table), + [[1, 1, 1], + [1, 2, 2]], + cl=ConsistencyLevel.QUORUM) + + @pytest.mark.no_vnodes + def test_srp(self): + """ When reading, transient replica should serve a missing read """ + for node in self.nodes: + self.assert_has_no_sstables(node) + + tm = lambda n: self.table_metrics(n) + self.insert_row(1, 1, 1) + self.insert_row(1, 2, 2) + + # Stop writes to the other full node + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + self.delete_row(1, 1, node = self.node1) + + # Stop reads from the node that will hold the second row + self.node1.stop() + + # Whether we're reading from the full node or from the transient node, we should get consistent results + assert_all(self.exclusive_cql_connection(self.node3), + "SELECT * FROM %s.%s LIMIT 1" % (self.keyspace, self.table), + [[1, 2, 2]], + cl=ConsistencyLevel.QUORUM) + + @pytest.mark.no_vnodes + def test_transient_full_merge_read_with_delete_transient_coordinator(self): + self._test_transient_full_merge_read_with_delete(self.node3) + + @pytest.mark.no_vnodes + def test_transient_full_merge_read_with_delete_full_coordinator(self): + self._test_transient_full_merge_read_with_delete(self.node2) + + @pytest.mark.no_vnodes + def _test_transient_full_merge_read_with_delete(self, coordinator): + """ When reading, transient replica should serve a missing read """ + for node in self.nodes: + self.assert_has_no_sstables(node) + + tm = lambda n: self.table_metrics(n) + self.insert_row(1, 1, 1) + self.insert_row(1, 2, 2) + # Stop writes to the other full node + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + self.delete_row(1, 2) + + self.assert_local_rows(self.node3, + []) + # Stop reads from the node that will hold the second row + self.node1.stop() + + assert_all(self.exclusive_cql_connection(coordinator), + "SELECT * FROM %s.%s" % (self.keyspace, self.table), + [[1, 1, 1]], + cl=ConsistencyLevel.QUORUM) + + def _test_speculative_write_repair_cycle(self, primary_range, optimized_repair, repair_coordinator, expect_node3_data): + """ + if one of the full replicas is not available, data should be written to the transient replica, but removed after incremental repair + """ + for node in self.nodes: + self.assert_has_no_sstables(node) + + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + # self.insert_row(1) + tm = lambda n: self.table_metrics(n) + with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3: + assert tm1.write_count == 0 + assert tm2.write_count == 0 + assert tm3.write_count == 0 + self.insert_row(1, 1, 1) + assert tm1.write_count == 1 + assert tm2.write_count == 0 + assert tm3.write_count == 1 + + self.assert_has_sstables(self.node1, flush=True) + self.assert_has_no_sstables(self.node2, flush=True) + self.assert_has_sstables(self.node3, flush=True) + + repair_opts = ['repair', self.keyspace] + if primary_range: repair_opts.append('-pr') + if optimized_repair: repair_opts.append('-os') + self.node1.nodetool(' '.join(repair_opts)) + + self.assert_has_sstables(self.node1, compact=True) + self.assert_has_sstables(self.node2, compact=True) + if expect_node3_data: + self.assert_has_sstables(self.node3, compact=True) + else: + self.assert_has_no_sstables(self.node3, compact=True) + + @pytest.mark.no_vnodes + def test_speculative_write_repair_cycle(self): + """ incremental repair from full replica should remove data on node3 """ + self._test_speculative_write_repair_cycle(primary_range=False, + optimized_repair=False, + repair_coordinator=self.node1, + expect_node3_data=False) + + @pytest.mark.no_vnodes + def test_primary_range_repair(self): + """ optimized primary range incremental repair from full replica should remove data on node3 """ + self._test_speculative_write_repair_cycle(primary_range=True, + optimized_repair=False, + repair_coordinator=self.node1, + expect_node3_data=False) + + @pytest.mark.no_vnodes + def test_optimized_primary_range_repair(self): + """ optimized primary range incremental repair from full replica should remove data on node3 """ + self._test_speculative_write_repair_cycle(primary_range=True, + optimized_repair=True, + repair_coordinator=self.node1, + expect_node3_data=False) + + @pytest.mark.no_vnodes + def test_transient_incremental_repair(self): + """ transiently replicated ranges should be skipped when coordinating repairs """ + self._test_speculative_write_repair_cycle(primary_range=True, + optimized_repair=False, + repair_coordinator=self.node1, + expect_node3_data=False) + + @pytest.mark.no_vnodes + def test_cheap_quorums(self): + """ writes shouldn't make it to transient nodes """ + session = self.exclusive_cql_connection(self.node1) + for node in self.nodes: + self.assert_has_no_sstables(node) + + tm = lambda n: self.table_metrics(n) + + with tm(self.node1) as tm1, tm(self.node2) as tm2, tm(self.node3) as tm3: + assert tm1.write_count == 0 + assert tm2.write_count == 0 + assert tm3.write_count == 0 + self.insert_row(1, 1, 1, session=session) + assert tm1.write_count == 1 + assert tm2.write_count == 1 + assert tm3.write_count == 0 + + @pytest.mark.no_vnodes + def test_speculative_write(self): + """ if a full replica isn't responding, we should send the write to the transient replica """ + session = self.exclusive_cql_connection(self.node1) + self.node2.byteman_submit(['./byteman/slow_writes.btm']) + + self.insert_row(1, 1, 1, session=session) + self.assert_local_rows(self.node1, [[1,1,1]]) + self.assert_local_rows(self.node2, []) + self.assert_local_rows(self.node3, [[1,1,1]]) + + @pytest.mark.no_vnodes + def test_full_repair_from_full_replica(self): + """ full repairs shouldn't replicate data to transient replicas """ + session = self.exclusive_cql_connection(self.node1) + for node in self.nodes: + self.assert_has_no_sstables(node) + + self.insert_row(1, 1, 1, session=session) + + self.assert_has_sstables(self.node1, flush=True) + self.assert_has_sstables(self.node2, flush=True) + self.assert_has_no_sstables(self.node3, flush=True) + + self.node1.nodetool(' '.join(['repair', self.keyspace, '-full'])) + + self.assert_has_sstables(self.node1, flush=True) + self.assert_has_sstables(self.node2, flush=True) + self.assert_has_no_sstables(self.node3, flush=True) + + @pytest.mark.no_vnodes + def test_full_repair_from_transient_replica(self): + """ full repairs shouldn't replicate data to transient replicas """ + session = self.exclusive_cql_connection(self.node1) + for node in self.nodes: + self.assert_has_no_sstables(node) + + self.insert_row(1, 1, 1, session=session) + + self.assert_has_sstables(self.node1, flush=True) + self.assert_has_sstables(self.node2, flush=True) + self.assert_has_no_sstables(self.node3, flush=True) + + self.node3.nodetool(' '.join(['repair', self.keyspace, '-full'])) + + self.assert_has_sstables(self.node1, flush=True) + self.assert_has_sstables(self.node2, flush=True) + self.assert_has_no_sstables(self.node3, flush=True) + + @pytest.mark.skip(reason="Doesn't test quite the right combination of forbidden RF changes right now") + def test_keyspace_rf_changes(self): + """ they should throw an exception """ + session = self.exclusive_cql_connection(self.node1) + replication_params = OrderedDict() + replication_params['class'] = 'NetworkTopologyStrategy' + assert self.replication_factor() == '3/1' + replication_params['datacenter1'] = '5/2' + replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items()) + with pytest.raises(ConfigurationException): + session.execute("ALTER KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params)) + + def test_disabled_read_repair(self): + """ shouldn't allow creating tables without read repair disabled """ + session = self.exclusive_cql_connection(self.node1) + with pytest.raises(InvalidRequest): + session.execute("CREATE TABLE %s.tbl2 (pk int, ck int, value int, PRIMARY KEY (pk, ck))" % self.keyspace) + + with pytest.raises(InvalidRequest): + session.execute("ALTER TABLE %s.%s WITH read_repair = 'BLOCKING'" % (self.keyspace, self.table)) + + +class TestTransientReplicationSpeculativeQueries(TransientReplicationBase): + def setup_schema(self): + session = self.exclusive_cql_connection(self.node1) + replication_params = OrderedDict() + replication_params['class'] = 'NetworkTopologyStrategy' + replication_params['datacenter1'] = self.replication_factor() + replication_params = ', '.join("'%s': '%s'" % (k, v) for k, v in replication_params.items()) + session.execute("CREATE KEYSPACE %s WITH REPLICATION={%s}" % (self.keyspace, replication_params)) + session.execute("CREATE TABLE %s.%s (pk int, ck int, value int, PRIMARY KEY (pk, ck)) WITH speculative_retry = 'NEVER' AND read_repair = 'NONE';" % (self.keyspace, self.table)) + + @pytest.mark.no_vnodes + def test_always_speculate(self): + """ If write can't succeed on full replica, it's written to the transient node instead """ + session = self.exclusive_cql_connection(self.node1) + session.execute("ALTER TABLE %s.%s WITH speculative_retry = 'ALWAYS';" % (self.keyspace, self.table)) + self.insert_row(1, 1, 1) + # Stop writes to the other full node + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + self.insert_row(1, 2, 2) + + for node in self.nodes: + assert_all(self.exclusive_cql_connection(node), + "SELECT * FROM %s.%s WHERE pk = 1" % (self.keyspace, self.table), + [[1, 1, 1], + [1, 2, 2]], + cl=ConsistencyLevel.QUORUM) + + @pytest.mark.no_vnodes + def test_custom_speculate(self): + """ If write can't succeed on full replica, it's written to the transient node instead """ + session = self.exclusive_cql_connection(self.node1) + session.execute("ALTER TABLE %s.%s WITH speculative_retry = '99.99PERCENTILE';" % (self.keyspace, self.table)) + self.insert_row(1, 1, 1) + # Stop writes to the other full node + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + self.insert_row(1, 2, 2) + + for node in self.nodes: + assert_all(self.exclusive_cql_connection(node), + "SELECT * FROM %s.%s WHERE pk = 1" % (self.keyspace, self.table), + [[1, 1, 1], + [1, 2, 2]], + cl=ConsistencyLevel.QUORUM) + +class TestMultipleTransientNodes(TransientReplicationBase): + def populate(self): + self.cluster.populate(5, tokens=self.tokens, debug=True, install_byteman=True) + + def set_nodes(self): + self.node1, self.node2, self.node3, self.node4, self.node5 = self.nodes + + def replication_factor(self): + return '5/2' + + def tokens(self): + return [0, 1, 2, 3, 4] + + @pytest.mark.no_vnodes + def test_transient_full_merge_read(self): + """ When reading, transient replica should serve a missing read """ + for node in self.nodes: + self.assert_has_no_sstables(node) + + tm = lambda n: self.table_metrics(n) + self.insert_row(1, 1, 1) + # Stop writes to the other full node + self.node2.byteman_submit(['./byteman/stop_writes.btm']) + self.insert_row(1, 2, 2) + + self.assert_local_rows(self.node1, + [[1, 1, 1], + [1, 2, 2]]) + self.assert_local_rows(self.node2, + [[1, 1, 1]]) + self.assert_local_rows(self.node3, + [[1, 1, 1], + [1, 2, 2]]) + self.assert_local_rows(self.node4, + [[1, 2, 2]]) + self.assert_local_rows(self.node5, + [[1, 2, 2]]) + # Stop reads from the node that will hold the second row + self.node1.stop() + + # Whether we're reading from the full node or from the transient node, we should get consistent results + for node in [self.node2, self.node3, self.node4, self.node5]: + assert_all(self.exclusive_cql_connection(node), + "SELECT * FROM %s.%s" % (self.keyspace, self.table), + [[1, 1, 1], + [1, 2, 2]], + cl=ConsistencyLevel.QUORUM)