Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 19 additions & 40 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import 'package:pub_dev/task/clock_control.dart';
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
import 'package:pub_dev/task/global_lock.dart';
import 'package:pub_dev/task/handlers.dart';
import 'package:pub_dev/task/loops/scan_packages_updated.dart';
import 'package:pub_dev/task/models.dart'
show
AbortedTokenInfo,
Expand Down Expand Up @@ -308,52 +309,30 @@ class TaskBackend {
}) async {
abort ??= Completer<void>();

// Map from package to updated that has been seen.
final seen = <String, DateTime>{};

// We will schedule longer overlaps every 6 hours.
var nextLongScan = clock.fromNow(hours: 6);

// In theory 30 minutes overlap should be enough. In practice we should
// allow an ample room for missed windows, and 3 days seems to be large enough.
var since = clock.ago(days: 3);
while (claim.valid && !abort.isCompleted) {
final sinceParamNow = since;

if (clock.now().isAfter(nextLongScan)) {
// Next time we'll do a longer scan
since = clock.ago(days: 1);
nextLongScan = clock.fromNow(hours: 6);
} else {
// Next time we'll only consider changes since now - 30 minutes
since = clock.ago(minutes: 30);
}
var state = ScanPackagesUpdatedState.init();
bool isAbortedFn() => !claim.valid || abort!.isCompleted;
while (!isAbortedFn()) {
final sinceParamNow = state.since;

// Look at all packages that has changed
await for (final p in _db.packages.listUpdatedSince(sinceParamNow)) {
// Abort, if claim is invalid or abort has been resolved!
if (!claim.valid || abort.isCompleted) {
return;
}
final next = await calculateScanPackagesUpdatedLoop(
state,
_db.packages.listUpdatedSince(sinceParamNow),
isAbortedFn,
);

// Check if the [updated] timestamp has been seen before.
// If so, we skip checking it!
final lastSeen = seen[p.name];
if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) {
continue;
}
// Remember the updated time for this package, so we don't check it
// again...
seen[p.name] = p.updated;
state = next.state;

for (final p in next.packages) {
if (isAbortedFn()) {
return;
}
// Check the package
await trackPackage(p.name, updateDependents: true);
await trackPackage(p, updateDependents: true);
}

// Cleanup the [seen] map for anything older than [since], as this won't
// be relevant to the next iteration.
seen.removeWhere((_, updated) => updated.isBefore(since));

if (isAbortedFn()) {
return;
}
// Wait until aborted or 10 minutes before scanning again!
await abort.future.timeoutWithClock(
Duration(minutes: 10),
Expand Down
99 changes: 99 additions & 0 deletions app/lib/task/loops/scan_packages_updated.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:clock/clock.dart';

/// The internal state for deciding which package needs to be updated.
class ScanPackagesUpdatedState {
/// The last time the algorithm checked on a package.
final Map<String, DateTime> seen;

/// The cycle's reference timestamp.
final DateTime since;

/// Most scan cycle will process changes only from a short time period,
/// however, periodically we want to process a longer overlap window.
/// This timestamp indicates the future time when such longer scan should happen.
final DateTime nextLongerOverlap;

ScanPackagesUpdatedState({
required this.seen,
required this.since,
required this.nextLongerOverlap,
});

factory ScanPackagesUpdatedState.init({
Map<String, DateTime>? seen,
}) => ScanPackagesUpdatedState(
seen: seen ?? {},
// In theory 30 minutes overlap should be enough. In practice we should
// allow an ample room for missed windows, and 3 days seems to be large enough.
since: clock.ago(days: 3),
// We will schedule longer overlaps every 6 hours.
nextLongerOverlap: clock.fromNow(hours: 6),
);
}

/// The result of the scan package operation.
class ScanPackagesUpdatedNextState {
/// The next state of the data.
final ScanPackagesUpdatedState state;

/// The package to update.
final List<String> packages;

ScanPackagesUpdatedNextState({required this.state, required this.packages});
}

/// Calculates the next state of scan packages updated loop by
/// processing the input [stream].
Future<ScanPackagesUpdatedNextState> calculateScanPackagesUpdatedLoop(
ScanPackagesUpdatedState state,
Stream<({String name, DateTime updated})> stream,
bool Function() isAbortedFn,
) async {
var since = state.since;
var nextLongScan = state.nextLongerOverlap;
if (clock.now().isAfter(state.nextLongerOverlap)) {
// Next time we'll do a longer scan
since = clock.ago(days: 1);
nextLongScan = clock.fromNow(hours: 6);
} else {
// Next time we'll only consider changes since now - 30 minutes
since = clock.ago(minutes: 30);
}

final seen = {...state.seen};
final packages = <String>[];

await for (final p in stream) {
if (isAbortedFn()) {
break;
}
// Check if the [updated] timestamp has been seen before.
// If so, we skip checking it!
final lastSeen = seen[p.name];
if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) {
continue;
}
// Remember the updated time for this package, so we don't check it
// again...
seen[p.name] = p.updated;
// Needs to be updated.
packages.add(p.name);
}

// Cleanup the [seen] map for anything older than [since], as this won't
// be relevant to the next iteration.
seen.removeWhere((_, updated) => updated.isBefore(since));

return ScanPackagesUpdatedNextState(
state: ScanPackagesUpdatedState(
seen: seen,
since: since,
nextLongerOverlap: nextLongScan,
),
packages: packages,
);
}
136 changes: 136 additions & 0 deletions app/test/task/loops/scan_packages_updated_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:clock/clock.dart';
import 'package:pub_dev/task/loops/scan_packages_updated.dart';
import 'package:test/test.dart';

void main() {
group('task scan: packages updated', () {
final referenceNow = clock.now();

test('every new package gets updated', () async {
await withClock(Clock.fixed(referenceNow), () async {
final state = ScanPackagesUpdatedState.init();
final next = await calculateScanPackagesUpdatedLoop(
state,
Stream.fromIterable([
(name: 'a', updated: clock.ago(minutes: 3)),
(name: 'b', updated: clock.ago(minutes: 2)),
]),
() => false,
);
expect(next.packages, ['a', 'b']);
expect(next.state.seen, {
'a': clock.ago(minutes: 3),
'b': clock.ago(minutes: 2),
});
expect(next.state.since, clock.ago(minutes: 30));
expect(next.state.nextLongerOverlap, state.nextLongerOverlap);
});
});

test('some packages will be updated', () async {
await withClock(Clock.fixed(referenceNow), () async {
final state = ScanPackagesUpdatedState.init(
seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 4)},
);
final next = await calculateScanPackagesUpdatedLoop(
state,
Stream.fromIterable([
(name: 'a', updated: clock.ago(minutes: 5)), // same
(name: 'b', updated: clock.ago(minutes: 2)), // updated
(name: 'c', updated: clock.ago(minutes: 1)), // new
]),
() => false,
);
expect(next.packages, ['b', 'c']);
expect(next.state.seen, {
'a': clock.ago(minutes: 5),
'b': clock.ago(minutes: 2),
'c': clock.ago(minutes: 1),
});
});
});

test('some packages are removed from seen', () async {
await withClock(Clock.fixed(referenceNow), () async {
final state = ScanPackagesUpdatedState.init(
seen: {'a': clock.ago(minutes: 5), 'b': clock.ago(minutes: 40)},
);
final next = await calculateScanPackagesUpdatedLoop(
state,
Stream.empty(),
() => false,
);
expect(next.packages, isEmpty);
expect(next.state.seen, {'a': clock.ago(minutes: 5)});
});
});

test('next long scan triggered', () async {
await withClock(Clock.fixed(referenceNow), () async {
final state = ScanPackagesUpdatedState(
seen: {},
since: clock.ago(minutes: 30),
nextLongerOverlap: clock.ago(minutes: 1),
);
final next = await calculateScanPackagesUpdatedLoop(
state,
Stream.empty(),
() => false,
);
expect(next.packages, isEmpty);
expect(next.state.since, clock.ago(days: 1));
expect(next.state.nextLongerOverlap, clock.fromNow(hours: 6));
});
});

test('seen an older timestamp does trigger an update', () async {
await withClock(Clock.fixed(referenceNow), () async {
final state = ScanPackagesUpdatedState.init(
seen: {'a': clock.ago(minutes: 5)},
);
final next = await calculateScanPackagesUpdatedLoop(
state,
Stream.fromIterable([(name: 'a', updated: clock.ago(minutes: 7))]),
() => false,
);
expect(next.packages, ['a']);
expect(next.state.seen, {'a': clock.ago(minutes: 7)});
});
});

test('abort signal stops processing', () async {
await withClock(Clock.fixed(referenceNow), () async {
final state = ScanPackagesUpdatedState.init();
var stopped = false;

final controller =
StreamController<({String name, DateTime updated})>();
final nextFuture = calculateScanPackagesUpdatedLoop(
state,
controller.stream,
() => stopped,
);

controller.add((name: 'a', updated: clock.ago(minutes: 3)));
controller.add((name: 'b', updated: clock.ago(minutes: 2)));
await Future.delayed(Duration(milliseconds: 200));
stopped = true;
controller.add((name: 'c', updated: clock.ago(minutes: 1)));
await controller.close();

final next = await nextFuture;
expect(next.packages, ['a', 'b']);
expect(next.state.seen, {
'a': clock.ago(minutes: 3),
'b': clock.ago(minutes: 2),
});
});
});
});
}