diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 0e5784976..49cad66f5 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -43,6 +43,7 @@ import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/handlers.dart'; +import 'package:pub_dev/task/loops/scan_packages_updated.dart'; import 'package:pub_dev/task/models.dart' show AbortedTokenInfo, @@ -308,52 +309,30 @@ class TaskBackend { }) async { abort ??= Completer(); - // Map from package to updated that has been seen. - final seen = {}; - - // We will schedule longer overlaps every 6 hours. - var nextLongScan = clock.fromNow(hours: 6); - - // In theory 30 minutes overlap should be enough. In practice we should - // allow an ample room for missed windows, and 3 days seems to be large enough. - var since = clock.ago(days: 3); - while (claim.valid && !abort.isCompleted) { - final sinceParamNow = since; - - if (clock.now().isAfter(nextLongScan)) { - // Next time we'll do a longer scan - since = clock.ago(days: 1); - nextLongScan = clock.fromNow(hours: 6); - } else { - // Next time we'll only consider changes since now - 30 minutes - since = clock.ago(minutes: 30); - } + var state = ScanPackagesUpdatedState.init(); + bool isAbortedFn() => !claim.valid || abort!.isCompleted; + while (!isAbortedFn()) { + final sinceParamNow = state.since; - // Look at all packages that has changed - await for (final p in _db.packages.listUpdatedSince(sinceParamNow)) { - // Abort, if claim is invalid or abort has been resolved! - if (!claim.valid || abort.isCompleted) { - return; - } + final next = await calculateScanPackagesUpdatedLoop( + state, + _db.packages.listUpdatedSince(sinceParamNow), + isAbortedFn, + ); - // Check if the [updated] timestamp has been seen before. - // If so, we skip checking it! - final lastSeen = seen[p.name]; - if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) { - continue; - } - // Remember the updated time for this package, so we don't check it - // again... - seen[p.name] = p.updated; + state = next.state; + for (final p in next.packages) { + if (isAbortedFn()) { + return; + } // Check the package - await trackPackage(p.name, updateDependents: true); + await trackPackage(p, updateDependents: true); } - // Cleanup the [seen] map for anything older than [since], as this won't - // be relevant to the next iteration. - seen.removeWhere((_, updated) => updated.isBefore(since)); - + if (isAbortedFn()) { + return; + } // Wait until aborted or 10 minutes before scanning again! await abort.future.timeoutWithClock( Duration(minutes: 10), diff --git a/app/lib/task/loops/scan_packages_updated.dart b/app/lib/task/loops/scan_packages_updated.dart new file mode 100644 index 000000000..b14b50f5a --- /dev/null +++ b/app/lib/task/loops/scan_packages_updated.dart @@ -0,0 +1,99 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:clock/clock.dart'; + +/// The internal state for deciding which package needs to be updated. +class ScanPackagesUpdatedState { + /// The last time the algorithm checked on a package. + final Map seen; + + /// The cycle's reference timestamp. + final DateTime since; + + /// Most scan cycle will process changes only from a short time period, + /// however, periodically we want to process a longer overlap window. + /// This timestamp indicates the future time when such longer scan should happen. + final DateTime nextLongerOverlap; + + ScanPackagesUpdatedState({ + required this.seen, + required this.since, + required this.nextLongerOverlap, + }); + + factory ScanPackagesUpdatedState.init({ + Map? seen, + }) => ScanPackagesUpdatedState( + seen: seen ?? {}, + // In theory 30 minutes overlap should be enough. In practice we should + // allow an ample room for missed windows, and 3 days seems to be large enough. + since: clock.ago(days: 3), + // We will schedule longer overlaps every 6 hours. + nextLongerOverlap: clock.fromNow(hours: 6), + ); +} + +/// The result of the scan package operation. +class ScanPackagesUpdatedNextState { + /// The next state of the data. + final ScanPackagesUpdatedState state; + + /// The package to update. + final List packages; + + ScanPackagesUpdatedNextState({required this.state, required this.packages}); +} + +/// Calculates the next state of scan packages updated loop by +/// processing the input [stream]. +Future calculateScanPackagesUpdatedLoop( + ScanPackagesUpdatedState state, + Stream<({String name, DateTime updated})> stream, + bool Function() isAbortedFn, +) async { + var since = state.since; + var nextLongScan = state.nextLongerOverlap; + if (clock.now().isAfter(state.nextLongerOverlap)) { + // Next time we'll do a longer scan + since = clock.ago(days: 1); + nextLongScan = clock.fromNow(hours: 6); + } else { + // Next time we'll only consider changes since now - 30 minutes + since = clock.ago(minutes: 30); + } + + final seen = {...state.seen}; + final packages = []; + + await for (final p in stream) { + if (isAbortedFn()) { + break; + } + // Check if the [updated] timestamp has been seen before. + // If so, we skip checking it! + final lastSeen = seen[p.name]; + if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) { + continue; + } + // Remember the updated time for this package, so we don't check it + // again... + seen[p.name] = p.updated; + // Needs to be updated. + packages.add(p.name); + } + + // Cleanup the [seen] map for anything older than [since], as this won't + // be relevant to the next iteration. + seen.removeWhere((_, updated) => updated.isBefore(since)); + + return ScanPackagesUpdatedNextState( + state: ScanPackagesUpdatedState( + seen: seen, + since: since, + nextLongerOverlap: nextLongScan, + ), + packages: packages, + ); +} diff --git a/app/test/task/loops/scan_packages_updated_test.dart b/app/test/task/loops/scan_packages_updated_test.dart new file mode 100644 index 000000000..13099366f --- /dev/null +++ b/app/test/task/loops/scan_packages_updated_test.dart @@ -0,0 +1,136 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:clock/clock.dart'; +import 'package:pub_dev/task/loops/scan_packages_updated.dart'; +import 'package:test/test.dart'; + +void main() { + group('task scan: packages updated', () { + final referenceNow = clock.now(); + + test('every new package gets updated', () async { + await withClock(Clock.fixed(referenceNow), () async { + final state = ScanPackagesUpdatedState.init(); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.fromIterable([ + (name: 'a', updated: clock.ago(minutes: 3)), + (name: 'b', updated: clock.ago(minutes: 2)), + ]), + () => false, + ); + expect(next.packages, ['a', 'b']); + expect(next.state.seen, { + 'a': clock.ago(minutes: 3), + 'b': clock.ago(minutes: 2), + }); + expect(next.state.since, clock.ago(minutes: 30)); + expect(next.state.nextLongerOverlap, state.nextLongerOverlap); + }); + }); + + test('some packages will be updated', () async { + await withClock(Clock.fixed(referenceNow), () async { + final state = ScanPackagesUpdatedState.init( + seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 4)}, + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.fromIterable([ + (name: 'a', updated: clock.ago(minutes: 5)), // same + (name: 'b', updated: clock.ago(minutes: 2)), // updated + (name: 'c', updated: clock.ago(minutes: 1)), // new + ]), + () => false, + ); + expect(next.packages, ['b', 'c']); + expect(next.state.seen, { + 'a': clock.ago(minutes: 5), + 'b': clock.ago(minutes: 2), + 'c': clock.ago(minutes: 1), + }); + }); + }); + + test('some packages are removed from seen', () async { + await withClock(Clock.fixed(referenceNow), () async { + final state = ScanPackagesUpdatedState.init( + seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 40)}, + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.empty(), + () => false, + ); + expect(next.packages, isEmpty); + expect(next.state.seen, {'a': clock.ago(minutes: 5)}); + }); + }); + + test('next long scan triggered', () async { + await withClock(Clock.fixed(referenceNow), () async { + final state = ScanPackagesUpdatedState( + seen: {}, + since: clock.ago(minutes: 30), + nextLongerOverlap: clock.ago(minutes: 1), + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.empty(), + () => false, + ); + expect(next.packages, isEmpty); + expect(next.state.since, clock.ago(days: 1)); + expect(next.state.nextLongerOverlap, clock.fromNow(hours: 6)); + }); + }); + + test('seen an older timestamp does trigger an update', () async { + await withClock(Clock.fixed(referenceNow), () async { + final state = ScanPackagesUpdatedState.init( + seen: {'a': clock.ago(minutes: 5)}, + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.fromIterable([(name: 'a', updated: clock.ago(minutes: 7))]), + () => false, + ); + expect(next.packages, ['a']); + expect(next.state.seen, {'a': clock.ago(minutes: 7)}); + }); + }); + + test('abort signal stops processing', () async { + await withClock(Clock.fixed(referenceNow), () async { + final state = ScanPackagesUpdatedState.init(); + var stopped = false; + + final controller = + StreamController<({String name, DateTime updated})>(); + final nextFuture = calculateScanPackagesUpdatedLoop( + state, + controller.stream, + () => stopped, + ); + + controller.add((name: 'a', updated: clock.ago(minutes: 3))); + controller.add((name: 'b', updated: clock.ago(minutes: 2))); + await Future.delayed(Duration(milliseconds: 200)); + stopped = true; + controller.add((name: 'c', updated: clock.ago(minutes: 1))); + await controller.close(); + + final next = await nextFuture; + expect(next.packages, ['a', 'b']); + expect(next.state.seen, { + 'a': clock.ago(minutes: 3), + 'b': clock.ago(minutes: 2), + }); + }); + }); + }); +}