Skip to content

Commit

Permalink
Add an IO implementation of WebSocketChannel.
Browse files Browse the repository at this point in the history
R=kevmoo@google.com

Review URL: https://codereview.chromium.org//1756613002 .
  • Loading branch information
nex3 committed Mar 2, 2016
1 parent 45aef9e commit cda6f1e
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 3 deletions.
79 changes: 79 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
The `web_socket_channel` package provides [`StreamChannel`][stream_channel]
wrappers for WebSocket connections. It provides a cross-platform
[`WebSocketChannel`][WebSocketChannel] API, a cross-platform implementation of
that API that communicates over an underlying [`StreamChannel`][stream_channel],
and [an implementation][IOWebSocketChannel] that wraps `dart:io`'s `WebSocket`
class.

[stream_channel]: https://pub.dartlang.org/packages/stream_channel
[WebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel-class.html
[IOWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel-class.html

## `WebSocketChannel`

The [`WebSocketChannel`][WebSocketChannel] class's most important role is as the
interface for WebSocket stream channels across all implementations and all
platforms. In addition to the base `StreamChannel` interface, it adds a
[`protocol`][protocol] getter that returns the negotiated protocol for the
socket; a [`pingInterval`][pingInterval] property that allows you to control the
socket's keep-alive behavior; and [`closeCode`][closeCode] and
[`closeReason`][closeReason] getters that provide information about why the
socket closed.

[protocol]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/protocol.html
[pingInterval]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/pingInterval.html
[closeCode]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/closeCode.html
[closeReason]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/closeReason.html

The channel's [`sink` property][sink] is also special. It returns a
[`WebSocketSink`][WebSocketSink], which is just like a `StreamSink` except that
its [`close()`][sink.close] method supports optional `closeCode` and
`closeReason` parameters. These parameters allow the caller to signal to the
other socket exactly why they're closing the connection.

[sink]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/sink.html
[WebSocketSink]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketSink-class.html
[sink.close]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketSink/close.html

`WebSocketChannel` also works as a cross-platform implementation of the
WebSocket protocol. Because it can't initiate or handle HTTP requests in a
cross-platform way, the [`new WebSocketChannel()` constructor][new] takes an
underlying [`StreamChannel`][stream_channel] over which it communicates using
the WebSocket protocol. It also provides the static [`signKey()`][signKey]
method to make it easier to implement the [initial WebSocket handshake][]. These
are used in the [`shelf_web_socket`][shelf_web_socket] package to support
WebSockets in a cross-platform way.

[new]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/WebSocketChannel.html
[signKey]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/signKey.html
[initial WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
[shelf_web_socket]: https://pub.dartlang.org/packages/shelf_web_socket

## `IOWebSocketChannel`

The [`IOWebSocketChannel`][IOWebSocketChannel] class wraps
[`dart:io`'s `WebSocket` class][io.WebSocket]. Because it imports `dart:io`, it
has its own library, `package:web_socket_channel/io.dart`. This allows the main
`WebSocketChannel` class to be available on all platforms.

[io.WebSocket]: https://api.dartlang.org/latest/dart-io/WebSocket-class.html

An `IOWebSocketChannel` can be created by passing a `dart:io` WebSocket to
[its constructor][new IOWebSocketChannel]. It's more common to want to connect
directly to a `ws://` or `wss://` URL, in which case
[`new IOWebSocketChannel.connect()`][IOWebSocketChannel.connect] should be used.

[new IOWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel/IOWebSocketChannel.html
[IOWebSocketChannel.connect]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel/IOWebSocketChannel.connect.html

```dart
import 'package:web_socket_channel/io.dart';
main() async {
var channel = new IOWebSocketChannel.connect("ws://localhost:8181");
channel.sink.add("connected!");
channel.sink.listen((message) {
// ...
});
}
```
113 changes: 113 additions & 0 deletions lib/io.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';

import 'src/channel.dart';
import 'src/exception.dart';
import 'src/sink_completer.dart';

/// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket].
class IOWebSocketChannel extends StreamChannelMixin
implements WebSocketChannel {
/// The underlying `dart:io` [WebSocket].
///
/// If the channel was constructed with [IOWebSocketChannel.connect], this is
/// `null` until the [WebSocket.connect] future completes.
WebSocket _webSocket;

Duration get pingInterval =>
_webSocket == null ? _pingInterval : _webSocket.pingInterval;

set pingInterval(Duration value) {
if (_webSocket == null) {
_pingInterval = value;
} else {
_webSocket.pingInterval = value;
}
}

/// The ping interval set by the user.
///
/// This is stored independently of [_webSocket] so that the user can set it
/// prior to [_webSocket] getting a value.
Duration _pingInterval;

String get protocol => _webSocket?.protocol;
int get closeCode => _webSocket?.closeCode;
String get closeReason => _webSocket?.closeReason;

final Stream stream;
final WebSocketSink sink;

// TODO(nweiz): Add a compression parameter after the initial release.

/// Creates a new WebSocket connection.
///
/// Connects to [url] using [WebSocket.connect] and returns a channel that can
/// be used to communicate over the resulting socket. The [url] may be either
/// a [String] or a [Uri]; otherwise, the parameters are the same as
/// [WebSocket.connect].
///
/// If there's an error connecting, the channel's stream emits a
/// [WebSocketChannelException] wrapping that error and then closes.
factory IOWebSocketChannel.connect(url, {Iterable<String> protocols,
Map<String, dynamic> headers}) {
var channel;
var sinkCompleter = new WebSocketSinkCompleter();
var stream = StreamCompleter.fromFuture(
WebSocket.connect(url.toString(), headers: headers).then((webSocket) {
channel._setWebSocket(webSocket);
sinkCompleter.setDestinationSink(new _IOWebSocketSink(webSocket));
return webSocket;
}).catchError((error) => throw new WebSocketChannelException.from(error)));

channel = new IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
return channel;
}

/// Creates a channel wrapping [socket].
IOWebSocketChannel(WebSocket socket)
: _webSocket = socket,
stream = socket.handleError((error) =>
throw new WebSocketChannelException.from(error)),
sink = new _IOWebSocketSink(socket);

/// Creates a channel without a socket.
///
/// This is used with [connect] to synchronously provide a channel that later
/// has a socket added.
IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
: _webSocket = null,
stream = stream.handleError((error) =>
throw new WebSocketChannelException.from(error));

/// Sets the underlying web socket.
///
/// This is called by [connect] once the [WebSocket.connect] future has
/// completed.
void _setWebSocket(WebSocket webSocket) {
assert(_webSocket == null);

_webSocket = webSocket;
if (_pingInterval != null) _webSocket.pingInterval = pingInterval;
}
}

/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink {
/// The underlying socket.
final WebSocket _webSocket;

_IOWebSocketSink(WebSocket webSocket)
: super(webSocket),
_webSocket = webSocket;

Future close([int closeCode, String closeReason]) =>
_webSocket.close(closeCode, closeReason);
}
2 changes: 1 addition & 1 deletion lib/src/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class WebSocketChannel extends StreamChannelMixin {
WebSocketSink get sink => new WebSocketSink._(_webSocket);

/// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
/// the [initial handshake].
/// the [initial handshake][].
///
/// The return value should be sent back to the client in a
/// `Sec-WebSocket-Accept` header.
Expand Down
6 changes: 5 additions & 1 deletion lib/src/exception.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ class WebSocketChannelException implements Exception {
/// The exception that caused this one, if available.
final inner;

WebSocketChannelException([this.message, this.inner]);
WebSocketChannelException([this.message]) : inner = null;

WebSocketChannelException.from(inner)
: message = inner.toString(),
inner = inner;

String toString() => message == null
? "WebSocketChannelException" :
Expand Down
153 changes: 153 additions & 0 deletions lib/src/sink_completer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'channel.dart';

/// A [WebSocketSink] where the destination is provided later.
///
/// This is like a [StreamSinkCompleter], except that it properly forwards
/// paramters to [WebSocketSink.close].
class WebSocketSinkCompleter {
/// The sink for this completer.
///
/// When a destination sink is provided, events that have been passed to the
/// sink will be forwarded to the destination.
///
/// Events can be added to the sink either before or after a destination sink
/// is set.
final WebSocketSink sink = new _CompleterSink();

/// Returns [sink] typed as a [_CompleterSink].
_CompleterSink get _sink => sink;

/// Sets a sink as the destination for events from the
/// [WebSocketSinkCompleter]'s [sink].
///
/// The completer's [sink] will act exactly as [destinationSink].
///
/// If the destination sink is set before events are added to [sink], further
/// events are forwarded directly to [destinationSink].
///
/// If events are added to [sink] before setting the destination sink, they're
/// buffered until the destination is available.
///
/// A destination sink may be set at most once.
void setDestinationSink(WebSocketSink destinationSink) {
if (_sink._destinationSink != null) {
throw new StateError("Destination sink already set");
}
_sink._setDestinationSink(destinationSink);
}
}

/// [WebSocketSink] completed by [WebSocketSinkCompleter].
class _CompleterSink implements WebSocketSink {
/// Controller for an intermediate sink.
///
/// Created if the user adds events to this sink before the destination sink
/// is set.
StreamController _controller;

/// Completer for [done].
///
/// Created if the user requests the [done] future before the destination sink
/// is set.
Completer _doneCompleter;

/// Destination sink for the events added to this sink.
///
/// Set when [WebSocketSinkCompleter.setDestinationSink] is called.
WebSocketSink _destinationSink;

/// The close code passed to [close].
int _closeCode;

/// The close reason passed to [close].
String _closeReason;

/// Whether events should be sent directly to [_destinationSink], as opposed
/// to going through [_controller].
bool get _canSendDirectly => _controller == null && _destinationSink != null;

Future get done {
if (_doneCompleter != null) return _doneCompleter.future;
if (_destinationSink == null) {
_doneCompleter = new Completer.sync();
return _doneCompleter.future;
}
return _destinationSink.done;
}

void add(event) {
if (_canSendDirectly) {
_destinationSink.add(event);
} else {
_ensureController();
_controller.add(event);
}
}

void addError(error, [StackTrace stackTrace]) {
if (_canSendDirectly) {
_destinationSink.addError(error, stackTrace);
} else {
_ensureController();
_controller.addError(error, stackTrace);
}
}

Future addStream(Stream stream) {
if (_canSendDirectly) return _destinationSink.addStream(stream);

_ensureController();
return _controller.addStream(stream, cancelOnError: false);
}

Future close([int closeCode, String closeReason]) {
if (_canSendDirectly) {
_destinationSink.close(closeCode, closeReason);
} else {
_closeCode = closeCode;
_closeReason = closeReason;
_ensureController();
_controller.close();
}
return done;
}

/// Create [_controller] if it doesn't yet exist.
void _ensureController() {
if (_controller == null) _controller = new StreamController(sync: true);
}

/// Sets the destination sink to which events from this sink will be provided.
///
/// If set before the user adds events, events will be added directly to the
/// destination sink. If the user adds events earlier, an intermediate sink is
/// created using a stream controller, and the destination sink is linked to
/// it later.
void _setDestinationSink(WebSocketSink sink) {
assert(_destinationSink == null);
_destinationSink = sink;

// If the user has already added data, it's buffered in the controller, so
// we add it to the sink.
if (_controller != null) {
// Catch any error that may come from [addStream] or [sink.close]. They'll
// be reported through [done] anyway.
sink
.addStream(_controller.stream)
.whenComplete(() => sink.close(_closeCode, _closeReason))
.catchError((_) {});
}

// If the user has already asked when the sink is done, connect the sink's
// done callback to that completer.
if (_doneCompleter != null) {
_doneCompleter.complete(sink.done);
}
}
}
2 changes: 1 addition & 1 deletion lib/web_socket_channel.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

Expand Down

0 comments on commit cda6f1e

Please sign in to comment.