Skip to content

Commit

Permalink
Merge pull request #1 from RobinMalfait/feature/async
Browse files Browse the repository at this point in the history
feature/async
  • Loading branch information
RobinMalfait committed Feb 23, 2020
2 parents 7b613a9 + e2d4acf commit 057b2bd
Show file tree
Hide file tree
Showing 59 changed files with 1,450 additions and 152 deletions.
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

---

Working with methods like `.map()`, `.filter()` and `.reduce()` are nice,
Working with methods like `.map()`, `.filter()` and `.reduce()` is nice,
however they create new arrays and everything is eagerly done before going to
the next step.

This is where lazy collections come in, under the hood we use generators so that
your data flows like a stream to have the optimal speed.
This is where lazy collections come in, under the hood we use [iterators][1] and
async iterators so that your data flows like a stream to have the optimal speed.

All functions should work with both `iterator` and `asyncIterator`, if one of
the functions uses an `asyncIterator` (for example when you introduce
`delay(100)`), don't forget to `await` the result!

[1]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#The_iterator_protocol

```js
const program = pipe(
Expand Down Expand Up @@ -58,6 +64,7 @@ program(range(0, 1000000));
- [Utilities](#utilities)
- [`chunk`](#chunk)
- [`compact`](#compact)
- [`delay`](#delay)
- [`flatten`](#flatten)
- [`generate`](#generate)
- [`groupBy`](#groupby)
Expand Down Expand Up @@ -426,6 +433,27 @@ program([0, 1, true, false, null, undefined, '', 'test', NaN]);
// [ 1, true, 'test' ];
```

#### `delay`

