Skip to content

Commit

Permalink
🎉 no memory leaks on cancel reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
richytong committed Jun 13, 2020
1 parent 52f35d5 commit 068a1c1
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ typings/
# package-lock
package-lock.json

benchmarks/data
data
42 changes: 42 additions & 0 deletions benchmarks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
const assert = require('assert')
const { reduce } = require('.')

describe('benchmarks', () => {
describe('reduce(...)(AsyncIterable)', () => {
it('no memory leaks on cancel', async () => {
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))

const add = (a, b) => a + b

const timeoutIDs = new Map()

const asyncGenerator = async function*(i) {
yield 1; yield 2
await new Promise(resolve => {
const id = setTimeout(resolve, 1e9)
timeoutIDs.set(i, id)
})
yield 3
}

let i = 0, maxHeapUsed = 0
while (i < 1e5) {
const p = reduce(add, 0)(asyncGenerator(i))
await sleep(1)
p.cancel()
try {
await p
} catch (err) {
if (err.message !== 'cancelled') throw err
clearTimeout(timeoutIDs.get(i))
timeoutIDs.delete(i)
}
const { heapUsed } = process.memoryUsage()
console.log(`${i},${(heapUsed / 1024 / 1024).toFixed(2)}`)
i += 1
maxHeapUsed = Math.max(maxHeapUsed, heapUsed)
assert.ok((maxHeapUsed / 1024 / 1024) < 30)
}
}).timeout(5 * 60 * 1000)
})
})
65 changes: 0 additions & 65 deletions benchmarks/reduceAsyncIterable.js

This file was deleted.

12 changes: 7 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -677,10 +677,9 @@ const reduceIterable = (fn, possiblyX0, x) => {
return y
}

// https://stackoverflow.com/questions/30233302/promise-is-it-possible-to-force-cancel-a-promise/30235261#30235261
const reduceAsyncIterable = async (fn, possiblyX0, x) => {
const reduceAsyncIterable = async (fn, possiblyY0, x) => {
const iter = x[Symbol.asyncIterator]()
const y0 = isUndefined(possiblyX0) ? (await iter.next()).value : possiblyX0
const y0 = isUndefined(possiblyY0) ? (await iter.next()).value : possiblyY0
if (isUndefined(y0)) {
throw new TypeError('reduce(...)(x); x cannot be empty')
}
Expand All @@ -697,6 +696,8 @@ const reduceObject = (fn, x0, x) => reduceIterable(
(function* () { for (const k in x) yield x[k] })(),
)

// https://stackoverflow.com/questions/30233302/promise-is-it-possible-to-force-cancel-a-promise/30235261#30235261
// https://stackoverflow.com/questions/62336381/is-this-promise-cancellation-implementation-for-reducing-an-async-iterable-on-th
const reduce = (fn, x0) => {
if (!isFunction(fn)) {
throw new TypeError('reduce(x, y); x is not a function')
Expand All @@ -705,11 +706,12 @@ const reduce = (fn, x0) => {
if (isIterable(x)) return reduceIterable(fn, x0, x)
if (isAsyncIterable(x)) {
const state = { cancel: () => {} }
const cancelToken = new Promise((_, reject) => { state.cancel = reject })
const p = Promise.race([
reduceAsyncIterable(fn, x0, x),
new Promise(resolve => { state.cancel = resolve }),
cancelToken,
])
p.cancel = () => { state.cancel() }
p.cancel = () => { state.cancel(new Error('cancelled')) }
return p
}
if (is(Object)(x)) return reduceObject(fn, x0, x)
Expand Down
2 changes: 1 addition & 1 deletion test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ describe('rubico', () => {
const p = r.reduce(add, 0)(infiniteAsyncIterable())
aok(p instanceof Promise)
p.cancel()
ase(undefined, await p)
assert.rejects(p, new Error('cancelled'))
})
it('throws a TypeError on reduce(nonFunction)', async () => {
assert.throws(
Expand Down

0 comments on commit 068a1c1

Please sign in to comment.