Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into readthedocs
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelmyers committed Aug 30, 2016
2 parents 809ab7e + 7b23da0 commit 90cd956
Show file tree
Hide file tree
Showing 19 changed files with 406 additions and 103 deletions.
30 changes: 17 additions & 13 deletions lib/client/bespoke-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ export class BespokeClient {
let self = this;
LoggingHelper.info(Logger, "OnWebhook: " + request.toString());

let tcpClient = new TCPClient();
let tcpClient = new TCPClient(request.id());
tcpClient.transmit("localhost", self.targetPort, request.toTCP(), function(data: string, error: NetworkErrorType, message: string) {
if (data != null) {
self.socketHandler.send(data);
} else if (error === NetworkErrorType.CONNECTION_REFUSED) {
LoggingHelper.error(Logger, "CLIENT Connection Refused, Port " + self.targetPort + ". Is your server running?");
} else if (error !== null && error !== undefined) {
if (error === NetworkErrorType.CONNECTION_REFUSED) {
LoggingHelper.error(Logger, "CLIENT Connection Refused, Port " + self.targetPort + ". Is your server running?");
}

if (self.onError != null) {
self.onError(error, message);
}
Expand Down Expand Up @@ -120,19 +123,20 @@ export class BespokeClient {
}

public shutdown(callback?: () => void): void {
let self = this;

LoggingHelper.info(Logger, "Shutting down proxy");
// We track that we are shutting down because a "close" event is sent
// We don't want to print out any errors in this case as it is expected

// We track that we are shutting down because a "close" event is sent to the main socket
// We normally print info on close, but not in this case
this.shuttingDown = true;

this.keepAlive.stop();

// Do not disconnect until keep alive has stopped
// Otherwise it may try to push data through the socket
this.keepAlive.stop(function () {
self.socketHandler.disconnect();
if (callback !== undefined && callback !== null) {
callback();
}
});
this.socketHandler.disconnect();

if (callback !== undefined && callback !== null) {
callback();
}
}
}
28 changes: 11 additions & 17 deletions lib/client/keep-alive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import {LoggingHelper} from "../core/logging-helper";
import {SocketHandler} from "../core/socket-handler";
import {Global} from "../core/global";

const KeepAlivePeriod = 1000;
const KeepAliveWindowPeriod = 60000;
const KeepAliveWarningThreshold = 3;
const KeepAlivePeriod = 30000; // Ping every 30 seconds
const KeepAliveWindowPeriod = 300000; // Check over a 5 Minute period
const KeepAliveWarningThreshold = 5; // Need to get more than five pings in that period

/**
* Handles keeping the client connection alive.
Expand All @@ -18,9 +18,8 @@ export class KeepAlive {

private keepAliveArray: Array<number> = []; // Rolling window of timestamps that correspond to errors
private startedTimestamp: number;
private shuttingDown: boolean = false;
private onFailureCallback: () => void;
private onStopCallback: () => void;
private timeout: any = null;

public constructor (private socket: SocketHandler) {}

Expand Down Expand Up @@ -49,15 +48,9 @@ export class KeepAlive {
}
}

setTimeout(function () {
if (self.shuttingDown) {
if (self.onStopCallback !== undefined && self.onStopCallback !== null) {
self.onStopCallback();
}
} else {
self.socket.send(Global.KeepAliveMessage);
self.keepAlive();
}
this.timeout = setTimeout(function () {
self.socket.send(Global.KeepAliveMessage);
self.keepAlive();
}, this.pingPeriod);
}

Expand All @@ -81,8 +74,9 @@ export class KeepAlive {
this.keepAliveArray.push(new Date().getTime());
}

public stop(stopped?: () => void): void {
this.onStopCallback = stopped;
this.shuttingDown = true;
public stop(): void {
if (this.timeout !== null) {
clearTimeout(this.timeout);
}
}
}
50 changes: 41 additions & 9 deletions lib/client/lambda-runner.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
import * as fs from "fs";
import * as http from "http";
import {IncomingMessage} from "http";
import {ServerResponse} from "http";
import {Server} from "http";
import {LoggingHelper} from "../core/logging-helper";
import {NodeUtil} from "../core/node-util";
import {FSWatcher} from "fs";

let Logger = "BST-LAMBDA";

export class LambdaRunner {
private server: Server = null;
private dirty: boolean = false;
private lambda: any = null;
private watcher: FSWatcher = null;
public onDirty: () => void = null; // Callback for test-ability

public constructor(public file: string, public port: number) {}

public start (): void {
public start (callback?: () => void): void {
let self = this;

// Add a watch to the current directory
let watchOptions = {"persistent": false, "recursive": true};
this.watcher = fs.watch(process.cwd(), watchOptions, function(event: string, filename: string) {
if (filename.indexOf("node_modules") === -1) {
LoggingHelper.info(Logger, "FS.Watch Event: " + event + ". File: " + filename + ". Reloading.");
self.dirty = true;
if (self.onDirty !== undefined && self.onDirty !== null) {
self.onDirty();
}
}
});

this.server = http.createServer();
this.server.listen(this.port);
this.server.on("request", function(request: IncomingMessage, response: ServerResponse) {
Expand All @@ -27,12 +46,11 @@ export class LambdaRunner {
});
});

this.server.on("error", function (message: string) {
LoggingHelper.error(Logger, "LambdaRunner encountered error: " + message);
});

this.server.on("listening", function () {
LoggingHelper.info(Logger, "LambdaRunner started on port: " + self.server.address().port.toString());
if (callback !== undefined && callback !== null) {
callback();
}
});
}

Expand All @@ -44,13 +62,22 @@ export class LambdaRunner {

LoggingHelper.info(Logger, "Invoked Lambda: " + this.file);
let bodyJSON: any = JSON.parse(body);
let lambda: any = NodeUtil.load(path);
if (this.lambda === null || this.dirty) {
this.lambda = NodeUtil.load(path);
this.dirty = false;
}

// let lambda = System.import("./" + file);
let context: LambdaContext = new LambdaContext(response);
lambda.handler(bodyJSON, context);
try {
this.lambda.handler(bodyJSON, context);
} catch (e) {
context.fail("Exception: " + e.message);
}
}

public stop (onStop?: () => void): void {
this.watcher.close();
this.server.close(function () {
if (onStop !== undefined && onStop !== null) {
onStop();
Expand All @@ -60,6 +87,7 @@ export class LambdaRunner {
}

export class LambdaContext {

public constructor(public response: ServerResponse) {}

public fail(body: any) {
Expand All @@ -71,6 +99,8 @@ export class LambdaContext {
}

private done(success: boolean, body: any) {
let self = this;

let statusCode: number = 200;
let contentType: string = "application/json";
let bodyString: string = null;
Expand All @@ -88,8 +118,10 @@ export class LambdaContext {
});

if (body) {
this.response.write(bodyString);
this.response.end(new Buffer(bodyString));
} else {
this.response.end();
}
this.response.end();

}
}
10 changes: 5 additions & 5 deletions lib/client/tcp-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ export interface TCPClientCallback {
}

export class TCPClient {


public constructor () {}
public constructor (public id: string) {}

public transmit(host: string, port: number, data: string, callback: TCPClientCallback) {
let self = this;
let client = new net.Socket();
LoggingHelper.info(Logger, "Forwarding " + host + ":" + port);

client.on("error", function (e: any) {
console.log("TCPClient Error: " + e.message);
if (e.code === "ECONNREFUSED") {
callback(null, NetworkErrorType.CONNECTION_REFUSED, e.message);
} else {
Expand All @@ -41,8 +41,8 @@ export class TCPClient {
});

// Add a 'close' event handler for the client socket
client.on("close", function() {
LoggingHelper.debug(Logger, "Connection closed");
client.on("close", function(had_error: boolean) {
LoggingHelper.debug(Logger, "Connection closed ID: " + self.id + " HadError: " + had_error);
});
}

Expand Down
7 changes: 3 additions & 4 deletions lib/core/http-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {BufferUtil} from "./buffer-util";

export class HTTPClient {

public post(host: string, port: number, path: string, data: string, callback?: (data: Buffer, success: boolean) => void) {
public post(host: string, port: number, path: string, data: string, callback?: (data: Buffer, statusCode: number, success: boolean) => void) {
// An object of options to indicate where to post to
let post_options = {
host: host,
Expand All @@ -33,19 +33,18 @@ export class HTTPClient {

response.on("end", function () {
if (callback !== undefined && callback !== null) {
callback(responseData, true);
callback(responseData, response.statusCode, true);
}
});
});

post_req.on("error", function (error: any) {
if (callback !== undefined && callback !== null) {
callback(BufferUtil.fromString(error.message), false);
callback(BufferUtil.fromString(error.message), 0, false);
}
});
// post the data
post_req.write(data);
post_req.end();

}
}
2 changes: 1 addition & 1 deletion lib/core/node-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ export class NodeUtil {
public static resetCache () {
let directory: string = process.cwd();
for (let file in require.cache) {

if (require.cache.hasOwnProperty(file)
&& file.startsWith(directory)
&& file.indexOf("node_modules") === -1) {
delete require.cache[require.resolve(file)];
let localPart = file.substr(directory.length);
LoggingHelper.debug(Logger, "ReloadCache: " + localPart);
}
}
}
Expand Down
32 changes: 14 additions & 18 deletions lib/core/socket-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ let Logger = "SOCKET";
* Manages the low-level socket communications
*/
export class SocketHandler {
public message: string = null;
public buffer: string = "";
public onDataCallback: (data: Buffer) => void;
public onCloseCallback: () => void = null;
private connected: boolean = true;
Expand All @@ -37,7 +37,6 @@ export class SocketHandler {

public constructor (public socket: Socket, private onMessage: (message: string) => void) {
let self = this;
this.resetBuffer();

// Set this as instance variable to make it easier to test
this.onDataCallback = function(data: Buffer) {
Expand Down Expand Up @@ -73,27 +72,24 @@ export class SocketHandler {
* @param dataString
*/
private handleData(dataString: string): void {
let delimiterIndex = dataString.indexOf(Global.MessageDelimiter);
if (delimiterIndex === -1) {
this.message += dataString;
} else {
this.message += dataString.substr(0, delimiterIndex);
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " " + StringUtil.prettyPrint(this.message));
if (dataString !== null) {
LoggingHelper.debug(Logger, "DATA RAW " + this.remoteEndPoint() + " " + StringUtil.prettyPrint(dataString));
this.buffer += dataString;
}

let delimiterIndex = this.buffer.indexOf(Global.MessageDelimiter);
if (delimiterIndex > -1) {
let message = this.buffer.substring(0, delimiterIndex);
LoggingHelper.debug(Logger, "DATA READ " + this.remoteEndPoint() + " " + StringUtil.prettyPrint(message));

this.onMessage(this.message);
this.resetBuffer();
this.onMessage(message);
this.buffer = this.buffer.slice(delimiterIndex + Global.MessageDelimiter.length);

// If we have received more than one packet at a time, handle it recursively
if (dataString.length > (dataString.indexOf(Global.MessageDelimiter) + Global.MessageDelimiter.length)) {
dataString = dataString.substr(dataString.indexOf(Global.MessageDelimiter) + Global.MessageDelimiter.length);
this.handleData(dataString);
if (this.buffer.indexOf(Global.MessageDelimiter) !== -1) {
this.handleData(null);
}
}

}

private resetBuffer(): void {
this.message = "";
}

public send(message: string) {
Expand Down
6 changes: 6 additions & 0 deletions lib/core/webhook-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ export class WebhookRequest {
public body: string;
public headers: { [id: string]: string };
public queryParameters: {[id: string]: string} = {};
private requestTimestamp: number;

public constructor() {
this.rawContents = new Buffer("");
this.body = "";
this.requestTimestamp = new Date().getTime();
}

public static fromString(payload: string): WebhookRequest {
Expand Down Expand Up @@ -104,4 +106,8 @@ export class WebhookRequest {
public toString(): string {
return this.method + " " + this.uri;
}

public id(): string {
return this.requestTimestamp + "";
}
}
Loading

0 comments on commit 90cd956

Please sign in to comment.