Skip to content

Commit

Permalink
[WIP] Observable spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
demurgos committed Aug 18, 2018
1 parent 9ef3373 commit 18d477e
Show file tree
Hide file tree
Showing 16 changed files with 540 additions and 31 deletions.
67 changes: 65 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"mkdirp": "^0.5.0",
"os-homedir": "^1.0.1",
"rimraf": "^2.6.2",
"rxjs": "^6.2.2",
"rxjs-stream": "^2.0.3",
"signal-exit": "^3.0.2",
"which": "^1.3.0"
},
Expand Down
40 changes: 15 additions & 25 deletions src/lib/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ export function withWrapContextSync<R = any>(options: SwOptions, handler: (ctx:
* @param path Path of the directory to create.
* @return Real path of the directory.
*/
function realpathMkdirp(path: string): Promise<string> {
const mkdirpPromise = new Promise((resolve, reject) => {
async function realpathMkdirp(path: string): Promise<string> {
await new Promise((resolve, reject) => {
mkdirp(path, (err) => {
if (err !== null) {
reject(err);
Expand All @@ -126,15 +126,13 @@ function realpathMkdirp(path: string): Promise<string> {
}
});
});
return mkdirpPromise.then(() => {
return new Promise<string>((resolve, reject) => {
fs.realpath(path, (err, res) => {
if (err !== null) {
reject(err);
} else {
return res;
}
});
return new Promise<string>((resolve, reject) => {
fs.realpath(path, (err, res) => {
if (err !== null) {
reject(err);
} else {
resolve(res);
}
});
});
}
Expand Down Expand Up @@ -162,20 +160,12 @@ function getShimRoot(): string {
return path.join(osHomedir(), DEFAULT_SHIM_ROOT_NAME);
}

export function createWrapContext(options: SwOptions): Promise<SwContext> {
return new Promise<ResolvedOptions>((resolve) => resolve(resolveOptions(options)))
.then((resolved: ResolvedOptions) => {
return realpathMkdirp(resolved.shimDir)
.then((shimDirRealPath) => {
resolved.shimDir = shimDirRealPath;
return resolved;
});
})
.then(resolvedOptionsToContext)
.then((ctx) => {
return writeWrapContext(ctx)
.then(() => ctx);
});
export async function createWrapContext(options: SwOptions): Promise<SwContext> {
const resolved = resolveOptions(options);
resolved.shimDir = await realpathMkdirp(resolved.shimDir);
const ctx = resolvedOptionsToContext(resolved);
await writeWrapContext(ctx);
return ctx;
}

export function createWrapContextSync(options: SwOptions): SwContext {
Expand Down
21 changes: 19 additions & 2 deletions src/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
import { withSpawnWrap, withSpawnWrapSync } from "./local";
import { spawn as observeSpawn } from "./observable/index";
import { applyContextOnGlobal, legacyWrap, runMain, wrapGlobal } from "./spawn-wrap";

// TODO: Use `export` statements to properly generate `index.d.ts`
// These TS exports are only there to generate the type definitions, they will be overwritten by the CJS exports below
export {
applyContextOnGlobal,
observeSpawn,
runMain,
wrapGlobal,
withSpawnWrap,
withSpawnWrapSync,
};

module.exports = legacyWrap;
Object.assign(module.exports, {applyContextOnGlobal, runMain, wrapGlobal, withSpawnWrap, withSpawnWrapSync});
Object.assign(module.exports, {
applyContextOnGlobal,
observeSpawn,
runMain,
wrapGlobal,
withSpawnWrap,
withSpawnWrapSync,
});
4 changes: 2 additions & 2 deletions src/lib/local.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import cp from "child_process";
import { SwContext, SwOptions, withWrapContextSync } from "./context";
import { SwContext, SwOptions, withWrapContext, withWrapContextSync } from "./context";
import { wrapSpawn } from "./wrap";

export type SyncApi = Pick<typeof cp, "spawnSync">;
export type Api = SyncApi & Pick<typeof cp, "spawn">;

export function withSpawnWrap<R = any>(options: SwOptions, handler: (api: Api) => Promise<R>): Promise<R> {
return withWrapContextSync(options, (ctx: SwContext) => {
return withWrapContext(options, (ctx: SwContext): Promise<R> => {
return handler(wrapApi(ctx));
});
}
Expand Down
67 changes: 67 additions & 0 deletions src/lib/observable/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import net from "net";
import { Observer, Subject, Subscribable } from "rxjs";
import { rxToStream, streamToRx } from "rxjs-stream";
import { parseJsonLines, printJsonLines } from "./json-lines";
import { ClientMessage, ServerMessage } from "./protocol";

export class SpawnClient implements Observer<ClientMessage>, Subscribable<ServerMessage> {
public static async create(host: string, port: number): Promise<SpawnClient> {
return new Promise<SpawnClient>((resolve, reject) => {
const options: net.TcpNetConnectOpts = {host, port};
const socket: net.Socket = net.createConnection(options);
socket.on("error", onError);
socket.on("connect", onConnect);

function onError(err: Error) {
removeListeners();
reject(err);
}

function onConnect() {
removeListeners();
resolve(new SpawnClient(socket));
}

function removeListeners() {
socket.removeListener("error", onError);
socket.removeListener("connect", onConnect);
}
});
}

private readonly input: Subscribable<ServerMessage>;
private readonly output: Observer<ClientMessage>;
private readonly socket: net.Socket;

constructor(socket: net.Socket) {
this.input = streamToRx(socket).pipe(parseJsonLines());
const output: Subject<ClientMessage> = new Subject();
rxToStream(output.pipe(printJsonLines())).pipe(socket);
this.output = output;
this.socket = socket;
}

public close() {
this.socket.end();
}

public get closed() {
return this.output.closed;
}

public complete(): void {
return this.output.complete();
}

public error(err: any): void {
return this.output.error(err);
}

public next(value: ClientMessage): void {
return this.output.next(value);
}

public subscribe(...args: any[]): any {
return this.input.subscribe(...args);
}
}
104 changes: 104 additions & 0 deletions src/lib/observable/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import cp from "child_process";
import { Observable, Observer, Subscribable, Unsubscribable } from "rxjs";
import { withSpawnWrap } from "../local";
import { ClientMessage, InfoMessage } from "./protocol";
import { RemoteSpawnClient, SpawnServer } from "./server";

const OBSERVABLE_WRAPPER = require.resolve("./observable-wrapper.js");

class SpawnEvent {
public readonly args: ReadonlyArray<string>;
private readonly client: RemoteSpawnClient;
private spawnCount: number;

constructor(client: RemoteSpawnClient, info: InfoMessage) {
this.args = Object.freeze([...info.args]);
this.client = client;
this.spawnCount = 0;
}

public proxySpawn(args?: ReadonlyArray<string>): ChildProcessProxy {
if (this.spawnCount > 0) {
throw new Error("Cannot spawn remote process multiple times");
}
if (args === undefined) {
args = this.args;
}

const spawnId: number = this.spawnCount;
this.client.next({
action: "proxy-spawn",
spawnId,
args,
});
this.spawnCount++;
return new ChildProcessProxy(this.client, spawnId);
}

public voidSpawn(args?: ReadonlyArray<string>): void {
if (this.spawnCount > 0) {
throw new Error("Cannot spawn remote process multiple times");
}
if (args === undefined) {
args = this.args;
}

this.client.next({
action: "void-spawn",
args,
});
this.spawnCount++;
}
}

export class ChildProcessProxy {
private readonly file: string;
private readonly client: RemoteSpawnClient;
private readonly spawnId: number;

constructor(client: RemoteSpawnClient, spawnId: number) {
this.file = "TODO";
this.client = client;
this.spawnId = spawnId;
}
}

export function spawn(
file: string,
args?: ReadonlyArray<string>,
options?: cp.SpawnOptions,
): Subscribable<SpawnEvent> {
return new Observable((observer: Observer<SpawnEvent>) => {
(async () => {
const server = await SpawnServer.create();
server.subscribe((client: RemoteSpawnClient) => {
const subscription: Unsubscribable = client.subscribe((msg: ClientMessage) => {
if (msg.action !== "info") {
observer.error(new Error("Expected first message to be `info`"));
} else {
observer.next(new SpawnEvent(client, msg));
subscription.unsubscribe();
}
});
});

const wrapperArgs: string[] = [OBSERVABLE_WRAPPER, server.host, server.port.toString(10)];

withSpawnWrap({args: wrapperArgs}, async (api) => {
return new Promise((resolve, reject) => {
const outChunks: Buffer[] = [];
const errChunks: Buffer[] = [];
const proc = api.spawn(file, args, options);
proc.stdout.on("data", (chunk) => outChunks.push(chunk));
proc.stderr.on("data", (chunk) => errChunks.push(chunk));
proc.on("close", () => {
console.log(Buffer.concat(outChunks).toString("UTF-8"));
console.log(Buffer.concat(errChunks).toString("UTF-8"));
server.close();
resolve();
});
});
});
})();
});
}

0 comments on commit 18d477e

Please sign in to comment.