diff --git a/CHANGELOG.md b/CHANGELOG.md index f9a41871..9d642cde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 3.1.0-dev + +* Expose a stream for connection state changes on ClientChannel to address + [#428](https://github.com/grpc/grpc-dart/issues/428). + This allows users to react to state changes in the connection. + ## 3.0.2 * Fix compilation on the Web with DDC. diff --git a/lib/src/client/channel.dart b/lib/src/client/channel.dart index 9ca3c5a0..f7de79e1 100644 --- a/lib/src/client/channel.dart +++ b/lib/src/client/channel.dart @@ -39,6 +39,12 @@ abstract class ClientChannel { /// Initiates a new RPC on this connection. ClientCall createCall( ClientMethod method, Stream requests, CallOptions options); + + /// Stream of connection state changes + /// + /// This returns a broadcast stream that can be listened to for connection changes. + /// Note: on web channels, this will not yield any values. + Stream get onConnectionStateChanged; } /// Auxiliary base class implementing much of ClientChannel. @@ -47,6 +53,8 @@ abstract class ClientChannelBase implements ClientChannel { late ClientConnection _connection; var _connected = false; bool _isShutdown = false; + final StreamController _connectionStateStreamController = + StreamController.broadcast(); ClientChannelBase(); @@ -56,6 +64,7 @@ abstract class ClientChannelBase implements ClientChannel { _isShutdown = true; if (_connected) { await _connection.shutdown(); + await _connectionStateStreamController.close(); } } @@ -64,6 +73,7 @@ abstract class ClientChannelBase implements ClientChannel { _isShutdown = true; if (_connected) { await _connection.terminate(); + await _connectionStateStreamController.close(); } } @@ -76,6 +86,12 @@ abstract class ClientChannelBase implements ClientChannel { if (_isShutdown) throw GrpcError.unavailable('Channel shutting down.'); if (!_connected) { _connection = createConnection(); + _connection.onStateChanged = (state) { + if (_connectionStateStreamController.isClosed) { + return; + } + _connectionStateStreamController.add(state); + }; _connected = true; } return _connection; @@ -97,4 +113,8 @@ abstract class ClientChannelBase implements ClientChannel { }, onError: call.onConnectionError); return call; } + + @override + Stream get onConnectionStateChanged => + _connectionStateStreamController.stream; } diff --git a/lib/src/client/connection.dart b/lib/src/client/connection.dart index b6f9d8e5..460438af 100644 --- a/lib/src/client/connection.dart +++ b/lib/src/client/connection.dart @@ -57,4 +57,10 @@ abstract class ClientConnection { /// All open calls are terminated immediately, and no further calls may be /// made on this connection. Future terminate(); + + /// Set state change listener for this connection. The given callback will be + /// invoked when the state of this connection changes. + // no need for this to be public, + // but needed in the actual implementations + set onStateChanged(void Function(ConnectionState) cb); } diff --git a/lib/src/client/http2_channel.dart b/lib/src/client/http2_channel.dart index eee91fc8..9326766c 100644 --- a/lib/src/client/http2_channel.dart +++ b/lib/src/client/http2_channel.dart @@ -37,9 +37,8 @@ class ClientChannel extends ClientChannelBase { : super(); @override - ClientConnection createConnection() { - return Http2ClientConnection(host, port, options); - } + ClientConnection createConnection() => + Http2ClientConnection(host, port, options); } class ClientTransportConnectorChannel extends ClientChannelBase { @@ -50,8 +49,7 @@ class ClientTransportConnectorChannel extends ClientChannelBase { {this.options = const ChannelOptions()}); @override - ClientConnection createConnection() { - return Http2ClientConnection.fromClientTransportConnector( - transportConnector, options); - } + ClientConnection createConnection() => + Http2ClientConnection.fromClientTransportConnector( + transportConnector, options); } diff --git a/lib/src/client/http2_connection.dart b/lib/src/client/http2_connection.dart index 4f7522f0..2bc70268 100644 --- a/lib/src/client/http2_connection.dart +++ b/lib/src/client/http2_connection.dart @@ -18,7 +18,6 @@ import 'dart:convert'; import 'dart:io'; import 'package:http2/transport.dart'; -import 'package:meta/meta.dart'; import '../shared/codec.dart'; import '../shared/timeout.dart'; @@ -43,8 +42,8 @@ class Http2ClientConnection implements connection.ClientConnection { connection.ConnectionState _state = ConnectionState.idle; - @visibleForTesting - void Function(Http2ClientConnection connection)? onStateChanged; + void Function(connection.ConnectionState)? onStateChanged; + final _pendingCalls = []; final ClientTransportConnector _transportConnector; @@ -214,7 +213,7 @@ class Http2ClientConnection implements connection.ClientConnection { void _setState(ConnectionState state) { _state = state; - onStateChanged?.call(this); + onStateChanged?.call(state); } void _handleIdleTimeout() { diff --git a/lib/src/client/transport/xhr_transport.dart b/lib/src/client/transport/xhr_transport.dart index 41a5845d..16b0dca5 100644 --- a/lib/src/client/transport/xhr_transport.dart +++ b/lib/src/client/transport/xhr_transport.dart @@ -144,7 +144,7 @@ class XhrTransportStream implements GrpcTransportStream { } } -class XhrClientConnection extends ClientConnection { +class XhrClientConnection implements ClientConnection { final Uri uri; final _requests = {}; @@ -217,6 +217,11 @@ class XhrClientConnection extends ClientConnection { @override Future shutdown() async {} + + @override + set onStateChanged(void Function(ConnectionState) cb) { + // Do nothing. + } } MapEntry? _getContentTypeHeader(Map metadata) { diff --git a/pubspec.yaml b/pubspec.yaml index b4b9a6c9..58b60178 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: grpc description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. -version: 3.0.2 +version: 3.1.0-dev repository: https://github.com/grpc/grpc-dart diff --git a/test/client_handles_bad_connections_test.dart b/test/client_handles_bad_connections_test.dart index 7aa08213..b576188b 100644 --- a/test/client_handles_bad_connections_test.dart +++ b/test/client_handles_bad_connections_test.dart @@ -43,7 +43,9 @@ class FixedConnectionClientChannel extends ClientChannelBase { final Http2ClientConnection clientConnection; List states = []; FixedConnectionClientChannel(this.clientConnection) { - clientConnection.onStateChanged = (c) => states.add(c.state); + onConnectionStateChanged.listen((state) { + states.add(state); + }); } @override ClientConnection createConnection() => clientConnection; @@ -89,8 +91,9 @@ Future main() async { server.port!, grpc.ChannelOptions(credentials: grpc.ChannelCredentials.insecure()))); final states = []; - channel.clientConnection.onStateChanged = - (Http2ClientConnection connection) => states.add(connection.state); + channel.onConnectionStateChanged.listen((state) { + states.add(state); + }); final testClient = TestClient(channel); await Future.wait([ diff --git a/test/client_tests/client_test.dart b/test/client_tests/client_test.dart index 7a12991d..4a434834 100644 --- a/test/client_tests/client_test.dart +++ b/test/client_tests/client_test.dart @@ -472,45 +472,59 @@ void main() { ); } + test('Connection states are reported', () async { + final connectionStates = []; + harness.channel.onConnectionStateChanged.listen((state) { + connectionStates.add(state); + }, onDone: () { + expect(connectionStates, [ + ConnectionState.connecting, + ConnectionState.ready, + ConnectionState.shutdown + ]); + }); + + await makeUnaryCall(); + }); + test('Connection errors are reported', () async { final connectionStates = []; harness.connection!.connectionError = 'Connection error'; - harness.connection!.onStateChanged = (connection) { - final state = connection.state; + harness.channel.onConnectionStateChanged.listen((state) { connectionStates.add(state); - }; + }, onDone: () { + expect( + connectionStates, [ConnectionState.connecting, ConnectionState.idle]); + }); final expectedException = GrpcError.unavailable('Error connecting: Connection error'); await harness.expectThrows( harness.client.unary(dummyValue), expectedException); - - expect( - connectionStates, [ConnectionState.connecting, ConnectionState.idle]); }); test('Connections time out if idle', () async { final done = Completer(); final connectionStates = []; - harness.connection!.onStateChanged = (connection) { - final state = connection.state; + harness.channel.onConnectionStateChanged.listen((state) { connectionStates.add(state); if (state == ConnectionState.idle) done.complete(); - }; + }, onDone: () async { + expect(connectionStates, + [ConnectionState.connecting, ConnectionState.ready]); + await done.future; + expect(connectionStates, [ + ConnectionState.connecting, + ConnectionState.ready, + ConnectionState.idle + ]); + }); harness.channelOptions.idleTimeout = const Duration(microseconds: 10); await makeUnaryCall(); harness.signalIdle(); - expect( - connectionStates, [ConnectionState.connecting, ConnectionState.ready]); - await done.future; - expect(connectionStates, [ - ConnectionState.connecting, - ConnectionState.ready, - ConnectionState.idle - ]); }); test('Default reconnect backoff backs off', () { diff --git a/test/client_tests/client_transport_connector_test.dart b/test/client_tests/client_transport_connector_test.dart index 9189f438..d2fa8872 100644 --- a/test/client_tests/client_transport_connector_test.dart +++ b/test/client_tests/client_transport_connector_test.dart @@ -351,43 +351,41 @@ void main() { test('Connection errors are reported', () async { final connectionStates = []; - harness.connection!.connectionError = 'Connection error'; - harness.connection!.onStateChanged = (connection) { - final state = connection.state; - connectionStates.add(state); - }; - final expectedException = GrpcError.unavailable('Error connecting: Connection error'); + harness.connection!.connectionError = 'Connection error'; + harness.channel.onConnectionStateChanged.listen((state) { + connectionStates.add(state); + }, onDone: () async { + await harness.expectThrows( + harness.client.unary(dummyValue), expectedException); - await harness.expectThrows( - harness.client.unary(dummyValue), expectedException); - - expect( - connectionStates, [ConnectionState.connecting, ConnectionState.idle]); + expect( + connectionStates, [ConnectionState.connecting, ConnectionState.idle]); + }); }); test('Connections time out if idle', () async { final done = Completer(); final connectionStates = []; - harness.connection!.onStateChanged = (connection) { - final state = connection.state; + harness.channel.onConnectionStateChanged.listen((state) { connectionStates.add(state); if (state == ConnectionState.idle) done.complete(); - }; + }, onDone: () async { + expect(connectionStates, + [ConnectionState.connecting, ConnectionState.ready]); + await done.future; + expect(connectionStates, [ + ConnectionState.connecting, + ConnectionState.ready, + ConnectionState.idle + ]); + }); harness.channelOptions.idleTimeout = const Duration(microseconds: 10); await makeUnaryCall(); harness.signalIdle(); - expect( - connectionStates, [ConnectionState.connecting, ConnectionState.ready]); - await done.future; - expect(connectionStates, [ - ConnectionState.connecting, - ConnectionState.ready, - ConnectionState.idle - ]); }); test('Default reconnect backoff backs off', () { diff --git a/test/round_trip_test.dart b/test/round_trip_test.dart index 798e67ae..9557d2dc 100644 --- a/test/round_trip_test.dart +++ b/test/round_trip_test.dart @@ -54,7 +54,7 @@ class FixedConnectionClientChannel extends ClientChannelBase { final Http2ClientConnection clientConnection; List states = []; FixedConnectionClientChannel(this.clientConnection) { - clientConnection.onStateChanged = (c) => states.add(c.state); + onConnectionStateChanged.listen((state) => states.add(state)); } @override ClientConnection createConnection() => clientConnection; diff --git a/test/timeline_test.dart b/test/timeline_test.dart index 33f91ed8..94172a8a 100644 --- a/test/timeline_test.dart +++ b/test/timeline_test.dart @@ -59,7 +59,7 @@ class FixedConnectionClientChannel extends ClientChannelBase { final Http2ClientConnection clientConnection; List states = []; FixedConnectionClientChannel(this.clientConnection) { - clientConnection.onStateChanged = (c) => states.add(c.state); + onConnectionStateChanged.listen((state) => states.add(state)); } @override ClientConnection createConnection() => clientConnection;