Skip to content

Commit

Permalink
feature(core-rxjs): add ProgressTracker
Browse files Browse the repository at this point in the history
ProgressTracker allows to track arbitrary async operations (multiple groups), display their state and handles a stop signal. @deepkit/rpc supports it natively as return type and @deepkit/desktop-ui has a component to display it nicely.
  • Loading branch information
marcj committed May 1, 2023
1 parent 3dcb39c commit c2e6be8
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 3 deletions.
1 change: 1 addition & 0 deletions packages/core-rxjs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@

export * from './src/utils.js';
export * from './src/timer.js';
export * from './src/progress.js';
235 changes: 235 additions & 0 deletions packages/core-rxjs/src/progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
import { BehaviorSubject } from 'rxjs';
import { throttleTime } from '@deepkit/core';

export interface ProgressTrackerState {
total: number;
done: number;
message: string;
speed: number;
stopped: boolean;
}

export class ProgressTrackerGroup {
protected lastUpdate = Date.now();

stopCallbacks: (() => void)[] = [];

constructor(public state: ProgressTrackerState) {
}

changed() {
}

/**
* Registers a callback that is called when the progress is stopped.
*/
onStop(callback: () => void) {
this.stopCallbacks.push(callback);
}

stop() {
this.state.stopped = true;
this.stopCallbacks.forEach(v => v());
this.changed();
}

set total(total: number) {
this.state.total = total;
this.changed();
}

set message(message: string) {
this.state.message = message;
this.changed();
}

/**
* Sets the number of items that are done.
* This triggers a change event.
*/
set done(done: number) {
done = Math.min(done, this.state.total);
//calculate speed
const now = Date.now();
const timeDiff = now - this.lastUpdate;
this.lastUpdate = now;
const diff = done - this.state.done;
this.state.speed = diff / timeDiff * 1000;

this.state.done = done;
this.changed();
}

/**
* Number between 0 and 1.
*/
get progress(): number {
return this.state.done / this.state.total;
}

/**
* Number between 0 and 1.
*/
set progress(progress: number) {
this.done = Math.round(this.state.total * progress);
this.changed();
}

/**
* Total number of items to process.
*/
get total(): number {
return this.state.total;
}

/**
* True if the progress is finished (done === total).
* Same as progress === 1.
*/
get finished() {
return this.state.total === this.state.done;
}

/**
* True if the progress is running (finished === false && stopped === false).
*/
get running() {
return !this.finished && !this.stopped;
}

/**
* True if the progress is ended (finished === true || stopped === true).
*/
get ended() {
return !this.running;
}

/**
* True if the progress is stopped (stopped === true).
*/
get stopped() {
return this.state.stopped;
}

get message(): string {
return this.state.message;
}

get done(): number {
return this.state.done;
}
}

/**
* This class allows to track multiple progress states.
*
* Natively supported as return type in @deepkit/rpc methods.
* The client can stop either a single progress state or all of them, to which the server
* can react by stopping the current operation.
*
* @deepkit/desktop-ui has a component to display the progress.
*/
export class ProgressTracker extends BehaviorSubject<ProgressTrackerState[]> {
groups: ProgressTrackerGroup[] = [];

changed = throttleTime(() => {
this.next(this.value);

//check if all groups are done
if (this.groups.every(v => v.done === v.total)) {
this.complete();
}
}, 100);

constructor(states: ProgressTrackerState[] = []) {
super(states);
this.groups = states.map(v => new ProgressTrackerGroup(v));
}

next(states: ProgressTrackerState[]) {
//don't create new groups, but update existing ones, and remove old ones, and add new ones if needed
for (let i = 0; i < states.length; i++) {
if (i < this.groups.length) {
const old = this.groups[i].state;
const next = states[i];
this.groups[i].state = next;
if (old.stopped !== next.stopped) {
if (next.stopped) {
this.groups[i].stopCallbacks.forEach(v => v());
}
}
} else {
this.groups.push(new ProgressTrackerGroup(states[i]));
}
}

//remove old groups
this.groups.splice(states.length, this.groups.length - states.length);

super.next(states);
}

get running(): boolean {
return this.groups.some(v => v.running);
}

get ended(): boolean {
return !this.running;
}

stop() {
this.groups.forEach(v => v.stop());
this.changed();
}

get stopped() {
return this.groups.every(v => v.stopped);
}

track(message: string = '', total: number, current: number = 0): ProgressTrackerGroup {
const group = new ProgressTrackerGroup({ total, done: current, message, speed: 0, stopped: false });
group.changed = () => {
this.changed();
};
this.groups.push(group);
this.value.push(group.state);
this.changed();
return group;
}

get progress(): number {
if (this.groups.length === 0) return 0;
return this.groups.reduce((v, group) => v + group.progress, 0) / this.groups.length;
}

get finished(): boolean {
return this.groups.every(v => v.done === v.total);
}

get done(): number {
return this.groups.reduce((v, group) => v + group.done, 0);
}

get total(): number {
return this.groups.reduce((v, group) => v + group.total, 0);
}

get current(): ProgressTrackerGroup {
return this.groups[0];
}
}

