Skip to content

Commit

Permalink
Migrate http2 to null safety (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
iinozemtsev authored Jan 20, 2021
1 parent 6084f9d commit 103e1f3
Show file tree
Hide file tree
Showing 36 changed files with 638 additions and 349 deletions.
28 changes: 16 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
language: dart

dart:
- 2.8.4
- dev

dart_task:
- test: --exclude-tags flaky
- test: --tags flaky
- dartanalyzer: --fatal-infos --fatal-warnings .

matrix:
jobs:
include:
# Only validate formatting using the dev release
- dart: dev
dart_task: dartfmt
- name: "Analyzer"
os: linux
script: dart analyze --fatal-warnings --fatal-infos .
- name: "Format"
os: linux
script: dartfmt -n --set-exit-if-changed .
- name: "Tests"
os: linux
script: dart test --exclude-tags flaky
- name: "Flaky Tests"
os: linux
script: dart test --tags flaky
allow_failures:
- dart_task:
test: --tags flaky
- name: "Flaky Tests"
os: linux
script: dart test --tags flaky

# Only building master means that we don't run two builds for each pull request.
branches:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.0.0-nullsafety.0

* Migrate to null safety.

## 1.0.2-dev

* Update minimum Dart SDK to `2.8.4`.
Expand Down
2 changes: 1 addition & 1 deletion example/display_headers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import 'dart:io';
import 'package:http2/transport.dart';

void main(List<String> args) async {
if (args == null || args.length != 1) {
if (args.length != 1) {
print('Usage: dart display_headers.dart <HTTPS_URI>');
exit(1);
}
Expand Down
15 changes: 7 additions & 8 deletions experimental/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,17 @@ void handleClient(SecureSocket socket) {
connection.incomingStreams.listen((ServerTransportStream stream) async {
dumpInfo('main', 'Got new HTTP/2 stream with id: ${stream.id}');

String path;
stream.incomingMessages.listen((StreamMessage msg) async {
var pathSeen = false;
unawaited(stream.incomingMessages.forEach((StreamMessage msg) async {
dumpInfo('${stream.id}', 'Got new incoming message');
if (msg is HeadersStreamMessage) {
dumpHeaders('${stream.id}', msg.headers);
if (path == null) {
path = pathFromHeaders(msg.headers);
if (path == null) throw Exception('no path given');

if (!pathSeen) {
var path = pathFromHeaders(msg.headers);
pathSeen = true;
if (path == '/') {
unawaited(sendHtml(stream));
} else if (['/iframe', '/iframe2'].contains(path)) {
} else if (path == '/iframe' || path == '/iframe2') {
unawaited(sendIFrameHtml(stream, path));
} else {
unawaited(send404(stream, path));
Expand All @@ -67,7 +66,7 @@ void handleClient(SecureSocket socket) {
} else if (msg is DataStreamMessage) {
dumpData('${stream.id}', msg.bytes);
}
});
}));
});
}

Expand Down
20 changes: 10 additions & 10 deletions lib/multiprotocol_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ import 'transport.dart' as http2;
/// * one handles HTTP/2 clients (called with a [http2.ServerTransportStream])
class MultiProtocolHttpServer {
final SecureServerSocket _serverSocket;
final http2.ServerSettings _settings;
final http2.ServerSettings? _settings;

_ServerSocketController _http11Controller;
HttpServer _http11Server;
late _ServerSocketController _http11Controller;
late HttpServer _http11Server;

final StreamController<http2.ServerTransportStream> _http2Controller =
StreamController();
Stream<http2.ServerTransportStream> get _http2Server =>
_http2Controller.stream;

StreamController<http2.ServerTransportStream> _http2Controller;
Stream<http2.ServerTransportStream> _http2Server;
final _http2Connections = <http2.ServerTransportConnection>{};

MultiProtocolHttpServer._(this._serverSocket, this._settings) {
_http11Controller =
_ServerSocketController(_serverSocket.address, _serverSocket.port);
_http11Server = HttpServer.listenOn(_http11Controller.stream);

_http2Controller = StreamController();
_http2Server = _http2Controller.stream;
}

/// Binds a new [SecureServerSocket] with a security [context] at [port] and
Expand All @@ -46,7 +46,7 @@ class MultiProtocolHttpServer {
/// See also [startServing].
static Future<MultiProtocolHttpServer> bind(
address, int port, SecurityContext context,
{http2.ServerSettings settings}) async {
{http2.ServerSettings? settings}) async {
context.setAlpnProtocols(['h2', 'h2-14', 'http/1.1'], true);
var secureServer = await SecureServerSocket.bind(address, port, context);
return MultiProtocolHttpServer._(secureServer, settings);
Expand All @@ -65,7 +65,7 @@ class MultiProtocolHttpServer {
/// an exception (i.e. these must take care of error handling themselves).
void startServing(void Function(HttpRequest) callbackHttp11,
void Function(http2.ServerTransportStream) callbackHttp2,
{void Function(dynamic error, StackTrace) onError}) {
{void Function(dynamic error, StackTrace)? onError}) {
// 1. Start listening on the real [SecureServerSocket].
_serverSocket.listen((SecureSocket socket) {
var protocol = socket.selectedProtocol;
Expand Down
37 changes: 18 additions & 19 deletions lib/src/async_utils/async_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,33 @@ class BufferIndicator {
/// whether the underlying stream cannot handle more data and would buffer.
class BufferedSink {
/// The indicator whether the underlying sink is buffering at the moment.
final BufferIndicator bufferIndicator = BufferIndicator();
final bufferIndicator = BufferIndicator();

/// A intermediate [StreamController] used to catch pause signals and to
/// propagate the change via [bufferIndicator].
StreamController<List<int>> _controller;
final _controller = StreamController<List<int>>(sync: true);

/// A future which completes once the sink has been closed.
Future _doneFuture;
late final Future _doneFuture;

BufferedSink(StreamSink<List<int>> dataSink) {
bufferIndicator.markBuffered();

_controller = StreamController<List<int>>(
onListen: () {
bufferIndicator.markUnBuffered();
},
onPause: () {
bufferIndicator.markBuffered();
},
onResume: () {
bufferIndicator.markUnBuffered();
},
onCancel: () {
// TODO: We may want to propagate cancel events as errors.
// Currently `_doneFuture` will just complete normally if the sink
// cancelled.
},
sync: true);
_controller
..onListen = () {
bufferIndicator.markUnBuffered();
}
..onPause = () {
bufferIndicator.markBuffered();
}
..onResume = () {
bufferIndicator.markUnBuffered();
}
..onCancel = () {
// TODO: We may want to propagate cancel events as errors.
// Currently `_doneFuture` will just complete normally if the sink
// cancelled.
};
_doneFuture =
Future.wait([_controller.stream.pipe(dataSink), dataSink.done]);
}
Expand Down
60 changes: 27 additions & 33 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class Connection {
final bool isClientConnection;

/// Active state handler for this connection.
ActiveStateHandler onActiveStateChanged;
ActiveStateHandler? onActiveStateChanged;

final Completer<void> _onInitialPeerSettingsReceived = Completer<void>();

Expand All @@ -117,32 +117,32 @@ abstract class Connection {
final FrameDefragmenter _defragmenter = FrameDefragmenter();

/// The outgoing frames of this connection;
FrameWriter _frameWriter;
late FrameWriter _frameWriter;

/// A subscription of incoming [Frame]s.
StreamSubscription<Frame> _frameReaderSubscription;
late StreamSubscription<Frame> _frameReaderSubscription;

/// The incoming connection-level message queue.
ConnectionMessageQueueIn _incomingQueue;
late ConnectionMessageQueueIn _incomingQueue;

/// The outgoing connection-level message queue.
ConnectionMessageQueueOut _outgoingQueue;
late ConnectionMessageQueueOut _outgoingQueue;

/// The ping handler used for making pings & handling remote pings.
PingHandler _pingHandler;
late PingHandler _pingHandler;

/// The settings handler used for changing settings & for handling remote
/// setting changes.
SettingsHandler _settingsHandler;
late SettingsHandler _settingsHandler;

/// The set of active streams this connection has.
StreamHandler _streams;
late StreamHandler _streams;

/// The connection-level flow control window handler for outgoing messages.
OutgoingConnectionWindowHandler _connectionWindowHandler;
late OutgoingConnectionWindowHandler _connectionWindowHandler;

/// The state of this connection.
ConnectionState _state;
late ConnectionState _state;

Connection(Stream<List<int>> incoming, StreamSink<List<int>> outgoing,
Settings settings,
Expand Down Expand Up @@ -238,22 +238,23 @@ abstract class Connection {
List<Setting> _decodeSettings(Settings settings) {
var settingsList = <Setting>[];

// By default a endpoitn can make an unlimited number of concurrent streams.
if (settings.concurrentStreamLimit != null) {
settingsList.add(Setting(Setting.SETTINGS_MAX_CONCURRENT_STREAMS,
settings.concurrentStreamLimit));
// By default a endpoint can make an unlimited number of concurrent streams.
var concurrentStreamLimit = settings.concurrentStreamLimit;
if (concurrentStreamLimit != null) {
settingsList.add(Setting(
Setting.SETTINGS_MAX_CONCURRENT_STREAMS, concurrentStreamLimit));
}

// By default the stream level flow control window is 64 KiB.
if (settings.streamWindowSize != null) {
settingsList.add(Setting(
Setting.SETTINGS_INITIAL_WINDOW_SIZE, settings.streamWindowSize));
var streamWindowSize = settings.streamWindowSize;
if (streamWindowSize != null) {
settingsList
.add(Setting(Setting.SETTINGS_INITIAL_WINDOW_SIZE, streamWindowSize));
}

if (settings is ClientSettings) {
// By default the server is allowed to do server pushes.
if (settings.allowServerPushes == null ||
settings.allowServerPushes == false) {
if (!settings.allowServerPushes) {
settingsList.add(Setting(Setting.SETTINGS_ENABLE_PUSH, 0));
}
} else if (settings is ServerSettings) {
Expand All @@ -278,24 +279,17 @@ abstract class Connection {
_finishing(active: true);

// TODO: There is probably more we need to wait for.
return _streams.done.whenComplete(() {
var futures = [_frameWriter.close()];
var f = _frameReaderSubscription.cancel();
if (f != null) futures.add(f);
return Future.wait(futures);
});
return _streams.done.whenComplete(() =>
Future.wait([_frameWriter.close(), _frameReaderSubscription.cancel()]));
}

/// Terminates this connection forcefully.
Future terminate() {
return _terminate(ErrorCode.NO_ERROR);
}

void _activeStateHandler(bool isActive) {
if (onActiveStateChanged != null) {
onActiveStateChanged(isActive);
}
}
void _activeStateHandler(bool isActive) =>
onActiveStateChanged?.call(isActive);

/// Invokes the passed in closure and catches any exceptions.
void _catchProtocolErrors(void Function() fn) {
Expand All @@ -319,7 +313,7 @@ abstract class Connection {
}
}

void _handleFrameImpl(Frame frame) {
void _handleFrameImpl(Frame? frame) {
// The first frame from the other side must be a [SettingsFrame], otherwise
// we terminate the connection.
if (_state.isInitialized) {
Expand Down Expand Up @@ -369,7 +363,7 @@ abstract class Connection {
}
}

void _finishing({bool active = true, String message}) {
void _finishing({bool active = true, String? message}) {
// If this connection is already dead, we return.
if (_state.isTerminated) return;

Expand Down Expand Up @@ -407,7 +401,7 @@ abstract class Connection {
///
/// The returned future will never complete with an error.
Future _terminate(int errorCode,
{bool causedByTransportError = false, String message}) {
{bool causedByTransportError = false, String? message}) {
// TODO: When do we complete here?
if (_state.state != ConnectionState.Terminated) {
_state.state = ConnectionState.Terminated;
Expand Down
37 changes: 18 additions & 19 deletions lib/src/connection_preface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ const List<int> CONNECTION_PREFACE = [
/// connection preface. If an error occurs while reading the connection
/// preface, the returned stream will have only an error.
Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
StreamController<List<int>> result;
StreamSubscription subscription;
final result = StreamController<List<int>>();
late StreamSubscription subscription;
var connectionPrefaceRead = false;
var prefaceBuffer = <int>[];
var terminated = false;

void terminate(error) {
void terminate(Object error) {
if (!terminated) {
result.addError(error);
result.close();
Expand All @@ -64,7 +64,6 @@ Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
return false;
}
}
prefaceBuffer = null;
connectionPrefaceRead = true;
return true;
}
Expand Down Expand Up @@ -96,21 +95,21 @@ Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
}
}

result = StreamController(
onListen: () {
subscription = incoming.listen(onData,
onError: (e, StackTrace s) => result.addError(e, s),
onDone: () {
if (prefaceBuffer != null) {
terminate('EOS before connection preface could be read.');
} else {
result.close();
}
});
},
onPause: () => subscription.pause(),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());
result.onListen = () {
subscription = incoming.listen(onData,
onError: (Object e, StackTrace s) => result.addError(e, s),
onDone: () {
if (!connectionPrefaceRead) {
terminate('EOS before connection preface could be read.');
} else {
result.close();
}
});
result
..onPause = subscription.pause
..onResume = subscription.resume
..onCancel = subscription.cancel;
};

return result.stream;
}
Loading

0 comments on commit 103e1f3

Please sign in to comment.