From b09e19f42d121798b75afe5881c9e42f7e8368e1 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Fri, 12 Sep 2025 15:57:47 +0200 Subject: [PATCH] Refactor search isolate control + minimal test. --- app/bin/tools/search_benchmark.dart | 2 +- app/lib/fake/server/fake_search_service.dart | 17 +++ app/lib/search/models.dart | 8 ++ app/lib/search/updater.dart | 32 +++-- app/lib/service/entrypoint/search.dart | 111 +++++++++++------- app/lib/service/entrypoint/search_index.dart | 2 +- app/lib/shared/handler_helpers.dart | 4 +- app/test/service/entrypoint/search_test.dart | 59 ++++++++++ .../lib/src/fake_pub_server_process.dart | 4 +- 9 files changed, 185 insertions(+), 54 deletions(-) create mode 100644 app/test/service/entrypoint/search_test.dart diff --git a/app/bin/tools/search_benchmark.dart b/app/bin/tools/search_benchmark.dart index f10b66362d..4c4174889f 100644 --- a/app/bin/tools/search_benchmark.dart +++ b/app/bin/tools/search_benchmark.dart @@ -16,7 +16,7 @@ Future main(List args) async { 'max memory: ${ProcessInfo.maxRss ~/ 1024} KiB', ); // Assumes that the first argument is a search snapshot file. - final index = await loadInMemoryPackageIndexFromFile(args.first); + final index = await loadInMemoryPackageIndexFromUrl(args.first); print( 'Loaded. Current memory: ${ProcessInfo.currentRss ~/ 1024} KiB, ' 'max memory: ${ProcessInfo.maxRss ~/ 1024} KiB', diff --git a/app/lib/fake/server/fake_search_service.dart b/app/lib/fake/server/fake_search_service.dart index efbac3b8fe..3ebb23fdc0 100644 --- a/app/lib/fake/server/fake_search_service.dart +++ b/app/lib/fake/server/fake_search_service.dart @@ -9,13 +9,16 @@ import 'package:fake_gcloud/mem_datastore.dart'; import 'package:fake_gcloud/mem_storage.dart'; import 'package:gcloud/service_scope.dart' as ss; import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; import 'package:pub_dev/fake/backend/fake_download_counts.dart'; import 'package:pub_dev/search/handlers.dart'; +import 'package:pub_dev/search/models.dart'; import 'package:pub_dev/search/sdk_mem_index.dart'; import 'package:pub_dev/search/updater.dart'; import 'package:pub_dev/service/services.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/handler_helpers.dart'; +import 'package:pub_dev/shared/handlers.dart'; import 'package:pub_dev/task/cloudcompute/fakecloudcompute.dart'; import 'package:shelf/shelf.dart' as shelf; import 'package:shelf/shelf_io.dart'; @@ -71,3 +74,17 @@ class FakeSearchService { _logger.info('closed'); } } + +@visibleForTesting +Future setupLocalSnapshotServer() async { + final snapshotServer = await IOServer.bind('localhost', 0); + serveRequests(snapshotServer.server, (request) async { + await generateFakeDownloadCountsInDatastore(); + final snapshot = SearchSnapshot.fromDocuments( + // ignore: invalid_use_of_visible_for_testing_member + await indexUpdater.loadAllPackageDocuments(), + ); + return jsonResponse(snapshot.toJson()); + }); + return snapshotServer; +} diff --git a/app/lib/search/models.dart b/app/lib/search/models.dart index 124c6815a8..47beb3f222 100644 --- a/app/lib/search/models.dart +++ b/app/lib/search/models.dart @@ -19,6 +19,14 @@ class SearchSnapshot { factory SearchSnapshot() => SearchSnapshot._(clock.now().toUtc(), {}); + factory SearchSnapshot.fromDocuments(Iterable documents) { + final snapshot = SearchSnapshot(); + for (final doc in documents) { + snapshot.add(doc); + } + return snapshot; + } + factory SearchSnapshot.fromJson(Map json) => _$SearchSnapshotFromJson(json); diff --git a/app/lib/search/updater.dart b/app/lib/search/updater.dart index b57c2a3c67..1c9fc827e6 100644 --- a/app/lib/search/updater.dart +++ b/app/lib/search/updater.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'package:_pub_shared/utils/http.dart'; import 'package:gcloud/service_scope.dart' as ss; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; @@ -28,14 +29,20 @@ void registerIndexUpdater(IndexUpdater updater) => /// The active index updater. IndexUpdater get indexUpdater => ss.lookup(#_indexUpdater) as IndexUpdater; -/// Loads a local search snapshot file and builds an in-memory package index from it. -Future loadInMemoryPackageIndexFromFile( - String path, -) async { - final file = File(path); - final content = - json.decode(utf8.decode(gzip.decode(await file.readAsBytes()))) - as Map; +/// Loads a search snapshot from an URL or from a local file and builds +/// an in-memory package index from it. +Future loadInMemoryPackageIndexFromUrl(String url) async { + late String textContent; + if (url.startsWith('http://') || url.startsWith('https://')) { + textContent = await httpGetWithRetry( + Uri.parse(url), + responseFn: (rs) => rs.body, + ); + } else { + final file = File(url); + textContent = utf8.decode(gzip.decode(await file.readAsBytes())); + } + final content = json.decode(textContent) as Map; final snapshot = SearchSnapshot.fromJson(content); return InMemoryPackageIndex( documents: snapshot.documents!.values.where( @@ -83,12 +90,19 @@ class IndexUpdater { /// complete document for the index. @visibleForTesting Future updateAllPackages() async { + final documents = await loadAllPackageDocuments(); + updatePackageIndex(InMemoryPackageIndex(documents: documents)); + } + + /// Loads all packages as PackageDocuments. + @visibleForTesting + Future> loadAllPackageDocuments() async { final documents = []; await for (final p in _db.query().run()) { final doc = await searchBackend.loadDocument(p.name!); documents.add(doc); } - updatePackageIndex(InMemoryPackageIndex(documents: documents)); + return documents; } /// Returns whether the snapshot was initialized and loaded properly. diff --git a/app/lib/service/entrypoint/search.dart b/app/lib/service/entrypoint/search.dart index 93f2b911c6..21636210cc 100644 --- a/app/lib/service/entrypoint/search.dart +++ b/app/lib/service/entrypoint/search.dart @@ -11,6 +11,7 @@ import 'package:logging/logging.dart'; import 'package:pub_dev/search/result_combiner.dart'; import 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart'; import 'package:pub_dev/service/entrypoint/search_index.dart'; +import 'package:shelf/shelf.dart'; import '../../search/backend.dart'; import '../../search/handlers.dart'; @@ -38,47 +39,77 @@ class SearchCommand extends Command { envConfig.checkServiceEnvironment(name); await withServices(() async { - final packageIsolate = await startSearchIsolate(logger: _logger); - registerScopeExitCallback(packageIsolate.close); - - final sdkIsolate = await startQueryIsolate( - logger: _logger, - kind: 'sdk', - spawnUri: Uri.parse( - 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart', - ), - ); - registerScopeExitCallback(sdkIsolate.close); - - registerSearchIndex( - SearchResultCombiner( - primaryIndex: LatencyAwareSearchIndex( - IsolateSearchIndex(packageIsolate), - ), - sdkIndex: SdkIsolateIndex(sdkIsolate), - ), + await runSearchInstanceController( + port: 8080, + renewPackageIndex: _createRenewStream(delayDrift: delayDrift), ); - - void scheduleRenew() { - scheduleMicrotask(() async { - // 12 - 17 minutes delay - final delay = Duration( - minutes: 12, - seconds: delayDrift + _random.nextInt(240), - ); - await Future.delayed(delay); - - // create a new index and handover with a 2-minute maximum wait - await packageIsolate.renew(count: 1, wait: Duration(minutes: 2)); - - // schedule the renewal again - scheduleRenew(); - }); - } - - scheduleRenew(); - - await runHandler(_logger, searchServiceHandler); }); } } + +/// Creates a stream with events separated by 12 - 17 minutes +Stream _createRenewStream({required int delayDrift}) { + return Stream.periodic(Duration(minutes: 12), (_) => Completer()).asyncMap( + (c) => Future.delayed( + Duration(seconds: delayDrift + _random.nextInt(240)), + () => c, + ), + ); +} + +/// Runs the search instance main controller, which creates separate isolates +/// for the package and the SDK indexes. +/// +/// When the [renewPackageIndex] has a new event, it will trigger the renewal of the +/// package index isolate, updating the search index. +Future runSearchInstanceController({ + required int port, + required Stream renewPackageIndex, + Duration renewWait = const Duration(minutes: 2), + String? snapshot, + Handler? handler, + Future Function()? processTerminationSignal, +}) async { + final packageIsolate = await startSearchIsolate( + logger: _logger, + snapshot: snapshot, + ); + registerScopeExitCallback(packageIsolate.close); + + final sdkIsolate = await startQueryIsolate( + logger: _logger, + kind: 'sdk', + spawnUri: Uri.parse( + 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart', + ), + ); + registerScopeExitCallback(sdkIsolate.close); + + registerSearchIndex( + SearchResultCombiner( + primaryIndex: LatencyAwareSearchIndex(IsolateSearchIndex(packageIsolate)), + sdkIndex: SdkIsolateIndex(sdkIsolate), + ), + ); + + final updateStream = renewPackageIndex.asyncMap((c) async { + try { + // create a new index and handover with a 2-minute maximum wait + await packageIsolate.renew(count: 1, wait: renewWait); + c.complete(null); + } catch (e, st) { + c.completeError(e, st); + } + }); + final updateListener = updateStream.listen((_) { + _logger.info('Package SDK isolate renewed.'); + }); + + await runHandler( + _logger, + handler ?? searchServiceHandler, + port: port, + processTerminationSignal: processTerminationSignal, + ); + await updateListener.cancel(); +} diff --git a/app/lib/service/entrypoint/search_index.dart b/app/lib/service/entrypoint/search_index.dart index 0ccf8f0d35..3f394063b6 100644 --- a/app/lib/service/entrypoint/search_index.dart +++ b/app/lib/service/entrypoint/search_index.dart @@ -49,7 +49,7 @@ Future main(List args, var message) async { if (snapshot == null) { await indexUpdater.init(); } else { - updatePackageIndex(await loadInMemoryPackageIndexFromFile(snapshot)); + updatePackageIndex(await loadInMemoryPackageIndexFromUrl(snapshot)); } await runIsolateFunctions( diff --git a/app/lib/shared/handler_helpers.dart b/app/lib/shared/handler_helpers.dart index 4c6667399e..26aa1fd07b 100644 --- a/app/lib/shared/handler_helpers.dart +++ b/app/lib/shared/handler_helpers.dart @@ -38,6 +38,7 @@ Future runHandler( shelf.Handler handler, { bool sanitize = false, int port = 8080, + Future Function()? processTerminationSignal, }) async { handler = wrapHandler(logger, handler, sanitize: sanitize); if (envConfig.isRunningInAppengine) { @@ -60,7 +61,8 @@ Future runHandler( port, shared: true, ); - await waitForProcessSignalTermination(); + processTerminationSignal ??= waitForProcessSignalTermination; + await processTerminationSignal(); await server.close(); } } diff --git a/app/test/service/entrypoint/search_test.dart b/app/test/service/entrypoint/search_test.dart new file mode 100644 index 0000000000..23196b2a77 --- /dev/null +++ b/app/test/service/entrypoint/search_test.dart @@ -0,0 +1,59 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:pub_dev/fake/server/fake_search_service.dart'; +import 'package:pub_dev/service/entrypoint/search.dart'; +import 'package:shelf/shelf_io.dart'; +import 'package:test/test.dart'; + +import '../../shared/test_services.dart'; + +void main() { + group('Main search isolate controller', () { + testWithProfile( + 'update the package index isolate', + fn: () async { + final snapshotServer = await setupLocalSnapshotServer(); + final renewController = StreamController.broadcast(); + final processTerminationCompleter = Completer(); + final handlerStartedCompleter = Completer(); + try { + final port = await _detectFreePort(); + final doneFuture = runSearchInstanceController( + port: port, + renewPackageIndex: renewController.stream, + processTerminationSignal: () async { + handlerStartedCompleter.complete(); + return await processTerminationCompleter.future; + }, + renewWait: Duration.zero, + snapshot: 'http://localhost:${snapshotServer.server.port}/', + ); + await handlerStartedCompleter.future; + + // force renew + final c = Completer(); + renewController.add(c); + await c.future; + + processTerminationCompleter.complete(); + await doneFuture; + } finally { + await snapshotServer.close(); + await renewController.close(); + } + }, + timeout: Timeout.factor(8), + ); + }); +} + +Future _detectFreePort() async { + final server = await IOServer.bind('localhost', 0); + final port = server.server.port; + await server.close(); + return port; +} diff --git a/pkg/pub_integration/lib/src/fake_pub_server_process.dart b/pkg/pub_integration/lib/src/fake_pub_server_process.dart index a98fe31dfb..3f77c46565 100644 --- a/pkg/pub_integration/lib/src/fake_pub_server_process.dart +++ b/pkg/pub_integration/lib/src/fake_pub_server_process.dart @@ -151,13 +151,13 @@ class FakePubServerProcess { Future get started => _startedCompleter.future; Future kill() async { - // First try SIGINT, and after 10 seconds do SIGTERM. + // First try SIGINT, and after 10 seconds do SIGKILL. print('Sending INT signal to ${_process.pid}...'); _process.kill(ProcessSignal.sigint); await _coverageConfig?.waitForCollect(); final timer = Timer(Duration(seconds: 10), () { print('Sending TERM signal to ${_process.pid}...'); - _process.kill(); + _process.kill(ProcessSignal.sigkill); }); final exitCode = await _process.exitCode; print('Exit code: $exitCode');