export class ProgressTrackerWatcher<T extends ProgressTracker = ProgressTracker> extends BehaviorSubject<T> {
}

/**
* Turns a ProgressTracker into a BehaviorSubject<ProgressTracker> aka ProgressTrackerWatcher.
*/
export function watchProgressTracker<T extends ProgressTracker>(tracker: T): ProgressTrackerWatcher<T> {
const subject = new BehaviorSubject<T>(tracker);
const sub = tracker.subscribe(() => {
subject.next(tracker);
}, (error) => subject.error(error), () => subject.complete());
subject.subscribe().add(() => sub.unsubscribe());
return subject;
}
64 changes: 63 additions & 1 deletion packages/core-rxjs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/

import { BehaviorSubject, isObservable, Observable, Observer, Subject, Subscriber, Subscription, TeardownLogic } from 'rxjs';
import { arrayRemoveItem, createStack, isFunction, mergePromiseStack, mergeStack } from '@deepkit/core';
import { arrayRemoveItem, asyncOperation, createStack, isFunction, mergePromiseStack, mergeStack } from '@deepkit/core';
import { first, skip } from 'rxjs/operators';

export class AsyncSubscription {
Expand Down Expand Up @@ -151,3 +151,65 @@ export async function tearDown(teardown: TeardownLogic) {
await teardown.unsubscribe();
}
}

/**
* Handles incoming messages in batches. The handler is called when the observable completes or when a certain time passed since the last message.
*
* This makes sure the handler is awaited before the next batch is processed.
*
* `maxWait` in milliseconds, this makes sure every `maxWait` ms the handler is called with the current messages if there are any.
* `batchSize` this is the maximum amount of messages that are passed to the handler.
*/
export async function throttleMessages<T, R>(observable: Observable<T>, handler: (messages: T[]) => Promise<R>, options: Partial<{
maxWait: number,
batchSize: number
}> = {}): Promise<R[]> {
return asyncOperation(async (resolve, reject) => {
const maxWait = options.maxWait || 100;
const batchSize = options.batchSize || 100;

const results: R[] = [];
let messages: T[] = [];
let lastFlush = Date.now();
let handlerDone = true;
let finished = false;

function flush(andFinish = false) {
finished = andFinish;
if (!handlerDone) return;
if (!messages.length) {
if (andFinish) resolve(results);
return;
}
lastFlush = Date.now();

handlerDone = false;
const messagesToSend = messages.slice(0);
messages = [];
handler(messagesToSend).then((result) => {
results.push(result);
handlerDone = true;
if (andFinish) {
resolve(results);
} else if (finished) {
flush(true);
}
}, (error) => {
sub.unsubscribe();
reject(error);
});
}

const sub = observable.subscribe((message) => {
messages.push(message);
const diffTime = Date.now() - lastFlush;
if (diffTime > maxWait || messages.length >= batchSize) {
flush();
}
}, (error) => {
reject(error);
}, () => {
flush(true);
});
});
}
54 changes: 52 additions & 2 deletions packages/core-rxjs/tests/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect, test } from '@jest/globals';
import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
import { isBehaviorSubject, isSubject, nextValue, Subscriptions } from '../src/utils.js';
import { BehaviorSubject, Observable, ReplaySubject, Subject, Subscription } from 'rxjs';
import { isBehaviorSubject, isSubject, nextValue, Subscriptions, throttleMessages } from '../src/utils.js';
import { ProgressTracker } from '../src/progress.js';

test('nextValue subject', async () => {
const subject = new Subject();
Expand Down Expand Up @@ -95,3 +96,52 @@ test('Subscriptions auto remove', async () => {
expect(sub1.closed).toBe(true);
expect(sub2.closed).toBe(true);
});

test('throttleMessages', async () => {
const behaviorSubject = new ReplaySubject<number>();
for (let i = 0; i < 100; i++) {
behaviorSubject.next(i);
}

setTimeout(() => {
behaviorSubject.complete();
}, 100);

let i = 0;
await throttleMessages(behaviorSubject, async (numbers) => {
i++;
if (i === 1) {
expect(numbers.length).toBe(10);
} else {
//once the first batch is handled, the observable is filled completely up
expect(numbers.length).toBe(90);
}
}, { batchSize: 10 });
});

test('progress', async () => {
const progressTracker = new ProgressTracker();
const test1 = progressTracker.track('test1', 5);
test1.done++;
test1.done++;

expect(progressTracker.progress).toBe(2 / 5);
expect(progressTracker.done).toBe(2);
expect(progressTracker.total).toBe(5);
expect(progressTracker.stopped).toBe(false);
expect(progressTracker.finished).toBe(false);
expect(progressTracker.ended).toBe(false);

test1.done++;
test1.done++;
test1.done++;
expect(progressTracker.progress).toBe(1);
expect(progressTracker.done).toBe(5);
expect(progressTracker.stopped).toBe(false);
expect(progressTracker.finished).toBe(true);
expect(progressTracker.ended).toBe(true);

test1.done++;
expect(progressTracker.progress).toBe(1);
expect(progressTracker.done).toBe(5);
});

0 comments on commit c2e6be8

Please sign in to comment.