Skip to content

Commit

Permalink
Transient Replication and Cheap Quorums, update existing tests
Browse files Browse the repository at this point in the history
Patch by Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-14404

Co-authored-by: Blake Eggleston <bdeggleston@gmail.com>
Co-authored-by: Alex Petrov <oleksandr.petrov@gmail.com>
  • Loading branch information
3 people committed Sep 1, 2018
1 parent 3d760e6 commit 4e1c055
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 16 deletions.
7 changes: 7 additions & 0 deletions byteman/failing_repair.btm
@@ -0,0 +1,7 @@
RULE fail repairs
CLASS org.apache.cassandra.repair.RepairMessageVerbHandler
METHOD doVerb
AT ENTRY
IF true
DO throw new RuntimeException("Repair failed");
ENDRULE
13 changes: 3 additions & 10 deletions byteman/read_repair/sorted_live_endpoints.btm
@@ -1,15 +1,8 @@
RULE sorted live endpoints
CLASS org.apache.cassandra.service.StorageProxy
METHOD getLiveSortedEndpoints
CLASS org.apache.cassandra.locator.SimpleSnitch
METHOD sortedByProximity
AT ENTRY
BIND ep1 = org.apache.cassandra.locator.InetAddressAndPort.getByName("127.0.0.1");
ep2 = org.apache.cassandra.locator.InetAddressAndPort.getByName("127.0.0.2");
ep3 = org.apache.cassandra.locator.InetAddressAndPort.getByName("127.0.0.3");
eps = new java.util.ArrayList();
IF true
DO
eps.add(ep1);
eps.add(ep2);
eps.add(ep3);
return eps;
return $unsortedAddress.sorted(java.util.Comparator.naturalOrder());
ENDRULE
2 changes: 1 addition & 1 deletion byteman/read_repair/stop_data_reads.btm
Expand Up @@ -4,7 +4,7 @@ CLASS org.apache.cassandra.db.ReadCommandVerbHandler
METHOD doVerb
# wait until command is declared locally. because generics
AFTER WRITE $command
# bail out if it's not a digest request
# bail out if it's a data request
IF NOT $command.isDigestQuery()
DO return;
ENDRULE
2 changes: 1 addition & 1 deletion byteman/read_repair/stop_digest_reads.btm
Expand Up @@ -4,7 +4,7 @@ CLASS org.apache.cassandra.db.ReadCommandVerbHandler
METHOD doVerb
# wait until command is declared locally. because generics
AFTER WRITE $command
# bail out if it's not a digest request
# bail out if it's a digest request
IF $command.isDigestQuery()
DO return;
ENDRULE
7 changes: 7 additions & 0 deletions byteman/slow_writes.btm
@@ -0,0 +1,7 @@
RULE slow mutations
CLASS org.apache.cassandra.db.MutationVerbHandler
METHOD doVerb
AT ENTRY
IF true
DO Thread.sleep(60000);
ENDRULE
8 changes: 8 additions & 0 deletions byteman/stop_reads.btm
@@ -0,0 +1,8 @@
# block mutation verb
RULE disable mutations
CLASS org.apache.cassandra.db.ReadCommandVerbHandler
METHOD doVerb
AT ENTRY
IF true
DO return;
ENDRULE
8 changes: 8 additions & 0 deletions byteman/stop_rr_writes.btm
@@ -0,0 +1,8 @@
# block mutation verb
RULE disable mutations
CLASS org.apache.cassandra.db.ReadRepairVerbHandler
METHOD doVerb
AT ENTRY
IF true
DO return;
ENDRULE
8 changes: 8 additions & 0 deletions byteman/stop_writes.btm
@@ -0,0 +1,8 @@
#block mutation verb
RULE disable mutations
CLASS org.apache.cassandra.db.MutationVerbHandler
METHOD doVerb
AT ENTRY
IF true
DO return;
ENDRULE
7 changes: 7 additions & 0 deletions byteman/throw_on_digest.btm
@@ -0,0 +1,7 @@
RULE block digest
CLASS org.apache.cassandra.db.ReadResponse
METHOD createDigestResponse
AT ENTRY
IF true
DO throw new RuntimeException("Digest response throws");
ENDRULE
44 changes: 41 additions & 3 deletions read_repair_test.py
@@ -1,14 +1,16 @@
from contextlib import contextmanager
import glob
import os
import time
import pytest
import logging
import subprocess
import typing

