Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 3.1.0-dev

* Expose a stream for connection state changes on ClientChannel to address
[#428](https://github.com/grpc/grpc-dart/issues/428).
This allows users to react to state changes in the connection.

## 3.0.2

* Fix compilation on the Web with DDC.
Expand Down
20 changes: 20 additions & 0 deletions lib/src/client/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ abstract class ClientChannel {
/// Initiates a new RPC on this connection.
ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options);

/// Stream of connection state changes
///
/// This returns a broadcast stream that can be listened to for connection changes.
/// Note: on web channels, this will not yield any values.
Stream<ConnectionState> get onConnectionStateChanged;
}

/// Auxiliary base class implementing much of ClientChannel.
Expand All @@ -47,6 +53,8 @@ abstract class ClientChannelBase implements ClientChannel {
late ClientConnection _connection;
var _connected = false;
bool _isShutdown = false;
final StreamController<ConnectionState> _connectionStateStreamController =
StreamController.broadcast();

ClientChannelBase();

Expand All @@ -56,6 +64,7 @@ abstract class ClientChannelBase implements ClientChannel {
_isShutdown = true;
if (_connected) {
await _connection.shutdown();
await _connectionStateStreamController.close();
}
}

Expand All @@ -64,6 +73,7 @@ abstract class ClientChannelBase implements ClientChannel {
_isShutdown = true;
if (_connected) {
await _connection.terminate();
await _connectionStateStreamController.close();
}
}

Expand All @@ -76,6 +86,12 @@ abstract class ClientChannelBase implements ClientChannel {
if (_isShutdown) throw GrpcError.unavailable('Channel shutting down.');
if (!_connected) {
_connection = createConnection();
_connection.onStateChanged = (state) {
if (_connectionStateStreamController.isClosed) {
return;
}
_connectionStateStreamController.add(state);
};
_connected = true;
}
return _connection;
Expand All @@ -97,4 +113,8 @@ abstract class ClientChannelBase implements ClientChannel {
}, onError: call.onConnectionError);
return call;
}

@override
Stream<ConnectionState> get onConnectionStateChanged =>
_connectionStateStreamController.stream;
}
6 changes: 6 additions & 0 deletions lib/src/client/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,10 @@ abstract class ClientConnection {
/// All open calls are terminated immediately, and no further calls may be
/// made on this connection.
Future<void> terminate();

/// Set state change listener for this connection. The given callback will be
/// invoked when the state of this connection changes.
// no need for this to be public,
// but needed in the actual implementations
set onStateChanged(void Function(ConnectionState) cb);
}
12 changes: 5 additions & 7 deletions lib/src/client/http2_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ class ClientChannel extends ClientChannelBase {
: super();

@override
ClientConnection createConnection() {
return Http2ClientConnection(host, port, options);
}
ClientConnection createConnection() =>
Http2ClientConnection(host, port, options);
}

class ClientTransportConnectorChannel extends ClientChannelBase {
Expand All @@ -50,8 +49,7 @@ class ClientTransportConnectorChannel extends ClientChannelBase {
{this.options = const ChannelOptions()});

@override
ClientConnection createConnection() {
return Http2ClientConnection.fromClientTransportConnector(
transportConnector, options);
}
ClientConnection createConnection() =>
Http2ClientConnection.fromClientTransportConnector(
transportConnector, options);
}
7 changes: 3 additions & 4 deletions lib/src/client/http2_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import 'dart:convert';
import 'dart:io';

import 'package:http2/transport.dart';
import 'package:meta/meta.dart';

