Skip to content
This repository has been archived by the owner on Jul 14, 2020. It is now read-only.

Proxy Server #10

Merged
merged 14 commits into from
Nov 14, 2019
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
},
"dependencies": {
"@types/express": "^4.17.1",
"@types/http-proxy": "^1.17.0",
"@types/node": "^12.7.11",
"effection": "~0.3.2",
"module-alias": "^2.2.2",
"http-proxy": "^1.18.0",
"trumpet": "^1.7.2",
"ts-node": "^8.4.1",
"typescript": "^3.6.3",
"websocket": "^1.0.30"
Expand Down
16 changes: 11 additions & 5 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@ import { on } from '@effection/events';
import { createServer, IncomingMessage, Response } from './http';
import { createSocketServer, Connection, Message, send } from './ws';
import { AddressInfo } from 'net';
import { createProxyServer } from './proxy';

// entry point for bigtestd
export function* main(): Sequence {
console.log('BigTest Server');

// proxies requests to application server and injects our harness
fork(createProxyServer({
port: 4001,
targetPort: 4002,
inject: "<script>console.log('Hello world');</script>"
}, (server) => {
let address = server.address() as AddressInfo;
console.log(`-> proxy server listening on port ${address.port}`);
}));

// accept commands from the outside world (CLI, UI, etc...)
fork(createServer(4000, commandServer, server => {
let address = server.address() as AddressInfo;
console.log(`-> listening for commands on port ${address.port}`);
}));


// TODO: serves the application with our special controls injected.
// fork(proxyServer);


// TODO: realtime socket communication with browsers
fork(createSocketServer(5001, connectionServer, server => {
let address = server.address() as AddressInfo;
Expand Down
124 changes: 124 additions & 0 deletions src/proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { fork, Sequence, Controller, Execution } from 'effection';
import { on } from '@effection/events';
import * as proxy from 'http-proxy';
import * as http from 'http';
import { listen, ReadyCallback } from './http';
import { forkOnEvent, } from './util';
import * as trumpet from 'trumpet';
import * as zlib from 'zlib';
import { Readable, Writable } from 'stream';

interface ProxyOptions {
port: number;
targetPort: number;
inject?: string;
};

export function* createProxyServer(options: ProxyOptions, ready: ReadyCallback = x => x): Sequence {
let proxyServer = proxy.createProxyServer({
target: `http://localhost:${options.targetPort}`,
selfHandleResponse: true
});

forkOnEvent(proxyServer, 'proxyRes', function*(proxyRes, req, res) {
console.debug("[proxy]", "start", req.method, req.url);
for(let [key, value] of Object.entries(proxyRes.headers)) {
res.setHeader(key, value);
}

let contentType = proxyRes.headers['content-type'] as string;
let contentEncoding = proxyRes.headers['content-encoding'] as string;

let proxyResMonitor = forkOnEvent(proxyRes, 'error', function*(error) { throw error; });

if(contentType && contentType.split(';')[0] === 'text/html') {
res.removeHeader('content-length');
res.removeHeader('content-encoding');

res.writeHead(proxyRes.statusCode, proxyRes.statusMessage);

let tr = trumpet();
let trMonitor = forkOnEvent(tr, 'error', function*(error) { throw error; });

let unzip = zlib.createGunzip();
let unzipMonitor = forkOnEvent(unzip, 'error', function*(error) { throw error; });

let nodeMonitor = fork(function* () {
tr.select('head', (node) => nodeMonitor.resume(node));
while(true) {
let node = yield;
let rs = node.createReadStream();
let ws = node.createWriteStream();
ws.write(options.inject || '');
rs.pipe(ws);
}
});

if(contentEncoding && contentEncoding.toLowerCase() == 'gzip') {
proxyRes.pipe(unzip);
unzip.pipe(tr);
} else {
proxyRes.pipe(tr);
}

tr.pipe(res);

try {
yield on(tr, "end");
} finally {
// tr.close(); there is no close method on Trumpet, how do we not leak it in case of errors?
unzip.close();

proxyResMonitor.halt();
trMonitor.halt();
unzipMonitor.halt();
nodeMonitor.halt();
}
} else {
res.writeHead(proxyRes.statusCode, proxyRes.statusMessage);

proxyRes.pipe(res);

try {
yield on(proxyRes, "end");
} finally {
proxyResMonitor.halt();
}
}
console.debug("[proxy]", "finish", req.method, req.url);
});

forkOnEvent(proxyServer, 'error', function*(err, req, res) {
res.writeHead(502, { 'Content-Type': 'text/plain' });
res.end(`Proxy error: ${err}`);
});

forkOnEvent(proxyServer, 'open', function*() {
console.debug('socket connection opened');
});

forkOnEvent(proxyServer, 'close', function*() {
console.debug('socket connection closed');
});

let server = http.createServer();

yield listen(server, options.port);

ready(server);

forkOnEvent(server, 'request', function*(req, res) {
proxyServer.web(req, res);
});

forkOnEvent(server, 'upgrade', function*(req, socket, head) {
proxyServer.ws(req, socket, head);
});

try {
yield;
} finally {
server.close();
proxyServer.close();
}
jnicklas marked this conversation as resolved.
Show resolved Hide resolved
}
12 changes: 12 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { EventEmitter } from 'events';
import { fork, Execution, Operation } from 'effection';
import { on } from '@effection/events';

//eslint-disable-next-line @typescript-eslint/no-empty-function
const Fork = fork(function*() {}).constructor;
Expand Down Expand Up @@ -35,3 +37,13 @@ export function resumeOnCb(fn: (cb: (error?: Error) => void) => void): Operation
return () => iCare = false;
}
}

export function forkOnEvent(emitter: EventEmitter, eventName: string | symbol, operation: (...any) => Operation): Execution {
return fork(function*() {
while(true) {
let args = yield on(emitter, eventName);

fork(operation(...args));
}
});
}
Loading