Skip to content

Commit

Permalink
Add byteCollector stream transformer and collectBytes function.
Browse files Browse the repository at this point in the history
  • Loading branch information
lrhn committed Jan 27, 2017
1 parent 0712b61 commit 4a55a88
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 1 deletion.
5 changes: 5 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,8 @@
## 1.13.0

* Add a `collectBytes` function which collects list-of-byte events into
a single byte list.

## 1.12.0

* Add an `AsyncCache` class that caches asynchronous operations for a period of
Expand Down
1 change: 1 addition & 0 deletions lib/async.dart
Expand Up @@ -4,6 +4,7 @@

export "src/async_cache.dart";
export "src/async_memoizer.dart";
export "src/byte_collector.dart";
export "src/cancelable_operation.dart";
export "src/delegate/event_sink.dart";
export "src/delegate/future.dart";
Expand Down
42 changes: 42 additions & 0 deletions lib/src/byte_collector.dart
@@ -0,0 +1,42 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import "dart:async";
import "dart:typed_data";

/// Collects an asynchronous sequence of byte lists into a single list of bytes.
///
/// If the [source] stream emits an error event,
/// the collection fails and the returned future completes with the same error.
///
/// If any of the input data are not valid bytes, they will be truncated to
/// an eight-bit unsigned value in the resulting list.
Future<Uint8List> collectBytes(Stream<List<int>> source) {
var byteLists = List<List<int>>[];
var length = 0;
var completer = new Completer<Uint8List>.sync();
source.listen(
(bytes) {
byteLists.add(bytes);
length += bytes.length;
},
onError: completer.completeError,
onDone: () {
completer.complete(_collect(length, byteLists));
},
cancelOnError: true);
return completer.future;
}

// Join a lists of bytes with a known total length into a single [Uint8List].
Uint8List _collect(int length, List<List<int>> byteLists) {
var result = new Uint8List(length);
int i = 0;
for (var byteList in byteLists) {
var end = i + byteList.length;
result.setRange(i, end, byteList);
i = end;
}
return result;
}
2 changes: 1 addition & 1 deletion pubspec.yaml
@@ -1,5 +1,5 @@
name: async
version: 1.12.0
version: 1.13.0
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
Expand Down
39 changes: 39 additions & 0 deletions test/byte_collection_test.dart
@@ -0,0 +1,39 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import "dart:async";
import "dart:typed_data";

import "package:test/test.dart";
import "package:async/async.dart" show byteCollector, collectBytes, Result;

void main() {
group("collectBytes", () {
test("simple list and overflow", () {
var result = collectBytes(new Stream.fromIterable([
[0],
[1],
[2],
[256]
]));
expect(result, completion([0, 1, 2, 0]));
});

test("no events", () {
var result = collectBytes(new Stream.fromIterable([]));
expect(result, completion([]));
});

test("empty events", () {
var result = collectBytes(new Stream.fromIterable([[], []]));
expect(result, completion([]));
});

test("error event", () {
var result = collectBytes(new Stream.fromIterable(
new Iterable.generate(3, (n) => n == 2 ? throw "badness" : [n])));
expect(result, throwsA("badness"));
});
});
}

0 comments on commit 4a55a88

Please sign in to comment.