Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduler] Add batch scheduling logic #1685

Merged
merged 6 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions app_dart/lib/src/foundation/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ List<String> validateOwnership(String ciYamlContent, String testOwnersContent) {
}

/// Utility to class to wrap related objects in.
class Pair<S, T> {
const Pair(this.first, this.second);
class Tuple<S, T, U> {
const Tuple(this.first, this.second, this.third);

final S first;
final T second;
final U third;
}
20 changes: 20 additions & 0 deletions app_dart/lib/src/model/ci_yaml/target.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import 'dart:convert';

import 'package:cocoon_service/src/service/scheduler/policy.dart';
import 'package:github/github.dart';

import '../luci/buildbucket.dart';
Expand Down Expand Up @@ -52,6 +53,19 @@ class Target {
return dimensions;
}

/// [SchedulerPolicy] this target follows.
///
/// Targets not triggered by Cocoon will not be triggered.
///
/// Targets by default run on a [GuranteedPolicy], but targets in the devicelab run with [BatchPolicy].
SchedulerPolicy get schedulerPolicy {
if (value.scheduler != pb.SchedulerSystem.cocoon) {
return OmitPolicy();
}

return isDevicelab ? BatchPolicy() : GuranteedPolicy();
}

/// Gets the assembled properties for this [pb.Target].
///
/// Target properties are prioritized in:
Expand Down Expand Up @@ -163,6 +177,12 @@ class Target {
return value.name.split(' ').first.toLowerCase();
}

/// Indicates whether this target is run in the DeviceLab.
///
/// DeviceLab targets are special as they run on a host + physical device, and there is limited
/// capacity in the labs to run them.
bool get isDevicelab => getPlatform().contains('android') || getPlatform().contains('ios');

/// Get the associated LUCI bucket to run this [Target] in.
String getBucket() {
return value.bringup ? 'staging' : 'prod';
Expand Down
16 changes: 16 additions & 0 deletions app_dart/lib/src/service/datastore.dart
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ class DatastoreService {
return query.run();
}

/// Queries for recent [Task] by name.
///
/// The [limit] argument specifies the maximum number of tasks to retrieve.
///
/// The returned tasks will be ordered by most recent to oldest.
Stream<Task> queryRecentTasksByName({
int limit = 100,
required String name,
}) {
final Query<Task> query = db.query<Task>()
..limit(limit)
..filter('name =', name)
..order('-createTimestamp');
return query.run();
}

/// Queries for recent tasks that meet the specified criteria.
///
/// Since each task belongs to a commit, this query implicitly includes a
Expand Down
12 changes: 8 additions & 4 deletions app_dart/lib/src/service/luci_build_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,20 @@ class LuciBuildService {
/// Schedules list of post-submit builds deferring work to [schedulePostsubmitBuild].
Future<void> schedulePostsubmitBuilds({
required Commit commit,
required List<Pair<Target, Task>> toBeScheduled,
required List<Tuple<Target, Task, int>> toBeScheduled,
}) async {
if (toBeScheduled.isEmpty) {
log.fine('Skipping schedulePostsubmitBuilds as there are no targets to be scheduled by Cocoon');
return;
}
final List<Request> buildRequests = <Request>[];
for (Pair<Target, Task> pair in toBeScheduled) {
final ScheduleBuildRequest scheduleBuildRequest =
_createPostsubmitScheduleBuild(commit: commit, target: pair.first, task: pair.second);
for (Tuple<Target, Task, int> tuple in toBeScheduled) {
final ScheduleBuildRequest scheduleBuildRequest = _createPostsubmitScheduleBuild(
commit: commit,
target: tuple.first,
task: tuple.second,
priority: tuple.third,
);
buildRequests.add(Request(scheduleBuild: scheduleBuildRequest));
}
final BatchRequest batchRequest = BatchRequest(requests: buildRequests);
Expand Down
22 changes: 14 additions & 8 deletions app_dart/lib/src/service/scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import 'dart:typed_data';

import 'package:cocoon_service/src/service/build_status_provider.dart';
import 'package:cocoon_service/src/service/scheduler/policy.dart';
import 'package:gcloud/db.dart';
import 'package:github/github.dart' as github;
import 'package:github/github.dart';
Expand Down Expand Up @@ -129,6 +130,19 @@ class Scheduler {
final List<Target> initialTargets = ciYaml.getInitialTargets(ciYaml.postsubmitTargets);
final List<Task> tasks = targetsToTask(commit, initialTargets).toList();

final List<Tuple<Target, Task, int>> toBeScheduled = <Tuple<Target, Task, int>>[];
for (Target target in initialTargets) {
final Task task = tasks.singleWhere((Task task) => task.name == target.value.name);
final SchedulerPolicy policy = target.schedulerPolicy;
final int? priority = await policy.triggerPriority(task: task, datastore: datastore);
if (priority != null) {
// Mark task as in progress to ensure it isn't scheduled over
task.status = Task.statusInProgress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition here. The refresh-chromebot-status API may update the task back to New if the task is not scheduled yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ciYaml.getPostSubmitBuilders is used in RefreshChromebotStatus, and it only returns targets based on the luci scheduler. This scheduling logic is only for targets with the cocoon scheduler.

toBeScheduled.add(Tuple<Target, Task, int>(target, task, priority));
}
}
await luciBuildService.schedulePostsubmitBuilds(commit: commit, toBeScheduled: toBeScheduled);

try {
await datastore.withTransaction<void>((Transaction transaction) async {
transaction.queueMutations(inserts: <Commit>[commit]);
Expand All @@ -140,14 +154,6 @@ class Scheduler {
log.severe('Failed to add commit ${commit.sha!}: $error');
}

final List<Pair<Target, Task>> toBeScheduled = <Pair<Target, Task>>[];
for (Target target in initialTargets) {
if (target.value.scheduler == pb.SchedulerSystem.cocoon) {
toBeScheduled.add(Pair<Target, Task>(target, tasks.singleWhere((Task task) => task.name == target.value.name)));
}
}
await luciBuildService.schedulePostsubmitBuilds(commit: commit, toBeScheduled: toBeScheduled);

await _uploadToBigQuery(commit);
}

Expand Down
72 changes: 72 additions & 0 deletions app_dart/lib/src/service/scheduler/policy.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2021 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'package:cocoon_service/src/service/datastore.dart';

import '../../model/appengine/task.dart';
import '../logging.dart';
import '../luci_build_service.dart';

/// Interface for implementing various scheduling policies in the Cocoon scheduler.
abstract class SchedulerPolicy {
/// Returns the priority of [Task].
///
/// If null is returned, the task should not be scheduled.
Future<int?> triggerPriority({
required Task task,
required DatastoreService datastore,
});
}

/// Every [Task] is triggered to run.
class GuranteedPolicy implements SchedulerPolicy {
@override
Future<int?> triggerPriority({
required Task task,
required DatastoreService datastore,
}) async =>
LuciBuildService.kDefaultPriority;
}

class BatchPolicy implements SchedulerPolicy {
static const int kBatchSize = 3;
@override
Future<int?> triggerPriority({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a doc explaining the policy for different cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!

required Task task,
required DatastoreService datastore,
}) async {
final List<Task> recentTasks = await datastore.queryRecentTasksByName(name: task.name!).toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a task with recent statuses:
new
pass
pass
pass
pass
, it will not be scheduled based on current logic. What if there is no new commit for 10 hours? We should run TOT tests regularly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep the PRs small, I deferred the backfilling logic to the next PR (overall work tracked in flutter/flutter#100793).

The plan is for the backfill cron job to detect new-pass columns and trigger a build (but skip on running-pass columns).

// Ensure task isn't considered in recentTasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case we split the scheduling logic from the tasks. In the current PR state, it shouldn't happen. However, we need to move the post-submit triggering logic to be based on GoB events (flutter/flutter#100911)

The returned column may or may not include the task we're scheduling for, so as a preemptive measure we remove it if it exists.

recentTasks.removeWhere((Task t) => t.commitKey == task.commitKey);
if (recentTasks.length < kBatchSize - 1) {
log.warning('${task.name} has less than $kBatchSize, triggerring all builds regardless of policy');
return LuciBuildService.kDefaultPriority;
}

// Prioritize tasks that recently failed.
if (_isFailed(recentTasks[0]) || _isFailed(recentTasks[1])) {
return LuciBuildService.kRerunPriority;
}

if (recentTasks[0].status == Task.statusNew && recentTasks[1].status == Task.statusNew) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rethink this part: the current logic seems schedule tasks in batch size >= kBatchSize. For example, if we have new, new, new, new, then we will batch schedule the latest in a size of 4. Or will our logic guarantee this case will not happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for this to show the batch size of kBatchSize is respected.

Yes, if there. is new, new, new, new it would schedule, but that shouldn't happen in prod (it's more of a theoretical).

return LuciBuildService.kDefaultPriority;
}

return null;
}

bool _isFailed(Task task) {
return task.status == Task.statusFailed || task.status == Task.statusInfraFailure;
}
}

/// [Task] run outside of Cocoon are not triggered by the Cocoon scheduler.
class OmitPolicy implements SchedulerPolicy {
@override
Future<int?> triggerPriority({
required Task task,
required DatastoreService datastore,
}) async =>
null;
}
17 changes: 17 additions & 0 deletions app_dart/test/model/ci_yaml/target_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import 'package:cocoon_service/src/model/ci_yaml/target.dart';
import 'package:cocoon_service/src/model/luci/buildbucket.dart';
import 'package:cocoon_service/src/model/proto/protos.dart' as pb;
import 'package:cocoon_service/src/service/scheduler/policy.dart';
import 'package:test/test.dart';

import '../../src/utilities/entity_generators.dart';
Expand Down Expand Up @@ -189,5 +191,20 @@ void main() {
expect(target.getDimensions().length, 1);
});
});

group('scheduler policy', () {
test('devicelab targets use batch policy', () {
expect(generateTarget(1, platform: 'Linux_android').schedulerPolicy, isA<BatchPolicy>());
});

test('non-cocoon scheduler targets return omit policy', () {
expect(generateTarget(1, platform: 'Linux_android', schedulerSystem: pb.SchedulerSystem.luci).schedulerPolicy,
isA<OmitPolicy>());
});

test('vm cocoon targets return guranteed policy', () {
expect(generateTarget(1, platform: 'Linux').schedulerPolicy, isA<GuranteedPolicy>());
});
});
});
}
5 changes: 3 additions & 2 deletions app_dart/test/service/luci_build_service_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,14 @@ void main() {

test('schedule postsubmit builds successfully', () async {
final Commit commit = generateCommit(0);
final Pair<Target, Task> toBeScheduled = Pair<Target, Task>(
final Tuple<Target, Task, int> toBeScheduled = Tuple<Target, Task, int>(
generateTarget(1),
generateTask(1),
LuciBuildService.kDefaultPriority,
);
await service.schedulePostsubmitBuilds(
commit: commit,
toBeScheduled: <Pair<Target, Task>>[
toBeScheduled: <Tuple<Target, Task, int>>[
toBeScheduled,
],
);
Expand Down
61 changes: 61 additions & 0 deletions app_dart/test/service/scheduler/policy_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2021 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

import 'package:cocoon_service/src/model/appengine/task.dart';
import 'package:cocoon_service/src/service/datastore.dart';
import 'package:cocoon_service/src/service/luci_build_service.dart';
import 'package:cocoon_service/src/service/scheduler/policy.dart';
import 'package:test/test.dart';

import '../../src/datastore/fake_datastore.dart';
import '../../src/utilities/entity_generators.dart';

void main() {
group('BatchPolicy', () {
late FakeDatastoreDB db;
late DatastoreService datastore;

final BatchPolicy policy = BatchPolicy();

setUp(() {
db = FakeDatastoreDB();
datastore = DatastoreService(db, 5);
});

final List<Task> allPending = <Task>[
generateTask(3),
generateTask(2),
generateTask(1),
];

final List<Task> latestFinishedButRestPending = <Task>[
generateTask(3, status: Task.statusSucceeded),
generateTask(2),
generateTask(1),
];

final List<Task> latestFailed = <Task>[
generateTask(3, status: Task.statusFailed),
generateTask(2),
generateTask(1),
];

test('triggers after batch size', () async {
db.addOnQuery<Task>((Iterable<Task> results) => allPending);
expect(
await policy.triggerPriority(task: generateTask(4), datastore: datastore), LuciBuildService.kDefaultPriority);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a batch with size 4. shall we test a batch size 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I made the test 3, and I added another test for when there is 2 pending tasks.

});

test('triggers with higher priority on recent failures', () async {
db.addOnQuery<Task>((Iterable<Task> results) => latestFailed);
expect(
await policy.triggerPriority(task: generateTask(4), datastore: datastore), LuciBuildService.kRerunPriority);
});

test('does not trigger when a test was recently scheduled', () async {
db.addOnQuery<Task>((Iterable<Task> results) => latestFinishedButRestPending);
expect(await policy.triggerPriority(task: generateTask(4), datastore: datastore), isNull);
});
});
}
13 changes: 9 additions & 4 deletions app_dart/test/service/scheduler_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ targets:
- name: Linux A
properties:
custom: abc
scheduler: luci
- name: Linux B
enabled_branches:
- stable
Expand Down Expand Up @@ -246,9 +245,15 @@ void main() {
commit: anyNamed('commit'), toBeScheduled: captureAnyNamed('toBeScheduled')))
.captured;
final List<dynamic> toBeScheduled = captured.first as List<dynamic>;
expect(toBeScheduled.length, 1);
final Pair<Target, Task> targetToBeScheduled = toBeScheduled.first as Pair<Target, Task>;
expect(targetToBeScheduled.first.value.name, 'Linux runIf');
expect(toBeScheduled.length, 2);
final Iterable<Tuple<Target, Task, int>> tuples =
toBeScheduled.map((dynamic tuple) => tuple as Tuple<Target, Task, int>);
final Iterable<String> scheduledTargetNames =
tuples.map((Tuple<Target, Task, int> tuple) => tuple.second.name!);
expect(scheduledTargetNames, ['Linux A', 'Linux runIf']);
// Tasks triggered by cocoon are marked as in progress
final Iterable<Task> tasks = db.values.values.whereType<Task>();
expect(tasks.singleWhere((Task task) => task.name == 'Linux A').status, Task.statusInProgress);
});
});

Expand Down
2 changes: 2 additions & 0 deletions app_dart/test/src/utilities/entity_generators.dart
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Target generateTarget(
Map<String, String>? properties,
List<String>? runIf,
github.RepositorySlug? slug,
pb.SchedulerSystem? schedulerSystem,
}) {
final pb.SchedulerConfig config = schedulerConfig ?? exampleConfig.config;
if (platformProperties != null) {
Expand All @@ -75,6 +76,7 @@ Target generateTarget(
name: '$platform $i',
properties: properties,
runIf: runIf ?? <String>[],
scheduler: schedulerSystem ?? pb.SchedulerSystem.cocoon,
),
);
}
Expand Down
Loading