Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions app/lib/search/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import '../shared/storage.dart';
import '../shared/versions.dart';
import '../task/backend.dart';
import '../task/global_lock.dart';
import '../task/models.dart';

import 'models.dart';
import 'result_combiner.dart';
Expand Down Expand Up @@ -254,18 +253,12 @@ class SearchBackend {
}
}

final q1 = _db.query<Package>()
..filter('updated >=', updatedThreshold)
..order('-updated');
await for (final p in q1.run()) {
addResult(p.name!, p.updated!);
await for (final e in _db.packages.listUpdatedSince(updatedThreshold)) {
addResult(e.name, e.updated);
}

final q3 = _db.query<PackageState>()
..filter('finished >=', updatedThreshold)
..order('-finished');
await for (final s in q3.run()) {
addResult(s.package, s.finished);
await for (final e in _db.tasks.listFinishedSince(updatedThreshold)) {
addResult(e.package, e.finished);
}

return results;
Expand Down
33 changes: 26 additions & 7 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1021,15 +1021,16 @@ class TaskBackend {
Future<void> Function(Payload payload) processPayload,
) async {
await backfillTrackingState();
await for (final state in _db.tasks.listAll()) {
await for (final state in _db.tasks.listAllForCurrentRuntime()) {
final zone = taskWorkerCloudCompute.zones.first;
// ignore: invalid_use_of_visible_for_testing_member
final payload = await updatePackageStateWithPendingVersions(
final updated = await updatePackageStateWithPendingVersions(
_db,
state,
state.package,
zone,
taskWorkerCloudCompute.generateInstanceName(),
);
final payload = updated?.$1;
if (payload == null) continue;
await processPayload(payload);
}
Expand Down Expand Up @@ -1327,10 +1328,6 @@ final class _TaskDataAccess {
);
}

Stream<PackageState> listAll() {
return _db.query<PackageState>().run();
}

Stream<({String package})> listAllForCurrentRuntime() async* {
final query = _db.query<PackageState>()
..filter('runtimeVersion =', runtimeVersion);
Expand All @@ -1339,6 +1336,17 @@ final class _TaskDataAccess {
}
}

Stream<({String package, DateTime finished})> listFinishedSince(
DateTime since,
) async* {
final query = _db.query<PackageState>()
..filter('finished >=', since)
..order('-finished');
await for (final s in query.run()) {
yield (package: s.package, finished: s.finished);
}
}

Stream<({String package})> listDependenciesOfPackage(
String package,
DateTime publishedAt,
Expand All @@ -1351,6 +1359,17 @@ final class _TaskDataAccess {
}
}

Stream<({String package})> selectSomePending(int limit) async* {
final query = _db.query<PackageState>()
..filter('runtimeVersion =', runtimeVersion)
..filter('pendingAt <=', clock.now())
..order('pendingAt')
..limit(limit);
await for (final ps in query.run()) {
yield (package: ps.package);
}
}

/// Returns whether the entry has been updated.
Future<bool> updateDependencyChanged(
String package,
Expand Down
230 changes: 120 additions & 110 deletions app/lib/task/scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/shared/utils.dart';
import 'package:pub_dev/shared/versions.dart' show runtimeVersion;
import 'package:pub_dev/task/backend.dart';
import 'package:pub_dev/task/clock_control.dart';
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
import 'package:pub_dev/task/global_lock.dart';
Expand Down Expand Up @@ -144,120 +145,122 @@ Future<void> schedule(

// Schedule analysis for some packages
var pendingPackagesReviewed = 0;
final selectLimit = min(
_maxInstancesPerIteration,
max(0, activeConfiguration.maxTaskInstances - instances),
);
await Future.wait(
await (db.query<PackageState>()
..filter('runtimeVersion =', runtimeVersion)
..filter('pendingAt <=', clock.now())
..order('pendingAt')
..limit(
min(
_maxInstancesPerIteration,
max(0, activeConfiguration.maxTaskInstances - instances),
),
))
.run()
.map<Future<void>>((state) async {
pendingPackagesReviewed += 1;
await (db.tasks.selectSomePending(selectLimit)).map<Future<void>>((
selected,
) async {
pendingPackagesReviewed += 1;

final instanceName = compute.generateInstanceName();
final zone = pickZone();

final instanceName = compute.generateInstanceName();
final zone = pickZone();
final updated = await updatePackageStateWithPendingVersions(
db,
selected.package,
zone,
instanceName,
);
final payload = updated?.$1;
if (payload == null) {
return;
}
// Create human readable description for GCP console.
final description =
'package:${payload.package} analysis of ${payload.versions.length} '
'versions.';

final payload = await updatePackageStateWithPendingVersions(
db,
state,
zone,
instanceName,
await Future.microtask(() async {
var rollbackPackageState = true;
try {
// Purging cache is important for the edge case, where the new upload happens
// on a different runtime version, and the current one's cache is still stale
// and does not have the version yet.
// TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed.
await purgePackageCache(payload.package);
_log.info(
'creating instance $instanceName in $zone for '
'package:${selected.package}',
);
await compute.createInstance(
zone: zone,
instanceName: instanceName,
dockerImage: activeConfiguration.taskWorkerImage!,
arguments: [json.encode(payload)],
description: description,
);
rollbackPackageState = false;
} on ZoneExhaustedException catch (e, st) {
// A zone being exhausted is normal operations, we just use another
// zone for 15 minutes.
_log.info(
'zone resources exhausted, banning ${e.zone} for 30 minutes',
e,
st,
);
// Ban usage of zone for 30 minutes
banZone(e.zone, minutes: 30);
} on QuotaExhaustedException catch (e, st) {
// Quota exhausted, this can happen, but it shouldn't. We'll just stop
// doing anything for 10 minutes. Hopefully that'll resolve the issue.
// We log severe, because this is a reason to adjust the quota or
// instance limits.
_log.severe(
'Quota exhausted trying to create $instanceName, banning all zones '
'for 10 minutes',
e,
st,
);
if (payload == null) {
return;
}
// Create human readable description for GCP console.
final description =
'package:${payload.package} analysis of ${payload.versions.length} '
'versions.';

scheduleMicrotask(() async {
var rollbackPackageState = true;
try {
// Purging cache is important for the edge case, where the new upload happens
// on a different runtime version, and the current one's cache is still stale
// and does not have the version yet.
// TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed.
await purgePackageCache(payload.package);
_log.info(
'creating instance $instanceName in $zone for '
'package:${state.package}',
);
await compute.createInstance(
zone: zone,
instanceName: instanceName,
dockerImage: activeConfiguration.taskWorkerImage!,
arguments: [json.encode(payload)],
description: description,
);
rollbackPackageState = false;
} on ZoneExhaustedException catch (e, st) {
// A zone being exhausted is normal operations, we just use another
// zone for 15 minutes.
_log.info(
'zone resources exhausted, banning ${e.zone} for 30 minutes',
e,
st,
);
// Ban usage of zone for 30 minutes
banZone(e.zone, minutes: 30);
} on QuotaExhaustedException catch (e, st) {
// Quota exhausted, this can happen, but it shouldn't. We'll just stop
// doing anything for 10 minutes. Hopefully that'll resolve the issue.
// We log severe, because this is a reason to adjust the quota or
// instance limits.
_log.severe(
'Quota exhausted trying to create $instanceName, banning all zones '
'for 10 minutes',
e,
st,
// Ban all zones for 10 minutes
for (final zone in compute.zones) {
banZone(zone, minutes: 10);
}
} on Exception catch (e, st) {
// No idea what happened, but for robustness we'll stop using the zone
// and shout into the logs
_log.shout(
'Failed to create instance $instanceName, banning zone "$zone" for '
'15 minutes',
e,
st,
);
// Ban usage of zone for 15 minutes
banZone(zone, minutes: 15);
} finally {
if (rollbackPackageState) {
final oldVersionsMap = updated?.$2 ?? const {};
// Restore the state of the PackageState for versions that were
// suppose to run on the instance we just failed to create.
// If this doesn't work, we'll eventually retry. Hence, correctness
// does not hinge on this transaction being successful.
await withRetryTransaction(db, (tx) async {
final s = await tx.lookupOrNull<PackageState>(
PackageState.createKey(
db.emptyKey,
runtimeVersion,
selected.package,
),
);

// Ban all zones for 10 minutes
for (final zone in compute.zones) {
banZone(zone, minutes: 10);
if (s == null) {
return; // Presumably, the package was deleted.
}
} on Exception catch (e, st) {
// No idea what happened, but for robustness we'll stop using the zone
// and shout into the logs
_log.shout(
'Failed to create instance $instanceName, banning zone "$zone" for '
'15 minutes',
e,
st,
);
// Ban usage of zone for 15 minutes
banZone(zone, minutes: 15);
} finally {
if (rollbackPackageState) {
// Restore the state of the PackageState for versions that were
// suppose to run on the instance we just failed to create.
// If this doesn't work, we'll eventually retry. Hence, correctness
// does not hinge on this transaction being successful.
await withRetryTransaction(db, (tx) async {
final s = await tx.lookupOrNull<PackageState>(state.key);
if (s == null) {
return; // Presumably, the package was deleted.
}

s.versions!.addEntries(
s.versions!.entries
.where((e) => e.value.instance == instanceName)
.map((e) => MapEntry(e.key, state.versions![e.key]!)),
);
s.derivePendingAt();
tx.insert(s);
});
}
}
});
})
.toList(),
s.versions!.addEntries(
s.versions!.entries
.where((e) => e.value.instance == instanceName)
.map((e) => MapEntry(e.key, oldVersionsMap[e.key]!)),
);
s.derivePendingAt();
tx.insert(s);
});
}
}
});
}).toList(),
);

// If there was no pending packages reviewed, and no instances currently
Expand All @@ -281,19 +284,25 @@ Future<void> schedule(

/// Updates the package state with versions that are already pending or
/// will be pending soon.
///
/// Returns the payload and the old status of the state info version map
@visibleForTesting
Future<Payload?> updatePackageStateWithPendingVersions(
Future<(Payload, Map<String, PackageVersionStateInfo>)?>
updatePackageStateWithPendingVersions(
DatastoreDB db,
PackageState state,
String package,
String zone,
String instanceName,
) async {
return await withRetryTransaction(db, (tx) async {
final s = await tx.lookupOrNull<PackageState>(state.key);
final s = await tx.lookupOrNull<PackageState>(
PackageState.createKey(db.emptyKey, runtimeVersion, package),
);
if (s == null) {
// presumably the package was deleted.
return null;
}
final oldVersionsMap = {...?s.versions};

final now = clock.now();
final pendingVersions = s
Expand Down Expand Up @@ -354,7 +363,7 @@ Future<Payload?> updatePackageStateWithPendingVersions(
tx.insert(s);

// Create payload
return Payload(
final payload = Payload(
package: s.package,
pubHostedUrl: activeConfiguration.defaultServiceBaseUrl,
versions: pendingVersions.map(
Expand All @@ -364,5 +373,6 @@ Future<Payload?> updatePackageStateWithPendingVersions(
),
),
);
return (payload, oldVersionsMap);
});
}