Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Message pack protocol in TS client #631

Merged
merged 3 commits into from Aug 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,4 +1,4 @@
import { TextMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters"
import { TextMessageFormat, BinaryMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters"

describe("Text Message Formatter", () => {
it("should return empty array on empty input", () => {
Expand Down Expand Up @@ -30,3 +30,43 @@ describe("Text Message Formatter", () => {
});
});
});

describe("Binary Message Formatter", () => {
([
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], <Uint8Array[]>[ new Uint8Array([])]],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff], <Uint8Array[]>[ new Uint8Array([0xff])]],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x7f], <Uint8Array[]>[ new Uint8Array([0xff]), new Uint8Array([0x7f])]],
] as [[number[], Uint8Array[]]]).forEach(([payload, expected_messages]) => {
it(`should parse '${payload}' correctly`, () => {
let messages = BinaryMessageFormat.parse(new Uint8Array(payload).buffer);
expect(messages).toEqual(expected_messages);
})
});

([
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Cannot read message size")],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x80, 0x00], new Error("Cannot read message size")],
[[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00], new Error("Incomplete message")],
] as [[number[], Error]]).forEach(([payload, expected_error]) => {
it(`should fail to parse '${payload}'`, () => {
expect(() => BinaryMessageFormat.parse(new Uint8Array(payload).buffer)).toThrow(expected_error);
})
});

([
[[], [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]],
[[0x20], [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20]],
] as [[number[], number[]]]).forEach(([input, expected_payload]) => {
it(`should write '${input}'`, () => {
let actual = new Uint8Array(BinaryMessageFormat.write(new Uint8Array(input)));
let expected = new Uint8Array(expected_payload);
expect(actual).toEqual(expected);
})
});
});
@@ -0,0 +1,103 @@
import { MessagePackHubProtocol } from "../Microsoft.AspNetCore.SignalR.Client.TS/MessagePackHubProtocol"
import { MessageType, InvocationMessage, CompletionMessage, ResultMessage } from "../Microsoft.AspNetCore.SignalR.Client.TS/IHubProtocol"

describe("MessageHubProtocol", () => {
it("can write/read Invocation message", () => {
let invocation = <InvocationMessage>{
type: MessageType.Invocation,
invocationId: "123",
target: "myMethod",
nonblocking: true,
arguments: [42, true, "test", ["x1", "y2"], null]
};

let protocol = new MessagePackHubProtocol();
var parsedMessages = protocol.parseMessages(protocol.writeMessage(invocation));
expect(parsedMessages).toEqual([invocation]);
});

([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0b,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x01, 0xa3, 0x45, 0x72, 0x72],
{
type: MessageType.Completion,
invocationId: "abc",
error: "Err",
result: null
} as CompletionMessage ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x02 ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: null
} as CompletionMessage ]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Completion message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([expected_message]);
}));

([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08 ],
{
type: MessageType.Result,
invocationId: "abc",
item: 8
} as ResultMessage ]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Result message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([expected_message]);
}));

([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x90 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xc2 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x05 ], new Error("Invalid message type.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x91, 0xa1, 0x78 ], new Error("Invalid message type.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x01 ], new Error("Invalid payload for Invocation message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x02 ], new Error("Invalid payload for stream Result message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x92, 0x03, 0xa0 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x94, 0x03, 0xa0, 0x02, 0x00 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x93, 0x03, 0xa0, 0x01 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x93, 0x03, 0xa0, 0x03 ], new Error("Invalid payload for Completion message.") ]
] as [[number[], Error]]).forEach(([payload, expected_error]) =>
it("throws for invalid messages", () => {
expect(() => new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer))
.toThrow(expected_error);
}));

it("can read multiple messages", () => {
let payload = [
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ];
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([
{
type: MessageType.Result,
invocationId: "abc",
item: 8
} as ResultMessage,
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage
]);
});
});
54 changes: 53 additions & 1 deletion client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Formatters.ts
Expand Up @@ -34,7 +34,7 @@ export namespace TextMessageFormat {
if (!hasSpace(input, offset, 1 + length)) {
throw new Error("Message is incomplete");
}

// Read the payload
var payload = input.substr(offset, length);
offset += length;
Expand Down Expand Up @@ -66,4 +66,56 @@ export namespace TextMessageFormat {
}
return messages;
}
}

