Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Adding MsgPack hub protocol to TS client
Browse files Browse the repository at this point in the history
  • Loading branch information
Pawel Kadluczka committed Jul 3, 2017
1 parent 4fc578f commit bbe5f52
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 11 deletions.
@@ -1,4 +1,4 @@
import { TextMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters"
import { TextMessageFormat, BinaryMessageFormat } from "../Microsoft.AspNetCore.SignalR.Client.TS/Formatters"

describe("Text Message Formatter", () => {
it("should return empty array on empty input", () => {
Expand Down Expand Up @@ -30,3 +30,43 @@ describe("Text Message Formatter", () => {
});
});
});

describe("Binary Message Formatter", () => {
([
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], <Uint8Array[]>[ new Uint8Array([])]],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff], <Uint8Array[]>[ new Uint8Array([0xff])]],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x7f], <Uint8Array[]>[ new Uint8Array([0xff]), new Uint8Array([0x7f])]],
] as [[number[], Uint8Array[]]]).forEach(([payload, expected_messages]) => {
it(`should parse '${payload}' correctly`, () => {
let messages = BinaryMessageFormat.parse(new Uint8Array(payload).buffer);
expect(messages).toEqual(expected_messages);
})
});

([
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Cannot read message size")],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x01, 0x80, 0x00], new Error("Cannot read message size")],
[[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00], new Error("Messages bigger than 2147483647 bytes are not supported")],
[[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00], new Error("Incomplete message")],
] as [[number[], Error]]).forEach(([payload, expected_error]) => {
it(`should fail to parse '${payload}'`, () => {
expect(() => BinaryMessageFormat.parse(new Uint8Array(payload).buffer)).toThrow(expected_error);
})
});

([
[[], [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]],
[[0x20], [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20]],
] as [[number[], number[]]]).forEach(([input, expected_payload]) => {
it(`should write '${input}'`, () => {
let actual = new Uint8Array(BinaryMessageFormat.write(new Uint8Array(input)));
let expected = new Uint8Array(expected_payload);
expect(actual).toEqual(expected);
})
});
});
@@ -0,0 +1,103 @@
import { MessagePackHubProtocol } from "../Microsoft.AspNetCore.SignalR.Client.TS/MessagePackHubProtocol"
import { MessageType, InvocationMessage, CompletionMessage, ResultMessage } from "../Microsoft.AspnetCore.SignalR.Client.TS/IHubProtocol"

describe("MessageHubProtocol", () => {
it("can write/read Invocation message", () => {
let invocation = <InvocationMessage>{
type: MessageType.Invocation,
invocationId: "123",
target: "myMethod",
nonblocking: true,
arguments: [42, true, "test", ["x1", "y2"], null]
};

let protocol = new MessagePackHubProtocol();
var parsedMessages = protocol.parseMessages(protocol.writeMessage(invocation));
expect(parsedMessages).toEqual([invocation]);
});

([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0b,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x01, 0xa3, 0x45, 0x72, 0x72],
{
type: MessageType.Completion,
invocationId: "abc",
error: "Err",
result: null
} as CompletionMessage ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x02 ],
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: null
} as CompletionMessage ]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Completion message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([expected_message]);
}));

([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08 ],
{
type: MessageType.Result,
invocationId: "abc",
item: 8
} as ResultMessage ]
] as [[number[], CompletionMessage]]).forEach(([payload, expected_message]) =>
it("can read Result message", () => {
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([expected_message]);
}));

([
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x90 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xc2 ], new Error("Invalid payload.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x05 ], new Error("Invalid message type.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x91, 0xa1, 0x78 ], new Error("Invalid message type.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x01 ], new Error("Invalid payload for Invocation message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x91, 0x02 ], new Error("Invalid payload for stream Result message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x92, 0x03, 0xa0 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x94, 0x03, 0xa0, 0x02, 0x00 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x93, 0x03, 0xa0, 0x01 ], new Error("Invalid payload for Completion message.") ],
[ [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x93, 0x03, 0xa0, 0x03 ], new Error("Invalid payload for Completion message.") ]
] as [[number[], Error]]).forEach(([payload, expected_error]) =>
it("throws for invalid messages", () => {
expect(() => new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer))
.toThrow(expected_error);
}));

