Skip to content

Commit

Permalink
Fixes retention issue on IOBalancer.
Browse files Browse the repository at this point in the history
If connections get created very rapidly, tasks in the IOBalancer
can accumulate and this can lead to retention of the NioChannel
and its resources.

This PR fixes that by processing tasks immediately in the IOBalancer
when they arrive instead of waiting up to 20s.
  • Loading branch information
pveentjer committed Sep 14, 2018
1 parent 8400dd0 commit 81df7cb
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.hazelcast.logging.LoggingService;
import com.hazelcast.spi.properties.GroupProperty;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static com.hazelcast.internal.util.counters.MwCounter.newMwCounter;
import static com.hazelcast.internal.util.counters.SwCounter.newSwCounter;
import static com.hazelcast.spi.properties.GroupProperty.IO_BALANCER_INTERVAL_SECONDS;
Expand Down Expand Up @@ -66,6 +69,7 @@ public class IOBalancer {
private final LoadTracker inLoadTracker;
private final LoadTracker outLoadTracker;
private final String hzName;
private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
private volatile boolean enabled;
private IOBalancerThread ioBalancerThread;

Expand Down Expand Up @@ -103,31 +107,30 @@ LoadTracker getOutLoadTracker() {
return outLoadTracker;
}

// just for testing
BlockingQueue<Runnable> getWorkQueue() {
return workQueue;
}

public void channelAdded(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
// if not enabled, then don't schedule tasks that will not get processed.
// See https://github.com/hazelcast/hazelcast/issues/11501
if (!enabled) {
return;
if (enabled) {
workQueue.add(new AddPipelineTask(inboundPipeline, outboundPipeline));
}

inLoadTracker.notifyPipelineAdded(inboundPipeline);
outLoadTracker.notifyPipelineAdded(outboundPipeline);
}

public void channelRemoved(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
// if not enabled, then don't schedule tasks that will not get processed.
// See https://github.com/hazelcast/hazelcast/issues/11501
if (!enabled) {
return;
if (enabled) {
workQueue.add(new RemovePipelineTask(inboundPipeline, outboundPipeline));
}

inLoadTracker.notifyPipelineRemoved(inboundPipeline);
outLoadTracker.notifyPipelineRemoved(outboundPipeline);
}

public void start() {
if (enabled) {
ioBalancerThread = new IOBalancerThread(this, balancerIntervalSeconds, hzName, logger);
ioBalancerThread = new IOBalancerThread(this, balancerIntervalSeconds, hzName, logger, workQueue);
ioBalancerThread.start();
}
}
Expand All @@ -138,12 +141,9 @@ public void stop() {
}
}

void checkOutboundPipelines() {
scheduleMigrationIfNeeded(outLoadTracker);
}

void checkInboundPipelines() {
void rebalance() {
scheduleMigrationIfNeeded(inLoadTracker);
scheduleMigrationIfNeeded(outLoadTracker);
}

private void scheduleMigrationIfNeeded(LoadTracker loadTracker) {
Expand Down Expand Up @@ -215,4 +215,46 @@ private void tryMigrate(LoadImbalance loadImbalance) {
public void signalMigrationComplete() {
migrationCompletedCount.inc();
}

private final class RemovePipelineTask implements Runnable {

private final MigratablePipeline inboundPipeline;
private final MigratablePipeline outboundPipeline;

private RemovePipelineTask(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
this.inboundPipeline = inboundPipeline;
this.outboundPipeline = outboundPipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Removing pipelines: " + inboundPipeline + ", " + outboundPipeline);
}

inLoadTracker.removePipeline(inboundPipeline);
outLoadTracker.removePipeline(outboundPipeline);
}
}

private final class AddPipelineTask implements Runnable {

private final MigratablePipeline inboundPipeline;
private final MigratablePipeline outboundPipeline;

private AddPipelineTask(MigratablePipeline inboundPipeline, MigratablePipeline outboundPipeline) {
this.inboundPipeline = inboundPipeline;
this.outboundPipeline = outboundPipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Adding pipelines: " + inboundPipeline + ", " + outboundPipeline);
}

inLoadTracker.addPipeline(inboundPipeline);
outLoadTracker.addPipeline(outboundPipeline);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,33 @@

import com.hazelcast.logging.ILogger;

import java.util.concurrent.BlockingQueue;

import static com.hazelcast.util.EmptyStatement.ignore;
import static com.hazelcast.util.ThreadUtil.createThreadName;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

class IOBalancerThread extends Thread {
private static final String THREAD_NAME_PREFIX = "IOBalancerThread";

private final IOBalancer ioBalancer;
private final ILogger log;
private final int balancerIntervalSeconds;
private final long balancerIntervalMs;
private final BlockingQueue<Runnable> workQueue;
private volatile boolean shutdown;

IOBalancerThread(IOBalancer ioBalancer, int balancerIntervalSeconds, String hzName, ILogger log) {
IOBalancerThread(IOBalancer ioBalancer,
int balancerIntervalSeconds,
String hzName,
ILogger log,
BlockingQueue<Runnable> workQueue) {
super(createThreadName(hzName, THREAD_NAME_PREFIX));
this.ioBalancer = ioBalancer;
this.log = log;
this.balancerIntervalSeconds = balancerIntervalSeconds;
this.balancerIntervalMs = SECONDS.toMillis(balancerIntervalSeconds);
this.workQueue = workQueue;
}

void shutdown() {
Expand All @@ -46,10 +56,21 @@ void shutdown() {
public void run() {
try {
log.finest("Starting IOBalancer thread");
long nextRebalanceMs = currentTimeMillis() + balancerIntervalMs;
while (!shutdown) {
ioBalancer.checkInboundPipelines();
ioBalancer.checkOutboundPipelines();
SECONDS.sleep(balancerIntervalSeconds);
for (; ; ) {
long maxPollDurationMs = nextRebalanceMs - currentTimeMillis();
Runnable task = maxPollDurationMs <= 0 ? workQueue.poll() : workQueue.poll(maxPollDurationMs, MILLISECONDS);
if (task == null) {
// we are finished with taking task from the queue, lets
// do a bit of rebalancing.
break;
}
task.run();
}

ioBalancer.rebalance();
nextRebalanceMs = currentTimeMillis() + balancerIntervalMs;
}
} catch (InterruptedException e) {
log.finest("IOBalancer thread stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
import com.hazelcast.util.ItemCounter;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

import static com.hazelcast.util.MapUtil.createHashMap;
import static com.hazelcast.util.StringUtil.LINE_SEPARATOR;
Expand All @@ -39,8 +36,6 @@
* {@link #removePipeline(MigratablePipeline)}
*/
class LoadTracker {
final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();

private final ILogger logger;

//all known IO ioThreads. we assume no. of ioThreads is constant during a lifespan of a member
Expand Down Expand Up @@ -80,23 +75,13 @@ class LoadTracker {
* @return recalculated imbalance
*/
LoadImbalance updateImbalance() {
handleAddedOrRemovedConnections();
clearWorkingImbalance();
updateNewWorkingImbalance();
updateNewFinalImbalance();
printDebugTable();
return imbalance;
}

private void handleAddedOrRemovedConnections() {
Iterator<Runnable> iterator = tasks.iterator();
while (iterator.hasNext()) {
Runnable task = iterator.next();
task.run();
iterator.remove();
}
}

// just for testing
Set<MigratablePipeline> getPipelines() {
return pipelines;
Expand Down Expand Up @@ -136,13 +121,6 @@ private void updateNewFinalImbalance() {
}
}

void notifyPipelineAdded(MigratablePipeline pipeline) {
tasks.offer(new AddPipelineTask(pipeline));
}

void notifyPipelineRemoved(MigratablePipeline pipeline) {
tasks.offer(new RemovePipelineTask(pipeline));
}

private void updateNewWorkingImbalance() {
for (MigratablePipeline pipeline : pipelines) {
Expand Down Expand Up @@ -176,7 +154,7 @@ void addPipeline(MigratablePipeline pipeline) {
pipelines.add(pipeline);
}

private void removePipeline(MigratablePipeline pipeline) {
public void removePipeline(MigratablePipeline pipeline) {
pipelines.remove(pipeline);
pipelineLoadCount.remove(pipeline);
lastLoadCounter.remove(pipeline);
Expand Down Expand Up @@ -251,39 +229,5 @@ private void appendSelectorInfo(
sb.append(LINE_SEPARATOR);
}

class RemovePipelineTask implements Runnable {

private final MigratablePipeline pipeline;

RemovePipelineTask(MigratablePipeline pipeline) {
this.pipeline = pipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Removing pipeline: " + pipeline);
}

removePipeline(pipeline);
}
}

class AddPipelineTask implements Runnable {

private final MigratablePipeline pipeline;

AddPipelineTask(MigratablePipeline pipeline) {
this.pipeline = pipeline;
}

@Override
public void run() {
if (logger.isFinestEnabled()) {
logger.finest("Adding pipeline: " + pipeline);
}

addPipeline(pipeline);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public void whenChannelAdded_andDisabled_thenSkipTaskCreation() {

ioBalancer.channelAdded(inboundPipeline, outboundPipeline);

assertTrue(ioBalancer.getInLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getOutLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getWorkQueue().isEmpty());
}

// https://github.com/hazelcast/hazelcast/issues/11501
Expand All @@ -58,7 +57,6 @@ public void whenChannelRemoved_andDisabled_thenSkipTaskCreation() {

ioBalancer.channelRemoved(inboundPipeline, outboundPipelines);

assertTrue(ioBalancer.getInLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getOutLoadTracker().tasks.isEmpty());
assertTrue(ioBalancer.getWorkQueue().isEmpty());
}
}

0 comments on commit 81df7cb

Please sign in to comment.