Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Merge 64ca2d9 into c937206
Browse files Browse the repository at this point in the history
  • Loading branch information
jperata committed Dec 15, 2017
2 parents c937206 + 64ca2d9 commit 073c2b1
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 140 deletions.
35 changes: 21 additions & 14 deletions lib/client/bespoke-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Global} from "../core/global";
import {SocketHandler} from "../core/socket-handler";
import {SocketHandler, SocketMessage} from "../core/socket-handler";
import {WebhookRequest} from "../core/webhook-request";
import {TCPClient} from "./tcp-client";
import {NetworkErrorType} from "../core/global";
Expand Down Expand Up @@ -49,8 +49,8 @@ export class BespokeClient {
self.connected(error);
},

function(data: string, messageID?: number) {
self.messageReceived(data, messageID);
function(socketMessage: SocketMessage) {
self.messageReceived(socketMessage);
}
);

Expand Down Expand Up @@ -107,14 +107,19 @@ export class BespokeClient {

if (!secretKeyValidated) {
const errorMessage = "Unauthorized request";
this.socketHandler.send(HTTPBuffer.errorResponse(errorMessage).raw().toString(), request.id());
this.socketHandler.send(new SocketMessage(HTTPBuffer.errorResponse(errorMessage).raw(), request.id()));
return;
}
}

// Print out the contents of the request body to the console
LoggingHelper.info(Logger, "RequestReceived: " + request.toString() + " ID: " + request.id());
LoggingHelper.verbose(Logger, "Payload:\n" + chalk.hex(LoggingHelper.REQUEST_COLOR)(StringUtil.prettyPrintJSON(request.body)));
try {
JSON.parse(request.body);
LoggingHelper.verbose(Logger, "Payload:\n" + chalk.hex(LoggingHelper.REQUEST_COLOR)(StringUtil.prettyPrintJSON(request.body)));
} catch (error) {
LoggingHelper.verbose(Logger, "Payload:\n" + chalk.hex(LoggingHelper.REQUEST_COLOR)("< Binary data >"));
}

