Skip to content

Commit

Permalink
fix #71
Browse files Browse the repository at this point in the history
  • Loading branch information
huan committed May 29, 2021
1 parent 79f8ebb commit 14bcaba
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 39 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ delay.next(3)
By using throttle, we don't allow to our queue to pass more than once every X milliseconds.

Practical examples of `ThrottleQueue`:

1. User is typing text in a textarea. We want to call auto-save function when user is typing, and want it only run at most once every five minutes.

**Example:**
Expand Down Expand Up @@ -104,6 +105,7 @@ The Debounce technique allow us to deal with multiple sequential items in a time
Debouncing enforces that no more items will be passed again until a certain amount of time has passed without any new items coming.

Practical examples of `DebounceQueue`:

1. User is typing text in a search box. We want to make an auto-complete function call only after the user stop typing for 500 milliseconds.

**Example:**
Expand Down Expand Up @@ -159,6 +161,12 @@ delay.execute(() => console.log(3))

## CHANGELOG

### v0.12 - May 2021

1. Upgrade RxJS to v7.1
1. Upgrade TypeScript to v4.3
1. Fix RxJS breaking changes [#71](https://github.com/huan/rx-queue/issues/71)

### v0.8 - Mar 2019

1. Fix typo: issue [#40](https://github.com/huan/rx-queue/issues/40) - rename `DelayQueueExector` to `DelayQueueExecutor`
Expand Down
28 changes: 14 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rx-queue",
"version": "0.11.8",
"version": "0.12.0",
"description": "Easy to Use ReactiveX Queue that Supports Delay/DelayExecutor/Throttle/Debounce Features Powered by RxJS.",
"main": "bundles/rx-queue.umd.js",
"typings": "dist/src/index.d.ts",
Expand Down Expand Up @@ -39,22 +39,23 @@
"homepage": "https://github.com/huan/rx-queue#readme",
"peerDependencies": {
"brolog": "^1.8.3",
"rxjs": "^7.0.0"
"rxjs": "^7.1.0"
},
"devDependencies": {
"@chatie/eslint-config": "^0.12.1",
"@chatie/git-scripts": "^0.6.1",
"@chatie/eslint-config": "^0.12.3",
"@chatie/git-scripts": "^0.6.2",
"@chatie/semver": "^0.4.7",
"@chatie/tsconfig": "^0.10.1",
"@types/sinon": "^9.0.0",
"brolog": "^1.2",
"pkg-jq": "^0.2.4",
"rollup": "^2.3.2",
"@chatie/tsconfig": "^0.16.1",
"@types/sinon": "^10.0.1",
"brolog": "^1.12",
"pkg-jq": "^0.2.11",
"rollup": "^2.50.4",
"rollup-plugin-json": "^4.0.0",
"rxjs": "^7.0.0",
"shx": "^0.3.2",
"source-map-support": "^0.5.16",
"tstest": "^0.4.10"
"rxjs": "^7.1.0",
"shx": "^0.3.3",
"source-map-support": "^0.5.19",
"tstest": "^0.4.10",
"typescript": "^4.3.2"
},
"files": [
"LICENSE",
Expand All @@ -68,7 +69,6 @@
"access": "public",
"tag": "next"
},
"dependencies": {},
"git": {
"scripts": {
"pre-push": "npx git-scripts-pre-push"
Expand Down
12 changes: 6 additions & 6 deletions src/debounce-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ test('DebounceQueue 2 item', async t => {
q.next(EXPECTED_ITEM2)

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledOnce, 'should be called only once after DELAY_PERIOD_TIME because its debounced')
t.deepEqual(spy.firstCall.args[0], EXPECTED_ITEM2, 'should get the EXPECTED_ITEM2')
t.equal(spy.callCount, 1, 'should be called only once after DELAY_PERIOD_TIME because its debounced')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM2, 'should get the EXPECTED_ITEM2')
})

test('DebounceQueue 3 items', async t => {
Expand All @@ -52,10 +52,10 @@ test('DebounceQueue 3 items', async t => {
await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))

q.next(EXPECTED_ITEM3)
t.ok(spy.calledOnce, 'should called once right after next(EXPECTED_ITEM3)')
t.deepEqual(spy.firstCall.args[0], EXPECTED_ITEM2, 'the first call should receive EXPECTED_ITEM2')
t.equal(spy.callCount, 1, 'should called once right after next(EXPECTED_ITEM3)')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM2, 'the first call should receive EXPECTED_ITEM2')

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledTwice, 'should be called twice after the DELAY_PERIOD_TIME')
t.deepEqual(spy.secondCall.args[0], EXPECTED_ITEM3, 'should get EXPECTED_ITEM3')
t.equal(spy.callCount, 2, 'should be called twice after the DELAY_PERIOD_TIME')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM3, 'should get EXPECTED_ITEM3')
})
14 changes: 7 additions & 7 deletions src/delay-queue-executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ test('DelayQueueExecutor execute thrice', async t => {
delay.execute(() => spy(EXPECTED_VAL2)).catch(() => { /* */ })
delay.execute(() => spy(EXPECTED_VAL3)).catch(() => { /* */ })

t.ok(spy.calledOnce, 'should call once immediately')
t.equal(spy.firstCall.args[0], EXPECTED_VAL1, 'should get EXPECTED_VAL1')
t.equal(spy.callCount, 1, 'should call once immediately')
t.equal(spy.lastCall.args[0], EXPECTED_VAL1, 'should get EXPECTED_VAL1')

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledTwice, 'should call twice after DELAY_PERIOD_TIME')
t.equal(spy.secondCall.args[0], EXPECTED_VAL2, 'should get EXPECTED_VAL2')
t.equal(spy.callCount, 2, 'should call twice after DELAY_PERIOD_TIME')
t.equal(spy.lastCall.args[0], EXPECTED_VAL2, 'should get EXPECTED_VAL2')

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledThrice, 'should call thrice after 2 x DELAY_PERIOD_TIME')
t.equal(spy.thirdCall.args[0], EXPECTED_VAL3, 'should get EXPECTED_VAL3')
t.equal(spy.callCount, 3, 'should call thrice after 2 x DELAY_PERIOD_TIME')
t.equal(spy.lastCall.args[0], EXPECTED_VAL3, 'should get EXPECTED_VAL3')

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledThrice, 'should keep third call...')
t.equal(spy.callCount, 3, 'should keep third call...')
})

