Skip to content

Commit

Permalink
Add typed wrapper functions to delegate classes.
Browse files Browse the repository at this point in the history
These mirror the methods in the the collection package, and serve a
similar purpose of safely casting generic objects when the user is
confident that the actual object's values are more specific than the
static type.

R=floitsch@google.com, lrn@google.com

Review URL: https://codereview.chromium.org//1870543004 .
  • Loading branch information
nex3 committed Apr 12, 2016
1 parent 783a5f6 commit 641e195
Show file tree
Hide file tree
Showing 20 changed files with 1,223 additions and 22 deletions.
25 changes: 17 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
## 1.9.1
## 1.10.0

* Fix all strong mode warnings and add generic method annotations.
* Add `DelegatingFuture.typed()`, `DelegatingStreamSubscription.typed()`,
`DelegatingStreamConsumer.typed()`, `DelegatingSink.typed()`,
`DelegatingEventSink.typed()`, and `DelegatingStreamSink.typed()` static
methods. These wrap untyped instances of these classes with the correct type
parameter, and assert the types of values as they're accessed.

* Add a `DelegatingStream` class. This is behaviorally identical to `StreamView`
from `dart:async`, but it follows this package's naming conventions and
provides a `DelegatingStream.typed()` static method.

* `new StreamQueue()` now takes a `Stream<T>` rather than a `Stream<dynamic>`.
Passing a type that wasn't `is`-compatible with `Stream<T>` would already
throw an error under some circumstances, so this is not considered a breaking
change.
* Fix all strong mode warnings and add generic method annotations.

* `new SubscriptionStream()` now takes a `Stream<T>` rather than a
`Stream<dynamic>`. Passing a type that wasn't `is`-compatible with `Stream<T>`
* `new StreamQueue()`, `new SubscriptionStream()`, `new
DelegatingStreamSubscription()`, `new DelegatingStreamConsumer()`, `new
DelegatingSink()`, `new DelegatingEventSink()`, and `new
DelegatingStreamSink()` now take arguments with generic type arguments (for
example `Stream<T>`) rather than without (for example `Stream<dynamic>`).
Passing a type that wasn't `is`-compatible with the fully-specified generic
would already throw an error under some circumstances, so this is not
considered a breaking change.

