Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'package:gcloud/storage.dart' show Bucket;
import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError;
import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange;
import 'package:logging/logging.dart' show Logger;
import 'package:meta/meta.dart';
import 'package:pana/models.dart' show Summary;
import 'package:pool/pool.dart' show Pool;
import 'package:pub_dev/package/api_export/api_exporter.dart';
Expand Down Expand Up @@ -84,6 +85,20 @@ void registerTaskBackend(TaskBackend backend) =>
/// The active task backend service.
TaskBackend get taskBackend => ss.lookup(#_taskBackend) as TaskBackend;

/// Describes an event that happened inside the task backend (e.g. scheduling).
class TaskEvent {
final DateTime timestamp;
final String kind;
final Map<String, dynamic> parameters;

TaskEvent(this.kind, this.parameters) : timestamp = clock.now();

@override
String toString() {
return '$timestamp $kind $parameters';
}
}

class TaskBackend {
final DatastoreDB _db;
final Bucket _bucket;
Expand All @@ -99,8 +114,14 @@ class TaskBackend {
/// `null` when not started yet.
Completer<void>? _stopped;

/// Event log for test verification.
final _events = StreamController<TaskEvent>.broadcast();

TaskBackend(this._db, this._bucket);

@visibleForTesting
Stream<TaskEvent> get events => _events.stream;

/// Start continuous background processes for scheduling of tasks.
///
/// Calling [start] without first calling [stop] is an error.
Expand Down Expand Up @@ -175,6 +196,7 @@ class TaskBackend {
taskWorkerCloudCompute,
_db,
abort: aborted,
eventSink: _events.sink,
);
}, abort: aborted);
} catch (e, st) {
Expand Down Expand Up @@ -218,6 +240,7 @@ class TaskBackend {
aborted.complete();
}
await _stopped!.future;
await _events.close();
_aborted = null;
_stopped = null;
}
Expand Down
133 changes: 133 additions & 0 deletions app/lib/task/loops/delete_instances.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:basics/basics.dart';
import 'package:clock/clock.dart';
import 'package:logging/logging.dart';
import 'package:pub_dev/task/clock_control.dart';
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';

final _log = Logger('pub.task.scan_instances');

/// The internal state for scanning and deleting instances.
class DeleteInstancesState {
// Maps the `CloudInstance.instanceName` to the deletion
// progress after we started to delete the instance.
final Map<String, DeletionInProgress> deletions;

DeleteInstancesState({required this.deletions});

factory DeleteInstancesState.init() => DeleteInstancesState(deletions: {});

// An arbitrary wait to attempt an async cleanup for our tests, since otherwise we
// keep the progress entry for 5 minutes, and ignore the completer status.
Future<void> waitSomeSeconds(int seconds) async {
final futures = deletions.values
.map((v) => v.completer)
.where((c) => !c.isCompleted)
.map(
(c) => c.future
.timeoutWithClock(Duration(seconds: seconds))
.whenComplete(() {}),
)
.toList();
if (futures.isEmpty) {
return;
}
await Future.wait(futures);
}
}

/// The result of the scan and delete instances operation.
class DeleteInstancesNextState {
/// The next state of the data.
final DeleteInstancesState state;

/// The number of currently running instances.
final int instances;

DeleteInstancesNextState({required this.state, required this.instances});
}

/// Tracks the latest instance-specific deletion's start timestamp
/// and progress [completer].
class DeletionInProgress {
final DateTime started;
final Completer completer;

DeletionInProgress({required this.started, required this.completer});

// Wait at-least 5 minutes from start of deletion until we remove the
// progress tracking, to give the Cloud API some time to reconcile the state.
bool isRecent() => started.isAfter(clock.ago(minutes: 5));
}

/// Calculates the next state of delete instances loop by processing
/// the input [stream].
Future<DeleteInstancesNextState> scanAndDeleteInstances(
DeleteInstancesState state,
Stream<CloudInstance> stream,
Future<void> Function(String zone, String instanceName) deleteInstanceFn,
bool Function() isAbortedFn, {
required int maxTaskRunHours,
}) async {
var instances = 0;
final deletionInProgress = {
...state.deletions.whereValue((v) => v.isRecent()),
};

await for (final instance in stream) {
if (isAbortedFn()) {
break;
}
instances++;

// Prevent multiple calls to delete the same instance.
if (deletionInProgress.containsKey(instance.instanceName)) {
continue;
}

// If terminated or older than maxInstanceAge, delete the instance...
final isTerminated = instance.state == InstanceState.terminated;
final isTooOld = instance.created
.add(Duration(hours: maxTaskRunHours))
.isBefore(clock.now());

if (isTooOld) {
// This indicates that something is wrong the with the instance,
// ideally it should have detected its own deadline being violated
// and terminated on its own. Of course, this can fail for arbitrary
// reasons in a distributed system.
_log.warning('terminating $instance for being too old!');
} else if (isTerminated) {
_log.info('deleting $instance as it has terminated.');
} else {
// Do not delete this instance
continue;
}

final completer = Completer();
deletionInProgress[instance.instanceName] = DeletionInProgress(
started: clock.now(),
completer: completer,
);

scheduleMicrotask(() async {
try {
await deleteInstanceFn(instance.zone, instance.instanceName);
} catch (e, st) {
_log.severe('Failed to delete $instance', e, st);
} finally {
completer.complete();
}
});
}

return DeleteInstancesNextState(
state: DeleteInstancesState(deletions: deletionInProgress),
instances: instances,
);
}
74 changes: 29 additions & 45 deletions app/lib/task/scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import 'package:pub_dev/task/backend.dart';
import 'package:pub_dev/task/clock_control.dart';
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
import 'package:pub_dev/task/global_lock.dart';
import 'package:pub_dev/task/loops/delete_instances.dart';
import 'package:pub_dev/task/models.dart';

