Skip to content

Commit

Permalink
Bring in the latest version of the SDK's WebSocket impl.
Browse files Browse the repository at this point in the history
R=alanknight@google.com

Review URL: https://codereview.chromium.org//1947683006 .
  • Loading branch information
nex3 committed May 5, 2016
1 parent c42271a commit 213b7e7
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 209 deletions.
9 changes: 5 additions & 4 deletions lib/src/copy/bytes_builder.dart
Expand Up @@ -9,7 +9,8 @@
// Because it's copied directly, there are no modifications from the original.
//
// This is up-to-date as of sdk revision
// 86227840d75d974feb238f8b3c59c038b99c05cf.
// e41fb4cafd6052157dbc1490d437045240f4773f.

import 'dart:math';
import 'dart:typed_data';

Expand Down Expand Up @@ -122,7 +123,7 @@ class _CopyingBytesBuilder implements BytesBuilder {
_length = required;
}

void addByte(int byte) => add([byte]);
void addByte(int byte) { add([byte]); }

List<int> takeBytes() {
if (_buffer == null) return new Uint8List(0);
Expand Down Expand Up @@ -162,7 +163,7 @@ class _CopyingBytesBuilder implements BytesBuilder {

class _BytesBuilder implements BytesBuilder {
int _length = 0;
final List _chunks = [];
final _chunks = <List<int>>[];

void add(List<int> bytes) {
if (bytes is! Uint8List) {
Expand All @@ -172,7 +173,7 @@ class _BytesBuilder implements BytesBuilder {
_length += bytes.length;
}

void addByte(int byte) => add([byte]);
void addByte(int byte) { add([byte]); }

List<int> takeBytes() {
if (_chunks.length == 0) return new Uint8List(0);
Expand Down
80 changes: 37 additions & 43 deletions lib/src/copy/io_sink.dart
Expand Up @@ -9,22 +9,20 @@
// desired public API and to remove "dart:io" dependencies have been made.
//
// This is up-to-date as of sdk revision
// 86227840d75d974feb238f8b3c59c038b99c05cf.
// e41fb4cafd6052157dbc1490d437045240f4773f.

import 'dart:async';

class StreamSinkImpl<T> implements StreamSink<T> {
final StreamConsumer<T> _target;
Completer _doneCompleter = new Completer();
Future _doneFuture;
final Completer _doneCompleter = new Completer();
StreamController<T> _controllerInstance;
Completer _controllerCompleter;
bool _isClosed = false;
bool _isBound = false;
bool _hasError = false;

StreamSinkImpl(this._target) {
_doneFuture = _doneCompleter.future;
}
StreamSinkImpl(this._target);

void add(T data) {
if (_isClosed) return;
Expand Down Expand Up @@ -65,8 +63,8 @@ class StreamSinkImpl<T> implements StreamSink<T> {
var future = _controllerCompleter.future;
_controllerInstance.close();
return future.whenComplete(() {
_isBound = false;
});
_isBound = false;
});
}

Future close() {
Expand All @@ -88,19 +86,19 @@ class StreamSinkImpl<T> implements StreamSink<T> {
_target.close().then(_completeDoneValue, onError: _completeDoneError);
}

Future get done => _doneFuture;
Future get done => _doneCompleter.future;

void _completeDoneValue(value) {
if (_doneCompleter == null) return;
_doneCompleter.complete(value);
_doneCompleter = null;
if (!_doneCompleter.isCompleted) {
_doneCompleter.complete(value);
}
}

void _completeDoneError(error, StackTrace stackTrace) {
if (_doneCompleter == null) return;
_hasError = true;
_doneCompleter.completeError(error, stackTrace);
_doneCompleter = null;
if (!_doneCompleter.isCompleted) {
_hasError = true;
_doneCompleter.completeError(error, stackTrace);
}
}

StreamController<T> get _controller {
Expand All @@ -113,33 +111,29 @@ class StreamSinkImpl<T> implements StreamSink<T> {
if (_controllerInstance == null) {
_controllerInstance = new StreamController<T>(sync: true);
_controllerCompleter = new Completer();
_target.addStream(_controller.stream)
.then(
(_) {
if (_isBound) {
// A new stream takes over - forward values to that stream.
_controllerCompleter.complete(this);
_controllerCompleter = null;
_controllerInstance = null;
} else {
// No new stream, .close was called. Close _target.
_closeTarget();
}
},
onError: (error, stackTrace) {
if (_isBound) {
// A new stream takes over - forward errors to that stream.
_controllerCompleter.completeError(error, stackTrace);
_controllerCompleter = null;
_controllerInstance = null;
} else {
// No new stream. No need to close target, as it have already
// failed.
_completeDoneError(error, stackTrace);
}
});
}
_target.addStream(_controller.stream).then((_) {
if (_isBound) {
// A new stream takes over - forward values to that stream.
_controllerCompleter.complete(this);
_controllerCompleter = null;
_controllerInstance = null;
} else {
// No new stream, .close was called. Close _target.
_closeTarget();
}
}, onError: (error, stackTrace) {
if (_isBound) {
// A new stream takes over - forward errors to that stream.
_controllerCompleter.completeError(error, stackTrace);
_controllerCompleter = null;
_controllerInstance = null;
} else {
// No new stream. No need to close target, as it has already
// failed.
_completeDoneError(error, stackTrace);
}
});
}
return _controllerInstance;
}
}

5 changes: 3 additions & 2 deletions lib/src/copy/web_socket.dart
Expand Up @@ -9,7 +9,8 @@
// desired public API and to remove "dart:io" dependencies have been made.
//
// This is up-to-date as of sdk revision
// 86227840d75d974feb238f8b3c59c038b99c05cf.
// e41fb4cafd6052157dbc1490d437045240f4773f.

/**
* Web socket status codes used when closing a web socket connection.
*/
Expand All @@ -18,7 +19,7 @@ abstract class WebSocketStatus {
static const int GOING_AWAY = 1001;
static const int PROTOCOL_ERROR = 1002;
static const int UNSUPPORTED_DATA = 1003;
static const int RESERVED_1004 = 1004;
static const int RESERVED_1004 = 1004;
static const int NO_STATUS_RECEIVED = 1005;
static const int ABNORMAL_CLOSURE = 1006;
static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
Expand Down

0 comments on commit 213b7e7

Please sign in to comment.