test('DelayQueueExecutor return Promise', async t => {
Expand Down
1 change: 1 addition & 0 deletions src/delay-queue-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Subscription } from 'rxjs'

import DelayQueue from './delay-queue'

export interface ExecutionUnit<T = unknown> {
Expand Down
21 changes: 13 additions & 8 deletions src/delay-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ test('DelayQueue 1 item', async t => {

q.next(EXPECTED_ITEM1)

t.ok(spy.calledOnce, 'should called right after first item')
t.deepEqual(spy.firstCall.args[0], EXPECTED_ITEM1, 'should get the first item immediately')
t.equal(spy.callCount, 1, 'should called right after first item')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM1, 'should get the first item immediately')
})

test('DelayQueue 2 item', async t => {
Expand All @@ -33,11 +33,12 @@ test('DelayQueue 2 item', async t => {
q.next(EXPECTED_ITEM1)
q.next(EXPECTED_ITEM2)

t.ok(spy.calledOnce, 'should get one item after next two item')
t.deepEqual(spy.firstCall.args[0], EXPECTED_ITEM1, 'should get the first item only')
t.equal(spy.callCount, 1, 'should get one item after next two item')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM1, 'should get the first item only')

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledTwice, 'should get the second item after period delay')
t.equal(spy.callCount, 2, 'should get the second item after period delay')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM2, 'should get the second item for last call')
})

test('DelayQueue 3 items', async t => {
Expand All @@ -50,10 +51,14 @@ test('DelayQueue 3 items', async t => {
q.next(EXPECTED_ITEM2)
q.next(EXPECTED_ITEM3)

t.equal(spy.callCount, 1, 'get first item immediatelly')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM1, 'should received EXPECTED_ITEM1 immediatelly')

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledTwice, 'get second item after period')
t.equal(spy.callCount, 2, 'get second item after period')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM2, 'should received EXPECTED_ITEM2 after 1 x period')

await new Promise(resolve => setTimeout(resolve, DELAY_PERIOD_TIME + 3))
t.ok(spy.calledThrice, 'should get the third item after 2 x period')
t.deepEqual(spy.thirdCall.args[0], EXPECTED_ITEM3, 'should received EXPECTED_ITEM3 after 2 x period')
t.equal(spy.callCount, 3, 'should get the third item after 2 x period')
t.deepEqual(spy.lastCall.args[0], EXPECTED_ITEM3, 'should received EXPECTED_ITEM3 after 2 x period')
})
15 changes: 11 additions & 4 deletions src/delay-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import {
of,
Subject,
Subscription,
timer,
} from 'rxjs'
import {
concatMap,
delay,
skipUntil,
} from 'rxjs/operators'

import RxQueue from './rx-queue'
Expand All @@ -32,9 +33,15 @@ export class DelayQueue<T = unknown> extends RxQueue<T> {

this.subject = new Subject<T>()
this.subscription = this.subject.pipe(
concatMap(args => concat(
of(args), // emit first item right away
EMPTY.pipe(delay(this.period)), // delay next item
concatMap(x => concat(
of(x), // emit first item right away
/**
* Issue #71 - DelayQueue failed: behavior breaking change after RxJS from v6 to v7
* https://github.com/huan/rx-queue/issues/71
*/
timer(this.period).pipe(
skipUntil(EMPTY)
),
)),
).subscribe((item: T) => super.next(item))
}
Expand Down

0 comments on commit 14bcaba

Please sign in to comment.