Skip to content

Commit

Permalink
Use immutable map for Read/WriteFailure exception and fix flaky test
Browse files Browse the repository at this point in the history
patch by Kevin Gallardo; reviewed by Benjamin Lerer for CASSANDRA-15543
  • Loading branch information
Kevin Gallardo authored and blerer committed Mar 26, 2020
1 parent b9c7a90 commit b617690
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.Map;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.locator.InetAddressAndPort;

Expand All @@ -28,7 +30,7 @@ public class ReadFailureException extends RequestFailureException

public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, failureReasonByEndpoint);
super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
this.dataPresent = dataPresent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.cassandra.exceptions;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

Expand All @@ -33,21 +32,19 @@ public class RequestFailureException extends RequestExecutionException

protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
super(code, String.format("Operation failed - received %d responses and %d failures: %s",
received,
failureReasonByEndpoint.size(),
buildFailureString(failureReasonByEndpoint)));
super(code, buildErrorMessage(received, failureReasonByEndpoint));
this.consistency = consistency;
this.received = received;
this.blockFor = blockFor;
this.failureReasonByEndpoint = failureReasonByEndpoint;
}

// It is possible for the passed in failureReasonByEndpoint map
// to have new entries added after this exception is constructed
// (e.g. a delayed failure response from a replica). So to be safe
// we make a copy of the map at this point to ensure it will not be
// modified any further. Otherwise, there could be implications when
// we encode this map for transport.
this.failureReasonByEndpoint = new HashMap<>(failureReasonByEndpoint);
private static String buildErrorMessage(int received, Map<InetAddressAndPort, RequestFailureReason> failures)
{
return String.format("Operation failed - received %d responses and %d failures: %s",
received,
failures.size(),
buildFailureString(failures));
}

private static String buildFailureString(Map<InetAddressAndPort, RequestFailureReason> failures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.Map;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.locator.InetAddressAndPort;
Expand All @@ -29,7 +31,7 @@ public class WriteFailureException extends RequestFailureException

public WriteFailureException(ConsistencyLevel consistency, int received, int blockFor, WriteType writeType, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
{
super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, failureReasonByEndpoint);
super(ExceptionCode.WRITE_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
this.writeType = writeType;
}
}
13 changes: 10 additions & 3 deletions src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.cassandra.transport.messages;

import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.CodecException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,15 +90,21 @@ public ErrorMessage decode(ByteBuf body, ProtocolVersion version)
// The number of failures is also present in protocol v5, but used instead to specify the size of the failure map
int failure = body.readInt();

Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = new ConcurrentHashMap<>();
Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
{
ImmutableMap.Builder<InetAddressAndPort, RequestFailureReason> builder = ImmutableMap.builderWithExpectedSize(failure);
for (int i = 0; i < failure; i++)
{
InetAddress endpoint = CBUtil.readInetAddr(body);
RequestFailureReason failureReason = RequestFailureReason.fromCode(body.readUnsignedShort());
failureReasonByEndpoint.put(InetAddressAndPort.getByAddress(endpoint), failureReason);
builder.put(InetAddressAndPort.getByAddress(endpoint), failureReason);
}
failureReasonByEndpoint = builder.build();
}
else
{
failureReasonByEndpoint = Collections.emptyMap();
}

if (code == ExceptionCode.WRITE_FAILURE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
import static org.junit.Assert.fail;

public class SimpleReadWriteTest extends DistributedTestBase
{
Expand Down Expand Up @@ -144,7 +145,7 @@ public void readRepairTimeoutTest() throws Throwable
try
{
cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
Assert.fail("Read timeout expected but it did not occur");
fail("Read timeout expected but it did not occur");
}
catch (Exception ex)
{
Expand Down Expand Up @@ -228,19 +229,24 @@ public void writeWithSchemaDisagreement() throws Throwable
// Introduce schema disagreement
cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);

Exception thrown = null;
try
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
ConsistencyLevel.QUORUM);
fail("Should have failed because of schema disagreement.");
}
catch (RuntimeException e)
catch (Exception e)
{
thrown = e;
}
Assert.assertTrue(e instanceof RuntimeException);
RuntimeException re = ((RuntimeException) e);
// for some reason, we get weird errors when trying to check class directly
// I suppose it has to do with some classloader manipulation going on
Assert.assertTrue(re.getCause().getClass().toString().contains("WriteFailureException"));
// we may see 1 or 2 failures in here, because of the fail-fast behavior of AbstractWriteResponseHandler
Assert.assertTrue(re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")
|| re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));

Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
}
}
}

Expand All @@ -258,20 +264,23 @@ public void readWithSchemaDisagreement() throws Throwable
// Introduce schema disagreement
cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);

Exception thrown = null;
try
{
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
ConsistencyLevel.ALL),
row(1, 1, 1, null));
cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL);
fail("Should have failed because of schema disagreement.");
}
catch (Exception e)
{
thrown = e;
Assert.assertTrue(e instanceof RuntimeException);
RuntimeException re = ((RuntimeException) e);
// for some reason, we get weird errors when trying to check class directly
// I suppose it has to do with some classloader manipulation going on
Assert.assertTrue(re.getCause().getClass().toString().contains("ReadFailureException"));
// we may see 1 or 2 failures in here, because of the fail-fast behavior of ReadCallback
Assert.assertTrue(re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2")
|| re.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
}

Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
}
}

Expand Down

0 comments on commit b617690

Please sign in to comment.