Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/bin/tools/search_benchmark.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Future<void> main(List<String> args) async {
'max memory: ${ProcessInfo.maxRss ~/ 1024} KiB',
);
// Assumes that the first argument is a search snapshot file.
final index = await loadInMemoryPackageIndexFromFile(args.first);
final index = await loadInMemoryPackageIndexFromUrl(args.first);
print(
'Loaded. Current memory: ${ProcessInfo.currentRss ~/ 1024} KiB, '
'max memory: ${ProcessInfo.maxRss ~/ 1024} KiB',
Expand Down
17 changes: 17 additions & 0 deletions app/lib/fake/server/fake_search_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import 'package:fake_gcloud/mem_datastore.dart';
import 'package:fake_gcloud/mem_storage.dart';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:pub_dev/fake/backend/fake_download_counts.dart';
import 'package:pub_dev/search/handlers.dart';
import 'package:pub_dev/search/models.dart';
import 'package:pub_dev/search/sdk_mem_index.dart';
import 'package:pub_dev/search/updater.dart';
import 'package:pub_dev/service/services.dart';
import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/handler_helpers.dart';
import 'package:pub_dev/shared/handlers.dart';
import 'package:pub_dev/task/cloudcompute/fakecloudcompute.dart';
import 'package:shelf/shelf.dart' as shelf;
import 'package:shelf/shelf_io.dart';
Expand Down Expand Up @@ -71,3 +74,17 @@ class FakeSearchService {
_logger.info('closed');
}
}

