Skip to content

Commit

Permalink
prevent multiple concurrent HH to the same target
Browse files Browse the repository at this point in the history
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3681
  • Loading branch information
jbellis committed Jan 5, 2012
1 parent eca0c48 commit 0d09395
Showing 1 changed file with 57 additions and 29 deletions.
86 changes: 57 additions & 29 deletions src/java/org/apache/cassandra/db/HintedHandOffManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,24 @@ private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer
|| (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
}

private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException
private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
{
Gossiper gossiper = Gossiper.instance;
int waited = 0;
// first, wait for schema to be gossiped.
while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
Thread.sleep(1000);
while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null)
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
waited += 1000;
if (waited > 2 * StorageService.RING_DELAY)
throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
}
waited = 0;
// then wait for the correct schema version.
Expand All @@ -217,44 +225,65 @@ private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedExcep
while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
Thread.sleep(1000);
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
waited += 1000;
if (waited > 2 * StorageService.RING_DELAY)
throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
}
logger_.debug("schema for {} matches local schema", endpoint);
return waited;
}

private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException
private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
{
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
try
{
if (hintStore.isEmpty())
return; // nothing to do, don't confuse users by logging a no-op handoff

logger_.debug("Checking remote({}) schema before delivering hints", endpoint);
int waited = waitForSchemaAgreement(endpoint);
// sleep a random amount to stagger handoff delivery from different replicas.
// (if we had to wait, then gossiper randomness took care of that for us already.)
if (waited == 0) {
// use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375
int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
Thread.sleep(sleep);
}

if (!FailureDetector.instance.isAlive(endpoint))
{
logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
return;
}
deliverHintsToEndpointInternal(endpoint);
}
finally
{
queuedDeliveries.remove(endpoint);
}
}

private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
{
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
if (hintStore.isEmpty())
return; // nothing to do, don't confuse users by logging a no-op handoff

logger_.debug("Checking remote({}) schema before delivering hints", endpoint);
int waited;
try
{
waited = waitForSchemaAgreement(endpoint);
}
catch (TimeoutException e)
{
return;
}
// sleep a random amount to stagger handoff delivery from different replicas.
// (if we had to wait, then gossiper randomness took care of that for us already.)
if (waited == 0)
{
// use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375
int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
Thread.sleep(sleep);
}

if (!FailureDetector.instance.isAlive(endpoint))
{
logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
return;
}

// 1. Get the key of the endpoint we need to handoff
// 2. For each column, deserialize the mutation and send it to the endpoint
Expand Down Expand Up @@ -341,8 +370,7 @@ private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, Di
}
}

logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s",
rowsReplayed, endpoint));
logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint));
}

/**
Expand Down

0 comments on commit 0d09395

Please sign in to comment.