export namespace BinaryMessageFormat {
export function write(output: Uint8Array): ArrayBuffer {
let size = output.byteLength;
let buffer = new Uint8Array(size + 8);

// javascript bitwise operators only support 32-bit integers
for (let i = 7; i >= 4; i--) {
buffer[i] = size & 0xff;
size = size >> 8;
}

buffer.set(output, 8);

return buffer.buffer;
}

export function parse(input: ArrayBuffer): Uint8Array[] {
let result: Uint8Array[] = [];
let uint8Array = new Uint8Array(input);
// 8 - the length prefix size
for (let offset = 0; offset < input.byteLength;) {

if (input.byteLength < offset + 8) {
throw new Error("Cannot read message size")
}

// Note javascript bitwise operators only support 32-bit integers - for now cutting bigger messages.
// Tracking bug https://github.com/aspnet/SignalR/issues/613
if (!(uint8Array[offset] == 0 && uint8Array[offset + 1] == 0 && uint8Array[offset + 2] == 0
&& uint8Array[offset + 3] == 0 && (uint8Array[offset + 4] & 0x80) == 0)) {
throw new Error("Messages bigger than 2147483647 bytes are not supported");
}

let size = 0;
for (let i = 4; i < 8; i++) {
size = (size << 8) | uint8Array[offset + i];
}

if (uint8Array.byteLength >= (offset + 8 + size)) {
result.push(uint8Array.slice(offset + 8, offset + 8 + size))
}
else {
throw new Error("Incomplete message");
}

offset = offset + 8 + size;
}

return result;
}
}
Expand Up @@ -16,8 +16,9 @@ export class HubConnection {
private connectionClosedCallback: ConnectionClosed;
private protocol: IHubProtocol;

constructor(connection: IConnection) {
constructor(connection: IConnection, protocol: IHubProtocol = new JsonHubProtocol()) {
this.connection = connection;
this.protocol = protocol || new JsonHubProtocol();
this.connection.onDataReceived = data => {
this.onDataReceived(data);
};
Expand All @@ -28,7 +29,6 @@ export class HubConnection {
this.callbacks = new Map<string, (invocationEvent: CompletionMessage | ResultMessage) => void>();
this.methods = new Map<string, (...args: any[]) => void>();
this.id = 0;
this.protocol = new JsonHubProtocol();
}

private onDataReceived(data: any) {
Expand Down
Expand Up @@ -30,6 +30,6 @@ export interface NegotiationMessage {

export interface IHubProtocol {
name(): string;
parseMessages(input: string): HubMessage[];
writeMessage(message: HubMessage): string;
parseMessages(input: any): HubMessage[];
writeMessage(message: HubMessage): any;
}
@@ -0,0 +1,118 @@
import { IHubProtocol, MessageType, HubMessage, InvocationMessage, ResultMessage, CompletionMessage } from "./IHubProtocol";
import { BinaryMessageFormat } from "./Formatters"
import * as msgpack5 from "msgpack5"

export class MessagePackHubProtocol implements IHubProtocol {
name(): string {
return "messagepack";
}

parseMessages(input: ArrayBuffer): HubMessage[] {
return BinaryMessageFormat.parse(input).map(m => this.parseMessage(m));
}

private parseMessage(input: Uint8Array): HubMessage {
if (input.length == 0) {
throw new Error("Invalid payload.");
}

let msgpack = msgpack5();
let properties = msgpack.decode(new Buffer(input));
if (properties.length == 0 || !(properties instanceof Array)) {
throw new Error("Invalid payload.");
}

let messageType = properties[0] as MessageType;
switch (messageType) {
case MessageType.Invocation:
return this.createInvocationMessage(properties);
case MessageType.Result:
return this.createStreamItemMessage(properties);
case MessageType.Completion:
return this.createCompletionMessage(properties);
default:
throw new Error("Invalid message type.");
}
}

private createInvocationMessage(properties: any[]): InvocationMessage {
if (properties.length != 5) {
throw new Error("Invalid payload for Invocation message.");
}

return {
type: MessageType.Invocation,
invocationId: properties[1],
nonblocking: properties[2],
target: properties[3],
arguments: properties[4]
} as InvocationMessage;
}

private createStreamItemMessage(properties: any[]): ResultMessage {
if (properties.length != 3) {
throw new Error("Invalid payload for stream Result message.");
}

return {
type: MessageType.Result,
invocationId: properties[1],
item: properties[2]
} as ResultMessage;
}

private createCompletionMessage(properties: any[]): CompletionMessage {
if (properties.length < 3) {
throw new Error("Invalid payload for Completion message.");
}

const errorResult = 1;
const voidResult = 2;
const nonVoidResult = 3;

let resultKind = properties[2];

if ((resultKind === voidResult && properties.length != 3) ||
(resultKind !== voidResult && properties.length != 4)) {
throw new Error("Invalid payload for Completion message.");
}

let completionMessage = {
type: MessageType.Completion,
invocationId: properties[1],
error: null as string,
result: null as any
};

switch (resultKind) {
case errorResult:
completionMessage.error = properties[3];
break;
case nonVoidResult:
completionMessage.result = properties[3];
break;
}

return completionMessage as ResultMessage;
}

writeMessage(message: HubMessage): ArrayBuffer {
switch (message.type) {
case MessageType.Invocation:
return this.writeInvocation(message as InvocationMessage);
case MessageType.Result:
case MessageType.Completion:
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
default:
throw new Error("Invalid message type.");
}
}

private writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer {
let msgpack = msgpack5();
let payload = msgpack.encode([ MessageType.Invocation, invocationMessage.invocationId,
invocationMessage.nonblocking, invocationMessage.target, invocationMessage.arguments]);

return BinaryMessageFormat.write(payload.slice());
}
}
Expand Up @@ -24,6 +24,7 @@ gulp.task('browserify-client', ['compile-ts-client'], () => {
.pipe(gulp.dest(clientOutDir + '/../browser'));
});


gulp.task('build-ts-client', ['clean', 'compile-ts-client', 'browserify-client']);

gulp.task('default', ['build-ts-client']);