@visibleForTesting
Future<IOServer> setupLocalSnapshotServer() async {
final snapshotServer = await IOServer.bind('localhost', 0);
serveRequests(snapshotServer.server, (request) async {
await generateFakeDownloadCountsInDatastore();
final snapshot = SearchSnapshot.fromDocuments(
// ignore: invalid_use_of_visible_for_testing_member
await indexUpdater.loadAllPackageDocuments(),
);
return jsonResponse(snapshot.toJson());
});
return snapshotServer;
}
8 changes: 8 additions & 0 deletions app/lib/search/models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ class SearchSnapshot {

factory SearchSnapshot() => SearchSnapshot._(clock.now().toUtc(), {});

factory SearchSnapshot.fromDocuments(Iterable<PackageDocument> documents) {
final snapshot = SearchSnapshot();
for (final doc in documents) {
snapshot.add(doc);
}
return snapshot;
}

factory SearchSnapshot.fromJson(Map<String, dynamic> json) =>
_$SearchSnapshotFromJson(json);

Expand Down
32 changes: 23 additions & 9 deletions app/lib/search/updater.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:_pub_shared/utils/http.dart';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
Expand All @@ -28,14 +29,20 @@ void registerIndexUpdater(IndexUpdater updater) =>
/// The active index updater.
IndexUpdater get indexUpdater => ss.lookup(#_indexUpdater) as IndexUpdater;

/// Loads a local search snapshot file and builds an in-memory package index from it.
Future<InMemoryPackageIndex> loadInMemoryPackageIndexFromFile(
String path,
) async {
final file = File(path);
final content =
json.decode(utf8.decode(gzip.decode(await file.readAsBytes())))
as Map<String, Object?>;
/// Loads a search snapshot from an URL or from a local file and builds
/// an in-memory package index from it.
Future<InMemoryPackageIndex> loadInMemoryPackageIndexFromUrl(String url) async {
late String textContent;
if (url.startsWith('http://') || url.startsWith('https://')) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you call this with an https url? Or is it preparing for something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More like completeness. Though, we could make the bucket content available through such URL, which would be https... and that would remove the special case here. While the intention was not to prepare for something, it can be :)

textContent = await httpGetWithRetry(
Uri.parse(url),
responseFn: (rs) => rs.body,
);
} else {
final file = File(url);
textContent = utf8.decode(gzip.decode(await file.readAsBytes()));
}
final content = json.decode(textContent) as Map<String, Object?>;
final snapshot = SearchSnapshot.fromJson(content);
return InMemoryPackageIndex(
documents: snapshot.documents!.values.where(
Expand Down Expand Up @@ -83,12 +90,19 @@ class IndexUpdater {
/// complete document for the index.
@visibleForTesting
Future<void> updateAllPackages() async {
final documents = await loadAllPackageDocuments();
updatePackageIndex(InMemoryPackageIndex(documents: documents));
}

/// Loads all packages as PackageDocuments.
@visibleForTesting
Future<List<PackageDocument>> loadAllPackageDocuments() async {
final documents = <PackageDocument>[];
await for (final p in _db.query<Package>().run()) {
final doc = await searchBackend.loadDocument(p.name!);
documents.add(doc);
}
updatePackageIndex(InMemoryPackageIndex(documents: documents));
return documents;
}

/// Returns whether the snapshot was initialized and loaded properly.
Expand Down
111 changes: 71 additions & 40 deletions app/lib/service/entrypoint/search.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'package:logging/logging.dart';
import 'package:pub_dev/search/result_combiner.dart';
import 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart';
import 'package:pub_dev/service/entrypoint/search_index.dart';
import 'package:shelf/shelf.dart';

import '../../search/backend.dart';
import '../../search/handlers.dart';
Expand Down Expand Up @@ -38,47 +39,77 @@ class SearchCommand extends Command {

envConfig.checkServiceEnvironment(name);
await withServices(() async {
final packageIsolate = await startSearchIsolate(logger: _logger);
registerScopeExitCallback(packageIsolate.close);

final sdkIsolate = await startQueryIsolate(
logger: _logger,
kind: 'sdk',
spawnUri: Uri.parse(
'package:pub_dev/service/entrypoint/sdk_isolate_index.dart',
),
);
registerScopeExitCallback(sdkIsolate.close);

registerSearchIndex(
SearchResultCombiner(
primaryIndex: LatencyAwareSearchIndex(
IsolateSearchIndex(packageIsolate),
),
sdkIndex: SdkIsolateIndex(sdkIsolate),
),
await runSearchInstanceController(
port: 8080,
renewPackageIndex: _createRenewStream(delayDrift: delayDrift),
);

void scheduleRenew() {
scheduleMicrotask(() async {
// 12 - 17 minutes delay
final delay = Duration(
minutes: 12,
seconds: delayDrift + _random.nextInt(240),
);
await Future.delayed(delay);

// create a new index and handover with a 2-minute maximum wait
await packageIsolate.renew(count: 1, wait: Duration(minutes: 2));

// schedule the renewal again
scheduleRenew();
});
}

scheduleRenew();

await runHandler(_logger, searchServiceHandler);
});
}
}

/// Creates a stream with events separated by 12 - 17 minutes
Stream<Completer> _createRenewStream({required int delayDrift}) {
return Stream.periodic(Duration(minutes: 12), (_) => Completer()).asyncMap(
(c) => Future.delayed(
Duration(seconds: delayDrift + _random.nextInt(240)),
() => c,
),
);
}

/// Runs the search instance main controller, which creates separate isolates
/// for the package and the SDK indexes.
///
/// When the [renewPackageIndex] has a new event, it will trigger the renewal of the
/// package index isolate, updating the search index.
Future<void> runSearchInstanceController({
required int port,
required Stream<Completer> renewPackageIndex,
Duration renewWait = const Duration(minutes: 2),
String? snapshot,
Handler? handler,
Future<void> Function()? processTerminationSignal,
}) async {
final packageIsolate = await startSearchIsolate(
logger: _logger,
snapshot: snapshot,
);
registerScopeExitCallback(packageIsolate.close);

final sdkIsolate = await startQueryIsolate(
logger: _logger,
kind: 'sdk',
spawnUri: Uri.parse(
'package:pub_dev/service/entrypoint/sdk_isolate_index.dart',
),
);
registerScopeExitCallback(sdkIsolate.close);

registerSearchIndex(
SearchResultCombiner(
primaryIndex: LatencyAwareSearchIndex(IsolateSearchIndex(packageIsolate)),
sdkIndex: SdkIsolateIndex(sdkIsolate),
),
);

final updateStream = renewPackageIndex.asyncMap((c) async {
try {
// create a new index and handover with a 2-minute maximum wait
await packageIsolate.renew(count: 1, wait: renewWait);
c.complete(null);
} catch (e, st) {
c.completeError(e, st);
}
});
final updateListener = updateStream.listen((_) {
_logger.info('Package SDK isolate renewed.');
});

await runHandler(
_logger,
handler ?? searchServiceHandler,
port: port,
processTerminationSignal: processTerminationSignal,
);
await updateListener.cancel();
}
2 changes: 1 addition & 1 deletion app/lib/service/entrypoint/search_index.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Future<void> main(List<String> args, var message) async {
if (snapshot == null) {
await indexUpdater.init();
} else {
updatePackageIndex(await loadInMemoryPackageIndexFromFile(snapshot));
updatePackageIndex(await loadInMemoryPackageIndexFromUrl(snapshot));
}

await runIsolateFunctions(
Expand Down
4 changes: 3 additions & 1 deletion app/lib/shared/handler_helpers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Future<void> runHandler(
shelf.Handler handler, {
bool sanitize = false,
int port = 8080,
Future<void> Function()? processTerminationSignal,
}) async {
handler = wrapHandler(logger, handler, sanitize: sanitize);
if (envConfig.isRunningInAppengine) {
Expand All @@ -60,7 +61,8 @@ Future<void> runHandler(
port,
shared: true,
);
await waitForProcessSignalTermination();
processTerminationSignal ??= waitForProcessSignalTermination;
await processTerminationSignal();
await server.close();
}
}
Expand Down
59 changes: 59 additions & 0 deletions app/test/service/entrypoint/search_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:pub_dev/fake/server/fake_search_service.dart';
import 'package:pub_dev/service/entrypoint/search.dart';
import 'package:shelf/shelf_io.dart';
import 'package:test/test.dart';

import '../../shared/test_services.dart';

void main() {
group('Main search isolate controller', () {
testWithProfile(
'update the package index isolate',
fn: () async {
final snapshotServer = await setupLocalSnapshotServer();
final renewController = StreamController<Completer>.broadcast();
final processTerminationCompleter = Completer();
final handlerStartedCompleter = Completer();
try {
final port = await _detectFreePort();
final doneFuture = runSearchInstanceController(
port: port,
renewPackageIndex: renewController.stream,
processTerminationSignal: () async {
handlerStartedCompleter.complete();
return await processTerminationCompleter.future;
},
renewWait: Duration.zero,
snapshot: 'http://localhost:${snapshotServer.server.port}/',
);
await handlerStartedCompleter.future;

// force renew
final c = Completer();
renewController.add(c);
await c.future;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to expect something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point the fact that it runs without issues and the future completes is already a good test. I will add search queries and actually changing index at a follow-up pr, when the fake-service is also fixed.

processTerminationCompleter.complete();
await doneFuture;
} finally {
await snapshotServer.close();
await renewController.close();
}
},
timeout: Timeout.factor(8),
);
});
}

Future<int> _detectFreePort() async {
final server = await IOServer.bind('localhost', 0);
final port = server.server.port;
await server.close();
return port;
}
4 changes: 2 additions & 2 deletions pkg/pub_integration/lib/src/fake_pub_server_process.dart
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,13 @@ class FakePubServerProcess {
Future<void> get started => _startedCompleter.future;

Future<void> kill() async {
// First try SIGINT, and after 10 seconds do SIGTERM.
// First try SIGINT, and after 10 seconds do SIGKILL.
print('Sending INT signal to ${_process.pid}...');
_process.kill(ProcessSignal.sigint);
await _coverageConfig?.waitForCollect();
final timer = Timer(Duration(seconds: 10), () {
print('Sending TERM signal to ${_process.pid}...');
_process.kill();
_process.kill(ProcessSignal.sigkill);
});
final exitCode = await _process.exitCode;
print('Exit code: $exitCode');
Expand Down