it("can read multiple messages", () => {
let payload = [
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07,
0x93, 0x02, 0xa3, 0x61, 0x62, 0x63, 0x08,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a,
0x94, 0x03, 0xa3, 0x61, 0x62, 0x63, 0x03, 0xa2, 0x4f, 0x4b ];
let messages = new MessagePackHubProtocol().parseMessages(new Uint8Array(payload).buffer);
expect(messages).toEqual([
{
type: MessageType.Result,
invocationId: "abc",
item: 8
} as ResultMessage,
{
type: MessageType.Completion,
invocationId: "abc",
error: null,
result: "OK"
} as CompletionMessage
]);
});
});
54 changes: 53 additions & 1 deletion client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Formatters.ts
Expand Up @@ -34,7 +34,7 @@ export namespace TextMessageFormat {
if (!hasSpace(input, offset, 1 + length)) {
throw new Error("Message is incomplete");
}

// Read the payload
var payload = input.substr(offset, length);
offset += length;
Expand Down Expand Up @@ -66,4 +66,56 @@ export namespace TextMessageFormat {
}
return messages;
}
}

export namespace BinaryMessageFormat {
export function write(output: Uint8Array): ArrayBuffer {
let size = output.byteLength;
let buffer = new Uint8Array(size + 8);

// javascript bitwise operators only support 32-bit integers
for (let i = 7; i >= 4; i--) {
buffer[i] = size & 0xff;
size = size >> 8;
}

buffer.set(output, 8);

return buffer.buffer;
}

export function parse(input: ArrayBuffer): Uint8Array[] {
let result: Uint8Array[] = [];
let uint8Array = new Uint8Array(input);
// 8 - the length prefix size
for (let offset = 0; offset < input.byteLength;) {

if (input.byteLength < offset + 8) {
throw new Error("Cannot read message size")
}

// Note javascript bitwise operators only support 32-bit integers - for now cutting bigger messages.
// Tracking bug https://github.com/aspnet/SignalR/issues/613
if (!(uint8Array[offset] == 0 && uint8Array[offset + 1] == 0 && uint8Array[offset + 2] == 0
&& uint8Array[offset + 3] == 0 && (uint8Array[offset + 4] & 0x80) == 0)) {
throw new Error("Messages bigger than 2147483647 bytes are not supported");
}

let size = 0;
for (let i = 4; i < 8; i++) {
size = (size << 8) | uint8Array[offset + i];
}

if (uint8Array.byteLength >= (offset + 8 + size)) {
result.push(uint8Array.slice(offset + 8, offset + 8 + size))
}
else {
throw new Error("Incomplete message");
}

offset = offset + 8 + size;
}

return result;
}
}
Expand Up @@ -16,8 +16,9 @@ export class HubConnection {
private connectionClosedCallback: ConnectionClosed;
private protocol: IHubProtocol;

constructor(connection: IConnection) {
constructor(connection: IConnection, protocol: IHubProtocol = new JsonHubProtocol()) {
this.connection = connection;
this.protocol = protocol || new JsonHubProtocol();
this.connection.onDataReceived = data => {
this.onDataReceived(data);
};
Expand All @@ -28,7 +29,6 @@ export class HubConnection {
this.callbacks = new Map<string, (invocationEvent: CompletionMessage | ResultMessage) => void>();
this.methods = new Map<string, (...args: any[]) => void>();
this.id = 0;
this.protocol = new JsonHubProtocol();
}

private onDataReceived(data: any) {
Expand Down
Expand Up @@ -30,6 +30,6 @@ export interface NegotiationMessage {

export interface IHubProtocol {
name(): string;
parseMessages(input: string): HubMessage[];
writeMessage(message: HubMessage): string;
parseMessages(input: any): HubMessage[];
writeMessage(message: HubMessage): any;
}
@@ -0,0 +1,117 @@
import { IHubProtocol, MessageType, HubMessage, InvocationMessage, ResultMessage, CompletionMessage } from "./IHubProtocol";
import { BinaryMessageFormat } from "./Formatters"

var msgpack = require("msgpack-lite");

export class MessagePackHubProtocol implements IHubProtocol {
name(): string {
return "messagepack";
}

parseMessages(input: ArrayBuffer): HubMessage[] {
return BinaryMessageFormat.parse(input).map(m => this.parseMessage(m));
}

private parseMessage(input: Uint8Array): HubMessage {
if (input.length == 0) {
throw new Error("Invalid payload.");
}

let properties = msgpack.decode(input);
if (properties.length == 0 || !(properties instanceof Array)) {
throw new Error("Invalid payload.");
}

let messageType = properties[0] as MessageType;
switch (messageType) {
case MessageType.Invocation:
return this.createInvocationMessage(properties);
case MessageType.Result:
return this.createStreamItemMessage(properties);
case MessageType.Completion:
return this.createCompletionMessage(properties);
default:
throw new Error("Invalid message type.");
}
}

private createInvocationMessage(properties: any[]): InvocationMessage {
if (properties.length != 5) {
throw new Error("Invalid payload for Invocation message.");
}

return {
type: MessageType.Invocation,
invocationId: properties[1],
nonblocking: properties[2],
target: properties[3],
arguments: properties[4]
} as InvocationMessage;
}

private createStreamItemMessage(properties: any[]): ResultMessage {
if (properties.length != 3) {
throw new Error("Invalid payload for stream Result message.");
}

return {
type: MessageType.Result,
invocationId: properties[1],
item: properties[2]
} as ResultMessage;
}

private createCompletionMessage(properties: any[]): CompletionMessage {
if (properties.length < 3) {
throw new Error("Invalid payload for Completion message.");
}

const errorResult = 1;
const voidResult = 2;
const nonVoidResult = 3;

let resultKind = properties[2];

if ((resultKind === voidResult && properties.length != 3) ||
(resultKind !== voidResult && properties.length != 4)) {
throw new Error("Invalid payload for Completion message.");
}

let completionMessage = {
type: MessageType.Completion,
invocationId: properties[1],
error: null as string,
result: null as any
};

switch (resultKind) {
case errorResult:
completionMessage.error = properties[3];
break;
case nonVoidResult:
completionMessage.result = properties[3];
break;
}

return completionMessage as ResultMessage;
}

writeMessage(message: HubMessage): ArrayBuffer {
switch (message.type) {
case MessageType.Invocation:
return this.writeInvocation(message as InvocationMessage);
case MessageType.Result:
case MessageType.Completion:
throw new Error(`Writing messages of type '${message.type}' is not supported.`);
default:
throw new Error("Invalid message type.");
}
}

private writeInvocation(invocationMessage: InvocationMessage): ArrayBuffer {
let payload = msgpack.encode([ MessageType.Invocation, invocationMessage.invocationId,
invocationMessage.nonblocking, invocationMessage.target, invocationMessage.arguments]);

return BinaryMessageFormat.write(payload);
}
}
Expand Up @@ -24,6 +24,7 @@ gulp.task('browserify-client', ['compile-ts-client'], () => {
.pipe(gulp.dest(clientOutDir + '/../browser'));
});


gulp.task('build-ts-client', ['clean', 'compile-ts-client', 'browserify-client']);

gulp.task('default', ['build-ts-client']);
4 changes: 4 additions & 0 deletions client-ts/package.json
Expand Up @@ -28,7 +28,11 @@
"gulp": "^3.9.1",
"gulp-typescript": "^3.1.3",
"jasmine": "^2.5.2",
"msgpack-lite": "^0.1.26",
"typescript": "^2.0.10",
"vinyl-source-stream": "^1.1.0"
},
"dependencies": {
"@types/node": "^8.0.5"
}
}

0 comments on commit bbe5f52

Please sign in to comment.