Skip to content

Commit

Permalink
feature(rpc): add support for ProgressTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
marcj committed May 1, 2023
1 parent c2e6be8 commit a8d3c95
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 10 deletions.
25 changes: 21 additions & 4 deletions packages/rpc/src/client/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { ClientProgress } from '../writer.js';
import type { WritableClient } from './client.js';
import { EntityState, EntitySubjectStore } from './entity-state.js';
import { assertType, deserializeType, ReflectionKind, Type, TypeObjectLiteral, typeOf } from '@deepkit/type';
import { ProgressTracker, ProgressTrackerState } from '@deepkit/core-rxjs';

interface ResponseActionObservableError extends rpcActionObservableSubscribeId, WrappedV {
}
Expand Down Expand Up @@ -240,11 +241,12 @@ export class RpcActionClient {
firstObservableNext = undefined;
}
resolve(observableSubject);
} else if (body.type === ActionObservableTypes.behaviorSubject) {
observableSubject = new BehaviorSubject<any>(firstObservableNext);
} else if (body.type === ActionObservableTypes.behaviorSubject || body.type === ActionObservableTypes.progressTracker) {
const classType = body.type === ActionObservableTypes.progressTracker ? ProgressTracker : BehaviorSubject;
observableSubject = new classType(firstObservableNext);
firstObservableNext = undefined;

//we have to monkey patch unsubscribe, because they is no other way to hook into that
//we have to monkey patch unsubscribe, because there is no other way to hook into that
// note: subject.subscribe().add(T), T is not called when subject.unsubscribe() is called.
observableSubject.unsubscribe = () => {
Subject.prototype.unsubscribe.call(observableSubject);
Expand All @@ -255,6 +257,17 @@ export class RpcActionClient {
Subject.prototype.complete.call(observableSubject);
subject.send(RpcTypes.ActionObservableSubjectUnsubscribe);
};

if (observableSubject instanceof ProgressTracker) {
//whenever the client changes something, it's synced back to the server.
//this is important to handle the stop signal.
const oldChanged = observableSubject.changed;
observableSubject.changed = function (this: ProgressTracker) {
subject.send(RpcTypes.ActionObservableProgressNext, this.value, typeOf<ProgressTrackerState[]>());
return oldChanged.apply(this);
};
}

resolve(observableSubject);
}

Expand Down Expand Up @@ -404,7 +417,11 @@ export class RpcActionClient {
collection.loaded();
}

public async loadActionTypes(controller: RpcControllerState, method: string, options: { timeout?: number, dontWaitForConnection?: true, typeReuseDisabled?: boolean } = {}): Promise<ControllerStateActionTypes> {
public async loadActionTypes(controller: RpcControllerState, method: string, options: {
timeout?: number,
dontWaitForConnection?: true,
typeReuseDisabled?: boolean
} = {}): Promise<ControllerStateActionTypes> {
const state = controller.getState(method);
if (state.types) return state.types;

Expand Down
22 changes: 22 additions & 0 deletions packages/rpc/src/client/client-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ export class RpcWebSocketClient extends RpcClient {
export class DeepkitClient extends RpcWebSocketClient {
}

/**
* Returns the WebSocket URL for the given base URL and allows port mapping.
* Default port-mapping maps Angular server :4200 to :8080
*/
export function webSocketFromBaseUrl(baseUrl: string, portMapping: { [name: number]: number } = { 4200: 8080 }): string {
let url = baseUrl.replace('https://', 'wss://').replace('http://', 'ws://');
for (const [from, to] of Object.entries(portMapping)) {
url = url.replace(':' + from, ':' + to);
}
return url;
}

/**
* Creates a provider for RpcWebSocketClient that is compatible with Angular and Deepkit.
*/
export function createRpcWebSocketClientProvider(baseUrl: string = typeof location !== 'undefined' ? location.origin : 'http://localhost', portMapping: { [name: number]: number } = { 4200: 8080 }) {
return {
provide: RpcWebSocketClient,
useFactory: () => new RpcWebSocketClient(webSocketFromBaseUrl(baseUrl, portMapping))
};
}

declare var require: (module: string) => any;

export class RpcWebSocketClientAdapter implements ClientTransportAdapter {
Expand Down
5 changes: 5 additions & 0 deletions packages/rpc/src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ export enum RpcTypes {
Entity, //change feed as composite, containing all Entity*
EntityPatch,
EntityRemove,

//Handles changes in ProgressTracker from client side to server (e.g. stop signal)
//From server to client is handled normally via ObservableNext
ActionObservableProgressNext
}

export interface rpcClientId {
Expand Down Expand Up @@ -324,6 +328,7 @@ export enum ActionObservableTypes {
observable,
subject,
behaviorSubject,
progressTracker,
}

export interface rpcSort {
Expand Down
41 changes: 36 additions & 5 deletions packages/rpc/src/server/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/

import { ClassType, collectForMicrotask, getClassName, isPrototypeOfBase, toFastProperties } from '@deepkit/core';
import { isBehaviorSubject, isSubject } from '@deepkit/core-rxjs';
import { isBehaviorSubject, isSubject, ProgressTracker, ProgressTrackerState } from '@deepkit/core-rxjs';
import {
assertType,
findMember,
Expand Down Expand Up @@ -205,6 +205,24 @@ export class RpcServerAction {
} else if (isPrototypeOfBase(unwrappedReturnType.classType, EntitySubject)) {
mode = 'entitySubject';
type = unwrappedReturnType.typeArguments ? unwrappedReturnType.typeArguments[0] : { kind: ReflectionKind.any };
} else if (isPrototypeOfBase(unwrappedReturnType.classType, ProgressTracker)) {
mode = 'observable';
type = typeOf<ProgressTrackerState[]>();
nextSchema = {
kind: ReflectionKind.objectLiteral,
types: [{
kind: ReflectionKind.propertySignature,
name: 'id',
parent: Object as any,
type: { kind: ReflectionKind.number },
}, {
kind: ReflectionKind.propertySignature,
name: 'v',
parent: Object as any,
optional: true,
type: type,
}]
};
} else if (isPrototypeOfBase(unwrappedReturnType.classType, Observable)) {
mode = 'observable';
type = unwrappedReturnType.typeArguments ? unwrappedReturnType.typeArguments[0] : { kind: ReflectionKind.any };
Expand Down Expand Up @@ -248,6 +266,9 @@ export class RpcServerAction {
collectionSchema,
collectionQueryModel,
};
if (!types.type) {
throw new Error(`No type detected for action ${controller}.${methodName}`);
}
toFastProperties(this.cachedActionsTypes);

return types;
Expand Down Expand Up @@ -343,6 +364,13 @@ export class RpcServerAction {
delete this.observableSubjects[message.id];
break;
}

case RpcTypes.ActionObservableProgressNext: { //ProgressTracker changes from client (e.g. stop signal)
const observable = this.observables[message.id];
if (!observable || !(observable.observable instanceof ProgressTracker)) return response.error(new Error('No observable ProgressTracker to sync found'));
observable.observable.next(message.parseBody<ProgressTrackerState[]>());
break;
}
}
}

Expand Down Expand Up @@ -443,6 +471,13 @@ export class RpcServerAction {
if (isSubject(result)) {
type = ActionObservableTypes.subject;

if (isBehaviorSubject(result)) {
type = ActionObservableTypes.behaviorSubject;
if (result instanceof ProgressTracker) {
type = ActionObservableTypes.progressTracker;
}
}

this.observableSubjects[message.id] = {
subject: result,
completedByClient: false,
Expand All @@ -462,10 +497,6 @@ export class RpcServerAction {
});
})
};

if (isBehaviorSubject(result)) {
type = ActionObservableTypes.behaviorSubject;
}
}

response.reply<rpcResponseActionObservable>(RpcTypes.ResponseActionObservable, { type });
Expand Down
98 changes: 97 additions & 1 deletion packages/rpc/tests/controller.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { getActions, rpc } from '../src/decorators.js';
import { RpcKernel, RpcKernelConnection } from '../src/server/kernel.js';
import { Session, SessionState } from '../src/server/security.js';
import { BehaviorSubject } from 'rxjs';
import { getClassName } from '@deepkit/core';
import { getClassName, sleep } from '@deepkit/core';
import { ProgressTracker } from '@deepkit/core-rxjs';

test('decorator', async () => {
@rpc.controller('name')
Expand Down Expand Up @@ -455,3 +456,98 @@ test('disable type reuse', async () => {
expect(res.items[0]).toEqual({ title: '123' });
}
});

test('progress tracker', async () => {
class Controller {
progress = new ProgressTracker();
tracker = this.progress.track('test', 10);

@rpc.action()
async getProgress(): Promise<ProgressTracker> {
return this.progress;
}

@rpc.action()
increase(): void {
this.tracker.done++;
}

@rpc.action()
done(): void {
this.tracker.done = 10;
}
}

const kernel = new RpcKernel();
kernel.registerController(Controller, 'myController');

const client = new DirectClient(kernel);
client.disableTypeReuse();
const controller = client.controller<Controller>('myController');

{
const res = await controller.getProgress();
expect(res).toBeInstanceOf(ProgressTracker);
expect(res.progress).toEqual(0);
expect(res.done).toEqual(0);
expect(res.total).toEqual(10);

await controller.increase();
await sleep(0.01);
expect(res.done).toEqual(1);
expect(res.total).toEqual(10);

await controller.done();
await sleep(0.01);
expect(res.done).toEqual(10);
expect(res.finished).toEqual(true);
expect(res.total).toEqual(10);
}
});

test('progress tracker stop', async () => {
let stopCalled = false;

class Controller {
@rpc.action()
async getProgress(): Promise<ProgressTracker> {
const tracker = new ProgressTracker();
const test1 = tracker.track('test1', 1000);

const int = setInterval(() => {
test1.done++;
}, 10);

test1.onStop(() => {
stopCalled = true;
clearInterval(int);
});

return tracker;
}
}

const kernel = new RpcKernel();
kernel.registerController(Controller, 'myController');

const client = new DirectClient(kernel);
client.disableTypeReuse();
const controller = client.controller<Controller>('myController');

{
const res = await controller.getProgress();
expect(res).toBeInstanceOf(ProgressTracker);
expect(res.done).toEqual(0);
expect(res.total).toEqual(1000);
await sleep(0.1);
expect(res.done).toBeGreaterThan(0);
expect(res.done).toBeLessThan(50);
res.stop();
await sleep(0.1);
expect(stopCalled).toBe(true);
expect(res.done).toBeLessThan(50);
expect(res.finished).toBe(false);
expect(res.stopped).toBe(true);
expect(res.ended).toBe(true);
}
});

0 comments on commit a8d3c95

Please sign in to comment.