From c1bd68721bb96d982aaa7fcfc17a24b11bb4f7eb Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Tue, 18 Nov 2025 14:49:06 +0100 Subject: [PATCH 1/4] Refactor task loop: scanning for package updated timestamps and updating tracking state. --- app/lib/task/backend.dart | 59 ++++-------- app/lib/task/loops/scan_packages_updated.dart | 93 +++++++++++++++++++ 2 files changed, 112 insertions(+), 40 deletions(-) create mode 100644 app/lib/task/loops/scan_packages_updated.dart diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 0e5784976..49cad66f5 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -43,6 +43,7 @@ import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/handlers.dart'; +import 'package:pub_dev/task/loops/scan_packages_updated.dart'; import 'package:pub_dev/task/models.dart' show AbortedTokenInfo, @@ -308,52 +309,30 @@ class TaskBackend { }) async { abort ??= Completer(); - // Map from package to updated that has been seen. - final seen = {}; - - // We will schedule longer overlaps every 6 hours. - var nextLongScan = clock.fromNow(hours: 6); - - // In theory 30 minutes overlap should be enough. In practice we should - // allow an ample room for missed windows, and 3 days seems to be large enough. - var since = clock.ago(days: 3); - while (claim.valid && !abort.isCompleted) { - final sinceParamNow = since; - - if (clock.now().isAfter(nextLongScan)) { - // Next time we'll do a longer scan - since = clock.ago(days: 1); - nextLongScan = clock.fromNow(hours: 6); - } else { - // Next time we'll only consider changes since now - 30 minutes - since = clock.ago(minutes: 30); - } + var state = ScanPackagesUpdatedState.init(); + bool isAbortedFn() => !claim.valid || abort!.isCompleted; + while (!isAbortedFn()) { + final sinceParamNow = state.since; - // Look at all packages that has changed - await for (final p in _db.packages.listUpdatedSince(sinceParamNow)) { - // Abort, if claim is invalid or abort has been resolved! - if (!claim.valid || abort.isCompleted) { - return; - } + final next = await calculateScanPackagesUpdatedLoop( + state, + _db.packages.listUpdatedSince(sinceParamNow), + isAbortedFn, + ); - // Check if the [updated] timestamp has been seen before. - // If so, we skip checking it! - final lastSeen = seen[p.name]; - if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) { - continue; - } - // Remember the updated time for this package, so we don't check it - // again... - seen[p.name] = p.updated; + state = next.state; + for (final p in next.packages) { + if (isAbortedFn()) { + return; + } // Check the package - await trackPackage(p.name, updateDependents: true); + await trackPackage(p, updateDependents: true); } - // Cleanup the [seen] map for anything older than [since], as this won't - // be relevant to the next iteration. - seen.removeWhere((_, updated) => updated.isBefore(since)); - + if (isAbortedFn()) { + return; + } // Wait until aborted or 10 minutes before scanning again! await abort.future.timeoutWithClock( Duration(minutes: 10), diff --git a/app/lib/task/loops/scan_packages_updated.dart b/app/lib/task/loops/scan_packages_updated.dart new file mode 100644 index 000000000..6ad486be3 --- /dev/null +++ b/app/lib/task/loops/scan_packages_updated.dart @@ -0,0 +1,93 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:clock/clock.dart'; + +/// The internal state for deciding which package needs to be updated. +class ScanPackagesUpdatedState { + /// The last time the algorithm checked on a package. + final Map seen; + + final DateTime since; + + final DateTime nextLongScan; + + ScanPackagesUpdatedState({ + required this.seen, + required this.since, + required this.nextLongScan, + }); + + factory ScanPackagesUpdatedState.init() => ScanPackagesUpdatedState( + seen: {}, + // In theory 30 minutes overlap should be enough. In practice we should + // allow an ample room for missed windows, and 3 days seems to be large enough. + since: clock.ago(days: 3), + // We will schedule longer overlaps every 6 hours. + nextLongScan: clock.fromNow(hours: 6), + ); +} + +/// The result of the scan package operation. +class ScanPackagesUpdatedNextState { + /// The next state of the data. + final ScanPackagesUpdatedState state; + + /// The package to update. + final List packages; + + ScanPackagesUpdatedNextState({required this.state, required this.packages}); +} + +/// Calculates the next state of scan packages updated loop by +/// processing the input [stream]. +Future calculateScanPackagesUpdatedLoop( + ScanPackagesUpdatedState state, + Stream<({String name, DateTime updated})> stream, + bool Function() isAbortedFn, +) async { + var since = state.since; + var nextLongScan = state.nextLongScan; + if (clock.now().isAfter(state.nextLongScan)) { + // Next time we'll do a longer scan + since = clock.ago(days: 1); + nextLongScan = clock.fromNow(hours: 6); + } else { + // Next time we'll only consider changes since now - 30 minutes + since = clock.ago(minutes: 30); + } + + final seen = {...state.seen}; + final packages = []; + + await for (final p in stream) { + if (isAbortedFn()) { + break; + } + // Check if the [updated] timestamp has been seen before. + // If so, we skip checking it! + final lastSeen = seen[p.name]; + if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) { + continue; + } + // Remember the updated time for this package, so we don't check it + // again... + seen[p.name] = p.updated; + // Needs to be updated. + packages.add(p.name); + } + + // Cleanup the [seen] map for anything older than [since], as this won't + // be relevant to the next iteration. + seen.removeWhere((_, updated) => updated.isBefore(since)); + + return ScanPackagesUpdatedNextState( + state: ScanPackagesUpdatedState( + seen: seen, + since: since, + nextLongScan: nextLongScan, + ), + packages: packages, + ); +} From a71549b7e5e1c0b15e8327a2d43cf982b61d57ec Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Tue, 18 Nov 2025 17:38:29 +0100 Subject: [PATCH 2/4] nextLongScan -> nextLongOverlap --- app/lib/task/loops/scan_packages_updated.dart | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/app/lib/task/loops/scan_packages_updated.dart b/app/lib/task/loops/scan_packages_updated.dart index 6ad486be3..b14b50f5a 100644 --- a/app/lib/task/loops/scan_packages_updated.dart +++ b/app/lib/task/loops/scan_packages_updated.dart @@ -9,23 +9,29 @@ class ScanPackagesUpdatedState { /// The last time the algorithm checked on a package. final Map seen; + /// The cycle's reference timestamp. final DateTime since; - final DateTime nextLongScan; + /// Most scan cycle will process changes only from a short time period, + /// however, periodically we want to process a longer overlap window. + /// This timestamp indicates the future time when such longer scan should happen. + final DateTime nextLongerOverlap; ScanPackagesUpdatedState({ required this.seen, required this.since, - required this.nextLongScan, + required this.nextLongerOverlap, }); - factory ScanPackagesUpdatedState.init() => ScanPackagesUpdatedState( - seen: {}, + factory ScanPackagesUpdatedState.init({ + Map? seen, + }) => ScanPackagesUpdatedState( + seen: seen ?? {}, // In theory 30 minutes overlap should be enough. In practice we should // allow an ample room for missed windows, and 3 days seems to be large enough. since: clock.ago(days: 3), // We will schedule longer overlaps every 6 hours. - nextLongScan: clock.fromNow(hours: 6), + nextLongerOverlap: clock.fromNow(hours: 6), ); } @@ -48,8 +54,8 @@ Future calculateScanPackagesUpdatedLoop( bool Function() isAbortedFn, ) async { var since = state.since; - var nextLongScan = state.nextLongScan; - if (clock.now().isAfter(state.nextLongScan)) { + var nextLongScan = state.nextLongerOverlap; + if (clock.now().isAfter(state.nextLongerOverlap)) { // Next time we'll do a longer scan since = clock.ago(days: 1); nextLongScan = clock.fromNow(hours: 6); @@ -86,7 +92,7 @@ Future calculateScanPackagesUpdatedLoop( state: ScanPackagesUpdatedState( seen: seen, since: since, - nextLongScan: nextLongScan, + nextLongerOverlap: nextLongScan, ), packages: packages, ); From e67cd3fe349fefbe18b751b0c3bc77f02b4d1836 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Tue, 18 Nov 2025 17:38:50 +0100 Subject: [PATCH 3/4] Adding tests. --- .../loops/scan_packages_updated_test.dart | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 app/test/task/loops/scan_packages_updated_test.dart diff --git a/app/test/task/loops/scan_packages_updated_test.dart b/app/test/task/loops/scan_packages_updated_test.dart new file mode 100644 index 000000000..8de49f1df --- /dev/null +++ b/app/test/task/loops/scan_packages_updated_test.dart @@ -0,0 +1,134 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:clock/clock.dart'; +import 'package:pub_dev/task/loops/scan_packages_updated.dart'; +import 'package:test/test.dart'; + +void main() { + group('task scan: packages updated', () { + test('every new package gets updated', () async { + await withClock(Clock.fixed(DateTime.now()), () async { + final state = ScanPackagesUpdatedState.init(); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.fromIterable([ + (name: 'a', updated: clock.ago(minutes: 3)), + (name: 'b', updated: clock.ago(minutes: 2)), + ]), + () => false, + ); + expect(next.packages, ['a', 'b']); + expect(next.state.seen, { + 'a': clock.ago(minutes: 3), + 'b': clock.ago(minutes: 2), + }); + expect(next.state.since, clock.ago(minutes: 30)); + expect(next.state.nextLongerOverlap, state.nextLongerOverlap); + }); + }); + + test('some packages will be updated', () async { + await withClock(Clock.fixed(DateTime.now()), () async { + final state = ScanPackagesUpdatedState.init( + seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 4)}, + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.fromIterable([ + (name: 'a', updated: clock.ago(minutes: 5)), // same + (name: 'b', updated: clock.ago(minutes: 2)), // updated + (name: 'c', updated: clock.ago(minutes: 1)), // new + ]), + () => false, + ); + expect(next.packages, ['b', 'c']); + expect(next.state.seen, { + 'a': clock.ago(minutes: 5), + 'b': clock.ago(minutes: 2), + 'c': clock.ago(minutes: 1), + }); + }); + }); + + test('some packages are removed from seen', () async { + await withClock(Clock.fixed(DateTime.now()), () async { + final state = ScanPackagesUpdatedState.init( + seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 40)}, + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.empty(), + () => false, + ); + expect(next.packages, isEmpty); + expect(next.state.seen, {'a': clock.ago(minutes: 5)}); + }); + }); + + test('next long scan triggered', () async { + await withClock(Clock.fixed(DateTime.now()), () async { + final state = ScanPackagesUpdatedState( + seen: {}, + since: clock.ago(minutes: 30), + nextLongerOverlap: clock.ago(minutes: 1), + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.empty(), + () => false, + ); + expect(next.packages, isEmpty); + expect(next.state.since, clock.ago(days: 1)); + expect(next.state.nextLongerOverlap, clock.fromNow(hours: 6)); + }); + }); + + test('seen an older timestamp does trigger an update', () async { + await withClock(Clock.fixed(DateTime.now()), () async { + final state = ScanPackagesUpdatedState.init( + seen: {'a': clock.ago(minutes: 5)}, + ); + final next = await calculateScanPackagesUpdatedLoop( + state, + Stream.fromIterable([(name: 'a', updated: clock.ago(minutes: 7))]), + () => false, + ); + expect(next.packages, ['a']); + expect(next.state.seen, {'a': clock.ago(minutes: 7)}); + }); + }); + + test('abort signal stops processing', () async { + await withClock(Clock.fixed(DateTime.now()), () async { + final state = ScanPackagesUpdatedState.init(); + var stopped = false; + + final controller = + StreamController<({String name, DateTime updated})>(); + final nextFuture = calculateScanPackagesUpdatedLoop( + state, + controller.stream, + () => stopped, + ); + + controller.add((name: 'a', updated: clock.ago(minutes: 3))); + controller.add((name: 'b', updated: clock.ago(minutes: 2))); + await Future.delayed(Duration(milliseconds: 200)); + stopped = true; + controller.add((name: 'c', updated: clock.ago(minutes: 1))); + await controller.close(); + + final next = await nextFuture; + expect(next.packages, ['a', 'b']); + expect(next.state.seen, { + 'a': clock.ago(minutes: 3), + 'b': clock.ago(minutes: 2), + }); + }); + }); + }); +} From 2bb7f14b2d2a54e585f1d10459938ccc78f5b839 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Tue, 18 Nov 2025 17:56:35 +0100 Subject: [PATCH 4/4] Making sure test doesn't complain about using DateTime.now --- .../task/loops/scan_packages_updated_test.dart | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/app/test/task/loops/scan_packages_updated_test.dart b/app/test/task/loops/scan_packages_updated_test.dart index 8de49f1df..13099366f 100644 --- a/app/test/task/loops/scan_packages_updated_test.dart +++ b/app/test/task/loops/scan_packages_updated_test.dart @@ -10,8 +10,10 @@ import 'package:test/test.dart'; void main() { group('task scan: packages updated', () { + final referenceNow = clock.now(); + test('every new package gets updated', () async { - await withClock(Clock.fixed(DateTime.now()), () async { + await withClock(Clock.fixed(referenceNow), () async { final state = ScanPackagesUpdatedState.init(); final next = await calculateScanPackagesUpdatedLoop( state, @@ -32,7 +34,7 @@ void main() { }); test('some packages will be updated', () async { - await withClock(Clock.fixed(DateTime.now()), () async { + await withClock(Clock.fixed(referenceNow), () async { final state = ScanPackagesUpdatedState.init( seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 4)}, ); @@ -55,7 +57,7 @@ void main() { }); test('some packages are removed from seen', () async { - await withClock(Clock.fixed(DateTime.now()), () async { + await withClock(Clock.fixed(referenceNow), () async { final state = ScanPackagesUpdatedState.init( seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 40)}, ); @@ -70,7 +72,7 @@ void main() { }); test('next long scan triggered', () async { - await withClock(Clock.fixed(DateTime.now()), () async { + await withClock(Clock.fixed(referenceNow), () async { final state = ScanPackagesUpdatedState( seen: {}, since: clock.ago(minutes: 30), @@ -88,7 +90,7 @@ void main() { }); test('seen an older timestamp does trigger an update', () async { - await withClock(Clock.fixed(DateTime.now()), () async { + await withClock(Clock.fixed(referenceNow), () async { final state = ScanPackagesUpdatedState.init( seen: {'a': clock.ago(minutes: 5)}, ); @@ -103,7 +105,7 @@ void main() { }); test('abort signal stops processing', () async { - await withClock(Clock.fixed(DateTime.now()), () async { + await withClock(Clock.fixed(referenceNow), () async { final state = ScanPackagesUpdatedState.init(); var stopped = false;