Skip to content

Commit

Permalink
fix(rpc): ensure data is not chunked twice in server->client controllers
Browse files Browse the repository at this point in the history
Prior to this change, the back connection (server->client) had chunking enabled in its clientKernel, which caused messages to be chunked twice.
As a result, chunks were not acknowledged and the protocol came to a standstill.
  • Loading branch information
marcj committed May 9, 2024
1 parent 4d24c8b commit 6c59f9b
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 10 deletions.
22 changes: 20 additions & 2 deletions packages/rpc/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,23 @@
import { asyncOperation, ClassType, formatError, sleep } from '@deepkit/core';
import { ReceiveType, resolveReceiveType } from '@deepkit/type';
import { BehaviorSubject, Observable, Subject } from 'rxjs';
import { ControllerDefinition, rpcAuthenticate, rpcClientId, rpcPeerDeregister, rpcPeerRegister, rpcResponseAuthenticate, RpcTypes } from '../model.js';
import { createRpcMessage, createRpcMessagePeer, ErroredRpcMessage, RpcMessage, RpcMessageReader, RpcMessageRouteType } from '../protocol.js';
import {
ControllerDefinition,
rpcAuthenticate,
rpcClientId,
rpcPeerDeregister,
rpcPeerRegister,
rpcResponseAuthenticate,
RpcTypes,
} from '../model.js';
import {
createRpcMessage,
createRpcMessagePeer,
ErroredRpcMessage,
RpcMessage,
RpcMessageReader,
RpcMessageRouteType,
} from '../protocol.js';
import { RpcKernel, RpcKernelConnection } from '../server/kernel.js';
import { ClientProgress, RpcMessageWriter, RpcMessageWriterOptions, SingleProgress } from '../writer.js';
import { RpcActionClient, RpcControllerState } from './action.js';
Expand Down Expand Up @@ -601,6 +616,9 @@ export class RpcClient extends RpcBaseClient {
return this.transporter.bufferedAmount();
}
});
// Import to disable since transporter.send chunks already,
// otherwise data is chunked twice and protocol breaks.
c.writerOptions.chunkSize = 0;
if (!(c instanceof RpcKernelConnection)) throw new Error('Expected RpcKernelConnection from clientKernel.createConnection');
this.clientKernelConnection = c;
}
Expand Down
13 changes: 10 additions & 3 deletions packages/rpc/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
* You should have received a copy of the MIT License along with this program.
*/

import { deserializeBSONWithoutOptimiser, getBSONDeserializer, getBSONSerializer, getBSONSizer, Writer } from '@deepkit/bson';
import {
deserializeBSONWithoutOptimiser,
getBSONDeserializer,
getBSONSerializer,
getBSONSizer,
Writer,
} from '@deepkit/bson';
import { ClassType } from '@deepkit/core';
import { rpcChunk, rpcError, RpcTypes } from './model.js';
import type { SingleProgress } from './writer.js';
Expand Down Expand Up @@ -468,7 +474,7 @@ export class RpcMessageReader {

protected gotMessage(buffer: Uint8Array) {
const message = readRpcMessage(buffer);
// console.log('reader got', message.id, RpcTypes[message.type], message.bodySize, buffer.byteLength);
// console.log('reader got', message.id, RpcTypes[message.type], {routeType: message.routeType, bodySize: message.bodySize, byteLength: buffer.byteLength});

if (message.type === RpcTypes.ChunkAck) {
const ack = this.chunkAcks.get(message.id);
Expand Down Expand Up @@ -498,7 +504,8 @@ export class RpcMessageReader {
newBuffer.set(buffer, offset);
offset += buffer.byteLength;
}
this.onMessage(readRpcMessage(newBuffer));
const packedMessage = readRpcMessage(newBuffer);
this.onMessage(packedMessage);
}
} else {
const progress = this.progress.get(message.id);
Expand Down
5 changes: 4 additions & 1 deletion packages/rpc/src/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,15 @@ export class RpcMessageWriter implements RpcConnectionWriter {
this.writer.close();
}

/**
* Writes a message buffer to the connection and chunks if necessary.
*/
write(buffer: Uint8Array, progress?: SingleProgress): void {
this.writeFull(buffer, progress).catch(error => console.log('RpcMessageWriter writeAsync error', error));
}

async writeFull(buffer: Uint8Array, progress?: SingleProgress): Promise<void> {
if (buffer.byteLength >= this.options.chunkSize) {
if (this.options.chunkSize && buffer.byteLength >= this.options.chunkSize) {
//split up
const chunkId = this.chunkId++;
const message = readRpcMessage(buffer); //we need the original message-id, so the chunks are correctly assigned in Progress tracker
Expand Down
31 changes: 31 additions & 0 deletions packages/rpc/tests/chunks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { rpc } from '../src/decorators.js';
import { RpcKernel } from '../src/server/kernel.js';
import { ClientProgress } from '../src/writer.js';
import { RpcBufferReader } from '../src/protocol.js';
import { asyncOperation } from '@deepkit/core';

test('buffer read does not do copy', async () => {
const data = Buffer.from('hello world');
Expand Down Expand Up @@ -107,3 +108,33 @@ test('chunks', async () => {
expect(progress.upload.progress).toBe(1);
}
});

test('back controller', async () => {
@rpc.controller('test')
class TestController {
@rpc.action()
uploadBig(file: Uint8Array): number {
return file.length;
}

@rpc.action()
downloadBig(size: number): Uint8Array {
return Buffer.alloc(size);
}
}

const kernel = new RpcKernel();

const res = await asyncOperation<Uint8Array>(async (resolve) => {
kernel.onConnection(async (connection) => {
const ctrl = connection.controller<TestController>('test');
const res = await ctrl.downloadBig(105_000);
resolve(res);
});

const client = new DirectClient(kernel);
client.registerController(TestController, 'test');
await client.connect();
});
expect(res.byteLength).toBe(105_000);
});
10 changes: 6 additions & 4 deletions packages/type/tests/receive-type.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ test('function with ReceiveType return expression', () => {

test('ReceiveType forward to type passing', () => {
function typeOf2<T>(type?: ReceiveType<T>) {
console.log('typeOf2', type);
return resolveReceiveType(type);
}

function mySerialize<T>(type?: ReceiveType<T>) {
console.log('mySerialize', type);
return typeOf2<T>();
}

console.log('typeOf2', typeOf2.toString());
console.log('mySerialize', mySerialize.toString());
function mySerialize2<T>(type?: ReceiveType<T>) {
return typeOf2<T[]>();
}

const type = mySerialize<string>();
expect(type).toMatchObject({ kind: ReflectionKind.string });

const type2 = mySerialize2<string>();
expect(type2).toMatchObject({ kind: ReflectionKind.array, type: { kind: ReflectionKind.string } });
});

0 comments on commit 6c59f9b

Please sign in to comment.