Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ final class ExportedApi {
required String prefix,
required String delimiter,
}) async {
var p = await _pool.withResource(() async => await _bucket.page(
var p = await _pool.withResource(() async => await _bucket.pageWithRetry(
prefix: prefix,
delimiter: delimiter,
pageSize: 1000,
Expand All @@ -240,7 +240,8 @@ final class ExportedApi {
}));

if (p.isLast) break;
p = await _pool.withResource(() async => await p.next(pageSize: 1000));
p = await _pool
.withResource(() async => await p.nextWithRetry(pageSize: 1000));
}
}
}
Expand Down Expand Up @@ -632,7 +633,7 @@ final class ExportedBlob extends ExportedObject {
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
if (destinationInfo.metadata.validated.isBefore(retouchDeadline)) {
try {
await _owner._bucket.updateMetadata(dst, _metadata());
await _owner._bucket.updateMetadataWithRetry(dst, _metadata());
} catch (e, st) {
// This shouldn't happen, but if a metadata update does fail, it's
// hardly the end of the world.
Expand All @@ -646,7 +647,7 @@ final class ExportedBlob extends ExportedObject {

// If dst or source doesn't exist, then we shall attempt to make a copy.
// (if source doesn't exist we'll consistently get an error from here!)
await _owner._storage.copyObject(
await _owner._storage.copyObjectWithRetry(
source.absoluteObjectName,
_owner._bucket.absoluteObjectName(dst),
metadata: _metadata(),
Expand All @@ -667,7 +668,7 @@ extension on Bucket {
if (info.metadata.validated
.isBefore(clock.agoBy(_updateValidatedAfter))) {
try {
await updateMetadata(name, metadata);
await updateMetadataWithRetry(name, metadata);
} catch (e, st) {
// This shouldn't happen, but if a metadata update does fail, it's
// hardly the end of the world.
Expand Down
2 changes: 1 addition & 1 deletion app/lib/package/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ class PackageBackend {
_logger.info('Removing temporary object $guid.');

sw.reset();
await _incomingBucket.delete(tmpObjectName(guid));
await _incomingBucket.deleteWithRetry(tmpObjectName(guid));
_logger.info('Temporary object removed in ${sw.elapsed}.');
return version;
});
Expand Down
3 changes: 2 additions & 1 deletion app/lib/service/download_counts/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'package:pub_dev/shared/cached_value.dart';
import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/shared/redis_cache.dart';
import 'package:pub_dev/shared/storage.dart';

/// Sets the download counts backend service.
void registerDownloadCountsBackend(DownloadCountsBackend backend) =>
Expand Down Expand Up @@ -42,7 +43,7 @@ class DownloadCountsBackend {
try {
final info = await storageService
.bucket(activeConfiguration.reportsBucketName!)
.info(downloadCounts30DaysTotalsFileName);
.infoWithRetry(downloadCounts30DaysTotalsFileName);

if (_lastData.etag == info.etag) {
return _lastData.data;
Expand Down
3 changes: 2 additions & 1 deletion app/lib/service/services.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:appengine/appengine.dart';
import 'package:clock/clock.dart';
import 'package:fake_gcloud/mem_datastore.dart';
import 'package:fake_gcloud/mem_storage.dart';
import 'package:fake_gcloud/retry_enforcer_storage.dart';
import 'package:gcloud/service_scope.dart';
import 'package:gcloud/storage.dart';
import 'package:googleapis_auth/auth_io.dart' as auth;
Expand Down Expand Up @@ -148,7 +149,7 @@ Future<R> withFakeServices<R>({
return await fork(() async {
register(#appengine.context, FakeClientContext());
registerDbService(DatastoreDB(datastore!));
registerStorageService(storage!);
registerStorageService(RetryEnforcerStorage(storage!));
IOServer? frontendServer;
IOServer? searchServer;
if (configuration == null) {
Expand Down
57 changes: 55 additions & 2 deletions app/lib/shared/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extension StorageExt on Storage {
/// local environment.
Future<void> verifyBucketExistenceAndAccess(String bucketName) async {
// check bucket existence
if (!await bucketExists(bucketName)) {
if (!await bucketExistsWithRetry(bucketName)) {
final message = 'Bucket "$bucketName" does not exists!';
_logger.shout(message);
if (envConfig.isRunningLocally) {
Expand All @@ -69,10 +69,29 @@ extension StorageExt on Storage {
return;
}
}

/// Check whether a cloud storage bucket exists with the default retry.
Future<bool> bucketExistsWithRetry(String bucketName) async {
return await _retry(() => bucketExists(bucketName));
}

/// Copy an object with the default retry.
Future<void> copyObjectWithRetry(
String src,
String dest, {
ObjectMetadata? metadata,
}) async {
return await _retry(() async => await copyObject(src, dest));
}
}

/// Additional methods on buckets.
extension BucketExt on Bucket {
/// Lookup object metadata with default retry.
Future<ObjectInfo> infoWithRetry(String name) async {
return await _retry(() => info(name));
}

/// Returns an [ObjectInfo] if [name] exists, `null` otherwise.
Future<ObjectInfo?> tryInfo(String name) async {
return await retry(
Expand All @@ -89,6 +108,11 @@ extension BucketExt on Bucket {
);
}

/// Delete an object with default retry.
Future<void> deleteWithRetry(String name) async {
return await _retry(() => delete(name));
}

/// Deletes [name] if it exists, ignores 404 otherwise.
Future<void> tryDelete(String name) async {
return await retry(
Expand Down Expand Up @@ -159,6 +183,35 @@ extension BucketExt on Bucket {
String objectUrl(String objectName) {
return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName';
}

/// Update object metadata with default retry rules.
Future<void> updateMetadataWithRetry(
String objectName, ObjectMetadata metadata) async {
return await _retry(() async => await updateMetadata(objectName, metadata));
}

/// Start paging through objects in the bucket with the default retry.
Future<Page<BucketEntry>> pageWithRetry(
{String? prefix, String? delimiter, int pageSize = 50}) async {
return await _retry(
() async => await page(
prefix: prefix,
delimiter: delimiter,
pageSize: pageSize,
),
);
}
}

extension PageExt<T> on Page<T> {
/// Move to the next page with default retry.
Future<Page<T>> nextWithRetry({int pageSize = 50}) async {
return await _retry(() => next(pageSize: pageSize));
}
}

Future<R> _retry<R>(Future<R> Function() fn) async {
return await retry(fn, maxAttempts: 3, retryIf: _retryIf);
}

bool _retryIf(Exception e) {
Expand Down Expand Up @@ -212,7 +265,7 @@ Future<void> updateContentDispositionToAttachment(
ObjectInfo info, Bucket bucket) async {
if (info.metadata.contentDisposition != 'attachment') {
try {
await bucket.updateMetadata(
await bucket.updateMetadataWithRetry(
info.name, info.metadata.replace(contentDisposition: 'attachment'));
} on Exception catch (e, st) {
_logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion app/test/admin/exported_api_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void main() {
final oldData = oldRoot[path] as Map;
final bucket =
storageService.bucket(activeConfiguration.exportedApiBucketName!);
await bucket.updateMetadata(
await bucket.updateMetadataWithRetry(
path,
ObjectMetadata(
contentType: 'text/plain',
Expand Down
Loading
Loading