import '../shared/codec.dart';
import '../shared/timeout.dart';
Expand All @@ -43,8 +42,8 @@ class Http2ClientConnection implements connection.ClientConnection {

connection.ConnectionState _state = ConnectionState.idle;

@visibleForTesting
void Function(Http2ClientConnection connection)? onStateChanged;
void Function(connection.ConnectionState)? onStateChanged;

final _pendingCalls = <ClientCall>[];

final ClientTransportConnector _transportConnector;
Expand Down Expand Up @@ -214,7 +213,7 @@ class Http2ClientConnection implements connection.ClientConnection {

void _setState(ConnectionState state) {
_state = state;
onStateChanged?.call(this);
onStateChanged?.call(state);
}

void _handleIdleTimeout() {
Expand Down
7 changes: 6 additions & 1 deletion lib/src/client/transport/xhr_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class XhrTransportStream implements GrpcTransportStream {
}
}

class XhrClientConnection extends ClientConnection {
class XhrClientConnection implements ClientConnection {
final Uri uri;

final _requests = <XhrTransportStream>{};
Expand Down Expand Up @@ -217,6 +217,11 @@ class XhrClientConnection extends ClientConnection {

@override
Future<void> shutdown() async {}

@override
set onStateChanged(void Function(ConnectionState) cb) {
// Do nothing.
}
}

MapEntry<String, String>? _getContentTypeHeader(Map<String, String> metadata) {
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: grpc
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.

version: 3.0.2
version: 3.1.0-dev

repository: https://github.com/grpc/grpc-dart

Expand Down
9 changes: 6 additions & 3 deletions test/client_handles_bad_connections_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection;
List<grpc.ConnectionState> states = <grpc.ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state);
onConnectionStateChanged.listen((state) {
states.add(state);
});
}
@override
ClientConnection createConnection() => clientConnection;
Expand Down Expand Up @@ -89,8 +91,9 @@ Future<void> main() async {
server.port!,
grpc.ChannelOptions(credentials: grpc.ChannelCredentials.insecure())));
final states = <grpc.ConnectionState>[];
channel.clientConnection.onStateChanged =
(Http2ClientConnection connection) => states.add(connection.state);
channel.onConnectionStateChanged.listen((state) {
states.add(state);
});
final testClient = TestClient(channel);

await Future.wait(<Future>[
Expand Down
48 changes: 31 additions & 17 deletions test/client_tests/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -472,45 +472,59 @@ void main() {
);
}

test('Connection states are reported', () async {
final connectionStates = <ConnectionState>[];
harness.channel.onConnectionStateChanged.listen((state) {
connectionStates.add(state);
}, onDone: () {
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.shutdown
]);
});

await makeUnaryCall();
});

test('Connection errors are reported', () async {
final connectionStates = <ConnectionState>[];
harness.connection!.connectionError = 'Connection error';
harness.connection!.onStateChanged = (connection) {
final state = connection.state;
harness.channel.onConnectionStateChanged.listen((state) {
connectionStates.add(state);
};
}, onDone: () {
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
});

final expectedException =
GrpcError.unavailable('Error connecting: Connection error');

await harness.expectThrows(
harness.client.unary(dummyValue), expectedException);

expect(
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
});

test('Connections time out if idle', () async {
final done = Completer();
final connectionStates = <ConnectionState>[];
harness.connection!.onStateChanged = (connection) {
final state = connection.state;
harness.channel.onConnectionStateChanged.listen((state) {
connectionStates.add(state);
if (state == ConnectionState.idle) done.complete();
};
}, onDone: () async {
expect(connectionStates,
[ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
});

harness.channelOptions.idleTimeout = const Duration(microseconds: 10);

await makeUnaryCall();
harness.signalIdle();
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
});

test('Default reconnect backoff backs off', () {
Expand Down
42 changes: 20 additions & 22 deletions test/client_tests/client_transport_connector_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -351,43 +351,41 @@ void main() {

test('Connection errors are reported', () async {
final connectionStates = <ConnectionState>[];
harness.connection!.connectionError = 'Connection error';
harness.connection!.onStateChanged = (connection) {
final state = connection.state;
connectionStates.add(state);
};

final expectedException =
GrpcError.unavailable('Error connecting: Connection error');
harness.connection!.connectionError = 'Connection error';
harness.channel.onConnectionStateChanged.listen((state) {
connectionStates.add(state);
}, onDone: () async {
await harness.expectThrows(
harness.client.unary(dummyValue), expectedException);

await harness.expectThrows(
harness.client.unary(dummyValue), expectedException);

expect(
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.idle]);
});
});

test('Connections time out if idle', () async {
final done = Completer();
final connectionStates = <ConnectionState>[];
harness.connection!.onStateChanged = (connection) {
final state = connection.state;
harness.channel.onConnectionStateChanged.listen((state) {
connectionStates.add(state);
if (state == ConnectionState.idle) done.complete();
};
}, onDone: () async {
expect(connectionStates,
[ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
});

harness.channelOptions.idleTimeout = const Duration(microseconds: 10);

await makeUnaryCall();
harness.signalIdle();
expect(
connectionStates, [ConnectionState.connecting, ConnectionState.ready]);
await done.future;
expect(connectionStates, [
ConnectionState.connecting,
ConnectionState.ready,
ConnectionState.idle
]);
});

test('Default reconnect backoff backs off', () {
Expand Down
2 changes: 1 addition & 1 deletion test/round_trip_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection;
List<ConnectionState> states = <ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state);
onConnectionStateChanged.listen((state) => states.add(state));
}
@override
ClientConnection createConnection() => clientConnection;
Expand Down
2 changes: 1 addition & 1 deletion test/timeline_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class FixedConnectionClientChannel extends ClientChannelBase {
final Http2ClientConnection clientConnection;
List<ConnectionState> states = <ConnectionState>[];
FixedConnectionClientChannel(this.clientConnection) {
clientConnection.onStateChanged = (c) => states.add(c.state);
onConnectionStateChanged.listen((state) => states.add(state));
}
@override
ClientConnection createConnection() => clientConnection;
Expand Down