diff --git a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancer.java b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancer.java index 13c598c8e841..6319f43ed51c 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancer.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancer.java @@ -27,6 +27,9 @@ import com.hazelcast.logging.LoggingService; import com.hazelcast.spi.properties.GroupProperty; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import static com.hazelcast.internal.util.counters.MwCounter.newMwCounter; import static com.hazelcast.internal.util.counters.SwCounter.newSwCounter; import static com.hazelcast.spi.properties.GroupProperty.IO_BALANCER_INTERVAL_SECONDS; @@ -66,6 +69,7 @@ public class IOBalancer { private final LoadTracker inLoadTracker; private final LoadTracker outLoadTracker; private final String hzName; + private final BlockingQueue workQueue = new LinkedBlockingQueue(); private volatile boolean enabled; private IOBalancerThread ioBalancerThread; @@ -103,31 +107,30 @@ LoadTracker getOutLoadTracker() { return outLoadTracker; } + // just for testing + BlockingQueue getWorkQueue() { + return workQueue; + } + public void channelAdded(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) { // if not enabled, then don't schedule tasks that will not get processed. // See https://github.com/hazelcast/hazelcast/issues/11501 - if (!enabled) { - return; + if (enabled) { + workQueue.add(new AddPipelineTask(inboundPipeline, outboundPipeline)); } - - inLoadTracker.notifyPipelineAdded(inboundPipeline); - outLoadTracker.notifyPipelineAdded(outboundPipeline); } public void channelRemoved(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) { // if not enabled, then don't schedule tasks that will not get processed. // See https://github.com/hazelcast/hazelcast/issues/11501 - if (!enabled) { - return; + if (enabled) { + workQueue.add(new RemovePipelineTask(inboundPipeline, outboundPipeline)); } - - inLoadTracker.notifyPipelineRemoved(inboundPipeline); - outLoadTracker.notifyPipelineRemoved(outboundPipeline); } public void start() { if (enabled) { - ioBalancerThread = new IOBalancerThread(this, balancerIntervalSeconds, hzName, logger); + ioBalancerThread = new IOBalancerThread(this, balancerIntervalSeconds, hzName, logger, workQueue); ioBalancerThread.start(); } } @@ -138,12 +141,9 @@ public void stop() { } } - void checkOutboundPipelines() { - scheduleMigrationIfNeeded(outLoadTracker); - } - - void checkInboundPipelines() { + void rebalance() { scheduleMigrationIfNeeded(inLoadTracker); + scheduleMigrationIfNeeded(outLoadTracker); } private void scheduleMigrationIfNeeded(LoadTracker loadTracker) { @@ -215,4 +215,46 @@ private void tryMigrate(LoadImbalance loadImbalance) { public void signalMigrationComplete() { migrationCompletedCount.inc(); } + + private final class RemovePipelineTask implements Runnable { + + private final MigratablePipeline inboundPipeline; + private final MigratablePipeline outboundPipeline; + + private RemovePipelineTask(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) { + this.inboundPipeline = inboundPipeline; + this.outboundPipeline = outboundPipeline; + } + + @Override + public void run() { + if (logger.isFinestEnabled()) { + logger.finest("Removing pipelines: " + inboundPipeline + ", " + outboundPipeline); + } + + inLoadTracker.removePipeline(inboundPipeline); + outLoadTracker.removePipeline(outboundPipeline); + } + } + + private final class AddPipelineTask implements Runnable { + + private final MigratablePipeline inboundPipeline; + private final MigratablePipeline outboundPipeline; + + private AddPipelineTask(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) { + this.inboundPipeline = inboundPipeline; + this.outboundPipeline = outboundPipeline; + } + + @Override + public void run() { + if (logger.isFinestEnabled()) { + logger.finest("Adding pipelines: " + inboundPipeline + ", " + outboundPipeline); + } + + inLoadTracker.addPipeline(inboundPipeline); + outLoadTracker.addPipeline(outboundPipeline); + } + } } diff --git a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerThread.java b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerThread.java index fdff16f79da4..7dd5d54d9b58 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerThread.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerThread.java @@ -18,8 +18,12 @@ import com.hazelcast.logging.ILogger; +import java.util.concurrent.BlockingQueue; + import static com.hazelcast.util.EmptyStatement.ignore; import static com.hazelcast.util.ThreadUtil.createThreadName; +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; class IOBalancerThread extends Thread { @@ -27,14 +31,20 @@ class IOBalancerThread extends Thread { private final IOBalancer ioBalancer; private final ILogger log; - private final int balancerIntervalSeconds; + private final long balancerIntervalMs; + private final BlockingQueue workQueue; private volatile boolean shutdown; - IOBalancerThread(IOBalancer ioBalancer, int balancerIntervalSeconds, String hzName, ILogger log) { + IOBalancerThread(IOBalancer ioBalancer, + int balancerIntervalSeconds, + String hzName, + ILogger log, + BlockingQueue workQueue) { super(createThreadName(hzName, THREAD_NAME_PREFIX)); this.ioBalancer = ioBalancer; this.log = log; - this.balancerIntervalSeconds = balancerIntervalSeconds; + this.balancerIntervalMs = SECONDS.toMillis(balancerIntervalSeconds); + this.workQueue = workQueue; } void shutdown() { @@ -46,10 +56,21 @@ void shutdown() { public void run() { try { log.finest("Starting IOBalancer thread"); + long nextRebalanceMs = currentTimeMillis() + balancerIntervalMs; while (!shutdown) { - ioBalancer.checkInboundPipelines(); - ioBalancer.checkOutboundPipelines(); - SECONDS.sleep(balancerIntervalSeconds); + for (; ; ) { + long maxPollDurationMs = nextRebalanceMs - currentTimeMillis(); + Runnable task = maxPollDurationMs <= 0 ? workQueue.poll() : workQueue.poll(maxPollDurationMs, MILLISECONDS); + if (task == null) { + // we are finished with taking task from the queue, lets + // do a bit of rebalancing. + break; + } + task.run(); + } + + ioBalancer.rebalance(); + nextRebalanceMs = currentTimeMillis() + balancerIntervalMs; } } catch (InterruptedException e) { log.finest("IOBalancer thread stopped"); diff --git a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/LoadTracker.java b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/LoadTracker.java index 4db3d51bf5d5..59a8aef50fc4 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/LoadTracker.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/networking/nio/iobalancer/LoadTracker.java @@ -22,11 +22,8 @@ import com.hazelcast.util.ItemCounter; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; import static com.hazelcast.util.MapUtil.createHashMap; import static com.hazelcast.util.StringUtil.LINE_SEPARATOR; @@ -39,8 +36,6 @@ * {@link #removePipeline(MigratablePipeline)} */ class LoadTracker { - final Queue tasks = new LinkedBlockingQueue(); - private final ILogger logger; //all known IO ioThreads. we assume no. of ioThreads is constant during a lifespan of a member @@ -80,7 +75,6 @@ class LoadTracker { * @return recalculated imbalance */ LoadImbalance updateImbalance() { - handleAddedOrRemovedConnections(); clearWorkingImbalance(); updateNewWorkingImbalance(); updateNewFinalImbalance(); @@ -88,15 +82,6 @@ LoadImbalance updateImbalance() { return imbalance; } - private void handleAddedOrRemovedConnections() { - Iterator iterator = tasks.iterator(); - while (iterator.hasNext()) { - Runnable task = iterator.next(); - task.run(); - iterator.remove(); - } - } - // just for testing Set getPipelines() { return pipelines; @@ -136,13 +121,6 @@ private void updateNewFinalImbalance() { } } - void notifyPipelineAdded(MigratablePipeline pipeline) { - tasks.offer(new AddPipelineTask(pipeline)); - } - - void notifyPipelineRemoved(MigratablePipeline pipeline) { - tasks.offer(new RemovePipelineTask(pipeline)); - } private void updateNewWorkingImbalance() { for (MigratablePipeline pipeline : pipelines) { @@ -176,7 +154,7 @@ void addPipeline(MigratablePipeline pipeline) { pipelines.add(pipeline); } - private void removePipeline(MigratablePipeline pipeline) { + public void removePipeline(MigratablePipeline pipeline) { pipelines.remove(pipeline); pipelineLoadCount.remove(pipeline); lastLoadCounter.remove(pipeline); @@ -251,39 +229,5 @@ private void appendSelectorInfo( sb.append(LINE_SEPARATOR); } - class RemovePipelineTask implements Runnable { - - private final MigratablePipeline pipeline; - - RemovePipelineTask(MigratablePipeline pipeline) { - this.pipeline = pipeline; - } - - @Override - public void run() { - if (logger.isFinestEnabled()) { - logger.finest("Removing pipeline: " + pipeline); - } - removePipeline(pipeline); - } - } - - class AddPipelineTask implements Runnable { - - private final MigratablePipeline pipeline; - - AddPipelineTask(MigratablePipeline pipeline) { - this.pipeline = pipeline; - } - - @Override - public void run() { - if (logger.isFinestEnabled()) { - logger.finest("Adding pipeline: " + pipeline); - } - - addPipeline(pipeline); - } - } } diff --git a/hazelcast/src/test/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerTest.java b/hazelcast/src/test/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerTest.java index 6bbf81df22ca..6e6eb4926b8c 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/networking/nio/iobalancer/IOBalancerTest.java @@ -45,8 +45,7 @@ public void whenChannelAdded_andDisabled_thenSkipTaskCreation() { ioBalancer.channelAdded(inboundPipeline, outboundPipeline); - assertTrue(ioBalancer.getInLoadTracker().tasks.isEmpty()); - assertTrue(ioBalancer.getOutLoadTracker().tasks.isEmpty()); + assertTrue(ioBalancer.getWorkQueue().isEmpty()); } // https://github.com/hazelcast/hazelcast/issues/11501 @@ -58,7 +57,6 @@ public void whenChannelRemoved_andDisabled_thenSkipTaskCreation() { ioBalancer.channelRemoved(inboundPipeline, outboundPipelines); - assertTrue(ioBalancer.getInLoadTracker().tasks.isEmpty()); - assertTrue(ioBalancer.getOutLoadTracker().tasks.isEmpty()); + assertTrue(ioBalancer.getWorkQueue().isEmpty()); } }