Skip to content
Permalink
Browse files
[ASTERIXDB-2813] Limit the number of flush/merge threads
- user model changes: no
- storage format changes: no
- interface changes: yes.

Details:
- Limit the number of flush/merge threads by introducing
the following parameters.
- storage.max.running.flushes.per.partition: the maximum
number of running flushes for each partition.
- storage.max.scheduled.merge.per.partition: the maximum
number of scheduled merges for each partition. This is
mainly used by the greedy scheduler.
- storage.max.running.merges.per.partition: the maximum
number of running mergese per partition.
- Basically, we limit the number of flush/merge threads
and put newly created flush/merge opreations into a wait
queue if the limit is reached.
- For the greedy scheduler, the scheduled merges
(i.e., merge threads) are more than the running merges
so that the scheduler can pick the smallest merge
for each LSM-tree.

Change-Id: I85a55423a1438b1d534c2e6a5968e675a99884c8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9183
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
  • Loading branch information
luochen01 authored and mhubail committed Mar 25, 2021
1 parent 30d8fcb commit 8f1cd017b710c6de4bf5f1b480f8daaeb06de388
Show file tree
Hide file tree
Showing 16 changed files with 522 additions and 225 deletions.
@@ -582,20 +582,26 @@ public IConfigValidator getConfigValidator() {

private ILSMIOOperationScheduler createIoScheduler(StorageProperties properties) {
String schedulerName = storageProperties.getIoScheduler();
int numPartitions = ioManager.getIODevices().size();

int maxRunningFlushes = storageProperties.getMaxRunningFlushes(numPartitions);
int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions);
int maxRunningMerges = storageProperties.getMaxRunningMerges(numPartitions);

ILSMIOOperationScheduler ioScheduler = null;
if (AsynchronousScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = AsynchronousScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE);
HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
} else if (GreedyScheduler.FACTORY.getName().equalsIgnoreCase(schedulerName)) {
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE);
HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
} else {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN,
"Unknown storage I/O scheduler: " + schedulerName + "; defaulting to greedy I/O scheduler.");
}
ioScheduler = GreedyScheduler.FACTORY.createIoScheduler(getServiceContext().getThreadFactory(),
HaltCallback.INSTANCE);
HaltCallback.INSTANCE, maxRunningFlushes, maxScheduledMerges, maxRunningMerges);
}
return ioScheduler;
}
@@ -173,7 +173,7 @@ public void schedulerFailed(ILSMIOOperationScheduler scheduler, Throwable failur
public void operationFailed(ILSMIOOperation operation, Throwable t) {
LOGGER.warn("IO Operation failed", t);
}
}));
}, Integer.MAX_VALUE, Integer.MAX_VALUE));
dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
}

@@ -55,7 +55,10 @@ public enum Option implements IOption {
STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
STORAGE_IO_SCHEDULER(STRING, "greedy"),
STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l);
STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l),
STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2),
STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 8),
STORAGE_MAX_RUNNING_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2);

private final IOptionType interpreter;
private final Object defaultValue;
@@ -111,6 +114,12 @@ public String description() {
return "The number of bytes before each disk force (fsync)";
case STORAGE_IO_SCHEDULER:
return "The I/O scheduler for LSM flush and merge operations";
case STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION:
return "The maximum number of running flushes per partition (0 means unlimited)";
case STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION:
return "The maximum number of scheduled merges per partition (0 means unlimited)";
case STORAGE_MAX_RUNNING_MERGES_PER_PARTITION:
return "The maximum number of running merges per partition (0 means unlimited)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -204,6 +213,21 @@ public String getIoScheduler() {
return accessor.getString(Option.STORAGE_IO_SCHEDULER);
}

public int getMaxRunningFlushes(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_FLUSHES_PER_PARTITION);
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}

public int getMaxScheduledMerges(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION);
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}

public int getMaxRunningMerges(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_RUNNING_MERGES_PER_PARTITION);
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}

protected int getMetadataDatasets() {
return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT;
}
@@ -177,4 +177,8 @@ enum LSMIOOperationStatus {
*/
boolean isActive();

/**
* @return whether this IO operation is completed
*/
boolean isCompleted();
}
@@ -21,7 +21,8 @@
import java.util.concurrent.ThreadFactory;

public interface ILSMIOOperationSchedulerFactory {
ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback);
ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback,
int maxNumRunningFlushes, int maxNumScheduledMerges, int maxNumRunningMerges);

String getName();
}
@@ -27,20 +27,26 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;

import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;

public abstract class AbstractAsynchronousScheduler implements ILSMIOOperationScheduler, Closeable {
protected final ExecutorService executor;

private final int maxNumFlushes;
protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
protected final Map<String, Deque<ILSMIOOperation>> waitingFlushOperations = new HashMap<>();
protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>();
protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>();

protected final Map<String, Throwable> failedGroups = new HashMap<>();

public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback) {
executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations,
waitingFlushOperations, failedGroups);
public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback,
int maxNumFlushes) {
executor = new IoOperationExecutor(threadFactory, this, callback, runningFlushOperations, failedGroups);
this.maxNumFlushes = maxNumFlushes;
}

@Override
@@ -61,34 +67,88 @@ public void scheduleOperation(ILSMIOOperation operation) {
}
}

