Skip to content

Commit

Permalink
retry timeouted migration tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr committed Mar 9, 2017
1 parent 1757e13 commit 463f3fe
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
16 changes: 13 additions & 3 deletions src/java/org/apache/cassandra/schema/MigrationManager.java
Expand Up @@ -43,6 +43,7 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;

public class MigrationManager
Expand Down Expand Up @@ -139,13 +140,22 @@ public static boolean isReadyForBootstrap()

public static void waitUntilReadyForBootstrap()
{
Pair<InetAddress, CountDownLatch> task;
CountDownLatch completionLatch;
while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null)
InetAddress endpoint;
EndpointState state;
while ((task = MigrationTask.getInflightTasks().poll()) != null)
{
try
{
if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS))
logger.error("Migration task failed to complete");
completionLatch = task.right;
if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) {
logger.error("Migration task failed to complete, retrying");
endpoint = task.left;
state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
scheduleSchemaPull(endpoint, state);
}

}
catch (InterruptedException e)
{
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/schema/MigrationTask.java
Expand Up @@ -36,13 +36,14 @@
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;

final class MigrationTask extends WrappedRunnable
{
private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);

private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>();
private static final ConcurrentLinkedQueue<Pair<InetAddress, CountDownLatch>> inflightTasks = new ConcurrentLinkedQueue<>();

private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);

Expand All @@ -53,7 +54,7 @@ final class MigrationTask extends WrappedRunnable
this.endpoint = endpoint;
}

static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
static ConcurrentLinkedQueue<Pair<InetAddress, CountDownLatch>> getInflightTasks()
{
return inflightTasks;
}
Expand Down Expand Up @@ -106,7 +107,7 @@ public boolean isLatencyForSnitch()

// Only save the latches if we need bootstrap or are bootstrapping
if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
inflightTasks.offer(completionLatch);
inflightTasks.offer(Pair.create(endpoint, completionLatch));

MessagingService.instance().sendRR(message, endpoint, cb);
}
Expand Down

0 comments on commit 463f3fe

Please sign in to comment.