Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions app/lib/dartdoc/dartdoc_runner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import 'package:path/path.dart' as p;

import 'package:pub_dartdoc_data/pub_dartdoc_data.dart';

import '../frontend/static_files.dart';
import '../job/backend.dart';
import '../job/job.dart';
import '../scorecard/backend.dart';
Expand All @@ -37,22 +38,32 @@ const _pubDataFileName = 'pub-data.json';
const _sdkTimeout = Duration(minutes: 20);
final Duration _twoYears = const Duration(days: 2 * 365);

final _pkgPubDartdocDir =
Platform.script.resolve('../../pkg/pub_dartdoc').toFilePath();

class DartdocJobProcessor extends JobProcessor {
bool _initialized = false;

DartdocJobProcessor({
@required AliveCallback aliveCallback,
}) : super(
service: JobService.dartdoc,
aliveCallback: aliveCallback,
);

Future<void> _initializeIfNeeded() async {
if (_initialized) return;
await runProc(
'pub',
['get'],
workingDirectory: resolvePubDartdocDirPath(),
);
_initialized = true;
}

/// Uses the tool environment's SDK (the one that is used for analysis too) to
/// generate dartdoc documentation and extracted data file for SDK API indexing.
/// Only the extracted data file will be used and uploaded.
Future<void> generateDocsForSdk() async {
if (await dartdocBackend.hasValidDartSdkDartdocData()) return;
await _initializeIfNeeded();
final tempDir =
await Directory.systemTemp.createTemp('pub-dartlang-dartdoc');
try {
Expand All @@ -70,7 +81,7 @@ class DartdocJobProcessor extends JobProcessor {
final pr = await runProc(
'dart',
['bin/pub_dartdoc.dart', ...args],
workingDirectory: _pkgPubDartdocDir,
workingDirectory: resolvePubDartdocDirPath(),
timeout: _sdkTimeout,
);

Expand Down Expand Up @@ -164,6 +175,7 @@ class DartdocJobProcessor extends JobProcessor {
job.packageName,
job.packageVersion,
destination: pkgPath,
pubHostedUrl: activeConfiguration.primarySiteUri.toString(),
);
final usesFlutter = await toolEnvRef.toolEnv.detectFlutterUse(pkgPath);

Expand Down Expand Up @@ -346,6 +358,7 @@ class DartdocJobProcessor extends JobProcessor {
/// When [isReduced] is set, we are running dartdoc with reduced features,
/// hopefully to complete within the time limit and fewer issues.
Future<DartdocResult> runDartdoc({bool isReduced = false}) async {
await _initializeIfNeeded();
final args = [
'--input',
pkgPath,
Expand All @@ -368,7 +381,7 @@ class DartdocJobProcessor extends JobProcessor {
'dart',
['bin/pub_dartdoc.dart', ...args],
environment: environment,
workingDirectory: _pkgPubDartdocDir,
workingDirectory: resolvePubDartdocDirPath(),
timeout: _packageTimeout,
);
final hasIndexHtml = await File(p.join(outputDir, 'index.html')).exists();
Expand Down
6 changes: 6 additions & 0 deletions app/lib/frontend/static_files.dart
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ String resolveWebCssDirPath() {
.resolveSymbolicLinksSync();
}

/// Returns the path of pkg/pub_dartdoc on the local filesystem.
String resolvePubDartdocDirPath() {
return Directory(path.join(resolveAppDir(), '../pkg/pub_dartdoc'))
.resolveSymbolicLinksSync();
}

/// Returns the path of /doc on the local filesystem.
String resolveDocDirPath() {
return path.join(resolveAppDir(), '../doc');
Expand Down
102 changes: 64 additions & 38 deletions app/lib/job/job.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export 'model.dart';
final _logger = Logger('pub.job');
final _random = math.Random.secure();

typedef AliveCallback = Function();
typedef AliveCallback = FutureOr Function();

abstract class JobProcessor {
final JobService service;
Expand All @@ -50,46 +50,50 @@ abstract class JobProcessor {
Future<void> run() async {
int sleepSeconds = 0;
for (;;) {
sleepSeconds = math.min(sleepSeconds + 1, 60);
String jobDescription = '[pull]';
final sw = Stopwatch()..start();
var statEvent = 'failed';
try {
final job = await jobBackend.lockAvailable(service);
if (job != null) {
jobDescription = '${job.packageName} ${job.packageVersion}';
_logger.info('$_serviceAsString job started: $jobDescription');
sleepSeconds = 0;

JobStatus status = JobStatus.failed;
try {
status = await process(job);
status = JobStatus.success;
statEvent = 'success';
_logger.info('$_serviceAsString job completed: $jobDescription');
} on TransactionAbortedError catch (e, st) {
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} on TimeoutError catch (e, st) {
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} catch (e, st) {
_logger.severe(
'$_serviceAsString job error $jobDescription', e, st);
}
await jobBackend.complete(job, status);
final status = await _runOneJob();
if (_aliveCallback != null) await _aliveCallback();
sleepSeconds = (status == JobStatus.failed || status == JobStatus.aborted)
? math.min(sleepSeconds + 1, 60)
: 0;
await Future.delayed(Duration(seconds: sleepSeconds));
}
}

Future<JobStatus> _runOneJob() async {
JobStatus status = JobStatus.none;
String jobDescription = '[pull]';
final sw = Stopwatch()..start();
var statEvent = 'failed';
try {
final job = await jobBackend.lockAvailable(service);
if (job != null) {
jobDescription = '${job.packageName} ${job.packageVersion}';
_logger.info('$_serviceAsString job started: $jobDescription');
try {
status = await process(job);
status = JobStatus.success;
statEvent = 'success';
_logger.info('$_serviceAsString job completed: $jobDescription');
} on TransactionAbortedError catch (e, st) {
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} on TimeoutError catch (e, st) {
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} catch (e, st) {
_logger.severe('$_serviceAsString job error $jobDescription', e, st);
}
} on TransactionAbortedError catch (e, st) {
statEvent = 'transaction';
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} on TimeoutError catch (e, st) {
statEvent = 'timeout';
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} catch (e, st) {
_logger.severe('$_serviceAsString job error $jobDescription', e, st);
await jobBackend.complete(job, status);
}
_trackers.putIfAbsent(statEvent, () => DurationTracker()).add(sw.elapsed);
if (_aliveCallback != null) _aliveCallback();
await Future.delayed(Duration(seconds: sleepSeconds));
} on TransactionAbortedError catch (e, st) {
statEvent = 'transaction';
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} on TimeoutError catch (e, st) {
statEvent = 'timeout';
_logger.info('$_serviceAsString job error $jobDescription', e, st);
} catch (e, st) {
_logger.severe('$_serviceAsString job error $jobDescription', e, st);
}
_trackers.putIfAbsent(statEvent, () => DurationTracker()).add(sw.elapsed);
return status;
}

void reportIssueWithLatest(Job job, String message) {
Expand Down Expand Up @@ -118,6 +122,28 @@ class JobMaintenance {
return Future.wait(futures);
}

/// Scans datastore for job updates, unlocks pending jobs and then runs the
/// available jobs.
@visibleForTesting
Future<void> scanUpdateAndRunOnce() async {
await for (final pv in _db.query<PackageVersion>().run()) {
await jobBackend.trigger(
_processor.service,
pv.package,
version: pv.version,
updated: pv.created,
);
}
await jobBackend.unlockStaleProcessing(_processor.service);
await jobBackend.checkIdle(_processor.service, _processor.shouldProcess);
for (;;) {
final status = await _processor._runOneJob();
if (status == JobStatus.none) {
break;
}
}
}

/// Never completes.
Future<void> syncDatastoreHead() async {
final source = DatastoreHeadTaskSource(_db, TaskSourceModel.version,
Expand Down
51 changes: 43 additions & 8 deletions pkg/fake_pub_server/bin/fake_pub_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import 'package:shelf/shelf.dart' as shelf;

import 'package:fake_gcloud/mem_datastore.dart';
import 'package:fake_gcloud/mem_storage.dart';
import 'package:fake_pub_server/fake_analyzer_service.dart';
import 'package:fake_pub_server/fake_dartdoc_service.dart';
import 'package:fake_pub_server/fake_pub_server.dart';
import 'package:fake_pub_server/fake_search_service.dart';
import 'package:fake_pub_server/fake_storage_server.dart';
Expand All @@ -21,13 +23,19 @@ final _argParser = ArgParser()
..addOption('storage-port',
defaultsTo: '8081', help: 'The HTTP port for the fake storage server.')
..addOption('search-port',
defaultsTo: '8082', help: 'The HTTP port for the fake search service.');
defaultsTo: '8082', help: 'The HTTP port for the fake search service.')
..addOption('analyzer-port',
defaultsTo: '8083', help: 'The HTTP port for the fake analyzer service.')
..addOption('dartdoc-port',
defaultsTo: '8084', help: 'The HTTP port for the fake dartdoc service.');

Future main(List<String> args) async {
final argv = _argParser.parse(args);
final port = int.parse(argv['port'] as String);
final storagePort = int.parse(argv['storage-port'] as String);
final searchPort = int.parse(argv['search-port'] as String);
final analyzerPort = int.parse(argv['analyzer-port'] as String);
final dartdocPort = int.parse(argv['dartdoc-port'] as String);

Logger.root.onRecord.listen((r) {
print([
Expand All @@ -44,22 +52,41 @@ Future main(List<String> args) async {
final storageServer = FakeStorageServer(storage);
final pubServer = FakePubServer(datastore, storage);
final searchService = FakeSearchService(datastore, storage);
final analyzerService = FakeAnalyzerService(datastore, storage);
final dartdocService = FakeDartdocService(datastore, storage);

final configuration = Configuration.fakePubServer(
frontendPort: port,
storageBaseUrl: 'http://localhost:$storagePort',
searchPort: searchPort,
);

Future<shelf.Response> _updateUpstream(int port) async {
final rs = await post('http://localhost:$port/fake-update-all');
if (rs.statusCode == 200) {
return shelf.Response.ok('OK');
} else {
return shelf.Response(503,
body: 'Upstream service ($port) returned ${rs.statusCode}.');
}
}

Future<shelf.Response> forwardUpdatesHandler(shelf.Request rq) async {
if (rq.requestedUri.path == '/fake-update-all') {
final analyzerRs = await _updateUpstream(analyzerPort);
if (analyzerRs.statusCode != 200) return analyzerRs;
final dartdocRs = await _updateUpstream(dartdocPort);
if (dartdocRs.statusCode != 200) return dartdocRs;
return await _updateUpstream(searchPort);
}
if (rq.requestedUri.path == '/fake-update-analyzer') {
return await _updateUpstream(analyzerPort);
}
if (rq.requestedUri.path == '/fake-update-dartdoc') {
return await _updateUpstream(dartdocPort);
}
if (rq.requestedUri.path == '/fake-update-search') {
final rs = await post('http://localhost:$searchPort/fake-update-all');
if (rs.statusCode == 200) {
return shelf.Response.ok('OK');
} else {
return shelf.Response(503,
body: 'Upstream service returned ${rs.statusCode}.');
}
return await _updateUpstream(searchPort);
}
return null;
}
Expand All @@ -77,6 +104,14 @@ Future main(List<String> args) async {
port: searchPort,
configuration: configuration,
),
analyzerService.run(
port: analyzerPort,
configuration: configuration,
),
dartdocService.run(
port: dartdocPort,
configuration: configuration,
),
],
eagerError: true,
);
Expand Down
72 changes: 72 additions & 0 deletions pkg/fake_pub_server/lib/fake_analyzer_service.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';

import 'package:fake_gcloud/mem_datastore.dart';
import 'package:fake_gcloud/mem_storage.dart';
import 'package:gcloud/db.dart';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:shelf/shelf.dart' as shelf;
import 'package:shelf/shelf_io.dart';

import 'package:pub_dev/analyzer/handlers.dart';
import 'package:pub_dev/analyzer/pana_runner.dart';
import 'package:pub_dev/job/job.dart';
import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/handler_helpers.dart';
import 'package:pub_dev/service/services.dart';

final _logger = Logger('fake_analyzer_service');

class FakeAnalyzerService {
final MemDatastore _datastore;
final MemStorage _storage;

FakeAnalyzerService(this._datastore, this._storage);

Future<void> run({
int port = 8083,
@required Configuration configuration,
}) async {
await ss.fork(() async {
final db = DatastoreDB(_datastore);
registerDbService(db);
registerStorageService(_storage);
registerActiveConfiguration(configuration);

await withPubServices(() async {
await ss.fork(() async {
final jobProcessor = AnalyzerJobProcessor(aliveCallback: null);
final jobMaintenance = JobMaintenance(dbService, jobProcessor);

final handler = wrapHandler(_logger, analyzerServiceHandler);
final server = await IOServer.bind('localhost', port);
serveRequests(server.server, (request) async {
return await ss.fork(() async {
if (request.requestedUri.path == '/fake-update-all') {
// ignore: invalid_use_of_visible_for_testing_member
await jobMaintenance.scanUpdateAndRunOnce();
return shelf.Response.ok('');
}
return await handler(request);
}) as shelf.Response;
});
_logger.info('running on port $port');

await ProcessSignal.sigterm.watch().first;

_logger.info('shutting down');
await server.close();
_logger.info('closing');
});
});
});
_logger.info('closed');
}
}
Loading