Skip to content
5 changes: 5 additions & 0 deletions app/lib/shared/exceptions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class NotAcceptableException extends ResponseException {
: super._(406, 'NotAcceptable', message);
}

/// Thrown when part of the underlying analysis task has been aborted.
class TaskAbortedException extends ResponseException {
TaskAbortedException(String message) : super._(400, 'TaskAborted', message);
}

/// Thrown when request input is invalid, bad payload, wrong querystring, etc.
class InvalidInputException extends ResponseException {
InvalidInputException._(String message)
Expand Down
84 changes: 64 additions & 20 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import 'package:pub_dev/task/global_lock.dart';
import 'package:pub_dev/task/handlers.dart';
import 'package:pub_dev/task/models.dart'
show
AbortedTokenInfo,
PackageState,
PackageStateInfo,
PackageVersionStateInfo,
Expand Down Expand Up @@ -454,6 +455,20 @@ class TaskBackend {
return false;
}

state.abortedTokens = [
...state.versions!.entries
.where((e) => deselectedVersions.contains(e.key))
.map((e) => e.value)
.where((vs) => vs.secretToken != null)
.map(
(vs) => AbortedTokenInfo(
token: vs.secretToken!,
expires: vs.scheduled.add(maxTaskExecutionTime),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have stored scheduled instead of expires, not that it matters more, but it would have been 10% more canonical.

Anyways, I don't think it matters :D

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could replace it, but I'm not sure it is worth it...

),
),
...?state.abortedTokens,
].where((t) => t.isNotExpired).take(50).toList();

// Make changes!
state.versions!
// Remove versions that have been deselected
Expand All @@ -467,6 +482,7 @@ class TaskBackend {
),
});
state.derivePendingAt();
state.abortedTokens?.removeWhere((t) => t.expires.isAfter(clock.now()));

