Skip to content

Commit

Permalink
fix: cleanup signal listener
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfbecker committed Aug 31, 2018
1 parent be96dac commit e94dfdc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 19 deletions.
11 changes: 11 additions & 0 deletions src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ describe('Observable consumers', () => {
const value = await toPromise(obs, abortController.signal)
assert.strictEqual(value, 3)
})
it('should reject if the Observable errors', async () => {
const obs = throwError(123)
const abortController = new AbortController()
const promise = toPromise(obs, abortController.signal)
try {
await promise
throw new AssertionError({ message: 'Expected Promise to be rejected' })
} catch (err) {
assert.strictEqual(err, 123)
}
})
})
describe('forEach()', () => {
it('should unsubscribe from the given Observable when the AbortSignal is aborted', async () => {
Expand Down
52 changes: 33 additions & 19 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,31 @@ export const toPromise = <T>(observable: Observable<T>, signal?: AbortSignal): P
reject(createAbortError())
return
}
const listener = () => {
subscription.unsubscribe()
reject(createAbortError())
}
const cleanup = () => {
if (signal) {
signal.removeEventListener('abort', listener)
}
}
let value: T
const subscription = observable.subscribe(
val => {
value = val
},
reject,
err => {
cleanup()
reject(err)
},
() => {
cleanup()
resolve(value)
}
)
if (signal) {
signal.addEventListener(
'abort',
() => {
subscription.unsubscribe()
reject(createAbortError())
},
{ once: true }
)
signal.addEventListener('abort', listener, { once: true })
}
})

Expand All @@ -71,6 +77,15 @@ export const forEach = <T>(source: Observable<T>, next: (value: T) => void, sign
reject(createAbortError())
return
}
const listener = () => {
subscription.unsubscribe()
reject(createAbortError())
}
const cleanup = () => {
if (signal) {
signal.removeEventListener('abort', listener)
}
}
// Must be declared in a separate statement to avoid a RefernceError when
// accessing subscription below in the closure due to Temporal Dead Zone.
let subscription: Subscription
Expand All @@ -85,18 +100,17 @@ export const forEach = <T>(source: Observable<T>, next: (value: T) => void, sign
}
}
},
reject,
resolve
err => {
cleanup()
reject(err)
},
() => {
cleanup()
resolve()
}
)
if (signal) {
signal.addEventListener(
'abort',
() => {
subscription.unsubscribe()
reject(createAbortError())
},
{ once: true }
)
signal.addEventListener('abort', listener, { once: true })
}
})

Expand Down

0 comments on commit e94dfdc

Please sign in to comment.