diff --git a/app/lib/shared/exceptions.dart b/app/lib/shared/exceptions.dart index 08f661e72a..0f66846e97 100644 --- a/app/lib/shared/exceptions.dart +++ b/app/lib/shared/exceptions.dart @@ -51,6 +51,11 @@ class NotAcceptableException extends ResponseException { : super._(406, 'NotAcceptable', message); } +/// Thrown when part of the underlying analysis task has been aborted. +class TaskAbortedException extends ResponseException { + TaskAbortedException(String message) : super._(400, 'TaskAborted', message); +} + /// Thrown when request input is invalid, bad payload, wrong querystring, etc. class InvalidInputException extends ResponseException { InvalidInputException._(String message) diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index a366672eca..87e91001b3 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -38,6 +38,7 @@ import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/handlers.dart'; import 'package:pub_dev/task/models.dart' show + AbortedTokenInfo, PackageState, PackageStateInfo, PackageVersionStateInfo, @@ -454,6 +455,20 @@ class TaskBackend { return false; } + state.abortedTokens = [ + ...state.versions!.entries + .where((e) => deselectedVersions.contains(e.key)) + .map((e) => e.value) + .where((vs) => vs.secretToken != null) + .map( + (vs) => AbortedTokenInfo( + token: vs.secretToken!, + expires: vs.scheduled.add(maxTaskExecutionTime), + ), + ), + ...?state.abortedTokens, + ].where((t) => t.isNotExpired).take(50).toList(); + // Make changes! state.versions! // Remove versions that have been deselected @@ -467,6 +482,7 @@ class TaskBackend { ), }); state.derivePendingAt(); + state.abortedTokens?.removeWhere((t) => t.expires.isAfter(clock.now())); _log.info('Update state tracking for $packageName'); tx.insert(state); @@ -609,20 +625,18 @@ class TaskBackend { InvalidInputException.checkPackageName(package); version = InvalidInputException.checkSemanticVersion(version); + final token = _extractBearerToken(request); + if (token == null) { + throw AuthenticationException.authenticationRequired(); + } + final key = PackageState.createKey(_db, runtimeVersion, package); final state = await _db.lookupOrNull(key); - if (state == null || state.versions![version] == null) { + if (state == null) { throw NotFoundException.resource('$package/$version'); } - final versionState = state.versions![version]!; - - // Check the secret token - if (!versionState.isAuthorized(_extractBearerToken(request))) { - throw AuthenticationException.authenticationRequired(); - } - assert(versionState.scheduled != initialTimestamp); - assert(versionState.instance != null); - assert(versionState.zone != null); + final versionState = + _authorizeWorkerCallback(package, version, state, token); // Set expiration of signed URLs to remaining execution time + 5 min to // allow for clock skew. @@ -669,6 +683,11 @@ class TaskBackend { InvalidInputException.checkPackageName(package); version = InvalidInputException.checkSemanticVersion(version); + final token = _extractBearerToken(request); + if (token == null) { + throw AuthenticationException.authenticationRequired(); + } + String? zone, instance; bool isInstanceDone = false; final index = await _loadTaskResultIndex( @@ -685,18 +704,11 @@ class TaskBackend { await withRetryTransaction(_db, (tx) async { final key = PackageState.createKey(_db, runtimeVersion, package); final state = await tx.lookupOrNull(key); - if (state == null || state.versions![version] == null) { + if (state == null) { throw NotFoundException.resource('$package/$version'); } - final versionState = state.versions![version]!; - - // Check the secret token - if (!versionState.isAuthorized(_extractBearerToken(request))) { - throw AuthenticationException.authenticationRequired(); - } - assert(versionState.scheduled != initialTimestamp); - assert(versionState.instance != null); - assert(versionState.zone != null); + final versionState = + _authorizeWorkerCallback(package, version, state, token); // Update dependencies, if pana summary has dependencies if (summary != null && summary.allDependencies != null) { @@ -1169,6 +1181,38 @@ String? _extractBearerToken(shelf.Request request) { return parts.last.trim(); } +/// Authorize a worker callback for [package] / [version]. +/// +/// Returns the [PackageVersionStateInfo] that the worker is authenticated for. +/// Or throw [ResponseException] if authorization is not possible. +PackageVersionStateInfo _authorizeWorkerCallback( + String package, + String version, + PackageState state, + String token, +) { + // fixed-time verification of aborted tokens + final isKnownAbortedToken = state.abortedTokens + ?.map((t) => t.isAuthorized(token)) + .fold(false, (a, b) => a || b); + if (isKnownAbortedToken ?? false) { + throw TaskAbortedException('$package/$version has been aborted.'); + } + + final versionState = state.versions![version]; + if (versionState == null) { + throw NotFoundException.resource('$package/$version'); + } + // Check the secret token + if (!versionState.isAuthorized(token)) { + throw AuthenticationException.authenticationRequired(); + } + assert(versionState.scheduled != initialTimestamp); + assert(versionState.instance != null); + assert(versionState.zone != null); + return versionState; +} + /// Given a list of versions return the list of versions that should be /// tracked for analysis. /// diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index 7a9ac2fe35..3fc61dbaf1 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -6,6 +6,8 @@ import 'dart:convert' show json; import 'package:clock/clock.dart'; import 'package:json_annotation/json_annotation.dart'; +import 'package:pub_dev/admin/actions/actions.dart'; +import 'package:pub_dev/shared/utils.dart'; import '../shared/datastore.dart' as db; import '../shared/versions.dart' as shared_versions; @@ -107,6 +109,12 @@ class PackageState extends db.ExpandoModel { @PackageVersionStateMapProperty(required: true) Map? versions; + /// The list of tokens that were removed from this [PackageState]. + /// When a worker reports back using one of these tokens, they will + /// recieve a [TaskAbortedException]. + @AbortedTokenListProperty() + List? abortedTokens; + /// Next [DateTime] at which point some package version becomes pending. @db.DateTimeProperty(required: true, indexed: true) DateTime? pendingAt; @@ -407,3 +415,58 @@ enum PackageVersionStatus { /// Analysis failed to report a result. failed, } + +/// Tracks a token that was removed from the [PackageState], but a worker +/// may still use it to report a completed task. Such workers may recieve +/// an error code that says they shouldn't really panic on the rejection. +@JsonSerializable() +class AbortedTokenInfo { + final String token; + final DateTime expires; + + AbortedTokenInfo({ + required this.token, + required this.expires, + }); + + factory AbortedTokenInfo.fromJson(Map m) => + _$AbortedTokenInfoFromJson(m); + Map toJson() => _$AbortedTokenInfoToJson(this); + + bool get isNotExpired => clock.now().isBefore(expires); + + bool isAuthorized(String token) { + return fixedTimeEquals(this.token, token) && isNotExpired; + } +} + +/// A [db.Property] encoding a List os [AbortedTokenInfo] as JSON. +class AbortedTokenListProperty extends db.Property { + const AbortedTokenListProperty({String? propertyName, bool required = false}) + : super(propertyName: propertyName, required: required, indexed: false); + + @override + Object? encodeValue( + db.ModelDB mdb, + Object? value, { + bool forComparison = false, + }) => + json.encode( + (value as List?)?.map((e) => e.toJson()).toList()); + + @override + Object? decodePrimitiveValue( + db.ModelDB mdb, + Object? value, + ) => + value == null + ? null + : (json.decode(value as String) as List?) + ?.map((e) => AbortedTokenInfo.fromJson(e as Map)) + .toList(); + + @override + bool validate(db.ModelDB mdb, Object? value) => + super.validate(mdb, value) && + (value == null || value is List); +} diff --git a/app/lib/task/models.g.dart b/app/lib/task/models.g.dart index 3cd7b34c1c..fed44ac4a1 100644 --- a/app/lib/task/models.g.dart +++ b/app/lib/task/models.g.dart @@ -48,3 +48,15 @@ Map _$PackageStateInfoToJson(PackageStateInfo instance) => 'package': instance.package, 'versions': instance.versions, }; + +AbortedTokenInfo _$AbortedTokenInfoFromJson(Map json) => + AbortedTokenInfo( + token: json['token'] as String, + expires: DateTime.parse(json['expires'] as String), + ); + +Map _$AbortedTokenInfoToJson(AbortedTokenInfo instance) => + { + 'token': instance.token, + 'expires': instance.expires.toIso8601String(), + }; diff --git a/pkg/pub_worker/lib/src/analyze.dart b/pkg/pub_worker/lib/src/analyze.dart index e1a0a43951..93aa163a27 100644 --- a/pkg/pub_worker/lib/src/analyze.dart +++ b/pkg/pub_worker/lib/src/analyze.dart @@ -92,12 +92,14 @@ Future analyze(Payload payload) async { 'analyze.', ); } + } on TaskAbortedException catch (e, st) { + _log.warning( + 'Task was aborted when uploading ${payload.package} / ${p.version}', + e, + st); } catch (e, st) { _log.shout( - 'failed to process ${payload.package} / ${p.version}', - e, - st, - ); + 'failed to process ${payload.package} / ${p.version}', e, st); } } } finally { diff --git a/pkg/pub_worker/lib/src/upload.dart b/pkg/pub_worker/lib/src/upload.dart index 8ee5342a86..689d84bcd8 100644 --- a/pkg/pub_worker/lib/src/upload.dart +++ b/pkg/pub_worker/lib/src/upload.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; +import 'dart:convert'; import 'dart:io'; import 'package:_pub_shared/data/package_api.dart' show UploadInfo; @@ -40,6 +41,15 @@ Future upload( )); final res = await Response.fromStream(await client.send(req)); + // Special case `TaskAborted` response code, it means that the analysis + // is no longer selected or the secret token timed out / was replaced + // (it may need a different analysis round). + if (res.statusCode == 400 && + _extractExceptionCode(res) == 'TaskAborted') { + _log.warning( + 'Task aborted, failed to upload: $filename, status = ${res.statusCode}'); + throw TaskAbortedException(res.body); + } if (400 <= res.statusCode && res.statusCode < 500) { _log.shout('Failed to upload: $filename, status = ${res.statusCode}'); throw UploadException( @@ -80,3 +90,26 @@ final class UploadException implements Exception { final class IntermittentUploadException extends UploadException { IntermittentUploadException(String message) : super(message); } + +final class TaskAbortedException extends UploadException { + TaskAbortedException(String message) : super(message); +} + +/// Extract `error.code` from JSON body in [res]. +String? _extractExceptionCode(Response res) { + try { + final map = json.decode(res.body); + if (map is! Map) { + return null; + } + final error = map['error']; + if (error is! Map) { + return null; + } + final code = error['code']; + return code?.toString(); + } on FormatException catch (_) { + // ignore + } + return null; +}