_log.info('Update state tracking for $packageName');
tx.insert(state);
Expand Down Expand Up @@ -609,20 +625,18 @@ class TaskBackend {
InvalidInputException.checkPackageName(package);
version = InvalidInputException.checkSemanticVersion(version);

final token = _extractBearerToken(request);
if (token == null) {
throw AuthenticationException.authenticationRequired();
}

final key = PackageState.createKey(_db, runtimeVersion, package);
final state = await _db.lookupOrNull<PackageState>(key);
if (state == null || state.versions![version] == null) {
if (state == null) {
throw NotFoundException.resource('$package/$version');
}
final versionState = state.versions![version]!;

// Check the secret token
if (!versionState.isAuthorized(_extractBearerToken(request))) {
throw AuthenticationException.authenticationRequired();
}
assert(versionState.scheduled != initialTimestamp);
assert(versionState.instance != null);
assert(versionState.zone != null);
final versionState =
_authorizeWorkerCallback(package, version, state, token);

// Set expiration of signed URLs to remaining execution time + 5 min to
// allow for clock skew.
Expand Down Expand Up @@ -669,6 +683,11 @@ class TaskBackend {
InvalidInputException.checkPackageName(package);
version = InvalidInputException.checkSemanticVersion(version);

final token = _extractBearerToken(request);
if (token == null) {
throw AuthenticationException.authenticationRequired();
}

String? zone, instance;
bool isInstanceDone = false;
final index = await _loadTaskResultIndex(
Expand All @@ -685,18 +704,11 @@ class TaskBackend {
await withRetryTransaction(_db, (tx) async {
final key = PackageState.createKey(_db, runtimeVersion, package);
final state = await tx.lookupOrNull<PackageState>(key);
if (state == null || state.versions![version] == null) {
if (state == null) {
throw NotFoundException.resource('$package/$version');
}
final versionState = state.versions![version]!;

// Check the secret token
if (!versionState.isAuthorized(_extractBearerToken(request))) {
throw AuthenticationException.authenticationRequired();
}
assert(versionState.scheduled != initialTimestamp);
assert(versionState.instance != null);
assert(versionState.zone != null);
final versionState =
_authorizeWorkerCallback(package, version, state, token);

// Update dependencies, if pana summary has dependencies
if (summary != null && summary.allDependencies != null) {
Expand Down Expand Up @@ -1169,6 +1181,38 @@ String? _extractBearerToken(shelf.Request request) {
return parts.last.trim();
}

/// Authorize a worker callback for [package] / [version].
///
/// Returns the [PackageVersionStateInfo] that the worker is authenticated for.
/// Or throw [ResponseException] if authorization is not possible.
PackageVersionStateInfo _authorizeWorkerCallback(
String package,
String version,
PackageState state,
String token,
) {
// fixed-time verification of aborted tokens
final isKnownAbortedToken = state.abortedTokens
?.map((t) => t.isAuthorized(token))
.fold<bool>(false, (a, b) => a || b);
if (isKnownAbortedToken ?? false) {
throw TaskAbortedException('$package/$version has been aborted.');
}

final versionState = state.versions![version];
if (versionState == null) {
throw NotFoundException.resource('$package/$version');
}
// Check the secret token
if (!versionState.isAuthorized(token)) {
throw AuthenticationException.authenticationRequired();
}
assert(versionState.scheduled != initialTimestamp);
assert(versionState.instance != null);
assert(versionState.zone != null);
return versionState;
}

/// Given a list of versions return the list of versions that should be
/// tracked for analysis.
///
Expand Down
63 changes: 63 additions & 0 deletions app/lib/task/models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import 'dart:convert' show json;

import 'package:clock/clock.dart';
import 'package:json_annotation/json_annotation.dart';
import 'package:pub_dev/admin/actions/actions.dart';
import 'package:pub_dev/shared/utils.dart';

import '../shared/datastore.dart' as db;
import '../shared/versions.dart' as shared_versions;
Expand Down Expand Up @@ -107,6 +109,12 @@ class PackageState extends db.ExpandoModel<String> {
@PackageVersionStateMapProperty(required: true)
Map<String, PackageVersionStateInfo>? versions;

/// The list of tokens that were removed from this [PackageState].
/// When a worker reports back using one of these tokens, they will
/// recieve a [TaskAbortedException].
@AbortedTokenListProperty()
List<AbortedTokenInfo>? abortedTokens;

/// Next [DateTime] at which point some package version becomes pending.
@db.DateTimeProperty(required: true, indexed: true)
DateTime? pendingAt;
Expand Down Expand Up @@ -407,3 +415,58 @@ enum PackageVersionStatus {
/// Analysis failed to report a result.
failed,
}

/// Tracks a token that was removed from the [PackageState], but a worker
/// may still use it to report a completed task. Such workers may recieve
/// an error code that says they shouldn't really panic on the rejection.
@JsonSerializable()
class AbortedTokenInfo {
final String token;
final DateTime expires;

AbortedTokenInfo({
required this.token,
required this.expires,
});

factory AbortedTokenInfo.fromJson(Map<String, dynamic> m) =>
_$AbortedTokenInfoFromJson(m);
Map<String, dynamic> toJson() => _$AbortedTokenInfoToJson(this);

bool get isNotExpired => clock.now().isBefore(expires);

bool isAuthorized(String token) {
return fixedTimeEquals(this.token, token) && isNotExpired;
}
}

/// A [db.Property] encoding a List os [AbortedTokenInfo] as JSON.
class AbortedTokenListProperty extends db.Property {
const AbortedTokenListProperty({String? propertyName, bool required = false})
: super(propertyName: propertyName, required: required, indexed: false);

@override
Object? encodeValue(
db.ModelDB mdb,
Object? value, {
bool forComparison = false,
}) =>
json.encode(
(value as List<AbortedTokenInfo>?)?.map((e) => e.toJson()).toList());

@override
Object? decodePrimitiveValue(
db.ModelDB mdb,
Object? value,
) =>
value == null
? null
: (json.decode(value as String) as List?)
?.map((e) => AbortedTokenInfo.fromJson(e as Map<String, dynamic>))
.toList();

@override
bool validate(db.ModelDB mdb, Object? value) =>
super.validate(mdb, value) &&
(value == null || value is List<AbortedTokenInfo>);
}
12 changes: 12 additions & 0 deletions app/lib/task/models.g.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/pub_worker/lib/src/analyze.dart
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ Future<void> analyze(Payload payload) async {
'analyze.',
);
}
} on TaskAbortedException catch (e, st) {
_log.warning(
'Task was aborted when uploading ${payload.package} / ${p.version}',
e,
st);
} catch (e, st) {
_log.shout(
'failed to process ${payload.package} / ${p.version}',
e,
st,
);
'failed to process ${payload.package} / ${p.version}', e, st);
}
}
} finally {
Expand Down
33 changes: 33 additions & 0 deletions pkg/pub_worker/lib/src/upload.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:_pub_shared/data/package_api.dart' show UploadInfo;
Expand Down Expand Up @@ -40,6 +41,15 @@ Future<void> upload(
));
final res = await Response.fromStream(await client.send(req));

// Special case `TaskAborted` response code, it means that the analysis
// is no longer selected or the secret token timed out / was replaced
// (it may need a different analysis round).
if (res.statusCode == 400 &&
_extractExceptionCode(res) == 'TaskAborted') {
_log.warning(
'Task aborted, failed to upload: $filename, status = ${res.statusCode}');
throw TaskAbortedException(res.body);
}
if (400 <= res.statusCode && res.statusCode < 500) {
_log.shout('Failed to upload: $filename, status = ${res.statusCode}');
throw UploadException(
Expand Down Expand Up @@ -80,3 +90,26 @@ final class UploadException implements Exception {
final class IntermittentUploadException extends UploadException {
IntermittentUploadException(String message) : super(message);
}

final class TaskAbortedException extends UploadException {
TaskAbortedException(String message) : super(message);
}

/// Extract `error.code` from JSON body in [res].
String? _extractExceptionCode(Response res) {
try {
final map = json.decode(res.body);
if (map is! Map) {
return null;
}
final error = map['error'];
if (error is! Map) {
return null;
}
final code = error['code'];
return code?.toString();
} on FormatException catch (_) {
// ignore
}
return null;
}
Loading