From 7a19a9bcb716bd90e215e859b812ad727037c739 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Mon, 3 Nov 2025 13:25:54 +0100 Subject: [PATCH] Refactor Datastore-related task queries. --- app/lib/search/backend.dart | 15 +-- app/lib/task/backend.dart | 33 ++++-- app/lib/task/scheduler.dart | 230 +++++++++++++++++++----------------- 3 files changed, 150 insertions(+), 128 deletions(-) diff --git a/app/lib/search/backend.dart b/app/lib/search/backend.dart index 09745d4dce..4a1d0606cd 100644 --- a/app/lib/search/backend.dart +++ b/app/lib/search/backend.dart @@ -40,7 +40,6 @@ import '../shared/storage.dart'; import '../shared/versions.dart'; import '../task/backend.dart'; import '../task/global_lock.dart'; -import '../task/models.dart'; import 'models.dart'; import 'result_combiner.dart'; @@ -254,18 +253,12 @@ class SearchBackend { } } - final q1 = _db.query() - ..filter('updated >=', updatedThreshold) - ..order('-updated'); - await for (final p in q1.run()) { - addResult(p.name!, p.updated!); + await for (final e in _db.packages.listUpdatedSince(updatedThreshold)) { + addResult(e.name, e.updated); } - final q3 = _db.query() - ..filter('finished >=', updatedThreshold) - ..order('-finished'); - await for (final s in q3.run()) { - addResult(s.package, s.finished); + await for (final e in _db.tasks.listFinishedSince(updatedThreshold)) { + addResult(e.package, e.finished); } return results; diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index c71a5bb2ad..164bad859d 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -1021,15 +1021,16 @@ class TaskBackend { Future Function(Payload payload) processPayload, ) async { await backfillTrackingState(); - await for (final state in _db.tasks.listAll()) { + await for (final state in _db.tasks.listAllForCurrentRuntime()) { final zone = taskWorkerCloudCompute.zones.first; // ignore: invalid_use_of_visible_for_testing_member - final payload = await updatePackageStateWithPendingVersions( + final updated = await updatePackageStateWithPendingVersions( _db, - state, + state.package, zone, taskWorkerCloudCompute.generateInstanceName(), ); + final payload = updated?.$1; if (payload == null) continue; await processPayload(payload); } @@ -1327,10 +1328,6 @@ final class _TaskDataAccess { ); } - Stream listAll() { - return _db.query().run(); - } - Stream<({String package})> listAllForCurrentRuntime() async* { final query = _db.query() ..filter('runtimeVersion =', runtimeVersion); @@ -1339,6 +1336,17 @@ final class _TaskDataAccess { } } + Stream<({String package, DateTime finished})> listFinishedSince( + DateTime since, + ) async* { + final query = _db.query() + ..filter('finished >=', since) + ..order('-finished'); + await for (final s in query.run()) { + yield (package: s.package, finished: s.finished); + } + } + Stream<({String package})> listDependenciesOfPackage( String package, DateTime publishedAt, @@ -1351,6 +1359,17 @@ final class _TaskDataAccess { } } + Stream<({String package})> selectSomePending(int limit) async* { + final query = _db.query() + ..filter('runtimeVersion =', runtimeVersion) + ..filter('pendingAt <=', clock.now()) + ..order('pendingAt') + ..limit(limit); + await for (final ps in query.run()) { + yield (package: ps.package); + } + } + /// Returns whether the entry has been updated. Future updateDependencyChanged( String package, diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index 9e3af21ced..b5b195e595 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -11,6 +11,7 @@ import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/shared/versions.dart' show runtimeVersion; +import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; @@ -144,120 +145,122 @@ Future schedule( // Schedule analysis for some packages var pendingPackagesReviewed = 0; + final selectLimit = min( + _maxInstancesPerIteration, + max(0, activeConfiguration.maxTaskInstances - instances), + ); await Future.wait( - await (db.query() - ..filter('runtimeVersion =', runtimeVersion) - ..filter('pendingAt <=', clock.now()) - ..order('pendingAt') - ..limit( - min( - _maxInstancesPerIteration, - max(0, activeConfiguration.maxTaskInstances - instances), - ), - )) - .run() - .map>((state) async { - pendingPackagesReviewed += 1; + await (db.tasks.selectSomePending(selectLimit)).map>(( + selected, + ) async { + pendingPackagesReviewed += 1; + + final instanceName = compute.generateInstanceName(); + final zone = pickZone(); - final instanceName = compute.generateInstanceName(); - final zone = pickZone(); + final updated = await updatePackageStateWithPendingVersions( + db, + selected.package, + zone, + instanceName, + ); + final payload = updated?.$1; + if (payload == null) { + return; + } + // Create human readable description for GCP console. + final description = + 'package:${payload.package} analysis of ${payload.versions.length} ' + 'versions.'; - final payload = await updatePackageStateWithPendingVersions( - db, - state, - zone, - instanceName, + await Future.microtask(() async { + var rollbackPackageState = true; + try { + // Purging cache is important for the edge case, where the new upload happens + // on a different runtime version, and the current one's cache is still stale + // and does not have the version yet. + // TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed. + await purgePackageCache(payload.package); + _log.info( + 'creating instance $instanceName in $zone for ' + 'package:${selected.package}', + ); + await compute.createInstance( + zone: zone, + instanceName: instanceName, + dockerImage: activeConfiguration.taskWorkerImage!, + arguments: [json.encode(payload)], + description: description, + ); + rollbackPackageState = false; + } on ZoneExhaustedException catch (e, st) { + // A zone being exhausted is normal operations, we just use another + // zone for 15 minutes. + _log.info( + 'zone resources exhausted, banning ${e.zone} for 30 minutes', + e, + st, + ); + // Ban usage of zone for 30 minutes + banZone(e.zone, minutes: 30); + } on QuotaExhaustedException catch (e, st) { + // Quota exhausted, this can happen, but it shouldn't. We'll just stop + // doing anything for 10 minutes. Hopefully that'll resolve the issue. + // We log severe, because this is a reason to adjust the quota or + // instance limits. + _log.severe( + 'Quota exhausted trying to create $instanceName, banning all zones ' + 'for 10 minutes', + e, + st, ); - if (payload == null) { - return; - } - // Create human readable description for GCP console. - final description = - 'package:${payload.package} analysis of ${payload.versions.length} ' - 'versions.'; - scheduleMicrotask(() async { - var rollbackPackageState = true; - try { - // Purging cache is important for the edge case, where the new upload happens - // on a different runtime version, and the current one's cache is still stale - // and does not have the version yet. - // TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed. - await purgePackageCache(payload.package); - _log.info( - 'creating instance $instanceName in $zone for ' - 'package:${state.package}', - ); - await compute.createInstance( - zone: zone, - instanceName: instanceName, - dockerImage: activeConfiguration.taskWorkerImage!, - arguments: [json.encode(payload)], - description: description, - ); - rollbackPackageState = false; - } on ZoneExhaustedException catch (e, st) { - // A zone being exhausted is normal operations, we just use another - // zone for 15 minutes. - _log.info( - 'zone resources exhausted, banning ${e.zone} for 30 minutes', - e, - st, - ); - // Ban usage of zone for 30 minutes - banZone(e.zone, minutes: 30); - } on QuotaExhaustedException catch (e, st) { - // Quota exhausted, this can happen, but it shouldn't. We'll just stop - // doing anything for 10 minutes. Hopefully that'll resolve the issue. - // We log severe, because this is a reason to adjust the quota or - // instance limits. - _log.severe( - 'Quota exhausted trying to create $instanceName, banning all zones ' - 'for 10 minutes', - e, - st, + // Ban all zones for 10 minutes + for (final zone in compute.zones) { + banZone(zone, minutes: 10); + } + } on Exception catch (e, st) { + // No idea what happened, but for robustness we'll stop using the zone + // and shout into the logs + _log.shout( + 'Failed to create instance $instanceName, banning zone "$zone" for ' + '15 minutes', + e, + st, + ); + // Ban usage of zone for 15 minutes + banZone(zone, minutes: 15); + } finally { + if (rollbackPackageState) { + final oldVersionsMap = updated?.$2 ?? const {}; + // Restore the state of the PackageState for versions that were + // suppose to run on the instance we just failed to create. + // If this doesn't work, we'll eventually retry. Hence, correctness + // does not hinge on this transaction being successful. + await withRetryTransaction(db, (tx) async { + final s = await tx.lookupOrNull( + PackageState.createKey( + db.emptyKey, + runtimeVersion, + selected.package, + ), ); - - // Ban all zones for 10 minutes - for (final zone in compute.zones) { - banZone(zone, minutes: 10); + if (s == null) { + return; // Presumably, the package was deleted. } - } on Exception catch (e, st) { - // No idea what happened, but for robustness we'll stop using the zone - // and shout into the logs - _log.shout( - 'Failed to create instance $instanceName, banning zone "$zone" for ' - '15 minutes', - e, - st, - ); - // Ban usage of zone for 15 minutes - banZone(zone, minutes: 15); - } finally { - if (rollbackPackageState) { - // Restore the state of the PackageState for versions that were - // suppose to run on the instance we just failed to create. - // If this doesn't work, we'll eventually retry. Hence, correctness - // does not hinge on this transaction being successful. - await withRetryTransaction(db, (tx) async { - final s = await tx.lookupOrNull(state.key); - if (s == null) { - return; // Presumably, the package was deleted. - } - s.versions!.addEntries( - s.versions!.entries - .where((e) => e.value.instance == instanceName) - .map((e) => MapEntry(e.key, state.versions![e.key]!)), - ); - s.derivePendingAt(); - tx.insert(s); - }); - } - } - }); - }) - .toList(), + s.versions!.addEntries( + s.versions!.entries + .where((e) => e.value.instance == instanceName) + .map((e) => MapEntry(e.key, oldVersionsMap[e.key]!)), + ); + s.derivePendingAt(); + tx.insert(s); + }); + } + } + }); + }).toList(), ); // If there was no pending packages reviewed, and no instances currently @@ -281,19 +284,25 @@ Future schedule( /// Updates the package state with versions that are already pending or /// will be pending soon. +/// +/// Returns the payload and the old status of the state info version map @visibleForTesting -Future updatePackageStateWithPendingVersions( +Future<(Payload, Map)?> +updatePackageStateWithPendingVersions( DatastoreDB db, - PackageState state, + String package, String zone, String instanceName, ) async { return await withRetryTransaction(db, (tx) async { - final s = await tx.lookupOrNull(state.key); + final s = await tx.lookupOrNull( + PackageState.createKey(db.emptyKey, runtimeVersion, package), + ); if (s == null) { // presumably the package was deleted. return null; } + final oldVersionsMap = {...?s.versions}; final now = clock.now(); final pendingVersions = s @@ -354,7 +363,7 @@ Future updatePackageStateWithPendingVersions( tx.insert(s); // Create payload - return Payload( + final payload = Payload( package: s.package, pubHostedUrl: activeConfiguration.defaultServiceBaseUrl, versions: pendingVersions.map( @@ -364,5 +373,6 @@ Future updatePackageStateWithPendingVersions( ), ), ); + return (payload, oldVersionsMap); }); }