Skip to content
This repository has been archived by the owner on Apr 2, 2023. It is now read-only.

Improve performance for Probabilistic priority queueing #344

Merged
merged 41 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d8f6f1f
performance & monitoring improvement
lenhattan86 May 14, 2021
c4e650b
use the same CloseableHttpClient for multiple requests
lenhattan86 May 14, 2021
d3d77a2
fix compiling error
lenhattan86 May 15, 2021
42bf2f7
fix CI error
lenhattan86 May 18, 2021
2a35900
fix CI error
lenhattan86 May 18, 2021
1196da5
resolve review comments
lenhattan86 May 19, 2021
81b887c
resolve PR comments
lenhattan86 May 19, 2021
6d836ea
reflect comments
lenhattan86 May 20, 2021
113a0bc
polish code
lenhattan86 May 20, 2021
dc594d5
polish code
lenhattan86 May 20, 2021
9d6a7f6
remove this.
lenhattan86 May 20, 2021
7bd29c3
update aurora version 0.24.0
lenhattan86 Jun 18, 2021
bd36a72
fix release-candidate error
lenhattan86 Jun 18, 2021
ff11201
Updating CHANGELOG for 0.24.2 release.
lenhattan86 Jun 18, 2021
e600867
Incrementing snapshot version to 0.25.0-SNAPSHOT.
lenhattan86 Jun 18, 2021
c4a6fc3
fixed release candidate
lenhattan86 Jun 18, 2021
48e75b3
fixed release candidate script
lenhattan86 Jun 18, 2021
22bc79e
aurora version 0.24.2
lenhattan86 Jun 18, 2021
6aff891
Updating CHANGELOG.md for 0.24.2 release.
lenhattan86 Jun 18, 2021
daffeee
Incrementing snapshot version to 0.25.0-SNAPSHOT.
lenhattan86 Jun 18, 2021
9f94253
Merge pull request #1 from lenhattan86/origin/http_offer_set_perf_imp…
lenhattan86 Jun 18, 2021
b9c9ce6
Merge branch 'master' into master
lenhattan86 Sep 7, 2021
259eb54
Merge pull request #2 from aurora-scheduler/master
lenhattan86 Sep 7, 2021
cc35f9a
Merge branch 'aurora-scheduler:master' into master
lenhattan86 Oct 5, 2021
e544e86
add probabilistic priority queueing feature using TaskAssigner module
lenhattan86 Oct 5, 2021
b5f5bb8
update os for github CI workflow
lenhattan86 Oct 5, 2021
9ff939c
Merge branch 'probabilistic_priority_queueing' of https://github.com/…
lenhattan86 Oct 5, 2021
4f6c2a6
Merge branch 'aurora-scheduler:master' into probabilistic_priority_qu…
lenhattan86 Oct 5, 2021
3ec721a
add design doc for probabilistic priority queueing
lenhattan86 Oct 8, 2021
5ba02dc
add design doc for probabilistic priority queueing
lenhattan86 Oct 8, 2021
c7ad72a
Merge branch 'aurora-scheduler:master' into probabilistic_priority_qu…
lenhattan86 Oct 11, 2021
0ff2ad8
more details for ProbabilisticPriorityAssignerModule
lenhattan86 Oct 11, 2021
9308725
address PR comments
lenhattan86 Oct 12, 2021
d6c8fa1
Update src/main/java/io/github/aurora/scheduler/scheduling/Probabilis…
lenhattan86 Oct 16, 2021
cbf0429
Update src/main/java/io/github/aurora/scheduler/scheduling/Probabilis…
lenhattan86 Oct 16, 2021
1c263b6
Update src/main/java/io/github/aurora/scheduler/scheduling/Probabilis…
lenhattan86 Oct 16, 2021
9827478
Update src/main/java/io/github/aurora/scheduler/scheduling/Probabilis…
lenhattan86 Oct 16, 2021
fe901b7
address comments
lenhattan86 Oct 16, 2021
f8b1c0e
asynchronously fetch the pendind tasks
lenhattan86 Oct 22, 2021
6fa7beb
Merge branch 'master' into probabilistic_priority_queueing
lenhattan86 Oct 22, 2021
9359535
addressed PR comments
lenhattan86 Oct 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -54,7 +55,8 @@ public class ProbabilisticPriorityAssigner extends TaskAssignerImpl {
private static final Logger LOG = LoggerFactory.
getLogger(ProbabilisticPriorityAssigner.class);

private final Storage storage;
private static Iterable<IScheduledTask> pendindTasks = new LinkedList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pendingTasks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use an ArrayList here for better performance, this is on a fast path and the size of the list can get significant putting pressure on memory pages. while iterating

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what makes you use a static variable here?


private Double exponent;

@VisibleForTesting
Expand All @@ -69,10 +71,8 @@ public ProbabilisticPriorityAssigner(
OfferManager offerManager,
UpdateAgentReserver updateAgentReserver,
StatsProvider statsProvider,
Storage storage,
@Exponent Double exponent) {
super(stateManager, taskFactory, offerManager, updateAgentReserver, statsProvider);
this.storage = requireNonNull(storage);
this.exponent = requireNonNull(exponent);
}

Expand All @@ -87,12 +87,13 @@ public Set<String> maybeAssign(

// probabilistic priority queueing: may not schedule these tasks if
// there are pending tasks with higher priority.
Iterable<IScheduledTask> pendindTasks = getPendingTasks();
Set<Integer> prioritySet = new HashSet<>();
for (IScheduledTask t: pendindTasks) {
prioritySet.add(t.getAssignedTask().getTask().getPriority());
synchronized (pendindTasks) {
for (IScheduledTask t: pendindTasks) {
prioritySet.add(t.getAssignedTask().getTask().getPriority());
}
}
//TODO(lenhattan86): Is the group is always included in the pending task set?
//this group is not always included in the pending task set
prioritySet.add(groupKey.getTask().getPriority());

if (!isScheduled(prioritySet, groupKey.getTask().getPriority())) {
Expand Down Expand Up @@ -131,8 +132,9 @@ boolean isScheduled(Set<Integer> prioritySet, int priority) {
}

@VisibleForTesting
Iterable<IScheduledTask> getPendingTasks() {
return Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(ScheduleStatus.PENDING));
public static synchronized void fetchPendingTasks(Storage storage) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VisibleForTesting but I don't see any tests calling this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the need for this method at all in this class. Your PendingTasksFetcher should call Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(ScheduleStatus.PENDING)); directly from its run() method, and the responsibility of fetchPendingTasks should be with that class, not this. The objects here should only use methods from that class. This class is responsible for assigning priorities, not for fetching tasks as a public method.

pendindTasks =
Storage.Util.fetchTasks(storage, Query.unscoped().byStatus(ScheduleStatus.PENDING));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,53 @@
*/
package io.github.aurora.scheduler.scheduling;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.inject.Qualifier;
import javax.inject.Singleton;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;

import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.base.AsyncUtil;
import org.apache.aurora.scheduler.config.CliOptions;
import org.apache.aurora.scheduler.config.CommandLine;
import org.apache.aurora.scheduler.config.types.TimeAmount;
import org.apache.aurora.scheduler.scheduling.TaskAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.Objects.requireNonNull;

/**
* The default TaskAssigner implementation.
*/
public class ProbabilisticPriorityAssignerModule extends AbstractModule {
private static final Logger LOG =
LoggerFactory.getLogger(ProbabilisticPriorityAssignerModule.class);

private final Options options;

@Parameters(separators = "=")
public static class Options {
@Parameter(names = "-probabilistic_priority_assigner_exponent")
Double probabilisticPriorityAssignerExponent = 0.0;

@Parameter(names = "-probabilistic_priority_assigner_task_fetch_interval")
TimeAmount probabilisticPriorityAssignerTaskFetchInterval = new TimeAmount(1, Time.SECONDS);
}

public ProbabilisticPriorityAssignerModule(CliOptions mOptions) {
Expand All @@ -50,5 +77,53 @@ protected void configure() {
.annotatedWith(ProbabilisticPriorityAssigner.Exponent.class)
.toInstance(options.probabilisticPriorityAssignerExponent);
bind(TaskAssigner.class).to(ProbabilisticPriorityAssigner.class).in(Singleton.class);
bind(TaskFetcher.class).in(com.google.inject.Singleton.class);
bind(ScheduledExecutorService.class)
.annotatedWith(Executor.class)
.toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor("HttpOfferSet-%d", LOG));
bind(Integer.class)
.annotatedWith(TaskFetchInvervalMs.class)
.toInstance(options.probabilisticPriorityAssignerTaskFetchInterval.
as(Time.MILLISECONDS).intValue());
bind(StatUpdater.class).in(com.google.inject.Singleton.class);
SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(StatUpdater.class);
}

// to bind an executor object to StatUpdater using @Executor
@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface Executor { }
lenhattan86 marked this conversation as resolved.
Show resolved Hide resolved

// to bind probabilisticPriorityAssignerTaskFetchInterval value to StatUpdater
@VisibleForTesting
@Qualifier
@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
@interface TaskFetchInvervalMs { }

static class StatUpdater extends AbstractIdleService {
private final ScheduledExecutorService executor;
private final TaskFetcher taskFetcher;
private final Integer taskFetchIntervalMs;

@Inject
StatUpdater(
@Executor ScheduledExecutorService mExecutor,
TaskFetcher mTaskFetcher,
@TaskFetchInvervalMs Integer mTaskFetchIntervalMs) {
executor = requireNonNull(mExecutor);
taskFetcher = requireNonNull(mTaskFetcher);
taskFetchIntervalMs = mTaskFetchIntervalMs;
}

@Override
protected void startUp() {
executor.scheduleAtFixedRate(taskFetcher, 0, taskFetchIntervalMs, TimeUnit.MILLISECONDS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the taskFetchIntervalMs here?

}

@Override
protected void shutDown() {
// Ignored.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.aurora.scheduler.scheduling;

import javax.inject.Inject;

import org.apache.aurora.scheduler.storage.Storage;

public class TaskFetcher implements Runnable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to fetch only pending tasks, why is it named TaskFetcher?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to be having multiple classes doing similar things. You should use a single executor to fetch these tasks with whatever filters are needed and use them from different classes. Having multiple executors for the same thing is not a good idea.

private final Storage storage;

@Inject
TaskFetcher(Storage mStorage) {
storage = mStorage;
}

@Override
public void run() {
ProbabilisticPriorityAssigner.fetchPendingTasks(storage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,33 @@
*/
package io.github.aurora.scheduler.scheduling;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import com.google.common.collect.ImmutableList;

import org.apache.aurora.common.testing.easymock.EasyMockTest;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.ScheduledTask;
import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
import org.apache.aurora.scheduler.offers.OfferManager;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
import org.junit.Before;
import org.junit.Test;

import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class ProbabilisticPriorityAssignerTest extends EasyMockTest {

private ProbabilisticPriorityAssigner assigner;
private Storage storage;

@Before
public void setUp() {
storage = MemStorageModule.newEmptyStorage();
assigner = new ProbabilisticPriorityAssigner(
createMock(StateManager.class),
createMock(MesosTaskFactory.class),
createMock(OfferManager.class),
createMock(UpdateAgentReserver.class),
new FakeStatsProvider(),
storage,
0.0);
}

Expand Down Expand Up @@ -117,26 +103,4 @@ public void testIsScheduled() {
}
assertTrue(res);
}

@Test
public void testGetPendingTasks() {
control.replay();

storage.write((Storage.MutateWork.NoResult.Quiet) sp -> {
ScheduledTask t0 = makeTask("t0", JOB)
.newBuilder()
.setStatus(ScheduleStatus.PENDING);
ScheduledTask t1 = makeTask("t1", JOB)
.newBuilder()
.setStatus(ScheduleStatus.PENDING);
ScheduledTask t2 = makeTask("t2", JOB)
.newBuilder()
.setStatus(ScheduleStatus.RUNNING);
sp.getUnsafeTaskStore().saveTasks(
IScheduledTask.setFromBuilders(ImmutableList.of(t0, t1, t2)));
});

Iterable<IScheduledTask> tasks = assigner.getPendingTasks();
assertEquals(((Collection<?>) tasks).size(), 2);
}
}