From 18ce6e45487216e91c67c6d69cb2058176f4398b Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Tue, 29 Oct 2024 22:26:37 +0100 Subject: [PATCH 1/8] Draft of what a full sync could look like --- app/lib/package/api_export/api_exporter.dart | 116 +++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/app/lib/package/api_export/api_exporter.dart b/app/lib/package/api_export/api_exporter.dart index a0d3be3328..b489f15c3f 100644 --- a/app/lib/package/api_export/api_exporter.dart +++ b/app/lib/package/api_export/api_exporter.dart @@ -9,6 +9,8 @@ import 'package:gcloud/storage.dart'; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:pool/pool.dart'; +import 'package:pub_dev/service/security_advisories/backend.dart'; +import 'package:pub_dev/shared/parallel_foreach.dart'; import 'package:retry/retry.dart'; import '../../search/backend.dart'; @@ -78,6 +80,120 @@ class ApiExporter { .write(await searchBackend.getPackageNameCompletionData()); } + Future fullSync() async { + final invisiblePackageNames = await dbService + .query() + .run() + .map((mp) => mp.name!) + .toSet(); + + final allPackageNames = {}; + final packageQuery = dbService.query(); + await packageQuery.run().parallelForEach(_concurrency, (pkg) async { + final name = pkg.name!; + if (pkg.isNotVisible) { + invisiblePackageNames.add(name); + return; + } + allPackageNames.add(name); + + // TODO: Consider retries around all this logic + await syncPackage(name); + }); + + final visibilityConflicts = + allPackageNames.intersection(invisiblePackageNames); + if (visibilityConflicts.isNotEmpty) { + // TODO: Shout into logs + } + + await _api.garbageCollect(allPackageNames); + } + + /// Sync package and into [ExportedApi], this will GC, etc. + /// + /// This is intended when: + /// * Running a full background synchronization. + /// * When a change in [Package.updated] is detected (maybe???) + /// * A package is moderated, or other admin action is applied. + Future syncPackage(String package) async { + final versionListing = await packageBackend.listVersions(package); + // TODO: Consider skipping the cache when fetching security advisories + final advisories = await securityAdvisoryBackend.listAdvisoriesResponse( + package, + ); + + await Future.wait(versionListing.versions.map((v) async { + final version = v.version; + + // TODO: Will v.version work here, is the canonicalized version number? + final absoluteObjectName = + packageBackend.tarballStorage.getCanonicalBucketAbsoluteObjectName( + package, + version, + ); + final info = + await packageBackend.tarballStorage.getCanonicalBucketArchiveInfo( + package, + version, + ); + if (info == null) { + throw AssertionError( + 'Expected an archive for "$package" and "$version" at ' + '"$absoluteObjectName"', + ); + } + + await _api.package(package).tarball(version).copyFrom( + absoluteObjectName, + info, + ); + })); + + await _api.package(package).advisories.write(advisories); + await _api.package(package).versions.write(versionListing); + + // TODO: Is this the canonoical version? (probably) + final allVersions = versionListing.versions.map((v) => v.version).toSet(); + await _api.package(package).garbageCollect(allVersions); + } + + /// Upload a single version of a new package. + /// + /// This is intended to be used when a new version of a package has been + /// published. + Future uploadSingleVersion( + String package, + String version, + ) async { + final versionListing = await packageBackend.listVersions(package); + + // TODO: Will v.version work here, is the canonicalized version number? + final absoluteObjectName = + packageBackend.tarballStorage.getCanonicalBucketAbsoluteObjectName( + package, + version, + ); + final info = + await packageBackend.tarballStorage.getCanonicalBucketArchiveInfo( + package, + version, + ); + if (info == null) { + throw AssertionError( + 'Expected an archive for "$package" and "$version" at ' + '"$absoluteObjectName"', + ); + } + + await _api.package(package).tarball(version).copyFrom( + absoluteObjectName, + info, + ); + + await _api.package(package).versions.write(versionListing); + } + /// Note: there is no global locking here, the full scan should be called /// only once every day, and it may be racing against the incremental /// updates. From 9715c843dbd4276cc2054b58681deeb101f9d3d7 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Wed, 6 Nov 2024 15:18:15 +0100 Subject: [PATCH 2/8] Updated ApiExported and added tests --- app/lib/package/api_export/api_exporter.dart | 425 ++++++--------- app/lib/package/backend.dart | 3 +- app/lib/service/entrypoint/analyzer.dart | 2 +- app/lib/tool/neat_task/pub_dev_tasks.dart | 11 +- .../package/api_export/api_exporter_test.dart | 502 +++++++++++++++--- app/test/shared/test_services.dart | 3 +- 6 files changed, 591 insertions(+), 355 deletions(-) diff --git a/app/lib/package/api_export/api_exporter.dart b/app/lib/package/api_export/api_exporter.dart index b489f15c3f..81e8e5a52c 100644 --- a/app/lib/package/api_export/api_exporter.dart +++ b/app/lib/package/api_export/api_exporter.dart @@ -2,30 +2,24 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import 'package:basics/basics.dart'; +import 'dart:async'; + import 'package:clock/clock.dart'; import 'package:gcloud/service_scope.dart' as ss; import 'package:gcloud/storage.dart'; import 'package:logging/logging.dart'; -import 'package:meta/meta.dart'; -import 'package:pool/pool.dart'; import 'package:pub_dev/service/security_advisories/backend.dart'; import 'package:pub_dev/shared/parallel_foreach.dart'; -import 'package:retry/retry.dart'; import '../../search/backend.dart'; import '../../shared/datastore.dart'; -import '../../shared/storage.dart'; import '../../shared/versions.dart'; import '../../task/global_lock.dart'; import '../backend.dart'; import '../models.dart'; import 'exported_api.dart'; -final Logger _logger = Logger('export_api_to_bucket'); - -/// The default concurrency to upload API JSON files to the bucket. -const _defaultBucketUpdateConcurrency = 8; +final Logger _log = Logger('api_exporter'); /// Sets the API Exporter service. void registerApiExporter(ApiExporter value) => @@ -34,321 +28,220 @@ void registerApiExporter(ApiExporter value) => /// The active API Exporter service or null if it hasn't been initialized. ApiExporter? get apiExporter => ss.lookup(#_apiExporter) as ApiExporter?; +const _concurrency = 30; + class ApiExporter { final ExportedApi _api; - final Bucket _bucket; - final int _concurrency; - final _pkgLastUpdated = {}; + + /// If [stop] has been called to stop background processes. + /// + /// `null` when not started yet, or we have been fully stopped. + Completer? _aborted; + + /// If background processes created by [start] have stopped. + /// + /// This won't be resolved if [start] has not been called! + /// `null` when not started yet. + Completer? _stopped; ApiExporter({ required Bucket bucket, - int concurrency = _defaultBucketUpdateConcurrency, - }) : _api = ExportedApi(storageService, bucket), - _bucket = bucket, - _concurrency = concurrency; + }) : _api = ExportedApi(storageService, bucket); - /// Runs a forever loop and tries to get a global lock. - /// - /// Once it has the claim, it scans the package entities and uploads - /// the package API JSONs to the bucket. - /// Tracks the package updates for the next up-to 24 hours and writes - /// the API JSONs after every few minutes. + /// Start continuous background processes for scheduling of tasks. /// - /// When other process has the claim, the loop waits a minute before - /// attempting to get the claim. - Future uploadInForeverLoop() async { - final lock = GlobalLock.create( - '$runtimeVersion/package/update-api-bucket', - expiration: Duration(minutes: 20), - ); - while (true) { + /// Calling [start] without first calling [stop] is an error. + Future start() async { + if (_aborted != null) { + throw StateError('ApiExporter.start() has already been called!'); + } + // Note: During testing we call [start] and [stop] in a [FakeAsync.run], + // this only works because the completers are created here. + // If we create the completers in the constructor which gets called + // outside [FakeAsync.run], then this won't work. + // In the future we hopefully support running the entire service using + // FakeAsync, but this point we rely on completers being created when + // [start] is called -- and not in the [ApiExporter] constructor. + final aborted = _aborted = Completer(); + final stopped = _stopped = Completer(); + + // Start scanning for packages to be tracked + scheduleMicrotask(() async { try { - await lock.withClaim((claim) async { - await incrementalPkgScanAndUpload(claim); - }); + // Create a lock for task scheduling, so tasks + final lock = GlobalLock.create( + '$runtimeVersion/package/scan-sync-export-api', + expiration: Duration(minutes: 25), + ); + + while (!aborted.isCompleted) { + // Acquire the global lock and scan for package changes while lock is + // valid. + try { + await lock.withClaim((claim) async { + await _scanForPackageUpdates(claim, abort: aborted); + }, abort: aborted); + } catch (e, st) { + // Log this as very bad, and then move on. Nothing good can come + // from straight up stopping. + _log.shout( + 'scanning failed (will retry when lock becomes free)', + e, + st, + ); + // Sleep 5 minutes to reduce risk of degenerate behavior + await Future.delayed(Duration(minutes: 5)); + } + } } catch (e, st) { - _logger.warning('Package API bucket update failed.', e, st); + _log.severe('scanning loop crashed', e, st); + } finally { + _log.info('scanning loop stopped'); + // Report background processes as stopped + stopped.complete(); } - // Wait for 1 minutes for sanity, before trying again. - await Future.delayed(Duration(minutes: 1)); + }); + } + + /// Stop any background process that may be running. + /// + /// Calling this method is always safe. + Future stop() async { + final aborted = _aborted; + if (aborted == null) { + return; + } + if (!aborted.isCompleted) { + aborted.complete(); } + await _stopped!.future; + _aborted = null; + _stopped = null; } /// Gets and uploads the package name completion data. - Future uploadPkgNameCompletionData() async { - await _api.packageNameCompletionData - .write(await searchBackend.getPackageNameCompletionData()); + Future synchronizePackageNameCompletionData() async { + await _api.packageNameCompletionData.write( + await searchBackend.getPackageNameCompletionData(), + ); } - Future fullSync() async { - final invisiblePackageNames = await dbService - .query() - .run() - .map((mp) => mp.name!) - .toSet(); - + /// Synchronize all exported API. + /// + /// This is intended to be scheduled from a daily background task. + Future synchronizeExportedApi() async { final allPackageNames = {}; final packageQuery = dbService.query(); await packageQuery.run().parallelForEach(_concurrency, (pkg) async { final name = pkg.name!; if (pkg.isNotVisible) { - invisiblePackageNames.add(name); return; } allPackageNames.add(name); // TODO: Consider retries around all this logic - await syncPackage(name); + await synchronizePackage(name); }); - final visibilityConflicts = - allPackageNames.intersection(invisiblePackageNames); - if (visibilityConflicts.isNotEmpty) { - // TODO: Shout into logs - } + await synchronizePackageNameCompletionData(); await _api.garbageCollect(allPackageNames); } - /// Sync package and into [ExportedApi], this will GC, etc. + /// Sync package and into [ExportedApi], this will synchronize package into + /// [ExportedApi]. + /// + /// This method will update [ExportedApi] ensuring: + /// * Version listing for [package] is up-to-date, + /// * Advisories for [package] is up-to-date, + /// * Tarballs for each version of [package] is up-to-date, + /// * Delete tarballs from old versions that no-longer exist. /// /// This is intended when: /// * Running a full background synchronization. - /// * When a change in [Package.updated] is detected (maybe???) + /// * When a change in [Package.updated] is detected. /// * A package is moderated, or other admin action is applied. - Future syncPackage(String package) async { + Future synchronizePackage(String package) async { + // TODO: Handle the case where [package] is deleted or invisible! + // TODO: We may need to delete the package, but only if it's not too recent! final versionListing = await packageBackend.listVersions(package); // TODO: Consider skipping the cache when fetching security advisories final advisories = await securityAdvisoryBackend.listAdvisoriesResponse( package, ); - await Future.wait(versionListing.versions.map((v) async { - final version = v.version; - - // TODO: Will v.version work here, is the canonicalized version number? - final absoluteObjectName = - packageBackend.tarballStorage.getCanonicalBucketAbsoluteObjectName( - package, - version, - ); - final info = - await packageBackend.tarballStorage.getCanonicalBucketArchiveInfo( - package, - version, - ); - if (info == null) { - throw AssertionError( - 'Expected an archive for "$package" and "$version" at ' - '"$absoluteObjectName"', - ); - } - - await _api.package(package).tarball(version).copyFrom( - absoluteObjectName, - info, - ); - })); - - await _api.package(package).advisories.write(advisories); - await _api.package(package).versions.write(versionListing); + final versions = await packageBackend.tarballStorage + .listVersionsInCanonicalBucket(package); - // TODO: Is this the canonoical version? (probably) - final allVersions = versionListing.versions.map((v) => v.version).toSet(); - await _api.package(package).garbageCollect(allVersions); - } - - /// Upload a single version of a new package. - /// - /// This is intended to be used when a new version of a package has been - /// published. - Future uploadSingleVersion( - String package, - String version, - ) async { - final versionListing = await packageBackend.listVersions(package); - - // TODO: Will v.version work here, is the canonicalized version number? - final absoluteObjectName = - packageBackend.tarballStorage.getCanonicalBucketAbsoluteObjectName( - package, - version, - ); - final info = - await packageBackend.tarballStorage.getCanonicalBucketArchiveInfo( - package, - version, + // Remove versions that are not exposed in the public API. + versions.removeWhere( + (version, _) => !versionListing.versions.any((v) => v.version == version), ); - if (info == null) { - throw AssertionError( - 'Expected an archive for "$package" and "$version" at ' - '"$absoluteObjectName"', - ); - } - - await _api.package(package).tarball(version).copyFrom( - absoluteObjectName, - info, - ); + await _api.package(package).synchronizeTarballs(versions); + await _api.package(package).advisories.write(advisories); await _api.package(package).versions.write(versionListing); } - /// Note: there is no global locking here, the full scan should be called - /// only once every day, and it may be racing against the incremental - /// updates. - @visibleForTesting - Future fullPkgScanAndUpload() async { - final pool = Pool(_concurrency); - final futures = []; - await for (final mp in dbService.query().run()) { - final f = pool.withResource(() => _deletePackageFromBucket(mp.name!)); - futures.add(f); - } - await Future.wait(futures); - futures.clear(); - - await for (final package in dbService.query().run()) { - final f = pool.withResource(() async { - if (package.isVisible) { - await _uploadPackageToBucket(package.name!); - } else { - await _deletePackageFromBucket(package.name!); - } - }); - futures.add(f); - } - await Future.wait(futures); - await pool.close(); - } - - @visibleForTesting - Future incrementalPkgScanAndUpload( + /// Scan for updates from packages until [abort] is resolved, or [claim] + /// is lost. + Future _scanForPackageUpdates( GlobalLockClaim claim, { - Duration sleepDuration = const Duration(minutes: 2), + Completer? abort, }) async { - final pool = Pool(_concurrency); - // The claim will be released after a day, another process may - // start to upload the API JSONs from scratch again. - final workUntil = clock.now().add(Duration(days: 1)); - - // start monitoring with a window of 7 days lookback - var lastQueryStarted = clock.now().subtract(Duration(days: 7)); - while (claim.valid) { - final now = clock.now().toUtc(); - if (now.isAfter(workUntil)) { - break; + abort ??= Completer(); + + // Map from package to updated that has been seen. + final seen = {}; + + // We will schedule longer overlaps every 6 hours. + var nextLongScan = clock.fromNow(hours: 6); + + // In theory 30 minutes overlap should be enough. In practice we should + // allow an ample room for missed windows, and 3 days seems to be large enough. + var since = clock.ago(days: 3); + while (claim.valid && !abort.isCompleted) { + // Look at all packages changed in [since] + final q = dbService.query() + ..filter('updated >', since) + ..order('-updated'); + + if (clock.now().isAfter(nextLongScan)) { + // Next time we'll do a longer scan + since = clock.ago(days: 1); + nextLongScan = clock.fromNow(hours: 6); + } else { + // Next time we'll only consider changes since now - 30 minutes + since = clock.ago(minutes: 30); } - // clear old entries from last seen cache - _pkgLastUpdated.removeWhere((key, event) => - now.difference(event.updated) > const Duration(hours: 1)); - - lastQueryStarted = now; - final futures = []; - final eventsSince = lastQueryStarted.subtract(Duration(minutes: 5)); - await for (final event in _queryRecentPkgUpdatedEvents(eventsSince)) { - if (!claim.valid) { - break; + // Look at all packages that has changed + await for (final p in q.run()) { + // Abort, if claim is invalid or abort has been resolved! + if (!claim.valid || abort.isCompleted) { + return; } - final f = pool.withResource(() async { - if (!claim.valid) { - return; - } - final last = _pkgLastUpdated[event.package]; - if (last != null && last.updated.isAtOrAfter(event.updated)) { - return; - } - _pkgLastUpdated[event.package] = event; - if (event.isVisible) { - await _uploadPackageToBucket(event.package); - } else { - await _deletePackageFromBucket(event.package); - } - }); - futures.add(f); - } - await Future.wait(futures); - futures.clear(); - await Future.delayed(sleepDuration); - } - await pool.close(); - } - - /// Updates the API files after a version has changed (e.g. new version was uploaded). - Future updatePackageVersion(String package, String version) async { - await _uploadPackageToBucket(package); - } - /// Uploads the package version API response bytes to the bucket, mirroring - /// the endpoint name in the file location. - Future _uploadPackageToBucket(String package) async { - final data = await retry(() => packageBackend.listVersions(package)); - await _api.package(package).versions.write(data); - } - - Future _deletePackageFromBucket(String package) async { - await _api.package(package).delete(); - } - - Stream<_PkgUpdatedEvent> _queryRecentPkgUpdatedEvents(DateTime since) async* { - final q1 = dbService.query() - ..filter('moderated >=', since) - ..order('-moderated'); - yield* q1.run().map((mp) => mp.asPkgUpdatedEvent()); - - final q2 = dbService.query() - ..filter('updated >=', since) - ..order('-updated'); - yield* q2.run().map((p) => p.asPkgUpdatedEvent()); - } - - /// Deletes obsolete runtime-versions from the bucket. - Future deleteObsoleteRuntimeContent() async { - final versions = {}; - - // Objects in the bucket are stored under the following pattern: - // `current/api/` - // `/api/` - // Thus, we list with `/` as delimiter and get a list of runtimeVersions - await for (final d in _bucket.list(prefix: '', delimiter: '/')) { - if (!d.isDirectory) { - _logger.warning( - 'Bucket `${_bucket.bucketName}` should not contain any top-level object: `${d.name}`'); - continue; - } + // Check if the [updated] timestamp has been seen before. + // If so, we skip checking it! + final lastSeen = seen[p.name!]; + if (lastSeen != null && lastSeen.toUtc() == p.updated!.toUtc()) { + continue; + } + // Remember the updated time for this package, so we don't check it + // again... + seen[p.name!] = p.updated!; - // Remove trailing slash from object prefix, to get a runtimeVersion - if (!d.name.endsWith('/')) { - _logger.warning( - 'Unexpected top-level directory name in bucket `${_bucket.bucketName}`: `${d.name}`'); - return; - } - final rtVersion = d.name.substring(0, d.name.length - 1); - if (runtimeVersionPattern.matchAsPrefix(rtVersion) == null) { - continue; + // Check the package + await synchronizePackage(p.name!); } - // Check if the runtimeVersion should be GC'ed - if (shouldGCVersion(rtVersion)) { - versions.add(rtVersion); - } - } + // Cleanup the [seen] map for anything older than [since], as this won't + // be relevant to the next iteration. + seen.removeWhere((_, updated) => updated.isBefore(since)); - for (final v in versions) { - await deleteBucketFolderRecursively(_bucket, '$v/', concurrency: 4); + // Wait until aborted or 10 minutes before scanning again! + await abort.future.timeout(Duration(minutes: 10), onTimeout: () => null); } } } - -typedef _PkgUpdatedEvent = ({String package, DateTime updated, bool isVisible}); - -extension on ModeratedPackage { - _PkgUpdatedEvent asPkgUpdatedEvent() => - (package: name!, updated: moderated, isVisible: false); -} - -extension on Package { - _PkgUpdatedEvent asPkgUpdatedEvent() => - (package: name!, updated: updated!, isVisible: isVisible); -} diff --git a/app/lib/package/backend.dart b/app/lib/package/backend.dart index 3af745fda4..5aabe52ec3 100644 --- a/app/lib/package/backend.dart +++ b/app/lib/package/backend.dart @@ -1255,8 +1255,7 @@ class PackageBackend { emailBackend.trySendOutgoingEmail(outgoingEmail), taskBackend.trackPackage(newVersion.package, updateDependents: true), if (apiExporter != null) - apiExporter! - .updatePackageVersion(newVersion.package, newVersion.version!), + apiExporter!.synchronizePackage(newVersion.package), ]); await tarballStorage.updateContentDispositionOnPublicBucket( newVersion.package, newVersion.version!); diff --git a/app/lib/service/entrypoint/analyzer.dart b/app/lib/service/entrypoint/analyzer.dart index 065d709b2a..fe5c004123 100644 --- a/app/lib/service/entrypoint/analyzer.dart +++ b/app/lib/service/entrypoint/analyzer.dart @@ -85,5 +85,5 @@ Future _apiExporterMain(EntryMessage message) async { message.protocolSendPort.send(ReadyMessage()); await popularityStorage.start(); await downloadCountsBackend.start(); - await apiExporter!.uploadInForeverLoop(); + await apiExporter!.start(); } diff --git a/app/lib/tool/neat_task/pub_dev_tasks.dart b/app/lib/tool/neat_task/pub_dev_tasks.dart index 10026cbd66..2ee78bac5d 100644 --- a/app/lib/tool/neat_task/pub_dev_tasks.dart +++ b/app/lib/tool/neat_task/pub_dev_tasks.dart @@ -119,9 +119,9 @@ void _setupGenericPeriodicTasks() { // Exports the package name completion data to a bucket. _daily( - name: 'export-package-name-completion-data-to-bucket', + name: 'synchronize-exported-api', isRuntimeVersioned: true, - task: () async => await apiExporter?.uploadPkgNameCompletionData(), + task: () async => await apiExporter?.synchronizeExportedApi(), ); // Deletes moderated packages, versions, publishers and users. @@ -167,13 +167,6 @@ void _setupGenericPeriodicTasks() { task: taskBackend.garbageCollect, ); - // Deletes exported API data for old runtime versions - _weekly( - name: 'garbage-collect-api-exports', - isRuntimeVersioned: true, - task: () async => apiExporter?.deleteObsoleteRuntimeContent(), - ); - // Delete very old instances that have been abandoned _daily( name: 'garbage-collect-old-instances', diff --git a/app/test/package/api_export/api_exporter_test.dart b/app/test/package/api_export/api_exporter_test.dart index a142e9e0be..d30b21b1e3 100644 --- a/app/test/package/api_export/api_exporter_test.dart +++ b/app/test/package/api_export/api_exporter_test.dart @@ -1,98 +1,448 @@ -// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -import 'dart:convert'; import 'dart:io'; +import 'dart:typed_data'; -import 'package:clock/clock.dart'; +import 'package:_pub_shared/data/admin_api.dart'; +import 'package:_pub_shared/data/package_api.dart'; import 'package:gcloud/storage.dart'; +import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError; +import 'package:pub_dev/fake/backend/fake_auth_provider.dart'; import 'package:pub_dev/package/api_export/api_exporter.dart'; -import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/storage.dart'; +import 'package:pub_dev/shared/utils.dart'; import 'package:pub_dev/shared/versions.dart'; +import 'package:pub_dev/tool/test_profile/import_source.dart'; +import 'package:pub_dev/tool/test_profile/importer.dart'; +import 'package:pub_dev/tool/test_profile/models.dart'; import 'package:test/test.dart'; import '../../shared/test_models.dart'; import '../../shared/test_services.dart'; -import '../backend_test_utils.dart'; +import '../../task/fake_time.dart'; + +final _testProfile = TestProfile( + defaultUser: userAtPubDevEmail, + packages: [ + TestPackage( + name: 'foo', + versions: [ + TestVersion(version: '1.0.0'), + ], + ), + ], + users: [ + TestUser(email: userAtPubDevEmail, likes: []), + ], +); void main() { - group('export API to bucket', () { - testWithProfile( - 'export and cleanup', - fn: () async { - await storageService.createBucket('bucket'); - final bucket = storageService.bucket('bucket'); - final exporter = ApiExporter( - bucket: bucket, - concurrency: 2, - ); - await exporter.uploadPkgNameCompletionData(); - await exporter.fullPkgScanAndUpload(); - - final claim = - FakeGlobalLockClaim(clock.now().add(Duration(seconds: 3))); - await exporter.incrementalPkgScanAndUpload( - claim, - sleepDuration: Duration(milliseconds: 300), - ); - await exporter.deleteObsoleteRuntimeContent(); - - final files = await bucket - .list(delimiter: 'bogus-delimiter-for-full-file-list') - .map((e) => e.name) - .toList(); - expect(files.toSet(), { - '$runtimeVersion/api/package-name-completion-data', - 'latest/api/package-name-completion-data', - '$runtimeVersion/api/packages/flutter_titanium', - '$runtimeVersion/api/packages/neon', - '$runtimeVersion/api/packages/oxygen', - 'latest/api/packages/flutter_titanium', - 'latest/api/packages/neon', - 'latest/api/packages/oxygen', - }); - - Future readAndDecodeJson(String path) async => json - .decode(utf8.decode(gzip.decode(await bucket.readAsBytes(path)))); - - expect( - await readAndDecodeJson('latest/api/packages/neon'), - { - 'name': 'neon', - 'latest': isNotEmpty, - 'versions': hasLength(1), - }, - ); - - expect( - await readAndDecodeJson('latest/api/package-name-completion-data'), - { - 'packages': hasLength(3), - }, - ); + testWithFakeTime('synchronizeExportedApi()', testProfile: _testProfile, + (fakeTime) async { + await storageService.createBucket('bucket'); + final bucket = storageService.bucket('bucket'); + final apiExporter = ApiExporter(bucket: bucket); + + // TODO: Testing DEBUG=* this will shout "[pub-notice:stray-file]" into logs + // we should investigate this! + + await _testExportedApiSynchronization( + fakeTime, + bucket, + apiExporter.synchronizeExportedApi, + ); + }); + + testWithFakeTime( + 'apiExporter.start()', + testProfile: _testProfile, + skip: 'Fix this test!', + (fakeTime) async { + await storageService.createBucket('bucket'); + final bucket = storageService.bucket('bucket'); + final apiExporter = ApiExporter(bucket: bucket); + + await apiExporter.synchronizeExportedApi(); + + await apiExporter.start(); + + await _testExportedApiSynchronization( + fakeTime, + bucket, + () async => await fakeTime.elapse(hours: 3), + ); + + await apiExporter.stop(); + }, + ); +} + +Future _testExportedApiSynchronization( + FakeTime fakeTime, + Bucket bucket, + Future Function() synchronize, +) async { + // ## Existing package + { + await synchronize(); + + // Check that exsting package was synchronized + expect( + await bucket.readGzippedJson('latest/api/packages/foo'), + { + 'name': 'foo', + 'latest': isNotEmpty, + 'versions': hasLength(1), + }, + ); + expect( + await bucket.readGzippedJson('latest/api/package-name-completion-data'), + {'packages': hasLength(1)}, + ); + expect( + await bucket.readBytes('$runtimeVersion/api/archives/foo-1.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readGzippedJson('$runtimeVersion/api/packages/foo'), + { + 'name': 'foo', + 'latest': isNotEmpty, + 'versions': hasLength(1), }, ); + expect( + await bucket + .readGzippedJson('$runtimeVersion/api/package-name-completion-data'), + {'packages': hasLength(1)}, + ); + expect( + await bucket.readBytes('$runtimeVersion/api/archives/foo-1.0.0.tar.gz'), + isNotNull, + ); + } - testWithProfile('new upload', fn: () async { - await apiExporter!.fullPkgScanAndUpload(); + // ## New package + { + await importProfile( + source: ImportSource.autoGenerated(), + profile: TestProfile( + defaultUser: userAtPubDevEmail, + packages: [ + TestPackage( + name: 'bar', + versions: [TestVersion(version: '2.0.0')], + publisher: 'example.com', + ), + ], + ), + ); - final bucket = - storageService.bucket(activeConfiguration.exportedApiBucketName!); - final originalBytes = - await bucket.readAsBytes('latest/api/packages/oxygen'); + // Synchronize again + await synchronize(); - final pubspecContent = generatePubspecYaml('oxygen', '9.0.0'); - final message = await createPubApiClient(authToken: adminClientToken) - .uploadPackageBytes( - await packageArchiveBytes(pubspecContent: pubspecContent)); - expect(message.success.message, contains('Successfully uploaded')); + // Check that exsting package is still there + expect( + await bucket.readGzippedJson('latest/api/packages/foo'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/foo-1.0.0.tar.gz'), + isNotNull, + ); + // Note. that name completion data won't be updated until search caches + // are purged, so we won't test that it is updated. - await Future.delayed(Duration(seconds: 1)); - final updatedBytes = - await bucket.readAsBytes('latest/api/packages/oxygen'); - expect(originalBytes.length, lessThan(updatedBytes.length)); - }); - }); + // Check that new package was synchronized + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(1), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + } + + // ## New package version + { + await importProfile( + source: ImportSource.autoGenerated(), + profile: TestProfile( + defaultUser: userAtPubDevEmail, + packages: [ + TestPackage( + name: 'bar', + versions: [TestVersion(version: '3.0.0')], + publisher: 'example.com', + ), + ], + ), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + // Check that versions are there + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } + + // ## Discontinued flipped on + { + final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); + await api.setPackageOptions( + 'bar', + PkgOptions(isDiscontinued: true), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + 'isDiscontinued': true, + }, + ); + } + + // ## Discontinued flipped off + { + final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); + await api.setPackageOptions( + 'bar', + PkgOptions(isDiscontinued: false), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + } + + // ## Version retracted + { + final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); + await api.setVersionOptions( + 'bar', + '2.0.0', + VersionOptions(isRetracted: true), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': contains(containsPair('retracted', true)) + }, + ); + } + + // ## Version moderated + { + // Elapse time before moderating package, because exported-api won't delete + // recently created files as a guard against race conditions. + fakeTime.elapseSync(days: 1); + + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package-version', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'version': '2.0.0', + 'state': 'true', + }), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(1), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } + + // ## Version reinstated + { + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package-version', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'version': '2.0.0', + 'state': 'false', + }), + ); + + // Synchronize again + await synchronize(); + + // Check that version listing was updated + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } + + // ## Package moderated + { + // Elapse time before moderating package, because exported-api won't delete + // recently created files as a guard against race conditions. + fakeTime.elapseSync(days: 1); + + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'state': 'true', + }), + ); + + // Synchronize again + await synchronize(); + + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + isNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNull, + ); + } + + // ## Package reinstated + { + final adminApi = createPubApiClient( + authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), + ); + await adminApi.adminInvokeAction( + 'moderate-package', + AdminInvokeActionArguments(arguments: { + 'case': 'none', + 'package': 'bar', + 'state': 'false', + }), + ); + + // Synchronize again + await synchronize(); + + expect( + await bucket.readGzippedJson('latest/api/packages/bar'), + { + 'name': 'bar', + 'latest': isNotEmpty, + 'versions': hasLength(2), + }, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-2.0.0.tar.gz'), + isNotNull, + ); + expect( + await bucket.readBytes('latest/api/archives/bar-3.0.0.tar.gz'), + isNotNull, + ); + } +} + +extension on Bucket { + /// Read bytes from bucket, retur null if missing + Future readBytes(String path) async { + try { + return await readAsBytes(path); + } on DetailedApiRequestError catch (e) { + if (e.status == 404) return null; + rethrow; + } + } + + /// Read gzipped JSON from bucket + Future readGzippedJson(String path) async { + final bytes = await readBytes(path); + if (bytes == null) { + return null; + } + return utf8JsonDecoder.convert(gzip.decode(bytes)); + } } diff --git a/app/test/shared/test_services.dart b/app/test/shared/test_services.dart index 1ec9168fac..f517c92471 100644 --- a/app/test/shared/test_services.dart +++ b/app/test/shared/test_services.dart @@ -176,8 +176,9 @@ void testWithFakeTime( TestProfile? testProfile, ImportSource? importSource, Pattern? integrityProblem, + dynamic skip, }) { - scopedTest(name, () async { + scopedTest(name, skip: skip, () async { await FakeTime.run((fakeTime) async { setupDebugEnvBasedLogging(); await withFakeServices( From e4bb5204c201e8c54b4e019fabf864d47bccc2f7 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Thu, 7 Nov 2024 13:43:43 +0100 Subject: [PATCH 3/8] More logging --- app/lib/package/api_export/api_exporter.dart | 4 +++- app/lib/package/api_export/exported_api.dart | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/app/lib/package/api_export/api_exporter.dart b/app/lib/package/api_export/api_exporter.dart index 81e8e5a52c..20993dcb09 100644 --- a/app/lib/package/api_export/api_exporter.dart +++ b/app/lib/package/api_export/api_exporter.dart @@ -19,7 +19,7 @@ import '../backend.dart'; import '../models.dart'; import 'exported_api.dart'; -final Logger _log = Logger('api_exporter'); +final Logger _log = Logger('api_export.api_exporter'); /// Sets the API Exporter service. void registerApiExporter(ApiExporter value) => @@ -162,6 +162,8 @@ class ApiExporter { /// * When a change in [Package.updated] is detected. /// * A package is moderated, or other admin action is applied. Future synchronizePackage(String package) async { + _log.info('synchronizePackage("$package")'); + // TODO: Handle the case where [package] is deleted or invisible! // TODO: We may need to delete the package, but only if it's not too recent! final versionListing = await packageBackend.listVersions(package); diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index 59460f22ec..40e192f208 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -19,7 +19,7 @@ import '../../shared/storage.dart'; import '../../shared/versions.dart' show runtimeVersion, runtimeVersionPattern, shouldGCVersion; -final _log = Logger('api_export:exported_bucket'); +final _log = Logger('api_export.exported_bucket'); /// Minimum age before an item can be consider garbage. /// From e422d12e27beb472fd27aa15a871e9377d25fa41 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Thu, 7 Nov 2024 14:00:25 +0100 Subject: [PATCH 4/8] stray file warning gone from tests --- app/test/package/api_export/api_exporter_test.dart | 3 --- 1 file changed, 3 deletions(-) diff --git a/app/test/package/api_export/api_exporter_test.dart b/app/test/package/api_export/api_exporter_test.dart index d30b21b1e3..c1810a7788 100644 --- a/app/test/package/api_export/api_exporter_test.dart +++ b/app/test/package/api_export/api_exporter_test.dart @@ -45,9 +45,6 @@ void main() { final bucket = storageService.bucket('bucket'); final apiExporter = ApiExporter(bucket: bucket); - // TODO: Testing DEBUG=* this will shout "[pub-notice:stray-file]" into logs - // we should investigate this! - await _testExportedApiSynchronization( fakeTime, bucket, From e722e19ff0e0fa67c2a065426ae51a47e0385292 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Thu, 7 Nov 2024 14:29:52 +0100 Subject: [PATCH 5/8] Fix tests --- app/lib/package/api_export/api_exporter.dart | 19 +++++++++++--- .../package/api_export/api_exporter_test.dart | 26 ++++++++++--------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/app/lib/package/api_export/api_exporter.dart b/app/lib/package/api_export/api_exporter.dart index 20993dcb09..e29c83b0a6 100644 --- a/app/lib/package/api_export/api_exporter.dart +++ b/app/lib/package/api_export/api_exporter.dart @@ -4,11 +4,13 @@ import 'dart:async'; +import 'package:_pub_shared/data/package_api.dart'; import 'package:clock/clock.dart'; import 'package:gcloud/service_scope.dart' as ss; import 'package:gcloud/storage.dart'; import 'package:logging/logging.dart'; import 'package:pub_dev/service/security_advisories/backend.dart'; +import 'package:pub_dev/shared/exceptions.dart'; import 'package:pub_dev/shared/parallel_foreach.dart'; import '../../search/backend.dart'; @@ -164,9 +166,20 @@ class ApiExporter { Future synchronizePackage(String package) async { _log.info('synchronizePackage("$package")'); - // TODO: Handle the case where [package] is deleted or invisible! - // TODO: We may need to delete the package, but only if it's not too recent! - final versionListing = await packageBackend.listVersions(package); + final PackageData versionListing; + try { + versionListing = await packageBackend.listVersions(package); + } on NotFoundException { + // Handle the case where package is moderated. + final pkg = await packageBackend.lookupPackage(package); + if (pkg != null && pkg.isNotVisible) { + // We only delete the package if it is explicitly not visible. + // If we can't find it, then it's safer to assume that it's a bug. + await _api.package(package).delete(); + } + return; + } + // TODO: Consider skipping the cache when fetching security advisories final advisories = await securityAdvisoryBackend.listAdvisoriesResponse( package, diff --git a/app/test/package/api_export/api_exporter_test.dart b/app/test/package/api_export/api_exporter_test.dart index c1810a7788..121103b9de 100644 --- a/app/test/package/api_export/api_exporter_test.dart +++ b/app/test/package/api_export/api_exporter_test.dart @@ -9,6 +9,7 @@ import 'package:_pub_shared/data/admin_api.dart'; import 'package:_pub_shared/data/package_api.dart'; import 'package:gcloud/storage.dart'; import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError; +import 'package:logging/logging.dart'; import 'package:pub_dev/fake/backend/fake_auth_provider.dart'; import 'package:pub_dev/package/api_export/api_exporter.dart'; import 'package:pub_dev/shared/storage.dart'; @@ -23,6 +24,8 @@ import '../../shared/test_models.dart'; import '../../shared/test_services.dart'; import '../../task/fake_time.dart'; +final _log = Logger('api_export.test'); + final _testProfile = TestProfile( defaultUser: userAtPubDevEmail, packages: [ @@ -55,7 +58,6 @@ void main() { testWithFakeTime( 'apiExporter.start()', testProfile: _testProfile, - skip: 'Fix this test!', (fakeTime) async { await storageService.createBucket('bucket'); final bucket = storageService.bucket('bucket'); @@ -68,7 +70,7 @@ void main() { await _testExportedApiSynchronization( fakeTime, bucket, - () async => await fakeTime.elapse(hours: 3), + () async => await fakeTime.elapse(minutes: 15), ); await apiExporter.stop(); @@ -81,7 +83,7 @@ Future _testExportedApiSynchronization( Bucket bucket, Future Function() synchronize, ) async { - // ## Existing package + _log.info('## Existing package'); { await synchronize(); @@ -121,7 +123,7 @@ Future _testExportedApiSynchronization( ); } - // ## New package + _log.info('## New package'); { await importProfile( source: ImportSource.autoGenerated(), @@ -167,7 +169,7 @@ Future _testExportedApiSynchronization( ); } - // ## New package version + _log.info('## New package version'); { await importProfile( source: ImportSource.autoGenerated(), @@ -206,7 +208,7 @@ Future _testExportedApiSynchronization( ); } - // ## Discontinued flipped on + _log.info('## Discontinued flipped on'); { final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); await api.setPackageOptions( @@ -229,7 +231,7 @@ Future _testExportedApiSynchronization( ); } - // ## Discontinued flipped off + _log.info('## Discontinued flipped off'); { final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); await api.setPackageOptions( @@ -251,7 +253,7 @@ Future _testExportedApiSynchronization( ); } - // ## Version retracted + _log.info('## Version retracted'); { final api = await createFakeAuthPubApiClient(email: userAtPubDevEmail); await api.setVersionOptions( @@ -274,7 +276,7 @@ Future _testExportedApiSynchronization( ); } - // ## Version moderated + _log.info('## Version moderated'); { // Elapse time before moderating package, because exported-api won't delete // recently created files as a guard against race conditions. @@ -315,7 +317,7 @@ Future _testExportedApiSynchronization( ); } - // ## Version reinstated + _log.info('## Version reinstated'); { final adminApi = createPubApiClient( authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), @@ -352,7 +354,7 @@ Future _testExportedApiSynchronization( ); } - // ## Package moderated + _log.info('## Package moderated'); { // Elapse time before moderating package, because exported-api won't delete // recently created files as a guard against race conditions. @@ -387,7 +389,7 @@ Future _testExportedApiSynchronization( ); } - // ## Package reinstated + _log.info('## Package reinstated'); { final adminApi = createPubApiClient( authToken: createFakeServiceAccountToken(email: 'admin@pub.dev'), From 14266fc1aa7f8ae4189ffc1cf646309d0bda4bde Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Thu, 7 Nov 2024 14:53:09 +0100 Subject: [PATCH 6/8] Fix logger name in exported_api.dart --- app/lib/package/api_export/exported_api.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index 40e192f208..5efe290958 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -19,7 +19,7 @@ import '../../shared/storage.dart'; import '../../shared/versions.dart' show runtimeVersion, runtimeVersionPattern, shouldGCVersion; -final _log = Logger('api_export.exported_bucket'); +final _log = Logger('api_export.exported_api'); /// Minimum age before an item can be consider garbage. /// From 98fcf4fb46e03bc237f8c8fc44d9f7c57f5d0867 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Thu, 7 Nov 2024 15:58:05 +0100 Subject: [PATCH 7/8] Remove skip from test --- app/test/shared/test_services.dart | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/test/shared/test_services.dart b/app/test/shared/test_services.dart index f517c92471..1ec9168fac 100644 --- a/app/test/shared/test_services.dart +++ b/app/test/shared/test_services.dart @@ -176,9 +176,8 @@ void testWithFakeTime( TestProfile? testProfile, ImportSource? importSource, Pattern? integrityProblem, - dynamic skip, }) { - scopedTest(name, skip: skip, () async { + scopedTest(name, () async { await FakeTime.run((fakeTime) async { setupDebugEnvBasedLogging(); await withFakeServices( From 51bb50db67e88b30272fb5e1058564a402262936 Mon Sep 17 00:00:00 2001 From: Jonas Finnemann Jensen Date: Thu, 7 Nov 2024 16:27:43 +0100 Subject: [PATCH 8/8] Avoid caching security advisories --- app/lib/package/api_export/api_exporter.dart | 2 +- .../service/security_advisories/backend.dart | 24 +++++++++++++----- .../security_advisory_test.dart | 25 +++++++++++++++---- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/app/lib/package/api_export/api_exporter.dart b/app/lib/package/api_export/api_exporter.dart index e29c83b0a6..becb38f366 100644 --- a/app/lib/package/api_export/api_exporter.dart +++ b/app/lib/package/api_export/api_exporter.dart @@ -180,9 +180,9 @@ class ApiExporter { return; } - // TODO: Consider skipping the cache when fetching security advisories final advisories = await securityAdvisoryBackend.listAdvisoriesResponse( package, + skipCache: true, // Skipping the cache when fetching security advisories ); final versions = await packageBackend.tarballStorage diff --git a/app/lib/service/security_advisories/backend.dart b/app/lib/service/security_advisories/backend.dart index 2450a24935..714b2cb541 100644 --- a/app/lib/service/security_advisories/backend.dart +++ b/app/lib/service/security_advisories/backend.dart @@ -39,22 +39,34 @@ class SecurityAdvisoryBackend { } Future> lookupSecurityAdvisories( - String package, - ) async { - return (await cache.securityAdvisories(package).get(() async { + String package, { + bool skipCache = false, + }) async { + final loadAdvisories = () async { final query = _db.query() ..filter('affectedPackages =', package); return query .run() .map((SecurityAdvisory adv) => SecurityAdvisoryData.fromModel(adv)) .toList(); - }))!; + }; + if (skipCache) { + return await loadAdvisories(); + } + + return (await cache.securityAdvisories(package).get(loadAdvisories))!; } /// Create a [ListAdvisoriesResponse] for [package] using advisories from /// cache. - Future listAdvisoriesResponse(String package) async { - final advisories = await lookupSecurityAdvisories(package); + Future listAdvisoriesResponse( + String package, { + bool skipCache = false, + }) async { + final advisories = await lookupSecurityAdvisories( + package, + skipCache: skipCache, + ); return ListAdvisoriesResponse( advisories: advisories.map((a) => a.advisory).toList(), advisoriesUpdated: advisories.map((a) => a.syncTime).maxOrNull, diff --git a/app/test/service/security_advisory/security_advisory_test.dart b/app/test/service/security_advisory/security_advisory_test.dart index 04f9c63200..49ce5dd19f 100644 --- a/app/test/service/security_advisory/security_advisory_test.dart +++ b/app/test/service/security_advisory/security_advisory_test.dart @@ -200,7 +200,10 @@ void main() { expect(advisory.syncTime!, ingestTime); expect(advisory.syncTime!.isBefore(afterIngestTime), isTrue); - final list = await securityAdvisoryBackend.lookupSecurityAdvisories('a'); + final list = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'a', + skipCache: true, + ); expect(list, isNotNull); expect(list.length, 1); expect(list.first.advisory.id, id); @@ -229,10 +232,16 @@ void main() { expect(updatedAdvisory.affectedPackages!.first, affectedA.package.name); expect(updatedAdvisory.affectedPackages!.last, affectedC.package.name); - final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories('b'); + final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'b', + skipCache: true, + ); expect(list2, isEmpty); - final list3 = await securityAdvisoryBackend.lookupSecurityAdvisories('c'); + final list3 = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'c', + skipCache: true, + ); expect(list3, isNotNull); expect(list3.length, 1); expect(list3.first.advisory.id, id); @@ -269,12 +278,18 @@ void main() { expect(advisory.affectedPackages!.length, 1); expect(advisory.affectedPackages!.first, affectedA.package.name); - final list = await securityAdvisoryBackend.lookupSecurityAdvisories('a'); + final list = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'a', + skipCache: true, + ); expect(list, isNotNull); expect(list.length, 1); expect(list.first.advisory.id, id); - final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories('b'); + final list2 = await securityAdvisoryBackend.lookupSecurityAdvisories( + 'b', + skipCache: true, + ); expect(list2, isNotNull); expect(list2, isEmpty); });