[Table of contents](#table-of-contents)

Will make he whole program async. It will add a delay of x milliseconds when an
item goes through the stream.

```js
import { pipe, range, delay, map, toArray } from 'lazy-collections';

const program = pipe(
range(0, 4),
delay(5000), // 5 seconds
map(() => new Date().toLocaleTimeString()),
toArray()
);

await program();
// [ '10:00:00', '10:00:05', '10:00:10', '10:00:15', '10:00:20' ];
```

#### `flatten`

[Table of contents](#table-of-contents)
Expand Down
1 change: 1 addition & 0 deletions src/__snapshots__/index.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Object {
"compact": [Function],
"compose": [Function],
"concat": [Function],
"delay": [Function],
"every": [Function],
"filter": [Function],
"find": [Function],
Expand Down
17 changes: 17 additions & 0 deletions src/average.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
import { pipe } from './pipe';
import { average } from './average';
import { delay } from './delay';

it('should be possible to get an average of all the values', () => {
const program = pipe(average());

expect(program([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])).toEqual(5);
expect(program([10, 10, 10])).toEqual(10);
});

it('should be possible to get an average of all the values (async)', async () => {
const program = pipe(delay(0), average());

expect(await program([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])).toEqual(5);
expect(await program([10, 10, 10])).toEqual(10);
});

it('should be possible to get an average of all the values (Promise async)', async () => {
const program = pipe(average());

expect(
await program(Promise.resolve([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
).toEqual(5);
expect(await program(Promise.resolve([10, 10, 10]))).toEqual(10);
});
31 changes: 22 additions & 9 deletions src/average.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
export function average() {
return function averageFn(data: Iterable<number>) {
let sum = 0;
let count = 0;
import { reduce } from './reduce';
import { chunk } from './chunk';
import { map } from './map';
import { head } from './head';
import { pipe } from './pipe';
import { LazyIterable } from './shared-types';

for (let datum of data) {
sum += datum;
count++;
}
export function average() {
return function averageFn(data: LazyIterable<number>) {
const program = pipe(
reduce<[number, number], number>(
(acc, current) => {
acc[0] += current;
acc[1] += 1;
return acc;
},
[0, 0]
),
chunk(2),
map(([sum, count]: [number, number]) => sum / count),
head()
);

return sum / count;
return program(data);
};
}

Expand Down
45 changes: 43 additions & 2 deletions src/chunk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,52 @@ import { pipe } from './pipe';
import { range } from './range';
import { chunk } from './chunk';
import { toArray } from './toArray';
import { delay } from './delay';

it('should create chunked items', () => {
const program = pipe(range(0, 10), chunk(3), toArray());
const program = pipe(chunk(3), toArray());

expect(program()).toEqual([
expect(program(range(0, 10))).toEqual([
[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9, 10],
]);
expect(program(range(0, 10))).toEqual([
[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9, 10],
]);
});

it('should create chunked items (async)', async () => {
const program = pipe(delay(0), chunk(3), toArray());

expect(await program(range(0, 10))).toEqual([
[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9, 10],
]);
expect(await program(range(0, 10))).toEqual([
[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9, 10],
]);
});

it('should create chunked items (Promise async)', async () => {
const program = pipe(chunk(3), toArray());

expect(await program(Promise.resolve(range(0, 10)))).toEqual([
[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9, 10],
]);
expect(await program(Promise.resolve(range(0, 10)))).toEqual([
[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
Expand Down
80 changes: 59 additions & 21 deletions src/chunk.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,64 @@
import { isAsyncIterable } from './utils/iterator';
import { LazyIterable } from './shared-types';

export function chunk<T>(size: number) {
return function* chunkFn(data: Iterable<T>) {
// Let's have a placeholder for our current chunk
let chunk = [];

// Loop over our data
for (let datum of data) {
// Add item to our current chunk
chunk.push(datum);

if (chunk.length === size) {
// Our current chunk is full, let's yield it
yield chunk;

// Let's also clear our chunk for the next chunk
chunk = [];
}
}
return function chunkFn(data: LazyIterable<T>) {
if (isAsyncIterable(data) || data instanceof Promise) {
return {
async *[Symbol.asyncIterator]() {
const stream = data instanceof Promise ? await data : data;

// Let's have a placeholder for our current chunk
let chunk = [];

// Loop over our data
for await (let datum of stream) {
// Add item to our current chunk
chunk.push(datum);

if (chunk.length === size) {
// Our current chunk is full, let's yield it
yield chunk;

// Let's also clear our chunk for the next chunk
chunk = [];
}
}

// When the chunk is not full yet, but when we are at the end of the data we
// have to ensure that this one is also yielded
if (chunk.length > 0) {
yield chunk;
// When the chunk is not full yet, but when we are at the end of the data we
// have to ensure that this one is also yielded
if (chunk.length > 0) {
yield chunk;
}
},
};
}

return {
*[Symbol.iterator]() {
// Let's have a placeholder for our current chunk
let chunk = [];

// Loop over our data
for (let datum of data) {
// Add item to our current chunk
chunk.push(datum);

if (chunk.length === size) {
// Our current chunk is full, let's yield it
yield chunk;

// Let's also clear our chunk for the next chunk
chunk = [];
}
}

// When the chunk is not full yet, but when we are at the end of the data we
// have to ensure that this one is also yielded
if (chunk.length > 0) {
yield chunk;
}
},
};
};
}
30 changes: 30 additions & 0 deletions src/compact.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,41 @@
import { pipe } from './pipe';
import { compact } from './compact';
import { toArray } from './toArray';
import { delay } from './delay';

it('should remove all falsey values', () => {
const program = pipe(compact(), toArray());

expect(
program([0, 1, true, false, null, undefined, '', 'test', NaN])
).toEqual([1, true, 'test']);
expect(
program([0, 1, true, false, null, undefined, '', 'test', NaN])
).toEqual([1, true, 'test']);
});

it('should remove all falsey values (async)', async () => {
const program = pipe(delay(0), compact(), toArray());

expect(
await program([0, 1, true, false, null, undefined, '', 'test', NaN])
).toEqual([1, true, 'test']);
expect(
await program([0, 1, true, false, null, undefined, '', 'test', NaN])
).toEqual([1, true, 'test']);
});

it('should remove all falsey values (Promise async)', async () => {
const program = pipe(compact(), toArray());

expect(
await program(
Promise.resolve([0, 1, true, false, null, undefined, '', 'test', NaN])
)
).toEqual([1, true, 'test']);
expect(
await program(
Promise.resolve([0, 1, true, false, null, undefined, '', 'test', NaN])
)
).toEqual([1, true, 'test']);
});
5 changes: 3 additions & 2 deletions src/compose.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { ensureFunction } from './utils/ensureFunction';
import { LazyIterable } from './shared-types';

type Fn = (...args: any) => any;

export function compose<T>(
fn: Fn | Iterable<T>,
...fns: (Fn | Iterable<T>)[]
fn: Fn | LazyIterable<T>,
...fns: (Fn | LazyIterable<T>)[]
): Fn {
return fns.reduce((f: Fn, g) => {
const g_ = ensureFunction(g);
Expand Down
19 changes: 19 additions & 0 deletions src/concat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { pipe } from './pipe';
import { concat } from './concat';
import { range } from './range';
import { toArray } from './toArray';
import { delay } from './delay';

it('should concat arrays', () => {
const program = pipe(
Expand All @@ -20,3 +21,21 @@ it('should concat iterators', () => {

expect(program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('should concat iterators (async)', async () => {
const program = pipe(
concat(range(0, 3), delay(0)(range(4, 7)), range(8, 10)),
toArray()
);

expect(await program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('should concat iterators (Promise async)', async () => {
const program = pipe(
concat(range(0, 3), range(4, 7), Promise.resolve(range(8, 10))),
toArray()
);

expect(await program()).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
26 changes: 23 additions & 3 deletions src/concat.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
export function* concat<T>(...data: Iterable<T>[]) {
for (let datum of data) {
yield* datum;
import { isAsyncIterable } from './utils/iterator';
import { LazyIterable } from './shared-types';

export function concat<T>(...data: LazyIterable<T>[]) {
if (
data.some(isAsyncIterable) ||
data.some(datum => datum instanceof Promise)
) {
return {
async *[Symbol.asyncIterator]() {
for await (let datum of await Promise.all(data)) {
yield* datum;
}
},
};
}

return {
*[Symbol.iterator]() {
for (let datum of data as Iterable<T>[]) {
yield* datum;
}
},
};
}
Loading

0 comments on commit 057b2bd

Please sign in to comment.