@Override
public void completeOperation(ILSMIOOperation operation) throws HyracksDataException {
switch (operation.getIOOpertionType()) {
case FLUSH:
completeFlush(operation);
break;
case MERGE:
completeMerge(operation);
case NOOP:
return;
default:
// this should never happen
// just guard here to avoid silent failures in case of future extensions
throw new IllegalArgumentException("Unknown operation type " + operation.getIOOpertionType());
}
}

protected abstract void scheduleMerge(ILSMIOOperation operation);

protected abstract void completeMerge(ILSMIOOperation operation);

protected void scheduleFlush(ILSMIOOperation operation) {
String id = operation.getIndexIdentifier();
synchronized (executor) {
if (failedGroups.containsKey(id)) {
// Group failure. Fail the operation right away
operation.setStatus(LSMIOOperationStatus.FAILURE);
operation.setFailure(new RuntimeException("Operation group " + id + " has permanently failed",
failedGroups.get(id)));
operation.complete();
if (checkFailedFlush(operation)) {
return;
}
if (runningFlushOperations.containsKey(id)) {
if (waitingFlushOperations.containsKey(id)) {
waitingFlushOperations.get(id).offer(operation);
} else {
Deque<ILSMIOOperation> q = new ArrayDeque<>();
q.offer(operation);
waitingFlushOperations.put(id, q);
}
if (runningFlushOperations.size() >= maxNumFlushes || runningFlushOperations.containsKey(id)) {
waitingFlushOperations.add(operation);
} else {
runningFlushOperations.put(id, operation);
executor.submit(operation);
}
}
}

private boolean checkFailedFlush(ILSMIOOperation operation) {
String id = operation.getIndexIdentifier();
if (failedGroups.containsKey(id)) {
// Group failure. Fail the operation right away
operation.setStatus(LSMIOOperationStatus.FAILURE);
operation.setFailure(
new RuntimeException("Operation group " + id + " has permanently failed", failedGroups.get(id)));
operation.complete();
return true;
} else {
return false;
}
}

private void completeFlush(ILSMIOOperation operation) {
String id = operation.getIndexIdentifier();
synchronized (executor) {
runningFlushOperations.remove(id);

// Schedule flushes in FIFO order. Must make sure that there is at most one scheduled flush for each index.
for (ILSMIOOperation flushOp : waitingFlushOperations) {
String flushOpId = flushOp.getIndexIdentifier();
if (runningFlushOperations.size() < maxNumFlushes) {
if (!runningFlushOperations.containsKey(flushOpId) && !flushOp.isCompleted()
&& !checkFailedFlush(flushOp)) {
runningFlushOperations.put(flushOpId, flushOp);
executor.submit(flushOp);
}
} else {
break;
}
}

// cleanup scheduled flushes
while (!waitingFlushOperations.isEmpty()) {
ILSMIOOperation top = waitingFlushOperations.peek();
if (top.isCompleted() || runningFlushOperations.get(top.getIndexIdentifier()) == top) {
waitingFlushOperations.poll();
} else {
break;
}
}

}
}

@Override
public void close() throws IOException {
executor.shutdown();
@@ -202,6 +202,11 @@ public boolean isActive() {
return isActive.get();
}

@Override
public synchronized boolean isCompleted() {
return completed;
}

public void waitIfPaused() throws HyracksDataException {
synchronized (this) {
while (!isActive.get()) {
@@ -35,26 +35,49 @@ public class AsynchronousScheduler extends AbstractAsynchronousScheduler {
public static final ILSMIOOperationSchedulerFactory FACTORY = new ILSMIOOperationSchedulerFactory() {
@Override
public ILSMIOOperationScheduler createIoScheduler(ThreadFactory threadFactory,
IIoOperationFailedCallback callback) {
return new AsynchronousScheduler(threadFactory, callback);
IIoOperationFailedCallback callback, int maxNumRunningFlushes, int maxNumScheduledMerges,
int maxNumRunningMerges) {
return new AsynchronousScheduler(threadFactory, callback, maxNumRunningFlushes, maxNumRunningMerges);
}

@Override
public String getName() {
return "async";
}
};

public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback) {
super(threadFactory, callback);
private final int maxNumRunningMerges;
private int numRunningMerges = 0;

public AsynchronousScheduler(ThreadFactory threadFactory, IIoOperationFailedCallback callback,
int maxNumRunningFlushes, int maxNumRunningMerges) {
super(threadFactory, callback, maxNumRunningFlushes);
this.maxNumRunningMerges = maxNumRunningMerges;
}

@Override
protected void scheduleMerge(ILSMIOOperation operation) {
executor.submit(operation);
synchronized (executor) {
if (numRunningMerges >= maxNumRunningMerges) {
waitingMergeOperations.add(operation);
} else {
doScheduleMerge(operation);
}
}
}

@Override
public void completeOperation(ILSMIOOperation operation) {
// no op
protected void completeMerge(ILSMIOOperation operation) {
synchronized (executor) {
--numRunningMerges;
if (!waitingMergeOperations.isEmpty() && numRunningMerges < maxNumRunningMerges) {
doScheduleMerge(waitingMergeOperations.poll());
}
}
}

private void doScheduleMerge(ILSMIOOperation operation) {
++numRunningMerges;
executor.submit(operation);
}
}

0 comments on commit 8f1cd01

Please sign in to comment.