Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Merge 65f0026 into f18a540
Browse files Browse the repository at this point in the history
  • Loading branch information
jkelvie committed Aug 31, 2016
2 parents f18a540 + 65f0026 commit 8aa04cf
Show file tree
Hide file tree
Showing 19 changed files with 238 additions and 106 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ lib/**/*.map
test/**/*.js
test/**/*.map

**/.DS_Store

typings/globals/
typings/modules/

Expand Down
21 changes: 9 additions & 12 deletions lib/client/bespoke-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ export class BespokeClient {
function(error: any) {
self.connected(error);
},
function(data: string) {
self.messageReceived(data);

function(data: string, messageID?: number) {
self.messageReceived(data, messageID);
}
);

Expand All @@ -53,14 +54,14 @@ export class BespokeClient {
}
};

this.onWebhookReceived = function(socket: Socket, request: WebhookRequest) {
this.onWebhookReceived = function(request: WebhookRequest) {
let self = this;
LoggingHelper.info(Logger, "OnWebhook: " + request.toString());

let tcpClient = new TCPClient(request.id());
let tcpClient = new TCPClient(request.id() + "");
tcpClient.transmit("localhost", self.targetPort, request.toTCP(), function(data: string, error: NetworkErrorType, message: string) {
if (data != null) {
self.socketHandler.send(data);
self.socketHandler.send(data, request.id());
} else if (error !== null && error !== undefined) {
if (error === NetworkErrorType.CONNECTION_REFUSED) {
LoggingHelper.error(Logger, "CLIENT Connection Refused, Port " + self.targetPort + ". Is your server running?");
Expand All @@ -87,10 +88,6 @@ export class BespokeClient {
return new KeepAlive(handler);
}

public send(message: string) {
this.socketHandler.send(message);
}

private connected(error?: any): void {
if (error !== undefined && error !== null) {
LoggingHelper.error(Logger, "Unable to connect to: " + this.host + ":" + this.port);
Expand All @@ -104,21 +101,21 @@ export class BespokeClient {
let messageJSON = {"id": this.nodeID};
let message = JSON.stringify(messageJSON);

this.send(message);
this.socketHandler.send(message);
if (this.onConnect !== undefined && this.onConnect !== null) {
this.onConnect();
}
}
}

private messageReceived (message: string) {
private messageReceived (message: string, messageID?: number) {
// First message we get back is an ack
if (message.indexOf("ACK") !== -1) {
// console.log("Client: ACK RECEIVED");
} else if (message.indexOf(Global.KeepAliveMessage) !== -1) {
this.keepAlive.received();
} else {
this.onWebhookReceived(this.socketHandler.socket, WebhookRequest.fromString(message));
this.onWebhookReceived(WebhookRequest.fromString(this.socketHandler.socket, message, messageID));
}
}

Expand Down
35 changes: 33 additions & 2 deletions lib/client/lambda-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,23 @@ export class LambdaRunner {
private dirty: boolean = false;
private lambda: any = null;
private watcher: FSWatcher = null;
private requests: Array<IncomingMessage> = [];
public onDirty: () => void = null; // Callback for test-ability

public constructor(public file: string, public port: number) {}
/**
* The file to run
* @param file
* @param port
* @param debug
*/
public constructor(public file: string, public port: number, public debug?: boolean) {
// We need the debug flag for testing this class
// It causes us to store off the requests so that we can close sockets and shutdown quickly
// Otherwise we have to wait for timeouts
if (this.debug === undefined) {
this.debug = false;
}
}

public start (callback?: () => void): void {
let self = this;
Expand All @@ -36,6 +50,10 @@ export class LambdaRunner {
this.server = http.createServer();
this.server.listen(this.port);
this.server.on("request", function(request: IncomingMessage, response: ServerResponse) {
if (self.debug) {
self.requests.push(request);
}

let requestBody: string = "";
request.on("data", function(chunk: Buffer) {
requestBody += chunk.toString();
Expand Down Expand Up @@ -78,6 +96,17 @@ export class LambdaRunner {

public stop (onStop?: () => void): void {
this.watcher.close();

if (this.debug) {
let request: IncomingMessage = null;
for (request of this.requests) {
try {
request.socket.end();
} catch (e) {

}
}
}
this.server.close(function () {
if (onStop !== undefined && onStop !== null) {
onStop();
Expand Down Expand Up @@ -118,7 +147,9 @@ export class LambdaContext {
});

if (body) {
this.response.end(new Buffer(bodyString));
this.response.end(new Buffer(bodyString), function () {
console.log("Done Written");
});
} else {
this.response.end();
}
Expand Down
1 change: 1 addition & 0 deletions lib/core/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {BSTConfig} from "../client/bst-config";
import {BSTProcess} from "../client/bst-config";
export class Global {
public static MessageDelimiter = "4772616365";
public static MessageIDLength = 13;
public static KeepAliveMessage = "KEEPALIVE";
public static BespokeServerHost = "proxy.bespoken.tools";
private static configuration: BSTConfig = null;
Expand Down
24 changes: 15 additions & 9 deletions lib/core/socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class SocketHandler {
public onCloseCallback: () => void = null;
private connected: boolean = true;

public static connect(host: string, port: number, onConnect: (error?: any) => void, onMessage: (message: string) => void): SocketHandler {
public static connect(host: string, port: number, onConnect: (error?: any) => void, onMessage: (message: string, messageID?: number) => void): SocketHandler {
let socket = new net.Socket();
let handler = new SocketHandler(socket, onMessage);
handler.connected = false;
Expand All @@ -35,7 +35,7 @@ export class SocketHandler {
return handler;
}

public constructor (public socket: Socket, private onMessage: (message: string) => void) {
public constructor (public socket: Socket, private onMessage: (message: string, sequenceNumber?: number) => void) {
let self = this;

// Set this as instance variable to make it easier to test
Expand Down Expand Up @@ -73,16 +73,18 @@ export class SocketHandler {
*/
private handleData(dataString: string): void {
if (dataString !== null) {
LoggingHelper.debug(Logger, "DATA RAW " + this.remoteEndPoint() + " " + StringUtil.prettyPrint(dataString));
this.buffer += dataString;
}

let delimiterIndex = this.buffer.indexOf(Global.MessageDelimiter);
if (delimiterIndex > -1) {
let message = this.buffer.substring(0, delimiterIndex);
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " " + StringUtil.prettyPrint(message));
let message = this.buffer.substring(0, delimiterIndex - Global.MessageIDLength);
// Grab the message ID - it precedes the delimiter
let messageIDString = this.buffer.substring(delimiterIndex - Global.MessageIDLength, delimiterIndex);
let messageID: number = parseInt(messageIDString);
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " ID: " + messageID + " MSG: " + StringUtil.prettyPrint(message));

this.onMessage(message);
this.onMessage(message, messageID);
this.buffer = this.buffer.slice(delimiterIndex + Global.MessageDelimiter.length);

// If we have received more than one packet at a time, handle it recursively
Expand All @@ -92,11 +94,15 @@ export class SocketHandler {
}
}

public send(message: string) {
LoggingHelper.debug(Logger, "DATA SENT " + this.remoteEndPoint() + " " + StringUtil.prettyPrint(message));
public send(message: string, messageID?: number) {
LoggingHelper.debug(Logger, "DATA SENT " + this.remoteEndPoint() + " SEQUENCE: " + messageID + " " + StringUtil.prettyPrint(message));

// If no message ID is specified, just grab a timestamp
if (messageID === undefined || messageID === null) {
messageID = new Date().getTime();
}
// Use TOKEN as message delimiter
message = message + Global.MessageDelimiter;
message = message + messageID + Global.MessageDelimiter;
this.socket.write(message, null);
}

Expand Down
16 changes: 9 additions & 7 deletions lib/core/webhook-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import * as querystring from "querystring";
import {BufferUtil} from "./buffer-util";
import {Socket} from "net";

export class WebhookRequest {
public rawContents: Buffer;
Expand All @@ -10,17 +11,18 @@ export class WebhookRequest {
public body: string;
public headers: { [id: string]: string };
public queryParameters: {[id: string]: string} = {};
private requestTimestamp: number;
private requestID: number;

public constructor() {
public constructor(public sourceSocket: Socket) {
this.rawContents = new Buffer("");
this.body = "";
this.requestTimestamp = new Date().getTime();
this.requestID = new Date().getTime();
}

public static fromString(payload: string): WebhookRequest {
let webhookRequest = new WebhookRequest();
public static fromString(sourceSocket: Socket, payload: string, id?: number): WebhookRequest {
let webhookRequest = new WebhookRequest(sourceSocket);
webhookRequest.append(BufferUtil.fromString(payload));
webhookRequest.requestID = id;
return webhookRequest;
}

Expand Down Expand Up @@ -107,7 +109,7 @@ export class WebhookRequest {
return this.method + " " + this.uri;
}

public id(): string {
return this.requestTimestamp + "";
public id(): number {
return this.requestID;
}
}
10 changes: 5 additions & 5 deletions lib/server/bespoke-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ export class BespokeServer {

this.webhookManager = new WebhookManager(this.webhookPort);
this.webhookManager.start(callbackCounter);
this.webhookManager.onWebhookReceived = function(socket: Socket, webhookRequest: WebhookRequest) {
this.webhookManager.onWebhookReceived = function(webhookRequest: WebhookRequest) {
// Check if this is a ping
if (webhookRequest.isPing()) {
HTTPHelper.respond(socket, 200, "bst-server-" + Global.version());
HTTPHelper.respond(webhookRequest.sourceSocket, 200, "bst-server-" + Global.version());

} else {
if (webhookRequest.nodeID() === null) {
HTTPHelper.respond(socket, 400, "No node specified. Must be included with the querystring as node-id.");
HTTPHelper.respond(webhookRequest.sourceSocket, 400, "No node specified. Must be included with the querystring as node-id.");
} else {
// Lookup the node
let node = self.nodeManager.node(webhookRequest.nodeID());
if (node == null) {
HTTPHelper.respond(socket, 404, "Node is not active: " + webhookRequest.nodeID());
HTTPHelper.respond(webhookRequest.sourceSocket, 404, "Node is not active: " + webhookRequest.nodeID());
} else {
node.forward(socket, webhookRequest);
node.forward(webhookRequest);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions lib/server/node-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class NodeManager {
this.server = net.createServer(function(socket: Socket) {
let initialConnection = true;
let node: Node = null;
let socketHandler = new SocketHandler(socket, function(message: string) {
let socketHandler = new SocketHandler(socket, function(message: string, messageID?: number) {
// We do special handling when we first connect
if (initialConnection) {
let connectData = JSON.parse(message);
Expand All @@ -49,7 +49,7 @@ export class NodeManager {

} else if (node.handlingRequest()) {
// Handle the case where the data received is a reply from the node to data sent to it
node.onReply(message);
node.onReply(message, messageID);
}
});

Expand Down Expand Up @@ -84,6 +84,7 @@ export class NodeManager {
// Reply with the same message on a Keep Alive
node.socketHandler.send(Global.KeepAliveMessage);
}

/**
* Calling stop tells the server to stop listening
* However, connections are not closed until all sockets disconnect, so loop through sockets and force a disconnect
Expand Down
36 changes: 21 additions & 15 deletions lib/server/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,37 @@ import {NodeManager} from "./node-manager";
import {Global} from "../core/global";
import {SocketHandler} from "../core/socket-handler";
import {WebhookRequest} from "../core/webhook-request";
import {LoggingHelper} from "../core/logging-helper";

const Logger = "NODE";

export class Node {
private activeRequest: WebhookRequest;
private sourceSocket: Socket;
private requests: {[id: number]: WebhookRequest} = {};

constructor(public id: string, public socketHandler: SocketHandler) {}

public forward(sourceSocket: Socket, request: WebhookRequest) {
console.log("NODE " + this.id + " Forwarding");
this.socketHandler.send(request.toTCP());
this.activeRequest = request;
this.sourceSocket = sourceSocket;
public forward(request: WebhookRequest): void {
console.log("NODE " + this.id + " MSG-ID: " + request.id() + " Forwarding");
this.requests[request.id()] = request;
this.socketHandler.send(request.toTCP(), request.id());
}

public handlingRequest(): boolean {
return (this.activeRequest !== null);
return (Object.keys(this.requests).length > 0);
}

public onReply(message: string): void {
public onReply(message: string, messageID: number): void {
let self = this;
console.log("NODE " + this.id + " ReplyReceived");
this.sourceSocket.write(message, function () {
// Reset the state of the request handling after passing along the reply
self.sourceSocket = null;
self.activeRequest = null;
});
console.log("NODE " + this.id + " MSG-ID: " + messageID + " ReplyReceived");

let request = this.requests[messageID];
if (request === null) {
LoggingHelper.info(Logger, "No matching messageID for reply: " + messageID);
} else {
request.sourceSocket.write(message, function () {
delete self.requests[messageID];
});
}

}
}
8 changes: 4 additions & 4 deletions lib/server/webhook-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {LoggingHelper} from "../core/logging-helper";
let Logger = "WEBHOOK";

export interface WebhookReceivedCallback {
(socket: Socket, webhookRequest: WebhookRequest): void;
(webhookRequest: WebhookRequest): void;
}
export class WebhookManager {
private server: Server;
Expand All @@ -27,7 +27,7 @@ export class WebhookManager {

let socketIndex = 0;
this.server = net.createServer(function(socket: Socket) {
let webhookRequest = new WebhookRequest();
let webhookRequest = new WebhookRequest(socket);
socketIndex++;

let socketKey = socketIndex;
Expand All @@ -45,12 +45,12 @@ export class WebhookManager {
// If we do it on the write callback on the socket, the original caller never gets the response
// For now, if we get a second request on the socket, re-initialize the webhookRequest
if (webhookRequest.done()) {
webhookRequest = new WebhookRequest();
webhookRequest = new WebhookRequest(socket);
}

webhookRequest.append(data);
if (webhookRequest.done()) {
self.onWebhookReceived(socket, webhookRequest);
self.onWebhookReceived(webhookRequest);
}
});

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "bespoken-tools",
"license": "SEE LICENSE IN LICENSE",
"private": false,
"version": "0.6.5",
"version": "0.6.10",
"bin": {
"bst": "./bin/bst.js",
"bst-server": "./bin/bst-server.js"
Expand Down

0 comments on commit 8aa04cf

Please sign in to comment.