Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Merge a384d2c into c937206
Browse files Browse the repository at this point in the history
  • Loading branch information
jperata committed Dec 11, 2017
2 parents c937206 + a384d2c commit 6105da2
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 54 deletions.
7 changes: 4 additions & 3 deletions lib/client/bespoke-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ export class BespokeClient {
if (httpBuffer.complete()) {
LoggingHelper.info(Logger, "ResponseReceived ID: " + request.id());
let payload: string = null;
const bodyToString = httpBuffer.body().toString();
if (httpBuffer.isJSON()) {
payload = StringUtil.prettyPrintJSON(httpBuffer.body().toString());
payload = StringUtil.prettyPrintJSON(bodyToString);
} else {
payload = httpBuffer.body().toString();
payload = bodyToString;
}

// Errors managed by us
Expand All @@ -140,7 +141,7 @@ export class BespokeClient {
} else {
LoggingHelper.verbose(Logger, "Payload:\n" + chalk.cyan(payload));
}
self.socketHandler.send(httpBuffer.raw().toString(), request.id());
self.socketHandler.send(httpBuffer.raw(), request.id());
}
} else if (error !== null && error !== undefined) {
if (error === NetworkErrorType.CONNECTION_REFUSED) {
Expand Down
8 changes: 4 additions & 4 deletions lib/client/tcp-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class TCPClient {
public onCloseCallback: () => void;
public constructor (public id: string) {}

public transmit(host: string, port: number, data: string, callback: TCPClientCallback) {
public transmit(host: string, port: number, requestData: string, callback: TCPClientCallback) {
let self = this;
let client = new net.Socket();
LoggingHelper.info(Logger, "Forwarding " + host + ":" + port);
Expand All @@ -28,14 +28,14 @@ export class TCPClient {

client.connect(port, host, function () {
// Write a message to the socket as soon as the client is connected, the server will receive it as message from the client
client.write(data);
client.write(requestData);
});


// Add a 'data' event handler for the client socket
// data is what the server sent to this socket
client.on("data", function(data: Buffer) {
callback(data, null, null);
client.on("data", function(incommingData: Buffer) {
callback(incommingData, null, null);
});

// Add a 'close' event handler for the client socket
Expand Down
24 changes: 12 additions & 12 deletions lib/core/http-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ export class HTTPBuffer {

if (this._headers === undefined) {
// Scan for \r\n\r\n - indicates the end of the headers
let endIndex = BufferUtil.scan(this._rawContent, [13, 10, 13, 10]);
const endIndex = BufferUtil.scan(this._rawContent, [13, 10, 13, 10]);
if (endIndex !== -1) {
let headerBuffer = this._rawContent.slice(0, endIndex);
const headerBuffer = this._rawContent.slice(0, endIndex);
this.parseHeaders(headerBuffer.toString());

if (endIndex + 4 < this._rawContent.length) {
let bodyPart = this._rawContent.slice((endIndex + 4));
const bodyPart = this._rawContent.slice((endIndex + 4));
this.appendBody(bodyPart);
}
}
Expand All @@ -59,9 +59,9 @@ export class HTTPBuffer {
if (!this._complete) {
// If we have the headers, then check the body
if (this._headers !== undefined) {
let chunked = this.hasHeader("Transfer-Encoding") && this.header("Transfer-Encoding").toLowerCase() === "chunked";
const chunked = this.hasHeader("Transfer-Encoding") && this.header("Transfer-Encoding").toLowerCase() === "chunked";
if (chunked && this._rawBody !== undefined) {
let chunks = this.parseChunks();
const chunks = this.parseChunks();
// Only store the chunks if they are finalized
if (chunks !== null && chunks.length > 0 && chunks[chunks.length - 1].lastChunk()) {
this._chunks = chunks;
Expand Down Expand Up @@ -164,7 +164,7 @@ export class HTTPBuffer {
// Keep looping until we either hit the final chunk (zero-length)
// Or we get an incomplete chunk
while (true) {
let chunk = HTTPChunk.parse(body);
const chunk = HTTPChunk.parse(body);
if (chunk !== null) {
chunks.push(chunk);
} else {
Expand Down Expand Up @@ -192,24 +192,24 @@ export class HTTPBuffer {

private parseHeaders(headersString: string): void {
this._headers = {};
let lines: Array<string> = headersString.split("\n");
const lines: Array<string> = headersString.split("\n");
// This is a response if it starts with HTTP
if (lines[0].startsWith("HTTP")) {
this._statusLine = lines[0];
let statusLineParts: Array<string> = this._statusLine.split(" ");
const statusLineParts: Array<string> = this._statusLine.split(" ");
this._statusCode = parseInt(statusLineParts[1]);
} else {
this._requestLine = lines[0];
let requestLineParts: Array<string> = this._requestLine.split(" ");
const requestLineParts: Array<string> = this._requestLine.split(" ");
this._method = requestLineParts[0];
this._uri = requestLineParts[1];
}

// Handle the headers
for (let i = 1; i < lines.length; i++) {
let headerLine: string = lines[i];
let headerParts: Array<string> = headerLine.split(":");
let key = headerParts[0];
const headerLine: string = lines[i];
const headerParts: Array<string> = headerLine.split(":");
const key = headerParts[0];
this._headers[key] = headerParts[1].trim();
}
}
Expand Down
48 changes: 30 additions & 18 deletions lib/core/socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ let Logger = "SOCKET";
* Manages the low-level socket communications
*/
export class SocketHandler {
public buffer: string = "";
public buffer: Buffer = Buffer.from("");
public onDataCallback: (data: Buffer) => void;
public onCloseCallback: () => void = null;
private connected: boolean = true;

public static connect(host: string, port: number, onConnect: (error?: any) => void, onMessage: (message: string, messageID?: number) => void): SocketHandler {
public static connect(host: string, port: number, onConnect: (error?: any) => void, onMessage: (message: string | Buffer, messageID?: number) => void): SocketHandler {
let socket = new net.Socket();
let handler = new SocketHandler(socket, onMessage);
handler.connected = false;
Expand All @@ -34,12 +34,12 @@ export class SocketHandler {
return handler;
}

public constructor (public socket: Socket, private onMessage: (message: string, sequenceNumber?: number) => void) {
public constructor (public socket: Socket, private onMessage: (message: string | Buffer, sequenceNumber?: number) => void) {
let self = this;

// Set this as instance variable to make it easier to test
this.onDataCallback = function(data: Buffer) {
self.handleData(data.toString());
self.handleData(data);
};

// Add a 'data' event handler to this instance of socket
Expand Down Expand Up @@ -68,60 +68,72 @@ export class SocketHandler {
/**
* Handles incoming data
* Finds the delimiter and sends callbacks, potentially multiple times as multiple messages can be received at once
* @param dataString
* @param data
*/
private handleData(dataString: string): void {
if (dataString !== null) {
this.buffer += dataString;
private handleData(data: Buffer): void {
if (data !== null) {
// this.buffer += dataString;
this.buffer = Buffer.concat([this.buffer, data]);
}

let delimiterIndex = this.buffer.indexOf(Global.MessageDelimiter);
const dataString = this.buffer.toString();

const delimiterIndex = dataString.indexOf(Global.MessageDelimiter);

if (delimiterIndex > -1) {
let messageIDIndex = delimiterIndex - Global.MessageIDLength;
const messageIDIndex = delimiterIndex - Global.MessageIDLength;
let badMessage = false;
if (messageIDIndex < 0) {
badMessage = true;
}

let message = this.buffer.substring(0, messageIDIndex);
const message = this.buffer.slice(0, messageIDIndex);
// Grab the message ID - it precedes the delimiter
let messageIDString = this.buffer.substring(delimiterIndex - Global.MessageIDLength, delimiterIndex);
let messageID: number = parseInt(messageIDString);
const messageIDString = dataString.slice(delimiterIndex - Global.MessageIDLength, delimiterIndex);
const messageID: number = parseInt(messageIDString);
if (isNaN(messageID) || (messageID + "").length < 13) {
badMessage = true;
}

if (badMessage) {
LoggingHelper.error(Logger, "Bad message received: " + dataString);
} else {
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " ID: " + messageID + " MSG: " + StringUtil.prettyPrint(message));
if (typeof message === "string") {
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " ID: " + messageID + " MSG: " + StringUtil.prettyPrint(message));
} else {
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " ID: " + messageID + " MSG: < Binary Data >");
}
this.onMessage(message, messageID);
}

this.buffer = this.buffer.slice(delimiterIndex + Global.MessageDelimiter.length);

// If we have received more than one packet at a time, handle it recursively
if (this.buffer.indexOf(Global.MessageDelimiter) !== -1) {
if (this.buffer.toString().indexOf(Global.MessageDelimiter) !== -1) {
this.handleData(null);
}
}
}

public send(message: string, messageID?: number) {
public send(message: string | Buffer, messageID?: number) {
// If the socket was already closed, do not write anything
if (this.socket === null) {
LoggingHelper.warn(Logger, "Writing message to closed socket: " + messageID);
return;
}

LoggingHelper.debug(Logger, "DATA SENT " + this.remoteEndPoint() + " SEQUENCE: " + messageID + " " + StringUtil.prettyPrint(message));
LoggingHelper.debug(Logger, "DATA SENT " + this.remoteEndPoint() + " SEQUENCE: " + messageID + " " + StringUtil.prettyPrint(message.toString()));

// If no message ID is specified, just grab a timestamp
if (messageID === undefined || messageID === null) {
messageID = new Date().getTime();
}
// Use TOKEN as message delimiter
message = message + messageID + Global.MessageDelimiter;
if (typeof message === "string") {
message = message + messageID + Global.MessageDelimiter;
} else {
message = Buffer.concat([message, Buffer.from(messageID.toString()), Buffer.from(Global.MessageDelimiter)]);
}
this.socket.write(message, null);
}

Expand Down
6 changes: 5 additions & 1 deletion lib/core/webhook-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,13 @@ export class WebhookRequest {
return nodeValue;
}

private extractNodeIdFromRequest(tcpString: string) {
return tcpString.replace("/?node-id=" + this.nodeID(), "/");
}

// Turns the webhook HTTP request into straight TCP payload
public toTCP (): string {
return this.rawContents.toString();
return this.extractNodeIdFromRequest(this.rawContents.toString());
}

public toString(): string {
Expand Down
22 changes: 14 additions & 8 deletions lib/server/node-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {Global} from "../core/global";
import {LoggingHelper} from "../core/logging-helper";
import {Statistics, AccessType} from "./statistics";

let Logger = "NODEMGR";
const Logger = "NODEMGR";

export interface OnConnectCallback {
(node: Node): void;
Expand All @@ -28,19 +28,26 @@ export class NodeManager {
}

public start (callback?: () => void) {
let self = this;
const self = this;
this.server = net.createServer(function(socket: Socket) {
let initialConnection = true;
let node: Node = null;
let socketHandler = new SocketHandler(socket, function(message: string, messageID?: number) {
const socketHandler = new SocketHandler(socket, function(message: string | Buffer, messageID?: number) {
// We do special handling when we first connect
let strMessage: string = "";
if (typeof message !== "string") {
strMessage = message.toString();
} else {
strMessage = message;
}

if (initialConnection) {
let connectData: any = null;
try {
connectData = JSON.parse(message);
connectData = JSON.parse(strMessage);
} catch (e) {
// We just drop it the payload is not correct
LoggingHelper.error(Logger, "Error on parsing initial message: " + message);
LoggingHelper.error(Logger, "Error on parsing initial message: " + strMessage);
socketHandler.disconnect();
return;
}
Expand All @@ -57,7 +64,7 @@ export class NodeManager {

// Capture the connection
Statistics.instance().record(node.id, AccessType.CONNECT);
} else if (message === Global.KeepAliveMessage) {
} else if (strMessage === Global.KeepAliveMessage) {
NodeManager.onKeepAliveReceived(node);

} else if (node.handlingRequest()) {
Expand Down Expand Up @@ -105,7 +112,7 @@ export class NodeManager {
*/
public stop (callback: () => void): void {
for (let key of Object.keys(this.nodes)) {
let node: Node = this.node(key);
const node: Node = this.node(key);
node.socketHandler.disconnect();
LoggingHelper.info(Logger, "NODE CLOSING: " + node.id);
}
Expand All @@ -117,7 +124,6 @@ export class NodeManager {
LoggingHelper.info(Logger, "STOPPED");
callback();
}

});
}
}
6 changes: 3 additions & 3 deletions lib/server/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ export class Node {
return (Object.keys(this.requests).length > 0);
}

public onReply(message: string, messageID: number): void {
let self = this;
public onReply(message: string | Buffer, messageID: number): void {
const self = this;
console.log("NODE " + this.id + " MSG-ID: " + messageID + " ReplyReceived");

let request = this.requests[messageID];
const request = this.requests[messageID];
if (request === null) {
LoggingHelper.info(Logger, "No matching messageID for reply: " + messageID);
} else {
Expand Down
15 changes: 10 additions & 5 deletions test/client/bespoke-client-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@ describe("BespokeClient", function() {

describe("#connect()", function() {
it("Fails to connect", function() {
return new Promise(resolve => {
this.timeout(8000);
this.timeout(13000);

return new Promise((resolve, reject) => {
const client = new BespokeClient("JPKa", "localhost", 9000, "localhost", 9000 );
let reconnectAttempts = 0;
client.onReconnect = function (error: any) {
reconnectAttempts++;
};

client.onConnect = function (error: any) {
assert.equal(reconnectAttempts, BespokeClient.RECONNECT_MAX_RETRIES, "Not enough reconnects");
assert(error);
resolve();
try {
assert.equal(reconnectAttempts, BespokeClient.RECONNECT_MAX_RETRIES, "Not enough reconnects");
assert(error);
resolve();
} catch (assertErr) {
reject(assertErr);
}
};
client.connect();
});
Expand Down

0 comments on commit 6105da2

Please sign in to comment.