Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions .github/workflows/bazel_worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@ jobs:
strategy:
fail-fast: false
matrix:
sdk: [3.4, dev]
sdk: [stable, dev]
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- uses: dart-lang/setup-dart@e51d8e571e22473a2ddebf0ef8a2123f0ab2c02c
with:
sdk: ${{ matrix.sdk }}
- run: dart pub get
- run: "dart format --output=none --set-exit-if-changed ."

- run: dart format --output=none --set-exit-if-changed .
if: ${{ matrix.sdk == 'dev' }}
- name: Test
run: ./tool/travis.sh

- run: dart analyze --fatal-infos

- run: dart run benchmark/benchmark.dart

- run: dart test

- name: dart test e2e_test
run: |
cd e2e_test
dart test
5 changes: 5 additions & 0 deletions pkgs/bazel_worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.1.4

* Require Dart SDK `^3.9.0`.
* Widen `package:protobuf` constraint to allow 5.0.0 and requiring minimum 4.0.0.

## 1.1.3

* Require Dart SDK `^3.4.0`.
Expand Down
2 changes: 1 addition & 1 deletion pkgs/bazel_worker/e2e_test/lib/async_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import 'package:bazel_worker/bazel_worker.dart';
class ExampleAsyncWorker extends AsyncWorkerLoop {
/// Set [sendPort] to run in an isolate.
ExampleAsyncWorker([SendPort? sendPort])
: super(connection: AsyncWorkerConnection(sendPort: sendPort));
: super(connection: AsyncWorkerConnection(sendPort: sendPort));

@override
Future<WorkResponse> performRequest(WorkRequest request) async {
Expand Down
7 changes: 4 additions & 3 deletions pkgs/bazel_worker/e2e_test/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
name: e2e_test
publish_to: none

resolution: workspace

environment:
sdk: ^3.4.0
sdk: ^3.9.0

dependencies:
bazel_worker:
path: ../
bazel_worker: any

dev_dependencies:
cli_util: ^0.4.2
Expand Down
13 changes: 7 additions & 6 deletions pkgs/bazel_worker/example/client.dart
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

import 'package:bazel_worker/driver.dart';

void main() async {
var scratchSpace = await Directory.systemTemp.createTemp();
var driver = BazelWorkerDriver(
() => Process.start(
Platform.resolvedExecutable,
[
Platform.script.resolve('worker.dart').toFilePath(),
],
workingDirectory: scratchSpace.path),
() => Process.start(Platform.resolvedExecutable, [
Platform.script.resolve('worker.dart').toFilePath(),
], workingDirectory: scratchSpace.path),
maxWorkers: 4,
);
var response = await driver.doWork(WorkRequest(arguments: ['foo']));
Expand Down
4 changes: 4 additions & 0 deletions pkgs/bazel_worker/example/worker.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';
import 'package:bazel_worker/bazel_worker.dart';

Expand Down
6 changes: 3 additions & 3 deletions pkgs/bazel_worker/lib/src/async_message_grouper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class AsyncMessageGrouper implements MessageGrouper {
int _messagePos = 0;

AsyncMessageGrouper(Stream<List<int>> inputStream)
: _inputQueue = StreamQueue(inputStream);
: _inputQueue = StreamQueue(inputStream);

/// Returns the next full message that is received, or null if none are left.
@override
Future<List<int>?> get next async {
try {
// Loop while there is data in the input buffer or the input stream.
while (
_inputBufferPos != _inputBuffer.length || await _inputQueue.hasNext) {
while (_inputBufferPos != _inputBuffer.length ||
await _inputQueue.hasNext) {
// If the input buffer is empty fill it from the input stream.
if (_inputBufferPos == _inputBuffer.length) {
_inputBuffer = await _inputQueue.next;
Expand Down
54 changes: 28 additions & 26 deletions pkgs/bazel_worker/lib/src/driver/driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ class BazelWorkerDriver {
int? maxIdleWorkers,
int? maxWorkers,
int? maxRetries,
}) : _maxIdleWorkers = maxIdleWorkers ?? 4,
_maxWorkers = maxWorkers ?? 4,
_maxRetries = maxRetries ?? 4;
}) : _maxIdleWorkers = maxIdleWorkers ?? 4,
_maxWorkers = maxWorkers ?? 4,
_maxRetries = maxRetries ?? 4;

/// Waits for an available worker, and then sends [WorkRequest] to it.
///
Expand Down Expand Up @@ -111,29 +111,31 @@ class BazelWorkerDriver {
// work queue.
var futureWorker = _spawnWorker();
_spawningWorkers.add(futureWorker);
futureWorker.then((worker) {
_spawningWorkers.remove(futureWorker);
_readyWorkers.add(worker);
var connection = StdDriverConnection.forWorker(worker);
_workerConnections[worker] = connection;
_runWorker(worker, attempt);

// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
//
// We don't use `exitCode` because it is null for detached processes (
// which is common for workers).
connection.done.then((_) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_runWorkQueue();
});
}).onError<Object>((e, s) {
_spawningWorkers.remove(futureWorker);
if (attempt.responseCompleter.isCompleted) return;
attempt.responseCompleter.completeError(e, s);
});
futureWorker
.then((worker) {
_spawningWorkers.remove(futureWorker);
_readyWorkers.add(worker);
var connection = StdDriverConnection.forWorker(worker);
_workerConnections[worker] = connection;
_runWorker(worker, attempt);

// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
//
// We don't use `exitCode` because it is null for detached processes (
// which is common for workers).
connection.done.then((_) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_runWorkQueue();
});
})
.onError<Object>((e, s) {
_spawningWorkers.remove(futureWorker);
if (attempt.responseCompleter.isCompleted) return;
attempt.responseCompleter.completeError(e, s);
});
}
// Recursively calls itself until one of the bail out conditions are met.
_runWorkQueue();
Expand Down
10 changes: 5 additions & 5 deletions pkgs/bazel_worker/lib/src/driver/driver_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ class StdDriverConnection implements DriverConnection {
StdDriverConnection({
Stream<List<int>>? inputStream,
StreamSink<List<int>>? outputStream,
}) : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;
}) : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;

factory StdDriverConnection.forWorker(Process worker) => StdDriverConnection(
inputStream: worker.stdout,
outputStream: worker.stdin,
);
inputStream: worker.stdout,
outputStream: worker.stdin,
);

/// Note: This will attempts to recover from invalid proto messages by parsing
/// them as strings. This is a common error case for workers (they print a
Expand Down
6 changes: 3 additions & 3 deletions pkgs/bazel_worker/lib/src/message_grouper_state.dart
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ class _MessageReader {
int _numMessageBytesReceived = 0;

_MessageReader(int length)
: _message = Uint8List(length),
_length = length,
_done = length == 0;
: _message = Uint8List(length),
_length = length,
_done = length == 0;

/// Reads [byte] into [_message].
void readByte(int byte) {
Expand Down
2 changes: 1 addition & 1 deletion pkgs/bazel_worker/lib/src/worker/async_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ abstract class AsyncWorkerLoop implements WorkerLoop {
final AsyncWorkerConnection connection;

AsyncWorkerLoop({AsyncWorkerConnection? connection})
: connection = connection ?? StdAsyncWorkerConnection();
: connection = connection ?? StdAsyncWorkerConnection();

/// Perform a single [WorkRequest], and return a [WorkResponse].
@override
Expand Down
2 changes: 1 addition & 1 deletion pkgs/bazel_worker/lib/src/worker/sync_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ abstract class SyncWorkerLoop implements WorkerLoop {
final SyncWorkerConnection connection;

SyncWorkerLoop({SyncWorkerConnection? connection})
: connection = connection ?? StdSyncWorkerConnection();
: connection = connection ?? StdSyncWorkerConnection();

/// Perform a single [WorkRequest], and return a [WorkResponse].
@override
Expand Down
23 changes: 11 additions & 12 deletions pkgs/bazel_worker/lib/src/worker/worker_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ abstract class AsyncWorkerConnection implements WorkerConnection {
Stream<List<int>>? inputStream,
StreamSink<List<int>>? outputStream,
SendPort? sendPort,
}) =>
sendPort == null
? StdAsyncWorkerConnection(
inputStream: inputStream,
outputStream: outputStream,
)
: SendPortAsyncWorkerConnection(sendPort);
}) => sendPort == null
? StdAsyncWorkerConnection(
inputStream: inputStream,
outputStream: outputStream,
)
: SendPortAsyncWorkerConnection(sendPort);

@override
Future<WorkRequest?> readRequest();
Expand All @@ -59,8 +58,8 @@ class StdAsyncWorkerConnection implements AsyncWorkerConnection {
StdAsyncWorkerConnection({
Stream<List<int>>? inputStream,
StreamSink<List<int>>? outputStream,
}) : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;
}) : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;

@override
Future<WorkRequest?> readRequest() async {
Expand Down Expand Up @@ -89,7 +88,7 @@ class SendPortAsyncWorkerConnection implements AsyncWorkerConnection {
}

SendPortAsyncWorkerConnection._(this.receivePort, this.sendPort)
: receivePortIterator = StreamIterator(receivePort.cast());
: receivePortIterator = StreamIterator(receivePort.cast());

@override
Future<WorkRequest?> readRequest() async {
Expand All @@ -110,8 +109,8 @@ class StdSyncWorkerConnection implements SyncWorkerConnection {
final Stdout _stdoutStream;

StdSyncWorkerConnection({Stdin? stdinStream, Stdout? stdoutStream})
: _messageGrouper = SyncMessageGrouper(stdinStream ?? stdin),
_stdoutStream = stdoutStream ?? stdout;
: _messageGrouper = SyncMessageGrouper(stdinStream ?? stdin),
_stdoutStream = stdoutStream ?? stdout;

@override
WorkRequest? readRequest() {
Expand Down
6 changes: 3 additions & 3 deletions pkgs/bazel_worker/lib/testing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class TestSyncWorkerConnection extends StdSyncWorkerConnection
final List<WorkResponse> responses = <WorkResponse>[];

TestSyncWorkerConnection(Stdin stdinStream, Stdout stdoutStream)
: super(stdinStream: stdinStream, stdoutStream: stdoutStream);
: super(stdinStream: stdinStream, stdoutStream: stdoutStream);

@override
void writeResponse(WorkResponse response) {
Expand All @@ -148,7 +148,7 @@ class TestSyncWorkerLoop extends SyncWorkerLoop implements TestWorkerLoop {
final String? printMessage;

TestSyncWorkerLoop(SyncWorkerConnection connection, {this.printMessage})
: super(connection: connection);
: super(connection: connection);

@override
WorkResponse performRequest(WorkRequest request) {
Expand Down Expand Up @@ -193,7 +193,7 @@ class TestAsyncWorkerLoop extends AsyncWorkerLoop implements TestWorkerLoop {
final String? printMessage;

TestAsyncWorkerLoop(AsyncWorkerConnection connection, {this.printMessage})
: super(connection: connection);
: super(connection: connection);

@override
Future<WorkResponse> performRequest(WorkRequest request) async {
Expand Down
10 changes: 7 additions & 3 deletions pkgs/bazel_worker/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
name: bazel_worker
version: 1.1.3
version: 1.1.4
description: >-
Protocol and utilities to implement or invoke persistent bazel workers.
repository: https://github.com/dart-lang/tools/tree/main/pkgs/bazel_worker
issue_tracker: https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Abazel_worker

# Using a workspace so the integration tests resolve to the local version of this package.
workspace:
- e2e_test

environment:
sdk: ^3.4.0
sdk: ^3.9.0

dependencies:
async: ^2.5.0
protobuf: ">=3.0.0 <5.0.0"
protobuf: ">=4.0.0 <6.0.0"

dev_dependencies:
dart_flutter_team_lints: ^3.0.0
Expand Down
3 changes: 2 additions & 1 deletion pkgs/bazel_worker/test/worker_loop_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ void runTests<T extends TestWorkerConnection>(
expect(
printMessages,
isEmpty,
reason: 'The worker loop should hide all print calls from the parent '
reason:
'The worker loop should hide all print calls from the parent '
'zone.',
);

Expand Down
26 changes: 0 additions & 26 deletions pkgs/bazel_worker/tool/travis.sh

This file was deleted.