final _log = Logger('pub.task.schedule');
Expand All @@ -27,6 +28,7 @@ Future<void> schedule(
CloudCompute compute,
DatastoreDB db, {
required Completer<void> abort,
required Sink<TaskEvent> eventSink,
}) async {
/// Sleep [delay] time [since] timestamp, or now if not given.
Future<void> sleepOrAborted(Duration delay, {DateTime? since}) async {
Expand Down Expand Up @@ -60,60 +62,37 @@ Future<void> schedule(
}
}

// Set of `CloudInstance.instanceName`s currently being deleted.
// This to avoid deleting instances where the deletion process is still
// running.
final deletionInProgress = <String>{};
var deleteInstancesState = DeleteInstancesState.init();

// Create a fast RNG with random seed for picking zones.
final rng = Random(Random.secure().nextInt(2 << 31));

bool isAbortedFn() => !claim.valid || abort.isCompleted;

// Run scheduling iterations, so long as we have a valid claim
while (claim.valid && !abort.isCompleted) {
while (!isAbortedFn()) {
final iterationStart = clock.now();
_log.info('Starting scheduling cycle');

// Count number of instances, and delete old instances
var instances = 0;
await for (final instance in compute.listInstances()) {
instances += 1; // count the instance

// If terminated or older than maxInstanceAge, delete the instance...
final isTerminated = instance.state == InstanceState.terminated;
final isTooOld = instance.created
.add(Duration(hours: activeConfiguration.maxTaskRunHours))
.isBefore(clock.now());
// Also check deletionInProgress to prevent multiple calls to delete the
// same instance
final isBeingDeleted = deletionInProgress.contains(instance.instanceName);
if ((isTerminated || isTooOld) && !isBeingDeleted) {
if (isTooOld) {
// This indicates that something is wrong the with the instance,
// ideally it should have detected its own deadline being violated
// and terminated on its own. Of course, this can fail for arbitrary
// reasons in a distributed system.
_log.warning('terminating $instance for being too old!');
} else if (isTerminated) {
_log.info('deleting $instance as it has terminated.');
}

deletionInProgress.add(instance.instanceName);
scheduleMicrotask(() async {
final deletionStart = clock.now();
try {
await compute.delete(instance.zone, instance.instanceName);
} catch (e, st) {
_log.severe('Failed to delete $instance', e, st);
} finally {
// Wait at-least 5 minutes from start of deletion until we remove
// it from [deletionInProgress] that way we give the API some time
// reconcile state.
await sleepOrAborted(Duration(minutes: 5), since: deletionStart);
deletionInProgress.remove(instance.instanceName);
}
});
}
final nextDeleteInstancesState = await scanAndDeleteInstances(
deleteInstancesState,
compute.listInstances(),
(zone, name) async {
await compute.delete(zone, name);
eventSink.add(
TaskEvent('delete-instance', {'name': name, 'zone': zone}),
);
},
isAbortedFn,
maxTaskRunHours: activeConfiguration.maxTaskInstances,
);
deleteInstancesState = nextDeleteInstancesState.state;
if (isAbortedFn()) {
break;
}
await deleteInstancesState.waitSomeSeconds(5);

final instances = nextDeleteInstancesState.instances;
_log.info('Found $instances instances');

// If we are not allowed to create new instances within the allowed quota,
Expand Down Expand Up @@ -188,6 +167,9 @@ Future<void> schedule(
arguments: [json.encode(payload)],
description: description,
);
eventSink.add(
TaskEvent('create-instance', {'name': instanceName, 'zone': zone}),
);
rollbackPackageState = false;
} on ZoneExhaustedException catch (e, st) {
// A zone being exhausted is normal operations, we just use another
Expand Down Expand Up @@ -267,6 +249,8 @@ Future<void> schedule(
// If we are waiting for quota, then we sleep a minute before checking again
await sleepOrAborted(Duration(minutes: 1), since: iterationStart);
}

await deleteInstancesState.waitSomeSeconds(5);
}

/// Updates the package state with versions that are already pending or
Expand Down
Loading