Skip to content

Commit

Permalink
limit rate of rollup scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydeanlakey committed May 9, 2019
1 parent 97852cc commit faa9173
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
Expand Up @@ -8,9 +8,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.bluedb.api.BlueCollection;
import org.bluedb.api.BlueQuery;
Expand Down Expand Up @@ -43,7 +44,7 @@

public class BlueCollectionOnDisk<T extends Serializable> implements BlueCollection<T>, Rollupable {

ExecutorService executor = Executors.newFixedThreadPool(1);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

private final Class<T> valueType;
private final Class<? extends BlueKey> keyType;
Expand Down Expand Up @@ -73,6 +74,10 @@ public BlueCollectionOnDisk(BlueDbOnDisk db, String name, Class<? extends BlueKe
recoveryManager.recover(); // everything else has to be in place before running this
}

public int getQueuedTaskCount() {
return executor.getQueue().size();
}

@Override
public BlueQuery<T> query() {
return new BlueQueryOnDisk<T>(this);
Expand Down
Expand Up @@ -35,7 +35,7 @@ public void start() {
@Override
public void run() {
while (!isStopped) {
scheduleReadyRollups();
scheduleLimitedReadyRollups();
isStopped |= !Blutils.trySleep(waitBetweenReviews);
}
}
Expand Down Expand Up @@ -82,6 +82,17 @@ public boolean isRunning() {
return !isStopped;
}

protected void scheduleLimitedReadyRollups() {
int rollupsToSchedule = 30 - collection.getQueuedTaskCount();
scheduleReadyRollups(rollupsToSchedule);
}

protected void scheduleReadyRollups(int maxRollupsToSchedule) {
for (RollupTarget target: rollupTargetsReadyForRollup(maxRollupsToSchedule)) {
scheduleRollup(target);
}
}

protected void scheduleReadyRollups() {
for (RollupTarget target: rollupTargetsReadyForRollup()) {
scheduleRollup(target);
Expand All @@ -99,6 +110,12 @@ public void setWaitBetweenReviews(long newWaitTimeMillis) {
waitBetweenReviews = newWaitTimeMillis;
}

protected List<RollupTarget> rollupTargetsReadyForRollup(int maxCount) {
List<RollupTarget> readyRollups = rollupTargetsReadyForRollup();
int toIndex = Math.min(maxCount, readyRollups.size());
return readyRollups.subList(0, toIndex); // TODO sort in some reasonable way
}

protected List<RollupTarget> rollupTargetsReadyForRollup() {
long now = System.currentTimeMillis();
List<RollupTarget> results = new ArrayList<>();
Expand Down
Expand Up @@ -4,6 +4,7 @@
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.mockito.Mockito;
import org.bluedb.api.index.BlueIndex;
import org.bluedb.api.keys.BlueKey;
import org.bluedb.api.keys.IntegerKey;
Expand Down Expand Up @@ -109,6 +110,27 @@ public void test_scheduleReadyRollups() throws Exception {
assertTrue(rollupsRequested.contains(rollupTarget));
}

@Test
public void test_reportRead_limit() {
BlueCollectionOnDisk<?> busyCollection = Mockito.mock(BlueCollectionOnDisk.class);
RollupScheduler rollupScheduler = new RollupScheduler(busyCollection);
Mockito.doReturn(30).when(busyCollection).getQueuedTaskCount();
for (int i=0; i<30; i++) {
rollupScheduler.reportRead(new RollupTarget(i, new Range(i, i)), 0);
}
rollupScheduler.scheduleLimitedReadyRollups();

BlueCollectionOnDisk<?> nonBusyCollection = Mockito.mock(BlueCollectionOnDisk.class);
Mockito.doReturn(0).when(nonBusyCollection).getQueuedTaskCount();
RollupScheduler nonBusyRollupScheduler = new RollupScheduler(nonBusyCollection);
for (int i=0; i<30; i++) {
nonBusyRollupScheduler.reportRead(new RollupTarget(i, new Range(i, i)), 0);
}
nonBusyRollupScheduler.scheduleLimitedReadyRollups();
Mockito.verify(busyCollection, Mockito.atMost(0)).submitTask(Mockito.any());
Mockito.verify(nonBusyCollection, Mockito.atLeast(20)).submitTask(Mockito.any());
}

@Test
public void test_scheduleRollup_collection() throws Exception {
BlueKey key1At1 = createKey(1, 1);
Expand Down

0 comments on commit faa9173

Please sign in to comment.