from cassandra import ConsistencyLevel, WriteTimeout, ReadTimeout
from cassandra.cluster import Session
from cassandra.query import SimpleStatement
from ccmlib.node import Node
from ccmlib.node import Node, handle_external_tool_process
from pytest import raises

from dtest import Tester, create_ks
Expand All @@ -20,6 +22,37 @@
since = pytest.mark.since
logger = logging.getLogger(__name__)

def byteman_validate(node, script, verbose=False, opts=None):
opts = opts or []
cdir = node.get_install_dir()
byteman_cmd = []
byteman_cmd.append(os.path.join(os.environ['JAVA_HOME'],
'bin',
'java'))
byteman_cmd.append('-cp')
jars = [
glob.glob(os.path.join(cdir, 'build', 'lib', 'jars', 'byteman-[0-9]*.jar'))[0],
os.path.join(cdir, 'build', '*'),
]
byteman_cmd.append(':'.join(jars))
byteman_cmd.append('org.jboss.byteman.check.TestScript')
byteman_cmd.append('-p')
byteman_cmd.append(node.byteman_port)
if verbose and '-v' not in opts:
byteman_cmd.append('-v')
byteman_cmd.append(script)
# process = subprocess.Popen(byteman_cmd)
# out, err = process.communicate()
out = subprocess.check_output(byteman_cmd)
if (out is not None) and isinstance(out, bytes):
out = out.decode()

has_errors = 'ERROR' in out
if verbose and not has_errors:
print (out)

assert not has_errors, "byteman script didn't compile\n" + out


class TestReadRepair(Tester):

Expand Down Expand Up @@ -382,8 +415,9 @@ def fixture_set_cluster_settings(self, fixture_dtest_setup):
'dynamic_snitch': False,
'write_request_timeout_in_ms': 500,
'read_request_timeout_in_ms': 500})
cluster.populate(3, install_byteman=True, debug=True).start(wait_for_binary_proto=True,
jvm_args=['-XX:-PerfDisableSharedMem'])
cluster.populate(3, install_byteman=True, debug=True)
byteman_validate(cluster.nodelist()[0], './byteman/read_repair/sorted_live_endpoints.btm', verbose=True)
cluster.start(wait_for_binary_proto=True, jvm_args=['-XX:-PerfDisableSharedMem'])
session = fixture_dtest_setup.patient_exclusive_cql_connection(cluster.nodelist()[0], timeout=2)

session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}")
Expand Down Expand Up @@ -623,6 +657,10 @@ def test_quorum_requirement_on_speculated_read(self):
@contextmanager
def _byteman_cycle(nodes, scripts):
script_path = lambda name: './byteman/read_repair/' + name + '.btm'

for script in scripts:
byteman_validate(nodes[0], script_path(script))

for node in nodes:
assert isinstance(node, Node)
for name in scripts:
Expand Down
3 changes: 2 additions & 1 deletion repair_tests/repair_test.py
Expand Up @@ -1166,7 +1166,8 @@ def _test_failure_during_repair(self, phase, initiator=False):
"Session completed with the following error",
"Repair session .* for range .* failed with error",
"Sync failed between .* and .*",
"failed to send a stream message/file to peer"
"failed to send a stream message/file to peer",
"failed to send a stream message/data to peer"
]

# Disable hinted handoff and set batch commit log so this doesn't
Expand Down

0 comments on commit 4e1c055

Please sign in to comment.