const tcpClient = new TCPClient(request.id() + "");
const httpBuffer = new HTTPBuffer();
Expand All @@ -128,10 +133,12 @@ export class BespokeClient {
if (httpBuffer.complete()) {
LoggingHelper.info(Logger, "ResponseReceived ID: " + request.id());
let payload: string = null;
const bodyToString = httpBuffer.body().toString();
if (httpBuffer.isJSON()) {
payload = StringUtil.prettyPrintJSON(httpBuffer.body().toString());
payload = StringUtil.prettyPrintJSON(bodyToString);
} else {
payload = httpBuffer.body().toString();
// Check for non printable characters
payload = /[\x00-\x1F]/.test(bodyToString) ? "< Binary data >" : bodyToString;
}

// Errors managed by us
Expand All @@ -140,15 +147,15 @@ export class BespokeClient {
} else {
LoggingHelper.verbose(Logger, "Payload:\n" + chalk.cyan(payload));
}
self.socketHandler.send(httpBuffer.raw().toString(), request.id());
self.socketHandler.send(new SocketMessage(httpBuffer.raw(), request.id()));
}
} else if (error !== null && error !== undefined) {
if (error === NetworkErrorType.CONNECTION_REFUSED) {
LoggingHelper.error(Logger, chalk.red("CLIENT Connection Refused, Port " + self.targetPort + ". Is your server running?"));
}

const errorMessage = "BST Proxy - Local Forwarding Error\n" + message;
self.socketHandler.send(HTTPBuffer.errorResponse(errorMessage).raw().toString(), request.id());
self.socketHandler.send(new SocketMessage(HTTPBuffer.errorResponse(errorMessage).raw(), request.id()));

if (self.onError != null) {
self.onError(error, message);
Expand Down Expand Up @@ -184,7 +191,7 @@ export class BespokeClient {
const messageJSON = {"id": this.nodeID};
const message = JSON.stringify(messageJSON);

this.socketHandler.send(message);
this.socketHandler.send(new SocketMessage(message));
if (this.onConnect !== undefined && this.onConnect !== null) {
this.onConnect();
}
Expand All @@ -199,14 +206,14 @@ export class BespokeClient {
}
}

private messageReceived (message: string, messageID?: number) {
private messageReceived (socketMessage: SocketMessage) {
// First message we get back is an ack
if (message.indexOf("ACK") !== -1) {
if (socketMessage.contains("ACK")) {

} else if (message.indexOf(Global.KeepAliveMessage) !== -1) {
} else if (socketMessage.contains(Global.KeepAliveMessage)) {
this.keepAlive.received();
} else {
this.onWebhookReceived(WebhookRequest.fromString(this.socketHandler.socket, message, messageID));
this.onWebhookReceived(WebhookRequest.fromString(this.socketHandler.socket, socketMessage.asString(), socketMessage.getMessageID()));
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/client/keep-alive.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {SocketHandler} from "../core/socket-handler";
import {SocketHandler, SocketMessage} from "../core/socket-handler";
import {Global} from "../core/global";

const KeepAlivePeriod = 30000; // Ping every 30 seconds
Expand Down Expand Up @@ -49,7 +49,7 @@ export class KeepAlive {

this.timeout = setTimeout(function () {
if (!self.stopped) {
self.socket.send(Global.KeepAliveMessage);
self.socket.send(new SocketMessage(Global.KeepAliveMessage));
self.keepAlive();
}
}, this.pingPeriod);
Expand Down
8 changes: 4 additions & 4 deletions lib/client/tcp-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class TCPClient {
public onCloseCallback: () => void;
public constructor (public id: string) {}

public transmit(host: string, port: number, data: string, callback: TCPClientCallback) {
public transmit(host: string, port: number, requestData: string, callback: TCPClientCallback) {
let self = this;
let client = new net.Socket();
LoggingHelper.info(Logger, "Forwarding " + host + ":" + port);
Expand All @@ -28,14 +28,14 @@ export class TCPClient {

client.connect(port, host, function () {
// Write a message to the socket as soon as the client is connected, the server will receive it as message from the client
client.write(data);
client.write(requestData);
});


// Add a 'data' event handler for the client socket
// data is what the server sent to this socket
client.on("data", function(data: Buffer) {
callback(data, null, null);
client.on("data", function(incomingData: Buffer) {
callback(incomingData, null, null);
});

// Add a 'close' event handler for the client socket
Expand Down
24 changes: 12 additions & 12 deletions lib/core/http-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ export class HTTPBuffer {

if (this._headers === undefined) {
// Scan for \r\n\r\n - indicates the end of the headers
let endIndex = BufferUtil.scan(this._rawContent, [13, 10, 13, 10]);
const endIndex = BufferUtil.scan(this._rawContent, [13, 10, 13, 10]);
if (endIndex !== -1) {
let headerBuffer = this._rawContent.slice(0, endIndex);
const headerBuffer = this._rawContent.slice(0, endIndex);
this.parseHeaders(headerBuffer.toString());

if (endIndex + 4 < this._rawContent.length) {
let bodyPart = this._rawContent.slice((endIndex + 4));
const bodyPart = this._rawContent.slice((endIndex + 4));
this.appendBody(bodyPart);
}
}
Expand All @@ -59,9 +59,9 @@ export class HTTPBuffer {
if (!this._complete) {
// If we have the headers, then check the body
if (this._headers !== undefined) {
let chunked = this.hasHeader("Transfer-Encoding") && this.header("Transfer-Encoding").toLowerCase() === "chunked";
const chunked = this.hasHeader("Transfer-Encoding") && this.header("Transfer-Encoding").toLowerCase() === "chunked";
if (chunked && this._rawBody !== undefined) {
let chunks = this.parseChunks();
const chunks = this.parseChunks();
// Only store the chunks if they are finalized
if (chunks !== null && chunks.length > 0 && chunks[chunks.length - 1].lastChunk()) {
this._chunks = chunks;
Expand Down Expand Up @@ -164,7 +164,7 @@ export class HTTPBuffer {
// Keep looping until we either hit the final chunk (zero-length)
// Or we get an incomplete chunk
while (true) {
let chunk = HTTPChunk.parse(body);
const chunk = HTTPChunk.parse(body);
if (chunk !== null) {
chunks.push(chunk);
} else {
Expand Down Expand Up @@ -192,24 +192,24 @@ export class HTTPBuffer {

private parseHeaders(headersString: string): void {
this._headers = {};
let lines: Array<string> = headersString.split("\n");
const lines: Array<string> = headersString.split("\n");
// This is a response if it starts with HTTP
if (lines[0].startsWith("HTTP")) {
this._statusLine = lines[0];
let statusLineParts: Array<string> = this._statusLine.split(" ");
const statusLineParts: Array<string> = this._statusLine.split(" ");
this._statusCode = parseInt(statusLineParts[1]);
} else {
this._requestLine = lines[0];
let requestLineParts: Array<string> = this._requestLine.split(" ");
const requestLineParts: Array<string> = this._requestLine.split(" ");
this._method = requestLineParts[0];
this._uri = requestLineParts[1];
}

// Handle the headers
for (let i = 1; i < lines.length; i++) {
let headerLine: string = lines[i];
let headerParts: Array<string> = headerLine.split(":");
let key = headerParts[0];
const headerLine: string = lines[i];
const headerParts: Array<string> = headerLine.split(":");
const key = headerParts[0];
this._headers[key] = headerParts[1].trim();
}
}
Expand Down
99 changes: 69 additions & 30 deletions lib/core/socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ let Logger = "SOCKET";
* Manages the low-level socket communications
*/
export class SocketHandler {
public buffer: string = "";
public buffer: Buffer = Buffer.from("");
public onDataCallback: (data: Buffer) => void;
public onCloseCallback: () => void = null;
private connected: boolean = true;

public static connect(host: string, port: number, onConnect: (error?: any) => void, onMessage: (message: string, messageID?: number) => void): SocketHandler {
public static connect(host: string, port: number, onConnect: (error?: any) => void, onMessage: (socketMessage: SocketMessage) => void): SocketHandler {
let socket = new net.Socket();
let handler = new SocketHandler(socket, onMessage);
handler.connected = false;
Expand All @@ -34,12 +34,12 @@ export class SocketHandler {
return handler;
}

public constructor (public socket: Socket, private onMessage: (message: string, sequenceNumber?: number) => void) {
public constructor (public socket: Socket, private onMessage: (socketMessage: SocketMessage) => void) {
let self = this;

// Set this as instance variable to make it easier to test
this.onDataCallback = function(data: Buffer) {
self.handleData(data.toString());
self.handleData(data);
};

// Add a 'data' event handler to this instance of socket
Expand Down Expand Up @@ -68,61 +68,61 @@ export class SocketHandler {
/**
* Handles incoming data
* Finds the delimiter and sends callbacks, potentially multiple times as multiple messages can be received at once
* @param dataString
* @param data
*/
private handleData(dataString: string): void {
if (dataString !== null) {
this.buffer += dataString;
private handleData(data: Buffer): void {
if (data !== null) {
this.buffer = Buffer.concat([this.buffer, data]);
}

let delimiterIndex = this.buffer.indexOf(Global.MessageDelimiter);
const dataString = this.buffer.toString();

const delimiterIndex = dataString.indexOf(Global.MessageDelimiter);

if (delimiterIndex > -1) {
let messageIDIndex = delimiterIndex - Global.MessageIDLength;
let badMessage = false;
if (messageIDIndex < 0) {
badMessage = true;
}
const messageIDIndex = delimiterIndex - Global.MessageIDLength;

let badMessage: boolean = messageIDIndex < 0;

let message = this.buffer.substring(0, messageIDIndex);
const message = this.buffer.slice(0, messageIDIndex);
// Grab the message ID - it precedes the delimiter
let messageIDString = this.buffer.substring(delimiterIndex - Global.MessageIDLength, delimiterIndex);
let messageID: number = parseInt(messageIDString);
const messageIDString = dataString.slice(delimiterIndex - Global.MessageIDLength, delimiterIndex);
const messageID: number = parseInt(messageIDString);
if (isNaN(messageID) || (messageID + "").length < 13) {
badMessage = true;
}

if (badMessage) {
LoggingHelper.error(Logger, "Bad message received: " + dataString);
} else {
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " ID: " + messageID + " MSG: " + StringUtil.prettyPrint(message));
this.onMessage(message, messageID);
const socketMessage = new SocketMessage(message, messageID);

LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " ID: " + messageID + " MSG: "
+ StringUtil.prettyPrint(socketMessage.messageForLogging()));

this.onMessage(socketMessage);
}

this.buffer = this.buffer.slice(delimiterIndex + Global.MessageDelimiter.length);

// If we have received more than one packet at a time, handle it recursively
if (this.buffer.indexOf(Global.MessageDelimiter) !== -1) {
if (this.buffer.toString().indexOf(Global.MessageDelimiter) !== -1) {
this.handleData(null);
}
}
}

public send(message: string, messageID?: number) {
public send(socketMessage: SocketMessage) {
// If the socket was already closed, do not write anything
if (this.socket === null) {
LoggingHelper.warn(Logger, "Writing message to closed socket: " + messageID);
LoggingHelper.warn(Logger, "Writing message to closed socket: " + socketMessage.getMessageID());
return;
}

LoggingHelper.debug(Logger, "DATA SENT " + this.remoteEndPoint() + " SEQUENCE: " + messageID + " " + StringUtil.prettyPrint(message));
const dataSent = socketMessage.isString() ? socketMessage.asString() : "< Binary Data >";
LoggingHelper.debug(Logger, "DATA SENT " + this.remoteEndPoint() + " SEQUENCE: " + socketMessage.getMessageID() + " " + StringUtil.prettyPrint(dataSent));

// If no message ID is specified, just grab a timestamp
if (messageID === undefined || messageID === null) {
messageID = new Date().getTime();
}
// Use TOKEN as message delimiter
message = message + messageID + Global.MessageDelimiter;
this.socket.write(message, null);
this.socket.write(socketMessage.getFullMessage(), null);
}

public remoteAddress (): string {
Expand All @@ -148,3 +148,42 @@ export class SocketHandler {
}
}

export class SocketMessage {
private message: Buffer = Buffer.from("");
public constructor(message: string | Buffer, private sequenceNumber?: number) {
if (typeof message === "string") {
this.message = Buffer.concat([this.message, Buffer.from(message)]);
} else {
this.message = Buffer.concat([this.message, message]);
}
}

public getMessageID(): number {
return this.sequenceNumber ? this.sequenceNumber : new Date().getTime();
}

public asString(): string {
return this.message.toString();
}

public isString(): boolean {
return !/[\x00-\x1F]/.test(this.asString());
}

public messageForLogging(): string {
return this.isString() ? this.asString() : "< Binary Data>";
}

public getMessage() {
return this.message;
}

public getFullMessage() {
const messageID = this.getMessageID();
return Buffer.concat([this.message, Buffer.from(messageID.toString()), Buffer.from(Global.MessageDelimiter)]);
}

public contains(stringToFind: string) {
return this.asString().indexOf(stringToFind) > -1;
}
}
6 changes: 5 additions & 1 deletion lib/core/webhook-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,13 @@ export class WebhookRequest {
return nodeValue;
}

private extractNodeIdFromRequest(tcpString: string) {
return tcpString.replace("/?node-id=" + this.nodeID(), "/");
}

// Turns the webhook HTTP request into straight TCP payload
public toTCP (): string {
return this.rawContents.toString();
return this.extractNodeIdFromRequest(this.rawContents.toString());
}

public toString(): string {
Expand Down

0 comments on commit 073c2b1

Please sign in to comment.