diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index d34a0962b1d3..2d5fdb101a9e 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -43,6 +43,7 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; public class MigrationManager @@ -139,13 +140,22 @@ public static boolean isReadyForBootstrap() public static void waitUntilReadyForBootstrap() { + Pair task; CountDownLatch completionLatch; - while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null) + InetAddress endpoint; + EndpointState state; + while ((task = MigrationTask.getInflightTasks().poll()) != null) { try { - if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) - logger.error("Migration task failed to complete"); + completionLatch = task.right; + if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) { + logger.error("Migration task failed to complete, retrying"); + endpoint = task.left; + state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + scheduleSchemaPull(endpoint, state); + } + } catch (InterruptedException e) { diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java b/src/java/org/apache/cassandra/schema/MigrationTask.java index a785e17e46d3..b58a40f6cd31 100644 --- a/src/java/org/apache/cassandra/schema/MigrationTask.java +++ b/src/java/org/apache/cassandra/schema/MigrationTask.java @@ -36,13 +36,14 @@ import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; final class MigrationTask extends WrappedRunnable { private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); - private static final ConcurrentLinkedQueue inflightTasks = new ConcurrentLinkedQueue<>(); + private static final ConcurrentLinkedQueue> inflightTasks = new ConcurrentLinkedQueue<>(); private static final Set monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); @@ -53,7 +54,7 @@ final class MigrationTask extends WrappedRunnable this.endpoint = endpoint; } - static ConcurrentLinkedQueue getInflightTasks() + static ConcurrentLinkedQueue> getInflightTasks() { return inflightTasks; } @@ -106,7 +107,7 @@ public boolean isLatencyForSnitch() // Only save the latches if we need bootstrap or are bootstrapping if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState())) - inflightTasks.offer(completionLatch); + inflightTasks.offer(Pair.create(endpoint, completionLatch)); MessagingService.instance().sendRR(message, endpoint, cb); }