Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate http2 to null safety #78

Merged
merged 3 commits into from
Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
language: dart

dart:
- 2.8.4
- dev

dart_task:
- test: --exclude-tags flaky
- test: --tags flaky
- dartanalyzer: --fatal-infos --fatal-warnings .

matrix:
jobs:
include:
# Only validate formatting using the dev release
- dart: dev
dart_task: dartfmt
- name: "Analyzer"
os: linux
script: dart analyze --fatal-warnings --fatal-infos .
- name: "Format"
os: linux
script: dartfmt -n --set-exit-if-changed .
- name: "Tests"
os: linux
script: dart test --exclude-tags flaky
- name: "Flaky Tests"
os: linux
script: dart test --tags flaky
allow_failures:
- dart_task:
test: --tags flaky
- name: "Flaky Tests"
os: linux
script: dart test --tags flaky

# Only building master means that we don't run two builds for each pull request.
branches:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.0.0-nullsafety.0

* Migrate to null safety.

## 1.0.2-dev

* Update minimum Dart SDK to `2.8.4`.
Expand Down
2 changes: 1 addition & 1 deletion example/display_headers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import 'dart:io';
import 'package:http2/transport.dart';

void main(List<String> args) async {
if (args == null || args.length != 1) {
if (args.length != 1) {
print('Usage: dart display_headers.dart <HTTPS_URI>');
exit(1);
}
Expand Down
15 changes: 7 additions & 8 deletions experimental/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,17 @@ void handleClient(SecureSocket socket) {
connection.incomingStreams.listen((ServerTransportStream stream) async {
dumpInfo('main', 'Got new HTTP/2 stream with id: ${stream.id}');

String path;
stream.incomingMessages.listen((StreamMessage msg) async {
var pathSeen = false;
unawaited(stream.incomingMessages.forEach((StreamMessage msg) async {
dumpInfo('${stream.id}', 'Got new incoming message');
if (msg is HeadersStreamMessage) {
iinozemtsev marked this conversation as resolved.
Show resolved Hide resolved
dumpHeaders('${stream.id}', msg.headers);
if (path == null) {
path = pathFromHeaders(msg.headers);
if (path == null) throw Exception('no path given');

if (!pathSeen) {
var path = pathFromHeaders(msg.headers);
pathSeen = true;
if (path == '/') {
unawaited(sendHtml(stream));
} else if (['/iframe', '/iframe2'].contains(path)) {
} else if (path == '/iframe' || path == '/iframe2') {
unawaited(sendIFrameHtml(stream, path));
} else {
unawaited(send404(stream, path));
Expand All @@ -67,7 +66,7 @@ void handleClient(SecureSocket socket) {
} else if (msg is DataStreamMessage) {
dumpData('${stream.id}', msg.bytes);
}
});
}));
});
}

Expand Down
20 changes: 10 additions & 10 deletions lib/multiprotocol_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ import 'transport.dart' as http2;
/// * one handles HTTP/2 clients (called with a [http2.ServerTransportStream])
class MultiProtocolHttpServer {
final SecureServerSocket _serverSocket;
final http2.ServerSettings _settings;
final http2.ServerSettings? _settings;

_ServerSocketController _http11Controller;
HttpServer _http11Server;
late _ServerSocketController _http11Controller;
late HttpServer _http11Server;

final StreamController<http2.ServerTransportStream> _http2Controller =
StreamController();
Stream<http2.ServerTransportStream> get _http2Server =>
_http2Controller.stream;

StreamController<http2.ServerTransportStream> _http2Controller;
Stream<http2.ServerTransportStream> _http2Server;
final _http2Connections = <http2.ServerTransportConnection>{};

MultiProtocolHttpServer._(this._serverSocket, this._settings) {
_http11Controller =
_ServerSocketController(_serverSocket.address, _serverSocket.port);
_http11Server = HttpServer.listenOn(_http11Controller.stream);

_http2Controller = StreamController();
_http2Server = _http2Controller.stream;
}

/// Binds a new [SecureServerSocket] with a security [context] at [port] and
Expand All @@ -46,7 +46,7 @@ class MultiProtocolHttpServer {
/// See also [startServing].
static Future<MultiProtocolHttpServer> bind(
address, int port, SecurityContext context,
{http2.ServerSettings settings}) async {
{http2.ServerSettings? settings}) async {
context.setAlpnProtocols(['h2', 'h2-14', 'http/1.1'], true);
var secureServer = await SecureServerSocket.bind(address, port, context);
return MultiProtocolHttpServer._(secureServer, settings);
Expand All @@ -65,7 +65,7 @@ class MultiProtocolHttpServer {
/// an exception (i.e. these must take care of error handling themselves).
void startServing(void Function(HttpRequest) callbackHttp11,
void Function(http2.ServerTransportStream) callbackHttp2,
{void Function(dynamic error, StackTrace) onError}) {
{void Function(dynamic error, StackTrace)? onError}) {
// 1. Start listening on the real [SecureServerSocket].
_serverSocket.listen((SecureSocket socket) {
var protocol = socket.selectedProtocol;
Expand Down
37 changes: 18 additions & 19 deletions lib/src/async_utils/async_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,33 @@ class BufferIndicator {
/// whether the underlying stream cannot handle more data and would buffer.
class BufferedSink {
/// The indicator whether the underlying sink is buffering at the moment.
final BufferIndicator bufferIndicator = BufferIndicator();
final bufferIndicator = BufferIndicator();

/// A intermediate [StreamController] used to catch pause signals and to
/// propagate the change via [bufferIndicator].
StreamController<List<int>> _controller;
iinozemtsev marked this conversation as resolved.
Show resolved Hide resolved
final _controller = StreamController<List<int>>(sync: true);

/// A future which completes once the sink has been closed.
Future _doneFuture;
late final Future _doneFuture;

BufferedSink(StreamSink<List<int>> dataSink) {
bufferIndicator.markBuffered();

_controller = StreamController<List<int>>(
onListen: () {
bufferIndicator.markUnBuffered();
},
onPause: () {
bufferIndicator.markBuffered();
},
onResume: () {
bufferIndicator.markUnBuffered();
},
onCancel: () {
// TODO: We may want to propagate cancel events as errors.
// Currently `_doneFuture` will just complete normally if the sink
// cancelled.
},
sync: true);
_controller
..onListen = () {
bufferIndicator.markUnBuffered();
}
..onPause = () {
bufferIndicator.markBuffered();
}
..onResume = () {
bufferIndicator.markUnBuffered();
}
..onCancel = () {
// TODO: We may want to propagate cancel events as errors.
// Currently `_doneFuture` will just complete normally if the sink
// cancelled.
};
_doneFuture =
Future.wait([_controller.stream.pipe(dataSink), dataSink.done]);
}
Expand Down
60 changes: 27 additions & 33 deletions lib/src/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class Connection {
final bool isClientConnection;

/// Active state handler for this connection.
ActiveStateHandler onActiveStateChanged;
ActiveStateHandler? onActiveStateChanged;

final Completer<void> _onInitialPeerSettingsReceived = Completer<void>();

Expand All @@ -117,32 +117,32 @@ abstract class Connection {
final FrameDefragmenter _defragmenter = FrameDefragmenter();

/// The outgoing frames of this connection;
FrameWriter _frameWriter;
late FrameWriter _frameWriter;

/// A subscription of incoming [Frame]s.
StreamSubscription<Frame> _frameReaderSubscription;
late StreamSubscription<Frame> _frameReaderSubscription;

/// The incoming connection-level message queue.
ConnectionMessageQueueIn _incomingQueue;
late ConnectionMessageQueueIn _incomingQueue;

/// The outgoing connection-level message queue.
ConnectionMessageQueueOut _outgoingQueue;
late ConnectionMessageQueueOut _outgoingQueue;

/// The ping handler used for making pings & handling remote pings.
PingHandler _pingHandler;
late PingHandler _pingHandler;

/// The settings handler used for changing settings & for handling remote
/// setting changes.
SettingsHandler _settingsHandler;
late SettingsHandler _settingsHandler;

/// The set of active streams this connection has.
StreamHandler _streams;
late StreamHandler _streams;

/// The connection-level flow control window handler for outgoing messages.
OutgoingConnectionWindowHandler _connectionWindowHandler;
late OutgoingConnectionWindowHandler _connectionWindowHandler;

/// The state of this connection.
ConnectionState _state;
late ConnectionState _state;

Connection(Stream<List<int>> incoming, StreamSink<List<int>> outgoing,
Settings settings,
Expand Down Expand Up @@ -238,22 +238,23 @@ abstract class Connection {
List<Setting> _decodeSettings(Settings settings) {
var settingsList = <Setting>[];

// By default a endpoitn can make an unlimited number of concurrent streams.
if (settings.concurrentStreamLimit != null) {
settingsList.add(Setting(Setting.SETTINGS_MAX_CONCURRENT_STREAMS,
settings.concurrentStreamLimit));
// By default a endpoint can make an unlimited number of concurrent streams.
var concurrentStreamLimit = settings.concurrentStreamLimit;
if (concurrentStreamLimit != null) {
settingsList.add(Setting(
Setting.SETTINGS_MAX_CONCURRENT_STREAMS, concurrentStreamLimit));
}

// By default the stream level flow control window is 64 KiB.
if (settings.streamWindowSize != null) {
settingsList.add(Setting(
Setting.SETTINGS_INITIAL_WINDOW_SIZE, settings.streamWindowSize));
var streamWindowSize = settings.streamWindowSize;
if (streamWindowSize != null) {
settingsList
.add(Setting(Setting.SETTINGS_INITIAL_WINDOW_SIZE, streamWindowSize));
}

if (settings is ClientSettings) {
// By default the server is allowed to do server pushes.
if (settings.allowServerPushes == null ||
settings.allowServerPushes == false) {
if (!settings.allowServerPushes) {
settingsList.add(Setting(Setting.SETTINGS_ENABLE_PUSH, 0));
}
} else if (settings is ServerSettings) {
Expand All @@ -278,24 +279,17 @@ abstract class Connection {
_finishing(active: true);

// TODO: There is probably more we need to wait for.
return _streams.done.whenComplete(() {
var futures = [_frameWriter.close()];
var f = _frameReaderSubscription.cancel();
if (f != null) futures.add(f);
return Future.wait(futures);
});
return _streams.done.whenComplete(() =>
Future.wait([_frameWriter.close(), _frameReaderSubscription.cancel()]));
}

/// Terminates this connection forcefully.
Future terminate() {
return _terminate(ErrorCode.NO_ERROR);
}

void _activeStateHandler(bool isActive) {
if (onActiveStateChanged != null) {
onActiveStateChanged(isActive);
}
}
void _activeStateHandler(bool isActive) =>
onActiveStateChanged?.call(isActive);

/// Invokes the passed in closure and catches any exceptions.
void _catchProtocolErrors(void Function() fn) {
Expand All @@ -319,7 +313,7 @@ abstract class Connection {
}
}

void _handleFrameImpl(Frame frame) {
void _handleFrameImpl(Frame? frame) {
// The first frame from the other side must be a [SettingsFrame], otherwise
// we terminate the connection.
if (_state.isInitialized) {
Expand Down Expand Up @@ -369,7 +363,7 @@ abstract class Connection {
}
}

void _finishing({bool active = true, String message}) {
void _finishing({bool active = true, String? message}) {
// If this connection is already dead, we return.
if (_state.isTerminated) return;

Expand Down Expand Up @@ -407,7 +401,7 @@ abstract class Connection {
///
/// The returned future will never complete with an error.
Future _terminate(int errorCode,
{bool causedByTransportError = false, String message}) {
{bool causedByTransportError = false, String? message}) {
// TODO: When do we complete here?
if (_state.state != ConnectionState.Terminated) {
_state.state = ConnectionState.Terminated;
Expand Down
37 changes: 18 additions & 19 deletions lib/src/connection_preface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ const List<int> CONNECTION_PREFACE = [
/// connection preface. If an error occurs while reading the connection
/// preface, the returned stream will have only an error.
Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
StreamController<List<int>> result;
StreamSubscription subscription;
final result = StreamController<List<int>>();
late StreamSubscription subscription;
var connectionPrefaceRead = false;
var prefaceBuffer = <int>[];
var terminated = false;

void terminate(error) {
void terminate(Object error) {
if (!terminated) {
result.addError(error);
result.close();
Expand All @@ -64,7 +64,6 @@ Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
return false;
}
}
prefaceBuffer = null;
connectionPrefaceRead = true;
return true;
}
Expand Down Expand Up @@ -96,21 +95,21 @@ Stream<List<int>> readConnectionPreface(Stream<List<int>> incoming) {
}
}

result = StreamController(
onListen: () {
subscription = incoming.listen(onData,
onError: (e, StackTrace s) => result.addError(e, s),
onDone: () {
if (prefaceBuffer != null) {
terminate('EOS before connection preface could be read.');
} else {
result.close();
}
});
},
onPause: () => subscription.pause(),
onResume: () => subscription.resume(),
onCancel: () => subscription.cancel());
result.onListen = () {
subscription = incoming.listen(onData,
onError: (Object e, StackTrace s) => result.addError(e, s),
onDone: () {
if (!connectionPrefaceRead) {
terminate('EOS before connection preface could be read.');
} else {
result.close();
}
});
result
..onPause = subscription.pause
..onResume = subscription.resume
..onCancel = subscription.cancel;
};

return result.stream;
}
Loading