Skip to content

Commit

Permalink
Merge pull request #13761 from pveentjer/v3.10.6/fix/iobalancer-task-…
Browse files Browse the repository at this point in the history
…retention

[BACKPORT] Fixes retention issue on IOBalancer.
  • Loading branch information
pveentjer committed Sep 17, 2018
2 parents 8400dd0 + 81df7cb commit 823ee87
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 83 deletions.
Expand Up @@ -27,6 +27,9 @@
import com.hazelcast.logging.LoggingService;
import com.hazelcast.spi.properties.GroupProperty;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static com.hazelcast.internal.util.counters.MwCounter.newMwCounter;
import static com.hazelcast.internal.util.counters.SwCounter.newSwCounter;
import static com.hazelcast.spi.properties.GroupProperty.IO_BALANCER_INTERVAL_SECONDS;
Expand Down Expand Up @@ -66,6 +69,7 @@ public class IOBalancer {
private final LoadTracker inLoadTracker;
private final LoadTracker outLoadTracker;
private final String hzName;
private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
private volatile boolean enabled;
private IOBalancerThread ioBalancerThread;

Expand Down Expand Up @@ -103,31 +107,30 @@ LoadTracker getOutLoadTracker() {
return outLoadTracker;
}

// just for testing
BlockingQueue<Runnable> getWorkQueue() {
return workQueue;
}

public void channelAdded(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
// if not enabled, then don't schedule tasks that will not get processed.
// See https://github.com/hazelcast/hazelcast/issues/11501
if (!enabled) {
return;
if (enabled) {
workQueue.add(new AddPipelineTask(inboundPipeline, outboundPipeline));
}

inLoadTracker.notifyPipelineAdded(inboundPipeline);
outLoadTracker.notifyPipelineAdded(outboundPipeline);
}

public void channelRemoved(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
// if not enabled, then don't schedule tasks that will not get processed.
// See https://github.com/hazelcast/hazelcast/issues/11501
if (!enabled) {
return;
if (enabled) {
workQueue.add(new RemovePipelineTask(inboundPipeline, outboundPipeline));
}

inLoadTracker.notifyPipelineRemoved(inboundPipeline);
outLoadTracker.notifyPipelineRemoved(outboundPipeline);
}

public void start() {
if (enabled) {
ioBalancerThread = new IOBalancerThread(this, balancerIntervalSeconds, hzName, logger);
ioBalancerThread = new IOBalancerThread(this, balancerIntervalSeconds, hzName, logger, workQueue);
ioBalancerThread.start();
}
}
Expand All @@ -138,12 +141,9 @@ public void stop() {
}
}

void checkOutboundPipelines() {
scheduleMigrationIfNeeded(outLoadTracker);
}

void checkInboundPipelines() {
void rebalance() {
scheduleMigrationIfNeeded(inLoadTracker);
scheduleMigrationIfNeeded(outLoadTracker);
}

private void scheduleMigrationIfNeeded(LoadTracker loadTracker) {
Expand Down Expand Up @@ -215,4 +215,46 @@ private void tryMigrate(LoadImbalance loadImbalance) {
public void signalMigrationComplete() {
migrationCompletedCount.inc();
}

private final class RemovePipelineTask implements Runnable {

private final MigratablePipeline inboundPipeline;
private final MigratablePipeline outboundPipeline;

private RemovePipelineTask(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
this.inboundPipeline = inboundPipeline;
this.outboundPipeline = outboundPipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Removing pipelines: " + inboundPipeline + ", " + outboundPipeline);
}

inLoadTracker.removePipeline(inboundPipeline);
outLoadTracker.removePipeline(outboundPipeline);
}
}

private final class AddPipelineTask implements Runnable {

private final MigratablePipeline inboundPipeline;
private final MigratablePipeline outboundPipeline;

private AddPipelineTask(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
this.inboundPipeline = inboundPipeline;
this.outboundPipeline = outboundPipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Adding pipelines: " + inboundPipeline + ", " + outboundPipeline);
}

inLoadTracker.addPipeline(inboundPipeline);
outLoadTracker.addPipeline(outboundPipeline);
}
}
}
Expand Up @@ -18,23 +18,33 @@

import com.hazelcast.logging.ILogger;

import java.util.concurrent.BlockingQueue;

import static com.hazelcast.util.EmptyStatement.ignore;
import static com.hazelcast.util.ThreadUtil.createThreadName;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

class IOBalancerThread extends Thread {
private static final String THREAD_NAME_PREFIX = "IOBalancerThread";

private final IOBalancer ioBalancer;
private final ILogger log;
private final int balancerIntervalSeconds;
private final long balancerIntervalMs;
private final BlockingQueue<Runnable> workQueue;
private volatile boolean shutdown;

IOBalancerThread(IOBalancer ioBalancer, int balancerIntervalSeconds, String hzName, ILogger log) {
IOBalancerThread(IOBalancer ioBalancer,
int balancerIntervalSeconds,
String hzName,
ILogger log,
BlockingQueue<Runnable> workQueue) {
super(createThreadName(hzName, THREAD_NAME_PREFIX));
this.ioBalancer = ioBalancer;
this.log = log;
this.balancerIntervalSeconds = balancerIntervalSeconds;
this.balancerIntervalMs = SECONDS.toMillis(balancerIntervalSeconds);
this.workQueue = workQueue;
}

void shutdown() {
Expand All @@ -46,10 +56,21 @@ void shutdown() {
public void run() {
try {
log.finest("Starting IOBalancer thread");
long nextRebalanceMs = currentTimeMillis() + balancerIntervalMs;
while (!shutdown) {
ioBalancer.checkInboundPipelines();
ioBalancer.checkOutboundPipelines();
SECONDS.sleep(balancerIntervalSeconds);
for (; ; ) {
long maxPollDurationMs = nextRebalanceMs - currentTimeMillis();
Runnable task = maxPollDurationMs <= 0 ? workQueue.poll() : workQueue.poll(maxPollDurationMs, MILLISECONDS);
if (task == null) {
// we are finished with taking task from the queue, lets
// do a bit of rebalancing.
break;
}
task.run();
}

ioBalancer.rebalance();
nextRebalanceMs = currentTimeMillis() + balancerIntervalMs;
}
} catch (InterruptedException e) {
log.finest("IOBalancer thread stopped");
Expand Down
Expand Up @@ -22,11 +22,8 @@
import com.hazelcast.util.ItemCounter;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

import static com.hazelcast.util.MapUtil.createHashMap;
import static com.hazelcast.util.StringUtil.LINE_SEPARATOR;
Expand All @@ -39,8 +36,6 @@
* {@link #removePipeline(MigratablePipeline)}
*/
class LoadTracker {
final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();

private final ILogger logger;

//all known IO ioThreads. we assume no. of ioThreads is constant during a lifespan of a member
Expand Down Expand Up @@ -80,23 +75,13 @@ class LoadTracker {
* @return recalculated imbalance
*/
LoadImbalance updateImbalance() {
handleAddedOrRemovedConnections();
clearWorkingImbalance();
updateNewWorkingImbalance();
updateNewFinalImbalance();
printDebugTable();
return imbalance;
}

private void handleAddedOrRemovedConnections() {
Iterator<Runnable> iterator = tasks.iterator();
while (iterator.hasNext()) {
Runnable task = iterator.next();
task.run();
iterator.remove();
}
}

// just for testing
Set<MigratablePipeline> getPipelines() {
return pipelines;
Expand Down Expand Up @@ -136,13 +121,6 @@ private void updateNewFinalImbalance() {
}
}

void notifyPipelineAdded(MigratablePipeline pipeline) {
tasks.offer(new AddPipelineTask(pipeline));
}

void notifyPipelineRemoved(MigratablePipeline pipeline) {
tasks.offer(new RemovePipelineTask(pipeline));
}

private void updateNewWorkingImbalance() {
for (MigratablePipeline pipeline : pipelines) {
Expand Down Expand Up @@ -176,7 +154,7 @@ void addPipeline(MigratablePipeline pipeline) {
pipelines.add(pipeline);
}

private void removePipeline(MigratablePipeline pipeline) {
public void removePipeline(MigratablePipeline pipeline) {
pipelines.remove(pipeline);
pipelineLoadCount.remove(pipeline);
lastLoadCounter.remove(pipeline);
Expand Down Expand Up @@ -251,39 +229,5 @@ private void appendSelectorInfo(
sb.append(LINE_SEPARATOR);
}

class RemovePipelineTask implements Runnable {

private final MigratablePipeline pipeline;

RemovePipelineTask(MigratablePipeline pipeline) {
this.pipeline = pipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Removing pipeline: " + pipeline);
}

removePipeline(pipeline);
}
}

class AddPipelineTask implements Runnable {

private final MigratablePipeline pipeline;

AddPipelineTask(MigratablePipeline pipeline) {
this.pipeline = pipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Adding pipeline: " + pipeline);
}

addPipeline(pipeline);
}
}
}
Expand Up @@ -45,8 +45,7 @@ public void whenChannelAdded_andDisabled_thenSkipTaskCreation() {

ioBalancer.channelAdded(inboundPipeline, outboundPipeline);

assertTrue(ioBalancer.getInLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getOutLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getWorkQueue().isEmpty());
}

// https://github.com/hazelcast/hazelcast/issues/11501
Expand All @@ -58,7 +57,6 @@ public void whenChannelRemoved_andDisabled_thenSkipTaskCreation() {

ioBalancer.channelRemoved(inboundPipeline, outboundPipelines);

assertTrue(ioBalancer.getInLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getOutLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getWorkQueue().isEmpty());
}
}

0 comments on commit 823ee87

Please sign in to comment.