From 4e1c05565aada57466b8edcdff43f1c7ebb7cd3e Mon Sep 17 00:00:00 2001 From: Ariel Weisberg Date: Fri, 22 Jun 2018 12:28:30 -0700 Subject: [PATCH] Transient Replication and Cheap Quorums, update existing tests Patch by Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-14404 Co-authored-by: Blake Eggleston Co-authored-by: Alex Petrov --- byteman/failing_repair.btm | 7 +++ byteman/read_repair/sorted_live_endpoints.btm | 13 ++---- byteman/read_repair/stop_data_reads.btm | 2 +- byteman/read_repair/stop_digest_reads.btm | 2 +- byteman/slow_writes.btm | 7 +++ byteman/stop_reads.btm | 8 ++++ byteman/stop_rr_writes.btm | 8 ++++ byteman/stop_writes.btm | 8 ++++ byteman/throw_on_digest.btm | 7 +++ read_repair_test.py | 44 +++++++++++++++++-- repair_tests/repair_test.py | 3 +- 11 files changed, 93 insertions(+), 16 deletions(-) create mode 100644 byteman/failing_repair.btm create mode 100644 byteman/slow_writes.btm create mode 100644 byteman/stop_reads.btm create mode 100644 byteman/stop_rr_writes.btm create mode 100644 byteman/stop_writes.btm create mode 100644 byteman/throw_on_digest.btm diff --git a/byteman/failing_repair.btm b/byteman/failing_repair.btm new file mode 100644 index 0000000000..ea82888019 --- /dev/null +++ b/byteman/failing_repair.btm @@ -0,0 +1,7 @@ +RULE fail repairs +CLASS org.apache.cassandra.repair.RepairMessageVerbHandler +METHOD doVerb +AT ENTRY +IF true +DO throw new RuntimeException("Repair failed"); +ENDRULE diff --git a/byteman/read_repair/sorted_live_endpoints.btm b/byteman/read_repair/sorted_live_endpoints.btm index 221e95873e..bfcfb1a9ad 100644 --- a/byteman/read_repair/sorted_live_endpoints.btm +++ b/byteman/read_repair/sorted_live_endpoints.btm @@ -1,15 +1,8 @@ RULE sorted live endpoints -CLASS org.apache.cassandra.service.StorageProxy -METHOD getLiveSortedEndpoints +CLASS org.apache.cassandra.locator.SimpleSnitch +METHOD sortedByProximity AT ENTRY -BIND ep1 = org.apache.cassandra.locator.InetAddressAndPort.getByName("127.0.0.1"); - ep2 = org.apache.cassandra.locator.InetAddressAndPort.getByName("127.0.0.2"); - ep3 = org.apache.cassandra.locator.InetAddressAndPort.getByName("127.0.0.3"); - eps = new java.util.ArrayList(); IF true DO - eps.add(ep1); - eps.add(ep2); - eps.add(ep3); - return eps; +return $unsortedAddress.sorted(java.util.Comparator.naturalOrder()); ENDRULE \ No newline at end of file diff --git a/byteman/read_repair/stop_data_reads.btm b/byteman/read_repair/stop_data_reads.btm index 9506abad50..905a110c40 100644 --- a/byteman/read_repair/stop_data_reads.btm +++ b/byteman/read_repair/stop_data_reads.btm @@ -4,7 +4,7 @@ CLASS org.apache.cassandra.db.ReadCommandVerbHandler METHOD doVerb # wait until command is declared locally. because generics AFTER WRITE $command -# bail out if it's not a digest request +# bail out if it's a data request IF NOT $command.isDigestQuery() DO return; ENDRULE diff --git a/byteman/read_repair/stop_digest_reads.btm b/byteman/read_repair/stop_digest_reads.btm index 92c54f610f..adb9b310fe 100644 --- a/byteman/read_repair/stop_digest_reads.btm +++ b/byteman/read_repair/stop_digest_reads.btm @@ -4,7 +4,7 @@ CLASS org.apache.cassandra.db.ReadCommandVerbHandler METHOD doVerb # wait until command is declared locally. because generics AFTER WRITE $command -# bail out if it's not a digest request +# bail out if it's a digest request IF $command.isDigestQuery() DO return; ENDRULE diff --git a/byteman/slow_writes.btm b/byteman/slow_writes.btm new file mode 100644 index 0000000000..a82dd0a2c2 --- /dev/null +++ b/byteman/slow_writes.btm @@ -0,0 +1,7 @@ +RULE slow mutations +CLASS org.apache.cassandra.db.MutationVerbHandler +METHOD doVerb +AT ENTRY +IF true +DO Thread.sleep(60000); +ENDRULE \ No newline at end of file diff --git a/byteman/stop_reads.btm b/byteman/stop_reads.btm new file mode 100644 index 0000000000..27beb7ca96 --- /dev/null +++ b/byteman/stop_reads.btm @@ -0,0 +1,8 @@ +# block mutation verb +RULE disable mutations +CLASS org.apache.cassandra.db.ReadCommandVerbHandler +METHOD doVerb +AT ENTRY +IF true +DO return; +ENDRULE \ No newline at end of file diff --git a/byteman/stop_rr_writes.btm b/byteman/stop_rr_writes.btm new file mode 100644 index 0000000000..267980aeb4 --- /dev/null +++ b/byteman/stop_rr_writes.btm @@ -0,0 +1,8 @@ +# block mutation verb +RULE disable mutations +CLASS org.apache.cassandra.db.ReadRepairVerbHandler +METHOD doVerb +AT ENTRY +IF true +DO return; +ENDRULE \ No newline at end of file diff --git a/byteman/stop_writes.btm b/byteman/stop_writes.btm new file mode 100644 index 0000000000..fd55b33c11 --- /dev/null +++ b/byteman/stop_writes.btm @@ -0,0 +1,8 @@ +#block mutation verb +RULE disable mutations +CLASS org.apache.cassandra.db.MutationVerbHandler +METHOD doVerb +AT ENTRY +IF true +DO return; +ENDRULE diff --git a/byteman/throw_on_digest.btm b/byteman/throw_on_digest.btm new file mode 100644 index 0000000000..086da8d549 --- /dev/null +++ b/byteman/throw_on_digest.btm @@ -0,0 +1,7 @@ +RULE block digest +CLASS org.apache.cassandra.db.ReadResponse +METHOD createDigestResponse +AT ENTRY +IF true +DO throw new RuntimeException("Digest response throws"); +ENDRULE \ No newline at end of file diff --git a/read_repair_test.py b/read_repair_test.py index ad60238f6e..ae017635b9 100644 --- a/read_repair_test.py +++ b/read_repair_test.py @@ -1,14 +1,16 @@ from contextlib import contextmanager +import glob import os import time import pytest import logging +import subprocess import typing from cassandra import ConsistencyLevel, WriteTimeout, ReadTimeout from cassandra.cluster import Session from cassandra.query import SimpleStatement -from ccmlib.node import Node +from ccmlib.node import Node, handle_external_tool_process from pytest import raises from dtest import Tester, create_ks @@ -20,6 +22,37 @@ since = pytest.mark.since logger = logging.getLogger(__name__) +def byteman_validate(node, script, verbose=False, opts=None): + opts = opts or [] + cdir = node.get_install_dir() + byteman_cmd = [] + byteman_cmd.append(os.path.join(os.environ['JAVA_HOME'], + 'bin', + 'java')) + byteman_cmd.append('-cp') + jars = [ + glob.glob(os.path.join(cdir, 'build', 'lib', 'jars', 'byteman-[0-9]*.jar'))[0], + os.path.join(cdir, 'build', '*'), + ] + byteman_cmd.append(':'.join(jars)) + byteman_cmd.append('org.jboss.byteman.check.TestScript') + byteman_cmd.append('-p') + byteman_cmd.append(node.byteman_port) + if verbose and '-v' not in opts: + byteman_cmd.append('-v') + byteman_cmd.append(script) + # process = subprocess.Popen(byteman_cmd) + # out, err = process.communicate() + out = subprocess.check_output(byteman_cmd) + if (out is not None) and isinstance(out, bytes): + out = out.decode() + + has_errors = 'ERROR' in out + if verbose and not has_errors: + print (out) + + assert not has_errors, "byteman script didn't compile\n" + out + class TestReadRepair(Tester): @@ -382,8 +415,9 @@ def fixture_set_cluster_settings(self, fixture_dtest_setup): 'dynamic_snitch': False, 'write_request_timeout_in_ms': 500, 'read_request_timeout_in_ms': 500}) - cluster.populate(3, install_byteman=True, debug=True).start(wait_for_binary_proto=True, - jvm_args=['-XX:-PerfDisableSharedMem']) + cluster.populate(3, install_byteman=True, debug=True) + byteman_validate(cluster.nodelist()[0], './byteman/read_repair/sorted_live_endpoints.btm', verbose=True) + cluster.start(wait_for_binary_proto=True, jvm_args=['-XX:-PerfDisableSharedMem']) session = fixture_dtest_setup.patient_exclusive_cql_connection(cluster.nodelist()[0], timeout=2) session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}") @@ -623,6 +657,10 @@ def test_quorum_requirement_on_speculated_read(self): @contextmanager def _byteman_cycle(nodes, scripts): script_path = lambda name: './byteman/read_repair/' + name + '.btm' + + for script in scripts: + byteman_validate(nodes[0], script_path(script)) + for node in nodes: assert isinstance(node, Node) for name in scripts: diff --git a/repair_tests/repair_test.py b/repair_tests/repair_test.py index 3a9f9a7b9b..f5bfe28110 100644 --- a/repair_tests/repair_test.py +++ b/repair_tests/repair_test.py @@ -1166,7 +1166,8 @@ def _test_failure_during_repair(self, phase, initiator=False): "Session completed with the following error", "Repair session .* for range .* failed with error", "Sync failed between .* and .*", - "failed to send a stream message/file to peer" + "failed to send a stream message/file to peer", + "failed to send a stream message/data to peer" ] # Disable hinted handoff and set batch commit log so this doesn't