Expand Down
1 change: 1 addition & 0 deletions lib/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export "src/cancelable_operation.dart";
export "src/delegate/event_sink.dart";
export "src/delegate/future.dart";
export "src/delegate/sink.dart";
export "src/delegate/stream.dart";
export "src/delegate/stream_consumer.dart";
export "src/delegate/stream_sink.dart";
export "src/delegate/stream_subscription.dart";
Expand Down
13 changes: 12 additions & 1 deletion lib/src/delegate/event_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,18 @@ class DelegatingEventSink<T> implements EventSink<T> {
final EventSink _sink;

/// Create a delegating sink forwarding calls to [sink].
DelegatingEventSink(EventSink sink) : _sink = sink;
DelegatingEventSink(EventSink<T> sink) : _sink = sink;

DelegatingEventSink._(this._sink);

/// Creates a wrapper that coerces the type of [sink].
///
/// Unlike [new DelegatingEventSink], this only requires its argument to be an
/// instance of `EventSink`, not `EventSink<T>`. This means that calls to
/// [add] may throw a [CastError] if the argument type doesn't match the
/// reified type of [sink].
static EventSink/*<T>*/ typed/*<T>*/(EventSink sink) =>
sink is EventSink/*<T>*/ ? sink : new DelegatingEventSink._(sink);

void add(T data) {
_sink.add(data);
Expand Down
13 changes: 12 additions & 1 deletion lib/src/delegate/future.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@

import 'dart:async';

import '../typed/future.dart';

/// A wrapper that forwards calls to a [Future].
class DelegatingFuture<T> implements Future<T> {
/// The wrapped [Future].
final Future<T> _future;

DelegatingFuture(this._future);

/// Creates a wrapper which throws if [future]'s value isn't an instance of
/// `T`.
///
/// This soundly converts a [Future] to a `Future<T>`, regardless of its
/// original generic type, by asserting that its value is an instance of `T`
/// whenever it's provided. If it's not, the future throws a [CastError].
static Future/*<T>*/ typed/*<T>*/(Future future) =>
future is Future/*<T>*/ ? future : new TypeSafeFuture/*<T>*/(future);

Stream<T> asStream() => _future.asStream();

Future catchError(Function onError, {bool test(Object error)}) =>
Expand All @@ -21,6 +32,6 @@ class DelegatingFuture<T> implements Future<T> {

Future<T> whenComplete(action()) => _future.whenComplete(action);

Future<T> timeout(Duration timeLimit, {void onTimeout()}) =>
Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
_future.timeout(timeLimit, onTimeout: onTimeout);
}
14 changes: 12 additions & 2 deletions lib/src/delegate/sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,18 @@ class DelegatingSink<T> implements Sink<T> {
final Sink _sink;

/// Create a delegating sink forwarding calls to [sink].
DelegatingSink(Sink sink)
: _sink = sink;
DelegatingSink(Sink<T> sink) : _sink = sink;

DelegatingSink._(this._sink);

/// Creates a wrapper that coerces the type of [sink].
///
/// Unlike [new DelegatingSink], this only requires its argument to be an
/// instance of `Sink`, not `Sink<T>`. This means that calls to [add] may
/// throw a [CastError] if the argument type doesn't match the reified type of
/// [sink].
static Sink/*<T>*/ typed/*<T>*/(Sink sink) =>
sink is Sink/*<T>*/ ? sink : new DelegatingSink._(sink);

void add(T data) {
_sink.add(data);
Expand Down
28 changes: 28 additions & 0 deletions lib/src/delegate/stream.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import '../typed/stream.dart';

/// Simple delegating wrapper around a [Stream].
///
/// Subclasses can override individual methods, or use this to expose only the
/// [Stream] methods of a subclass.
///
/// Note that this is identical to [StreamView] in `dart:async`. It's provided
/// under this name for consistency with other `Delegating*` classes.
class DelegatingStream<T> extends StreamView<T> {
DelegatingStream(Stream<T> stream) : super(stream);

/// Creates a wrapper which throws if [stream]'s events aren't instances of
/// `T`.
///
/// This soundly converts a [Stream] to a `Stream<T>`, regardless of its
/// original generic type, by asserting that its events are instances of `T`
/// whenever they're provided. If they're not, the stream throws a
/// [CastError].
static Stream/*<T>*/ typed/*<T>*/(Stream stream) =>
stream is Stream/*<T>*/ ? stream : new TypeSafeStream/*<T>*/(stream);
}
16 changes: 14 additions & 2 deletions lib/src/delegate/stream_consumer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,20 @@ class DelegatingStreamConsumer<T> implements StreamConsumer<T> {
final StreamConsumer _consumer;

/// Create a delegating consumer forwarding calls to [consumer].
DelegatingStreamConsumer(StreamConsumer consumer)
: _consumer = consumer;
DelegatingStreamConsumer(StreamConsumer<T> consumer) : _consumer = consumer;

DelegatingStreamConsumer._(this._consumer);

/// Creates a wrapper that coerces the type of [consumer].
///
/// Unlike [new StreamConsumer], this only requires its argument to be an
/// instance of `StreamConsumer`, not `StreamConsumer<T>`. This means that
/// calls to [addStream] may throw a [CastError] if the argument type doesn't
/// match the reified type of [consumer].
static StreamConsumer/*<T>*/ typed/*<T>*/(StreamConsumer consumer) =>
consumer is StreamConsumer/*<T>*/
? consumer
: new DelegatingStreamConsumer._(consumer);

Future addStream(Stream<T> stream) => _consumer.addStream(stream);

Expand Down
14 changes: 12 additions & 2 deletions lib/src/delegate/stream_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ class DelegatingStreamSink<T> implements StreamSink<T> {
Future get done => _sink.done;

/// Create delegating sink forwarding calls to [sink].
DelegatingStreamSink(StreamSink sink)
: _sink = sink;
DelegatingStreamSink(StreamSink<T> sink) : _sink = sink;

DelegatingStreamSink._(this._sink);

/// Creates a wrapper that coerces the type of [sink].
///
/// Unlike [new StreamSink], this only requires its argument to be an instance
/// of `StreamSink`, not `StreamSink<T>`. This means that calls to [add] may
/// throw a [CastError] if the argument type doesn't match the reified type of
/// [sink].
static StreamSink/*<T>*/ typed/*<T>*/(StreamSink sink) =>
sink is StreamSink/*<T>*/ ? sink : new DelegatingStreamSink._(sink);

void add(T data) {
_sink.add(data);
Expand Down
17 changes: 16 additions & 1 deletion lib/src/delegate/stream_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,31 @@

import 'dart:async';

import '../typed/stream_subscription.dart';

/// Simple delegating wrapper around a [StreamSubscription].
///
/// Subclasses can override individual methods.
class DelegatingStreamSubscription<T> implements StreamSubscription<T> {
final StreamSubscription _source;

/// Create delegating subscription forwarding calls to [sourceSubscription].
DelegatingStreamSubscription(StreamSubscription sourceSubscription)
DelegatingStreamSubscription(StreamSubscription<T> sourceSubscription)
: _source = sourceSubscription;

/// Creates a wrapper which throws if [subscription]'s events aren't instances
/// of `T`.
///
/// This soundly converts a [StreamSubscription] to a `StreamSubscription<T>`,
/// regardless of its original generic type, by asserting that its events are
/// instances of `T` whenever they're provided. If they're not, the
/// subscription throws a [CastError].
static StreamSubscription/*<T>*/ typed/*<T>*/(
StreamSubscription subscription) =>
subscription is StreamSubscription/*<T>*/
? subscription
: new TypeSafeStreamSubscription/*<T>*/(subscription);

void onData(void handleData(T data)) {
_source.onData(handleData);
}
Expand Down
12 changes: 9 additions & 3 deletions lib/src/lazy_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import "dart:async";

import "stream_completer.dart";
import "delegate/stream.dart";

/// A [Stream] wrapper that forwards to another [Stream] that's initialized
/// lazily.
Expand Down Expand Up @@ -39,9 +40,14 @@ class LazyStream<T> extends Stream<T> {
_callback = null;
var result = callback();

Stream stream = result is Future
? StreamCompleter.fromFuture(result)
: result;
Stream<T> stream;
if (result is Future) {
stream = StreamCompleter.fromFuture(result.then((stream) {
return DelegatingStream.typed/*<T>*/(stream as Stream);
}));
} else {
stream = DelegatingStream.typed/*<T>*/(result as Stream);
}

return stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
Expand Down
2 changes: 1 addition & 1 deletion lib/src/subscription_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SubscriptionStream<T> extends Stream<T> {
/// source subscription on the first error.
class _CancelOnErrorSubscriptionWrapper<T>
extends DelegatingStreamSubscription<T> {
_CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
_CancelOnErrorSubscriptionWrapper(StreamSubscription<T> subscription)
: super(subscription);

void onError(Function handleError) {
Expand Down
25 changes: 25 additions & 0 deletions lib/src/typed/future.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

class TypeSafeFuture<T> implements Future<T> {
final Future _future;

TypeSafeFuture(this._future);

Stream<T> asStream() => _future.then((value) => value as T).asStream();

Future catchError(Function onError, {bool test(Object error)}) =>
_future.catchError(onError, test: test);

Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(T value), {Function onError}) =>
_future.then((value) => onValue(value as T), onError: onError);

Future<T> whenComplete(action()) =>
new TypeSafeFuture<T>(_future.whenComplete(action));

Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
new TypeSafeFuture<T>(_future.timeout(timeLimit, onTimeout: onTimeout));
}
Loading

0 comments on commit 641e195

Please sign in to comment.