Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…c appears to be broken
  • Loading branch information
rantav committed Apr 8, 2010
1 parent 0d48ab8 commit 4321125
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 8 deletions.
46 changes: 39 additions & 7 deletions src/main/java/me/prettyprint/cassandra/service/KeyspaceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public void updateKnownHosts() throws TException {
*/
private void skipToNextHost(boolean isRetrySameHostAgain,
boolean invalidateAllConnectionsToCurrentHost)
throws IllegalStateException, PoolExhaustedException, Exception {
throws SkipHostException {
log.info("Skipping to next host. Current host is: {}", client.getUrl());
invalidate();
if (invalidateAllConnectionsToCurrentHost) {
Expand All @@ -617,12 +617,20 @@ private void skipToNextHost(boolean isRetrySameHostAgain,
getNextHost(client.getUrl(), client.getIp());
if (nextHost == null) {
log.error("Unable to find next host to skip to at {}", toString());
throw new TException("Unable to failover to next host");
throw new SkipHostException("Unable to failover to next host");
}


// assume they use the same port
client = clientPools.borrowClient(nextHost, client.getPort());
// assume all hosts in the ring use the same port (cassandra's API only provides IPs, not ports)
try {
client = clientPools.borrowClient(nextHost, client.getPort());
} catch (IllegalStateException e) {
throw new SkipHostException(e);
} catch (PoolExhaustedException e) {
throw new SkipHostException(e);
} catch (Exception e) {
throw new SkipHostException(e);
}
cassandra = client.getCassandra();
monitor.incCounter(Counter.SKIP_HOST_SUCCESS);
log.info("Skipped host. New host is: {}", client.getUrl());
Expand Down Expand Up @@ -679,9 +687,14 @@ private void operateWithFailover(Operation<?> op) throws InvalidRequestException
if (!isFirst) {
--retries;
}
boolean success = operateWithFailoverSingleIteration(op, stopWatch, retries, isFirst);
if (success) {
return;
try {
boolean success = operateWithFailoverSingleIteration(op, stopWatch, retries, isFirst);
if (success) {
return;
}
} catch (SkipHostException e) {
log.warn("Skip-host failed ", e);
// continue the loop to the next host.
}
isFirst = false;
}
Expand Down Expand Up @@ -857,4 +870,23 @@ public String toString() {
b.append(">");
return super.toString();
}

/**
* An internal implementation excption used to signal that the skip-host action has failed.
* @author Ran Tavory (ran@outbain.com)
*
*/
private static class SkipHostException extends Exception {

private static final long serialVersionUID = -6099636388926769255L;

public SkipHostException(String msg) {
super(msg);
}

public SkipHostException(Throwable t) {
super(t);
}

}
}
75 changes: 74 additions & 1 deletion src/test/java/me/prettyprint/cassandra/service/KeyspaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,14 @@ public void testFailover() throws IllegalStateException, PoolExhaustedException,
}
}

/**
* Test case for bug http://github.com/rantav/hector/issues/closed#issue/8
* @throws IllegalStateException
* @throws PoolExhaustedException
* @throws Exception
*/
@Test
public void testFailover2() throws IllegalStateException, PoolExhaustedException, Exception {
public void testFailoverBug8() throws IllegalStateException, PoolExhaustedException, Exception {
CassandraClient h1client = mock(CassandraClient.class);
CassandraClient h2client = mock(CassandraClient.class);
Cassandra.Client h1cassandra = mock(Cassandra.Client.class);
Expand Down Expand Up @@ -745,6 +751,69 @@ public void testFailover2() throws IllegalStateException, PoolExhaustedException
when(clientPools.borrowClient("h1", 111)).thenReturn(h1client);
when(clientPools.borrowClient("h2", 111)).thenReturn(h2client);

FailoverPolicy failoverPolicy = FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
Keyspace ks = new KeyspaceImpl(h1client, keyspaceName, keyspaceDesc, consistencyLevel,
failoverPolicy, clientPools, monitor);

// fail the call, use a transport exception.
// This simulates a host we can connect to, but cannot perform operations on. The host is semi-
// down
doThrow(new TTransportException()).when(h1cassandra).insert(anyString(), anyString(),
(ColumnPath) anyObject(), (byte[]) anyObject(), anyLong(), anyInt());

ks.insert("key", cp, bytes("value"));

// Make sure the client is invalidated
verify(clientPools, times(2)).invalidateClient(h1client);

// make sure the next call is to h2
verify(h2client).getCassandra();

// Now run another insert on the same keyspace to make sure it can handle next writes.
ks.insert("key2", cp, bytes("value2"));
}

/**
* A test case for bug 14 http://github.com/rantav/hector/issues#issue/14
* A host goes down and can't even reconnect to it, so failover fails to skip to the next host.
* @throws Exception
* @throws PoolExhaustedException
* @throws IllegalStateException
*/
@Test
public void testFailoverBug14() throws IllegalStateException, PoolExhaustedException, Exception {
CassandraClient h1client = mock(CassandraClient.class);
CassandraClient h2client = mock(CassandraClient.class);
Cassandra.Client h1cassandra = mock(Cassandra.Client.class);
Cassandra.Client h2cassandra = mock(Cassandra.Client.class);
String keyspaceName = "Keyspace1";
Map<String, Map<String, String>> keyspaceDesc = new HashMap<String, Map<String, String>>();
Map<String, String> keyspace1Desc = new HashMap<String, String>();
keyspace1Desc.put(Keyspace.CF_TYPE, Keyspace.CF_TYPE_STANDARD);
keyspaceDesc.put("Standard1", keyspace1Desc);
int consistencyLevel = 1;
ColumnPath cp = new ColumnPath("Standard1", null, bytes("testFailover"));
CassandraClientPool clientPools = mock(CassandraClientPool.class);
CassandraClientMonitor monitor = mock(CassandraClientMonitor.class);

// The token map represents the list of available servers.
Map<String, String> tokenMap = new HashMap<String, String>();
tokenMap.put("t1", "h1");
tokenMap.put("t2", "h2");

when(h1client.getCassandra()).thenReturn(h1cassandra);
when(h2client.getCassandra()).thenReturn(h2cassandra);
when(h1client.getTokenMap(anyBoolean())).thenReturn(tokenMap);
when(h2client.getTokenMap(anyBoolean())).thenReturn(tokenMap);
when(h1client.getPort()).thenReturn(2);
when(h2client.getPort()).thenReturn(2);
when(h1client.getUrl()).thenReturn("h1");
when(h1client.getIp()).thenReturn("ip1");
when(h2client.getUrl()).thenReturn("h2");
when(h2client.getIp()).thenReturn("ip2");
when(clientPools.borrowClient("h1", 2)).thenReturn(h1client);
when(clientPools.borrowClient("h2", 2)).thenReturn(h2client);

FailoverPolicy failoverPolicy = FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
Keyspace ks = new KeyspaceImpl(h1client, keyspaceName, keyspaceDesc, consistencyLevel,
failoverPolicy, clientPools, monitor);
Expand All @@ -753,6 +822,10 @@ public void testFailover2() throws IllegalStateException, PoolExhaustedException
doThrow(new TTransportException()).when(h1cassandra).insert(anyString(), anyString(),
(ColumnPath) anyObject(), (byte[]) anyObject(), anyLong(), anyInt());

// And also fail the call to borrowClient when trying to borrow from this host again.
// This is actually simulation the host down permanently (well, until the test ends at least...)
doThrow(new TException()).when(clientPools).borrowClient("h1", 2);

ks.insert("key", cp, bytes("value"));

// Make sure the client is invalidated
Expand Down

0 comments on commit 4321125

Please sign in to comment.