diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 4700383ff..0e5784976 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -50,6 +50,7 @@ import 'package:pub_dev/task/models.dart' PackageStateInfo, PackageVersionStateInfo, PackageVersionStatus, + derivePendingAt, initialTimestamp, maxTaskExecutionTime; import 'package:pub_dev/task/scheduler.dart'; @@ -394,21 +395,25 @@ class TaskBackend { if (state == null) { // Create [PackageState] entity to track the package _log.info('Started state tracking for $packageName'); + final versionsMap = { + for (final version in versions) + version: PackageVersionStateInfo( + scheduled: initialTimestamp, + attempts: 0, + ), + }; await tx.tasks.insert( PackageState() ..setId(runtimeVersion, packageName) ..runtimeVersion = runtimeVersion - ..versions = { - for (final version in versions) - version: PackageVersionStateInfo( - scheduled: initialTimestamp, - attempts: 0, - ), - } + ..versions = versionsMap ..dependencies = [] ..lastDependencyChanged = initialTimestamp ..finished = initialTimestamp - ..derivePendingAt(), + ..pendingAt = derivePendingAt( + versions: versionsMap, + lastDependencyChanged: initialTimestamp, + ), ); return true; // no more work for this package, state is synced } @@ -464,7 +469,10 @@ class TaskBackend { attempts: 0, ), }); - state.derivePendingAt(); + state.pendingAt = derivePendingAt( + versions: state.versions!, + lastDependencyChanged: state.lastDependencyChanged!, + ); _log.info('Update state tracking for $packageName'); await tx.tasks.update(state); @@ -726,7 +734,10 @@ class TaskBackend { // Ensure that we update [state.pendingAt], otherwise it might be // re-scheduled way too soon. - state.derivePendingAt(); + state.pendingAt = derivePendingAt( + versions: state.versions!, + lastDependencyChanged: state.lastDependencyChanged!, + ); state.finished = clock.now().toUtc(); await tx.tasks.update(state); @@ -1407,7 +1418,10 @@ final class _TaskDataAccess { tx.insert( s ..lastDependencyChanged = publishedAt - ..derivePendingAt(), + ..pendingAt = derivePendingAt( + versions: s.versions!, + lastDependencyChanged: publishedAt, + ), ); return true; } @@ -1442,7 +1456,10 @@ final class _TaskDataAccess { .where((e) => e.value.instance == instanceName) .map((e) => MapEntry(e.key, previousVersionsMap[e.key]!)), ); - s.derivePendingAt(); + s.pendingAt = derivePendingAt( + versions: s.versions!, + lastDependencyChanged: s.lastDependencyChanged!, + ); await tx.tasks.update(s); }); } diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index 23f32d9e7..1ccd3bba4 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -133,95 +133,6 @@ class PackageState extends db.ExpandoModel { @db.DateTimeProperty(required: true, indexed: true) DateTime finished = initialTimestamp; - /// Derive [pendingAt] using [versions] and [lastDependencyChanged]. - /// - /// When updating PackageState the pendingAt property is set to the minimum of: - /// * `scheduled + 31 days` for any version, - /// * `scheduled + 24 hours` for any version where `dependencyChanged > scheduled` - /// * `scheduled + 3 hours * attempts^2` for any version where `attempts > 0 && attempts < 3`. - void derivePendingAt() { - final versionStates = versions!.values; - pendingAt = [ - // scheduled + 31 days - ...versionStates.map((v) => v.scheduled.add(taskRetriggerInterval)), - // scheduled + 24 hours, where scheduled < lastDependencyChanged - ...versionStates - .where((v) => v.scheduled.isBefore(lastDependencyChanged!)) - .map((v) => v.scheduled.add(taskDependencyRetriggerCoolOff)), - // scheduled + 3 hours * attempts^2, where attempts > 0 && attempts < 3 - ...versionStates - .where((v) => v.attempts > 0 && v.attempts < taskRetryLimit) - .map((v) => v.scheduled.add(taskRetryDelay(v.attempts))), - // Pick the minimum of the candidates, default scheduling in year 3k - // if there is no date before that. - ].fold(DateTime(3000), (a, b) => a!.isBefore(b) ? a : b); - } - - /// Return a list of pending versions for this package. - /// - /// When scheduling analysis of a package we piggyback along versions that - /// are going to be pending soon too. Hence, we return a version if: - /// * `now - scheduled > 21 days`, - /// * `lastDependencyChanged > scheduled`, or, - /// * `attempts > 0 && attempts < 3 && now - scheduled > 3 hours * attempts^2` - List pendingVersions({DateTime? at}) { - final at_ = at ?? clock.now(); - Duration timeSince(DateTime past) => at_.difference(past); - - final list = versions!.entries - .where( - // NOTE: Any changes here must be reflected in [derivePendingAt] - (e) => - // If scheduled more than 21 days ago - timeSince(e.value.scheduled) > minTaskRetriggerInterval || - // If a dependency has changed since it was last scheduled - lastDependencyChanged!.isAfter(e.value.scheduled) || - // If: - // - attempts > 0 (analysis is not done, and has been started) - // - no more than 3 attempts have been done, - // - now - scheduled > 3 hours * attempts^2 - (e.value.attempts > 0 && - e.value.attempts < taskRetryLimit && - timeSince(e.value.scheduled) > - taskRetryDelay(e.value.attempts)), - ) - .map((e) => e.key) - .map(Version.parse) - .toList(); - - // Prioritize stable versions first, prereleases after them (in decreasing order), e.g. - // - 2.5.0 - // - 2.4.0 - // - 2.0.0 - // - 1.2.0 - // - 3.0.0-dev2 - // - 3.0.0-dev1 - // - 2.7.0-beta - // - 1.0.0-dev - list.sort((a, b) => compareSemanticVersionsDesc(a, b, true, true)); - - // Promote the first prerelease version to the second position, e.g. - // - 2.5.0 - // - 3.0.0-dev2 - // - 2.4.0 - // - 2.0.0 - // - 1.2.0 - // - 3.0.0-dev1 - // - 2.7.0-beta - // - 1.0.0-dev - // - // (applicable only when the second position is a stable version) - if (list.length > 2 && !list[1].isPreRelease) { - final firstPrereleaseIndex = list.indexWhere((v) => v.isPreRelease); - if (firstPrereleaseIndex > 1) { - final v = list.removeAt(firstPrereleaseIndex); - list.insert(1, v); - } - } - - return list.map((s) => s.toString()).toList(); - } - /// Returns true if the current [PackageState] instance is new, no version analysis /// has not completed yet (with neither success nor failure). bool get hasNeverFinished => finished == initialTimestamp; @@ -242,6 +153,101 @@ class PackageState extends db.ExpandoModel { '\n)'; } +/// Derive the `pendingAt` field using [versions] and [lastDependencyChanged]. +/// +/// When updating PackageState the pendingAt property is set to the minimum of: +/// * `scheduled + 31 days` for any version, +/// * `scheduled + 24 hours` for any version where `dependencyChanged > scheduled` +/// * `scheduled + 3 hours * attempts^2` for any version where `attempts > 0 && attempts < 3`. +DateTime derivePendingAt({ + required Map versions, + required DateTime lastDependencyChanged, +}) { + return [ + // scheduled + 31 days + ...versions.values.map((v) => v.scheduled.add(taskRetriggerInterval)), + // scheduled + 24 hours, where scheduled < lastDependencyChanged + ...versions.values + .where((v) => v.scheduled.isBefore(lastDependencyChanged)) + .map((v) => v.scheduled.add(taskDependencyRetriggerCoolOff)), + // scheduled + 3 hours * attempts^2, where attempts > 0 && attempts < 3 + ...versions.values + .where((v) => v.attempts > 0 && v.attempts < taskRetryLimit) + .map((v) => v.scheduled.add(taskRetryDelay(v.attempts))), + // Pick the minimum of the candidates, default scheduling in year 3k + // if there is no date before that. + ].fold(DateTime(3000), (a, b) => a.isBefore(b) ? a : b); +} + +/// Return a list of pending versions for this package. +/// +/// When scheduling analysis of a package we piggyback along versions that +/// are going to be pending soon too. Hence, we return a version if: +/// * `now - scheduled > 21 days`, +/// * `lastDependencyChanged > scheduled`, or, +/// * `attempts > 0 && attempts < 3 && now - scheduled > 3 hours * attempts^2` +List derivePendingVersions({ + required Map versions, + required DateTime lastDependencyChanged, + required DateTime? at, +}) { + final at_ = at ?? clock.now(); + Duration timeSince(DateTime past) => at_.difference(past); + + final list = versions.entries + .where( + // NOTE: Any changes here must be reflected in [derivePendingAt] + (e) => + // If scheduled more than 21 days ago + timeSince(e.value.scheduled) > minTaskRetriggerInterval || + // If a dependency has changed since it was last scheduled + lastDependencyChanged.isAfter(e.value.scheduled) || + // If: + // - attempts > 0 (analysis is not done, and has been started) + // - no more than 3 attempts have been done, + // - now - scheduled > 3 hours * attempts^2 + (e.value.attempts > 0 && + e.value.attempts < taskRetryLimit && + timeSince(e.value.scheduled) > + taskRetryDelay(e.value.attempts)), + ) + .map((e) => e.key) + .map(Version.parse) + .toList(); + + // Prioritize stable versions first, prereleases after them (in decreasing order), e.g. + // - 2.5.0 + // - 2.4.0 + // - 2.0.0 + // - 1.2.0 + // - 3.0.0-dev2 + // - 3.0.0-dev1 + // - 2.7.0-beta + // - 1.0.0-dev + list.sort((a, b) => compareSemanticVersionsDesc(a, b, true, true)); + + // Promote the first prerelease version to the second position, e.g. + // - 2.5.0 + // - 3.0.0-dev2 + // - 2.4.0 + // - 2.0.0 + // - 1.2.0 + // - 3.0.0-dev1 + // - 2.7.0-beta + // - 1.0.0-dev + // + // (applicable only when the second position is a stable version) + if (list.length > 2 && !list[1].isPreRelease) { + final firstPrereleaseIndex = list.indexWhere((v) => v.isPreRelease); + if (firstPrereleaseIndex > 1) { + final v = list.removeAt(firstPrereleaseIndex); + list.insert(1, v); + } + } + + return list.map((s) => s.toString()).toList(); +} + /// State of a given `version` within a [PackageState]. @JsonSerializable() class PackageVersionStateInfo { diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index 1102ac222..7c8888095 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -286,7 +286,11 @@ updatePackageStateWithPendingVersions( final oldVersionsMap = {...?s.versions}; final now = clock.now(); - final pendingVersions = s.pendingVersions(at: now).toList(); + final pendingVersions = derivePendingVersions( + versions: s.versions!, + lastDependencyChanged: s.lastDependencyChanged!, + at: now, + ).toList(); if (pendingVersions.isEmpty) { // do not schedule anything return null; @@ -304,7 +308,10 @@ updatePackageStateWithPendingVersions( finished: s.versions![v]!.finished, ), }); - s.derivePendingAt(); + s.pendingAt = derivePendingAt( + versions: s.versions!, + lastDependencyChanged: s.lastDependencyChanged!, + ); await tx.tasks.update(s); // Create payload