Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
Expand Down Expand Up @@ -73,9 +75,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.apache.ignite.configuration.ConfigurationChangeException;
Expand Down Expand Up @@ -282,10 +281,8 @@ public DistributionZoneManager(

nodesAttributes = new ConcurrentHashMap<>();

executor = new ScheduledThreadPoolExecutor(
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
new ThreadPoolExecutor.DiscardPolicy()
executor = createZoneManagerExecutor(
new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG)
);

// It's safe to leak with partially initialised object here, because rebalanceEngine is only accessible through this or by
Expand Down Expand Up @@ -361,7 +358,7 @@ public void stop() throws Exception {
metaStorageManager.unregisterWatch(topologyWatchListener);
metaStorageManager.unregisterWatch(dataNodesWatchListener);

shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(executor, 10, SECONDS);
}

/**
Expand Down Expand Up @@ -1610,6 +1607,12 @@ static class ZoneState {
/** Schedule task for a scale down process. */
private ScheduledFuture<?> scaleDownTask;

/** The delay for the scale up task. */
private long scaleUpTaskDelay;

/** The delay for the scale down task. */
private long scaleDownTaskDelay;

/**
* Map that stores pairs revision -> {@link Augmentation} for a zone. With this map we can track which nodes
* should be added or removed in the processes of scale up or scale down. Revision helps to track visibility of the events
Expand Down Expand Up @@ -1644,56 +1647,64 @@ ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap() {
}

/**
* Reschedules existing scale up task, if it is not started yet, or schedules new one, if the current task cannot be canceled.
* Reschedules existing scale up task, if it is not started yet and the delay of this task is not immediate,
* or schedules new one, if the current task cannot be canceled.
*
* @param delay Delay to start runnable in seconds.
* @param runnable Custom logic to run.
*/
synchronized void rescheduleScaleUp(long delay, Runnable runnable) {
if (scaleUpTask != null) {
scaleUpTask.cancel(false);
}
stopScaleUp();

scaleUpTask = executor.schedule(runnable, delay, TimeUnit.SECONDS);
scaleUpTask = executor.schedule(runnable, delay, SECONDS);

scaleUpTaskDelay = delay;
}

/**
* Reschedules existing scale down task, if it is not started yet, or schedules new one, if the current task cannot be canceled.
* Reschedules existing scale down task, if it is not started yet and the delay of this task is not immediate,
* or schedules new one, if the current task cannot be canceled.
*
* @param delay Delay to start runnable in seconds.
* @param runnable Custom logic to run.
*/
synchronized void rescheduleScaleDown(long delay, Runnable runnable) {
if (scaleDownTask != null) {
scaleDownTask.cancel(false);
}
stopScaleDown();

scaleDownTask = executor.schedule(runnable, delay, TimeUnit.SECONDS);
scaleDownTask = executor.schedule(runnable, delay, SECONDS);

scaleDownTaskDelay = delay;
}

/**
* Cancels task for scale up and scale down.
* Cancels task for scale up and scale down. Used on {@link ZonesConfigurationListener#onDelete(ConfigurationNotificationEvent)}.
* Not need to check {@code scaleUpTaskDelay} and {@code scaleDownTaskDelay} because after timer stopping on zone delete event
* the data nodes value will be updated.
*/
synchronized void stopTimers() {
stopScaleUp();
if (scaleUpTask != null) {
scaleUpTask.cancel(false);
}

stopScaleDown();
if (scaleDownTask != null) {
scaleDownTask.cancel(false);
}
}

/**
* Cancels task for scale up.
* Cancels task for scale up if it is not started yet and the delay of this task is not immediate.
*/
synchronized void stopScaleUp() {
if (scaleUpTask != null) {
if (scaleUpTask != null && scaleUpTaskDelay > 0) {
scaleUpTask.cancel(false);
}
}

/**
* Cancels task for scale down.
* Cancels task for scale down if it is not started yet and the delay of this task is not immediate.
*/
synchronized void stopScaleDown() {
if (scaleDownTask != null) {
if (scaleDownTask != null && scaleDownTaskDelay > 0) {
scaleDownTask.cancel(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.dsl.CompoundCondition;
import org.apache.ignite.internal.metastorage.dsl.SimpleCondition;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.DistributionZoneNotFoundException;
Expand Down Expand Up @@ -532,4 +536,22 @@ public static Set<String> filterDataNodes(
.map(Node::nodeName)
.collect(toSet());
}

/**
* Create an executor for the zone manager.
* Used a single thread executor to avoid concurrent executing several tasks for the same zone.
* ScheduledThreadPoolExecutor guarantee that tasks scheduled for exactly the same
* execution time are enabled in first-in-first-out (FIFO) order of submission.
* // TODO: IGNITE-19783 Need to use a striped executor.
*
* @param namedThreadFactory Named thread factory.
* @return Executor.
*/
static ScheduledExecutorService createZoneManagerExecutor(NamedThreadFactory namedThreadFactory) {
return new ScheduledThreadPoolExecutor(
1,
namedThreadFactory,
new ThreadPoolExecutor.DiscardPolicy()
);
}
}
Loading