From 951e5decb7c4d653d3d423a9f73031f6e96d1b3e Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 28 Nov 2025 10:22:32 +0100 Subject: [PATCH 1/2] Refactor + separately test the instance deletion logic in task scheduling. --- app/lib/task/backend.dart | 23 ++ app/lib/task/loops/delete_instances.dart | 134 ++++++++++++ app/lib/task/scheduler.dart | 74 +++---- .../task/loops/delete_instances_test.dart | 199 ++++++++++++++++++ app/test/task/task_test.dart | 76 ++++--- 5 files changed, 435 insertions(+), 71 deletions(-) create mode 100644 app/lib/task/loops/delete_instances.dart create mode 100644 app/test/task/loops/delete_instances_test.dart diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 49cad66f5b..a2627e4a74 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -17,6 +17,7 @@ import 'package:gcloud/storage.dart' show Bucket; import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError; import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange; import 'package:logging/logging.dart' show Logger; +import 'package:meta/meta.dart'; import 'package:pana/models.dart' show Summary; import 'package:pool/pool.dart' show Pool; import 'package:pub_dev/package/api_export/api_exporter.dart'; @@ -84,6 +85,20 @@ void registerTaskBackend(TaskBackend backend) => /// The active task backend service. TaskBackend get taskBackend => ss.lookup(#_taskBackend) as TaskBackend; +/// Describes an event that happened inside the task backend (e.g. scheduling). +class TaskEvent { + final DateTime timestamp; + final String kind; + final Map parameters; + + TaskEvent(this.kind, this.parameters) : timestamp = clock.now(); + + @override + String toString() { + return '$timestamp $kind $parameters'; + } +} + class TaskBackend { final DatastoreDB _db; final Bucket _bucket; @@ -99,8 +114,14 @@ class TaskBackend { /// `null` when not started yet. Completer? _stopped; + /// Event log for test verification. + final _events = StreamController.broadcast(); + TaskBackend(this._db, this._bucket); + @visibleForTesting + Stream get events => _events.stream; + /// Start continuous background processes for scheduling of tasks. /// /// Calling [start] without first calling [stop] is an error. @@ -175,6 +196,7 @@ class TaskBackend { taskWorkerCloudCompute, _db, abort: aborted, + eventSink: _events.sink, ); }, abort: aborted); } catch (e, st) { @@ -218,6 +240,7 @@ class TaskBackend { aborted.complete(); } await _stopped!.future; + await _events.close(); _aborted = null; _stopped = null; } diff --git a/app/lib/task/loops/delete_instances.dart b/app/lib/task/loops/delete_instances.dart new file mode 100644 index 0000000000..392ff7197f --- /dev/null +++ b/app/lib/task/loops/delete_instances.dart @@ -0,0 +1,134 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:basics/basics.dart'; +import 'package:clock/clock.dart'; +import 'package:logging/logging.dart'; +import 'package:pub_dev/shared/configuration.dart'; +import 'package:pub_dev/task/clock_control.dart'; +import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; + +final _log = Logger('pub.task.scan_instances'); + +/// The internal state for scanning and deleting instances. +class DeleteInstancesState { + // Maps the `CloudInstance.instanceName` to the deletion + // progress after we started to delete the instance. + final Map deletions; + + DeleteInstancesState({required this.deletions}); + + factory DeleteInstancesState.init() => DeleteInstancesState(deletions: {}); + + // An arbitrary wait to attempt an async cleanup for our tests, since otherwise we + // keep the progress entry for 5 minutes, and ignore the completer status. + Future waitSomeSeconds(int seconds) async { + final futures = deletions.values + .map((v) => v.completer) + .where((c) => !c.isCompleted) + .map( + (c) => c.future + .timeoutWithClock(Duration(seconds: seconds)) + .whenComplete(() {}), + ) + .toList(); + if (futures.isEmpty) { + return; + } + await Future.wait(futures); + } +} + +/// The result of the scan and delete instances operation. +class DeleteInstancesNextState { + /// The next state of the data. + final DeleteInstancesState state; + + /// The number of currently running instances. + final int instances; + + DeleteInstancesNextState({required this.state, required this.instances}); +} + +/// Tracks the latest instance-specific deletion's start timestamp +/// and progress [completer]. +class DeletionInProgress { + final DateTime started; + final Completer completer; + + DeletionInProgress({required this.started, required this.completer}); + + // Wait at-least 5 minutes from start of deletion until we remove the + // progress tracking, to give the Cloud API some time to reconcile the state. + bool isRecent() => started.isAfter(clock.ago(minutes: 5)); +} + +/// Calculates the next state of delete instances loop by processing +/// the input [stream]. +Future scanAndDeleteInstances( + DeleteInstancesState state, + Stream stream, + Future Function(String zone, String instanceName) deleteInstanceFn, + bool Function() isAbortedFn, { + required int maxTaskRunHours, +}) async { + var instances = 0; + final deletionInProgress = { + ...state.deletions.whereValue((v) => v.isRecent()), + }; + + await for (final instance in stream) { + if (isAbortedFn()) { + break; + } + instances++; + + // Prevent multiple calls to delete the same instance. + if (deletionInProgress.containsKey(instance.instanceName)) { + continue; + } + + // If terminated or older than maxInstanceAge, delete the instance... + final isTerminated = instance.state == InstanceState.terminated; + final isTooOld = instance.created + .add(Duration(hours: maxTaskRunHours)) + .isBefore(clock.now()); + + if (isTooOld) { + // This indicates that something is wrong the with the instance, + // ideally it should have detected its own deadline being violated + // and terminated on its own. Of course, this can fail for arbitrary + // reasons in a distributed system. + _log.warning('terminating $instance for being too old!'); + } else if (isTerminated) { + _log.info('deleting $instance as it has terminated.'); + } else { + // Do not delete this instance + continue; + } + + final completer = Completer(); + deletionInProgress[instance.instanceName] = DeletionInProgress( + started: clock.now(), + completer: completer, + ); + + scheduleMicrotask(() async { + try { + await deleteInstanceFn(instance.zone, instance.instanceName); + } catch (e, st) { + _log.severe('Failed to delete $instance', e, st); + } finally { + completer.complete(); + } + }); + } + + return DeleteInstancesNextState( + state: DeleteInstancesState(deletions: deletionInProgress), + instances: instances, + ); +} diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index e69e87ad7e..9e6c1714d8 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -14,6 +14,7 @@ import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; +import 'package:pub_dev/task/loops/delete_instances.dart'; import 'package:pub_dev/task/models.dart'; final _log = Logger('pub.task.schedule'); @@ -27,6 +28,7 @@ Future schedule( CloudCompute compute, DatastoreDB db, { required Completer abort, + required Sink eventSink, }) async { /// Sleep [delay] time [since] timestamp, or now if not given. Future sleepOrAborted(Duration delay, {DateTime? since}) async { @@ -60,60 +62,37 @@ Future schedule( } } - // Set of `CloudInstance.instanceName`s currently being deleted. - // This to avoid deleting instances where the deletion process is still - // running. - final deletionInProgress = {}; + var deleteInstancesState = DeleteInstancesState.init(); // Create a fast RNG with random seed for picking zones. final rng = Random(Random.secure().nextInt(2 << 31)); + bool isAbortedFn() => !claim.valid || abort.isCompleted; + // Run scheduling iterations, so long as we have a valid claim - while (claim.valid && !abort.isCompleted) { + while (!isAbortedFn()) { final iterationStart = clock.now(); _log.info('Starting scheduling cycle'); - // Count number of instances, and delete old instances - var instances = 0; - await for (final instance in compute.listInstances()) { - instances += 1; // count the instance - - // If terminated or older than maxInstanceAge, delete the instance... - final isTerminated = instance.state == InstanceState.terminated; - final isTooOld = instance.created - .add(Duration(hours: activeConfiguration.maxTaskRunHours)) - .isBefore(clock.now()); - // Also check deletionInProgress to prevent multiple calls to delete the - // same instance - final isBeingDeleted = deletionInProgress.contains(instance.instanceName); - if ((isTerminated || isTooOld) && !isBeingDeleted) { - if (isTooOld) { - // This indicates that something is wrong the with the instance, - // ideally it should have detected its own deadline being violated - // and terminated on its own. Of course, this can fail for arbitrary - // reasons in a distributed system. - _log.warning('terminating $instance for being too old!'); - } else if (isTerminated) { - _log.info('deleting $instance as it has terminated.'); - } - - deletionInProgress.add(instance.instanceName); - scheduleMicrotask(() async { - final deletionStart = clock.now(); - try { - await compute.delete(instance.zone, instance.instanceName); - } catch (e, st) { - _log.severe('Failed to delete $instance', e, st); - } finally { - // Wait at-least 5 minutes from start of deletion until we remove - // it from [deletionInProgress] that way we give the API some time - // reconcile state. - await sleepOrAborted(Duration(minutes: 5), since: deletionStart); - deletionInProgress.remove(instance.instanceName); - } - }); - } + final nextDeleteInstancesState = await scanAndDeleteInstances( + deleteInstancesState, + compute.listInstances(), + (zone, name) async { + await compute.delete(zone, name); + eventSink.add( + TaskEvent('delete-instance', {'name': name, 'zone': zone}), + ); + }, + isAbortedFn, + maxTaskRunHours: activeConfiguration.maxTaskInstances, + ); + deleteInstancesState = nextDeleteInstancesState.state; + if (isAbortedFn()) { + break; } + await deleteInstancesState.waitSomeSeconds(5); + + final instances = nextDeleteInstancesState.instances; _log.info('Found $instances instances'); // If we are not allowed to create new instances within the allowed quota, @@ -188,6 +167,9 @@ Future schedule( arguments: [json.encode(payload)], description: description, ); + eventSink.add( + TaskEvent('create-instance', {'name': instanceName, 'zone': zone}), + ); rollbackPackageState = false; } on ZoneExhaustedException catch (e, st) { // A zone being exhausted is normal operations, we just use another @@ -267,6 +249,8 @@ Future schedule( // If we are waiting for quota, then we sleep a minute before checking again await sleepOrAborted(Duration(minutes: 1), since: iterationStart); } + + await deleteInstancesState.waitSomeSeconds(5); } /// Updates the package state with versions that are already pending or diff --git a/app/test/task/loops/delete_instances_test.dart b/app/test/task/loops/delete_instances_test.dart new file mode 100644 index 0000000000..0f873e12f4 --- /dev/null +++ b/app/test/task/loops/delete_instances_test.dart @@ -0,0 +1,199 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:clock/clock.dart'; +import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; +import 'package:pub_dev/task/loops/delete_instances.dart'; +import 'package:test/test.dart'; + +void main() { + group('task scan: delete cloud instances', () { + final referenceNow = clock.now(); + + test('fresh instance is not deleted', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState.init(), + Stream.fromIterable([ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 18)), + ), + ]), + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.instances, 1); + expect(next.state.deletions, isEmpty); + expect(deletions, {}); + }); + }); + + test('old instance is deleted', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState.init(), + Stream.fromIterable([ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 78)), + ), + ]), + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.instances, 1); + expect(next.state.deletions, hasLength(1)); + expect(next.state.deletions.containsKey('a'), isTrue); + + // Wait for the async deletion to complete + await next.state.deletions['a']!.completer.future; + expect(deletions, {'a': 'z1'}); + }); + }); + + test('terminated instance is deleted', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState.init(), + Stream.fromIterable([ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 18)), + state: InstanceState.terminated, + ), + ]), + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.instances, 1); + expect(next.state.deletions, hasLength(1)); + expect(next.state.deletions.containsKey('a'), isTrue); + + // Wait for the async deletion to complete + await next.state.deletions['a']!.completer.future; + expect(deletions, {'a': 'z1'}); + }); + }); + + test('pending delete is kept within 5 minutes', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState( + deletions: { + 'a': DeletionInProgress( + completer: Completer(), + started: clock.ago(minutes: 3), + ), + }, + ), + Stream.fromIterable([ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 78)), + ), + ]), + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.instances, 1); + expect(next.state.deletions, hasLength(1)); + next.state.deletions['a']!.started.isBefore(clock.ago(minutes: 2)); + expect(deletions, {}); + }); + }); + + test('pending delete is removed after 5 minutes', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState( + deletions: { + 'a': DeletionInProgress( + completer: Completer(), + started: clock.ago(minutes: 8), + ), + }, + ), + Stream.fromIterable([ + _CloudInstance(created: clock.now(), instanceName: 'b'), + ]), + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.instances, 1); + expect(next.state.deletions, isEmpty); + expect(deletions, {}); + }); + }); + + test('pending delete is refreshed after 5 minutes', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState( + deletions: { + 'a': DeletionInProgress( + completer: Completer(), + started: clock.ago(minutes: 8), + ), + }, + ), + Stream.fromIterable([ + _CloudInstance(created: clock.ago(minutes: 78), instanceName: 'a'), + ]), + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.instances, 1); + expect(next.state.deletions, hasLength(1)); + next.state.deletions['a']!.started.isAfter(clock.ago(minutes: 2)); + await next.state.deletions['a']!.completer.future; + expect(deletions, {'a': 'z1'}); + }); + }); + }); +} + +class _CloudInstance implements CloudInstance { + @override + final DateTime created; + @override + final String instanceName; + @override + final InstanceState state; + @override + final String zone; + + _CloudInstance({ + required this.created, + required this.instanceName, + this.state = InstanceState.running, + this.zone = 'z1', + }); +} diff --git a/app/test/task/task_test.dart b/app/test/task/task_test.dart index 90e3469ee6..ba5b6b90ab 100644 --- a/app/test/task/task_test.dart +++ b/app/test/task/task_test.dart @@ -186,45 +186,44 @@ void main() { await taskBackend.backfillTrackingState(); await clockControl.elapse(minutes: 1); + final collector = _TaskEventCollector(); await taskBackend.start(); // We are going to let the task timeout, if this happens we should only // try to scheduled it until we hit the [taskRetryLimit]. - for (var i = 0; i < taskRetryLimit; i++) { - // Within 24 hours an instance should be created - await clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, - timeout: Duration(days: 1), - ); - - // If nothing happens, then it should be killed within 24 hours. - // Actually, it'll happen much sooner, like ~2 hours, but we'll leave the - // test some wiggle room. - await clockControl.elapseUntil( - () => cloud.listInstances().isEmpty, - timeout: Duration(days: 1), - ); - } + await clockControl.elapse(hours: 36); + expect(collector.createdCount, taskRetryLimit); + expect(collector.deletedCount, taskRetryLimit); + expect( + collector.events.map( + (e) => (e.timestamp.difference(collector.startTime).inHours, e.kind), + ), + [ + (0, 'create-instance'), + (3, 'create-instance'), + (10, 'delete-instance'), + (13, 'delete-instance'), + (15, 'create-instance'), + (25, 'delete-instance'), + ], + ); // Once we've exceeded the [taskRetryLimit], we shouldn't see any instances // created for the next day... assert(taskRetriggerInterval > Duration(days: 1)); - await expectLater( - clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, - timeout: Duration(days: 1), - ), - throwsA(isA()), - ); + await clockControl.elapse(days: 1); + expect(collector.createdCount, taskRetryLimit); + expect(collector.deletedCount, taskRetryLimit); // But the task should be retried after [taskRetriggerInterval], this is a // long time, but for sanity we do re-analyze everything occasionally. - await clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, - timeout: taskRetriggerInterval + Duration(days: 1), - ); + await clockControl.elapseTime(taskRetriggerInterval); + await clockControl.elapse(days: 1); + expect(collector.createdCount, taskRetryLimit + 1); + expect(collector.deletedCount, taskRetryLimit + 1); await taskBackend.stop(); + await collector.close(); await clockControl.elapse(minutes: 10); }, @@ -842,3 +841,28 @@ Future upload( // Unhandled response code -> retry fail('Unhandled HTTP status = ${res.statusCode}, body: ${res.body}'); } + +class _TaskEventCollector { + final DateTime startTime; + final List events; + late final StreamSubscription _subscription; + + _TaskEventCollector._(this.startTime, this.events, this._subscription); + + factory _TaskEventCollector() { + final events = []; + return _TaskEventCollector._( + clock.now(), + events, + taskBackend.events.listen(events.add), + ); + } + + int get createdCount => + events.where((e) => e.kind == 'create-instance').length; + + int get deletedCount => + events.where((e) => e.kind == 'delete-instance').length; + + Future close() => _subscription.cancel(); +} From c50634376440928103ba56f18d5b519711b6a10b Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 28 Nov 2025 14:15:38 +0100 Subject: [PATCH 2/2] fix lints --- app/lib/task/loops/delete_instances.dart | 1 - app/test/task/loops/delete_instances_test.dart | 3 +-- app/test/task/task_test.dart | 6 ------ 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/app/lib/task/loops/delete_instances.dart b/app/lib/task/loops/delete_instances.dart index 392ff7197f..b0aa713d4c 100644 --- a/app/lib/task/loops/delete_instances.dart +++ b/app/lib/task/loops/delete_instances.dart @@ -7,7 +7,6 @@ import 'dart:async'; import 'package:basics/basics.dart'; import 'package:clock/clock.dart'; import 'package:logging/logging.dart'; -import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; diff --git a/app/test/task/loops/delete_instances_test.dart b/app/test/task/loops/delete_instances_test.dart index 0f873e12f4..235ba03ebc 100644 --- a/app/test/task/loops/delete_instances_test.dart +++ b/app/test/task/loops/delete_instances_test.dart @@ -188,12 +188,11 @@ class _CloudInstance implements CloudInstance { @override final InstanceState state; @override - final String zone; + final String zone = 'z1'; _CloudInstance({ required this.created, required this.instanceName, this.state = InstanceState.running, - this.zone = 'z1', }); } diff --git a/app/test/task/task_test.dart b/app/test/task/task_test.dart index ba5b6b90ab..3f0aa68bdc 100644 --- a/app/test/task/task_test.dart +++ b/app/test/task/task_test.dart @@ -798,12 +798,6 @@ void main() { ); } -extension on Stream { - Future get isNotEmpty async { - return !await this.isEmpty; - } -} - Future upload( http.Client client, UploadInfo destination,