diff --git a/packages/core-rxjs/index.ts b/packages/core-rxjs/index.ts index aa2f7427b..80aad59b5 100644 --- a/packages/core-rxjs/index.ts +++ b/packages/core-rxjs/index.ts @@ -10,3 +10,4 @@ export * from './src/utils.js'; export * from './src/timer.js'; +export * from './src/progress.js'; diff --git a/packages/core-rxjs/src/progress.ts b/packages/core-rxjs/src/progress.ts new file mode 100644 index 000000000..3ea3695af --- /dev/null +++ b/packages/core-rxjs/src/progress.ts @@ -0,0 +1,235 @@ +import { BehaviorSubject } from 'rxjs'; +import { throttleTime } from '@deepkit/core'; + +export interface ProgressTrackerState { + total: number; + done: number; + message: string; + speed: number; + stopped: boolean; +} + +export class ProgressTrackerGroup { + protected lastUpdate = Date.now(); + + stopCallbacks: (() => void)[] = []; + + constructor(public state: ProgressTrackerState) { + } + + changed() { + } + + /** + * Registers a callback that is called when the progress is stopped. + */ + onStop(callback: () => void) { + this.stopCallbacks.push(callback); + } + + stop() { + this.state.stopped = true; + this.stopCallbacks.forEach(v => v()); + this.changed(); + } + + set total(total: number) { + this.state.total = total; + this.changed(); + } + + set message(message: string) { + this.state.message = message; + this.changed(); + } + + /** + * Sets the number of items that are done. + * This triggers a change event. + */ + set done(done: number) { + done = Math.min(done, this.state.total); + //calculate speed + const now = Date.now(); + const timeDiff = now - this.lastUpdate; + this.lastUpdate = now; + const diff = done - this.state.done; + this.state.speed = diff / timeDiff * 1000; + + this.state.done = done; + this.changed(); + } + + /** + * Number between 0 and 1. + */ + get progress(): number { + return this.state.done / this.state.total; + } + + /** + * Number between 0 and 1. + */ + set progress(progress: number) { + this.done = Math.round(this.state.total * progress); + this.changed(); + } + + /** + * Total number of items to process. + */ + get total(): number { + return this.state.total; + } + + /** + * True if the progress is finished (done === total). + * Same as progress === 1. + */ + get finished() { + return this.state.total === this.state.done; + } + + /** + * True if the progress is running (finished === false && stopped === false). + */ + get running() { + return !this.finished && !this.stopped; + } + + /** + * True if the progress is ended (finished === true || stopped === true). + */ + get ended() { + return !this.running; + } + + /** + * True if the progress is stopped (stopped === true). + */ + get stopped() { + return this.state.stopped; + } + + get message(): string { + return this.state.message; + } + + get done(): number { + return this.state.done; + } +} + +/** + * This class allows to track multiple progress states. + * + * Natively supported as return type in @deepkit/rpc methods. + * The client can stop either a single progress state or all of them, to which the server + * can react by stopping the current operation. + * + * @deepkit/desktop-ui has a component to display the progress. + */ +export class ProgressTracker extends BehaviorSubject { + groups: ProgressTrackerGroup[] = []; + + changed = throttleTime(() => { + this.next(this.value); + + //check if all groups are done + if (this.groups.every(v => v.done === v.total)) { + this.complete(); + } + }, 100); + + constructor(states: ProgressTrackerState[] = []) { + super(states); + this.groups = states.map(v => new ProgressTrackerGroup(v)); + } + + next(states: ProgressTrackerState[]) { + //don't create new groups, but update existing ones, and remove old ones, and add new ones if needed + for (let i = 0; i < states.length; i++) { + if (i < this.groups.length) { + const old = this.groups[i].state; + const next = states[i]; + this.groups[i].state = next; + if (old.stopped !== next.stopped) { + if (next.stopped) { + this.groups[i].stopCallbacks.forEach(v => v()); + } + } + } else { + this.groups.push(new ProgressTrackerGroup(states[i])); + } + } + + //remove old groups + this.groups.splice(states.length, this.groups.length - states.length); + + super.next(states); + } + + get running(): boolean { + return this.groups.some(v => v.running); + } + + get ended(): boolean { + return !this.running; + } + + stop() { + this.groups.forEach(v => v.stop()); + this.changed(); + } + + get stopped() { + return this.groups.every(v => v.stopped); + } + + track(message: string = '', total: number, current: number = 0): ProgressTrackerGroup { + const group = new ProgressTrackerGroup({ total, done: current, message, speed: 0, stopped: false }); + group.changed = () => { + this.changed(); + }; + this.groups.push(group); + this.value.push(group.state); + this.changed(); + return group; + } + + get progress(): number { + if (this.groups.length === 0) return 0; + return this.groups.reduce((v, group) => v + group.progress, 0) / this.groups.length; + } + + get finished(): boolean { + return this.groups.every(v => v.done === v.total); + } + + get done(): number { + return this.groups.reduce((v, group) => v + group.done, 0); + } + + get total(): number { + return this.groups.reduce((v, group) => v + group.total, 0); + } + + get current(): ProgressTrackerGroup { + return this.groups[0]; + } +} + +export class ProgressTrackerWatcher extends BehaviorSubject { +} + +/** + * Turns a ProgressTracker into a BehaviorSubject aka ProgressTrackerWatcher. + */ +export function watchProgressTracker(tracker: T): ProgressTrackerWatcher { + const subject = new BehaviorSubject(tracker); + const sub = tracker.subscribe(() => { + subject.next(tracker); + }, (error) => subject.error(error), () => subject.complete()); + subject.subscribe().add(() => sub.unsubscribe()); + return subject; +} diff --git a/packages/core-rxjs/src/utils.ts b/packages/core-rxjs/src/utils.ts index 00b272b5c..89da05291 100644 --- a/packages/core-rxjs/src/utils.ts +++ b/packages/core-rxjs/src/utils.ts @@ -9,7 +9,7 @@ */ import { BehaviorSubject, isObservable, Observable, Observer, Subject, Subscriber, Subscription, TeardownLogic } from 'rxjs'; -import { arrayRemoveItem, createStack, isFunction, mergePromiseStack, mergeStack } from '@deepkit/core'; +import { arrayRemoveItem, asyncOperation, createStack, isFunction, mergePromiseStack, mergeStack } from '@deepkit/core'; import { first, skip } from 'rxjs/operators'; export class AsyncSubscription { @@ -151,3 +151,65 @@ export async function tearDown(teardown: TeardownLogic) { await teardown.unsubscribe(); } } + +/** + * Handles incoming messages in batches. The handler is called when the observable completes or when a certain time passed since the last message. + * + * This makes sure the handler is awaited before the next batch is processed. + * + * `maxWait` in milliseconds, this makes sure every `maxWait` ms the handler is called with the current messages if there are any. + * `batchSize` this is the maximum amount of messages that are passed to the handler. + */ +export async function throttleMessages(observable: Observable, handler: (messages: T[]) => Promise, options: Partial<{ + maxWait: number, + batchSize: number +}> = {}): Promise { + return asyncOperation(async (resolve, reject) => { + const maxWait = options.maxWait || 100; + const batchSize = options.batchSize || 100; + + const results: R[] = []; + let messages: T[] = []; + let lastFlush = Date.now(); + let handlerDone = true; + let finished = false; + + function flush(andFinish = false) { + finished = andFinish; + if (!handlerDone) return; + if (!messages.length) { + if (andFinish) resolve(results); + return; + } + lastFlush = Date.now(); + + handlerDone = false; + const messagesToSend = messages.slice(0); + messages = []; + handler(messagesToSend).then((result) => { + results.push(result); + handlerDone = true; + if (andFinish) { + resolve(results); + } else if (finished) { + flush(true); + } + }, (error) => { + sub.unsubscribe(); + reject(error); + }); + } + + const sub = observable.subscribe((message) => { + messages.push(message); + const diffTime = Date.now() - lastFlush; + if (diffTime > maxWait || messages.length >= batchSize) { + flush(); + } + }, (error) => { + reject(error); + }, () => { + flush(true); + }); + }); +} diff --git a/packages/core-rxjs/tests/index.spec.ts b/packages/core-rxjs/tests/index.spec.ts index 961b5cfe3..6c8ac184f 100644 --- a/packages/core-rxjs/tests/index.spec.ts +++ b/packages/core-rxjs/tests/index.spec.ts @@ -1,6 +1,7 @@ import { expect, test } from '@jest/globals'; -import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs'; -import { isBehaviorSubject, isSubject, nextValue, Subscriptions } from '../src/utils.js'; +import { BehaviorSubject, Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; +import { isBehaviorSubject, isSubject, nextValue, Subscriptions, throttleMessages } from '../src/utils.js'; +import { ProgressTracker } from '../src/progress.js'; test('nextValue subject', async () => { const subject = new Subject(); @@ -95,3 +96,52 @@ test('Subscriptions auto remove', async () => { expect(sub1.closed).toBe(true); expect(sub2.closed).toBe(true); }); + +test('throttleMessages', async () => { + const behaviorSubject = new ReplaySubject(); + for (let i = 0; i < 100; i++) { + behaviorSubject.next(i); + } + + setTimeout(() => { + behaviorSubject.complete(); + }, 100); + + let i = 0; + await throttleMessages(behaviorSubject, async (numbers) => { + i++; + if (i === 1) { + expect(numbers.length).toBe(10); + } else { + //once the first batch is handled, the observable is filled completely up + expect(numbers.length).toBe(90); + } + }, { batchSize: 10 }); +}); + +test('progress', async () => { + const progressTracker = new ProgressTracker(); + const test1 = progressTracker.track('test1', 5); + test1.done++; + test1.done++; + + expect(progressTracker.progress).toBe(2 / 5); + expect(progressTracker.done).toBe(2); + expect(progressTracker.total).toBe(5); + expect(progressTracker.stopped).toBe(false); + expect(progressTracker.finished).toBe(false); + expect(progressTracker.ended).toBe(false); + + test1.done++; + test1.done++; + test1.done++; + expect(progressTracker.progress).toBe(1); + expect(progressTracker.done).toBe(5); + expect(progressTracker.stopped).toBe(false); + expect(progressTracker.finished).toBe(true); + expect(progressTracker.ended).toBe(true); + + test1.done++; + expect(progressTracker.progress).toBe(1); + expect(progressTracker.done).toBe(5); +});