Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/gold-worlds-switch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@bluecadet/launchpad-controller": patch
---

Fix invalid named pipe on windows
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/controller/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"homepage": "https://github.com/bluecadet/launchpad/packages/controller",
"dependencies": {
"@bluecadet/launchpad-utils": "~2.1.0",
"chalk": "^5.0.0",
"devalue": "^5.4.2",
"immer": "^10.2.0",
"neverthrow": "^8.1.1",
Expand Down
6 changes: 5 additions & 1 deletion packages/controller/src/ipc/__tests__/ipc-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import type { IPCEvent, IPCResponse } from "../../transports/ipc-transport.js";
import { IPCClient } from "../ipc-client.js";
import { IPCSerializer } from "../ipc-serializer.js";
import { getOSSocketPath } from "../ipc-utils.js";

type Cb = (...args: any[]) => void;

Expand Down Expand Up @@ -84,7 +85,10 @@ describe("IPCClient", () => {
const result = await client.connect("/test/socket");

expect(result.isOk()).toBe(true);
expect(net.createConnection).toHaveBeenCalledWith("/test/socket", expect.any(Function));
expect(net.createConnection).toHaveBeenCalledWith(
getOSSocketPath("/test/socket"),
expect.any(Function),
);
});

it("should return error if connection fails", async () => {
Expand Down
27 changes: 27 additions & 0 deletions packages/controller/src/ipc/__tests__/ipc-utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { describe, expect, it } from "vitest";
import { getOSSocketPath } from "../ipc-utils.js";

describe("getOSSocketPath", () => {
describe.runIf(process.platform !== "win32")("on posix", () => {
it("returns the same path", () => {
const inputPath = "/tmp/socket.sock";
const result = getOSSocketPath(inputPath);
expect(result).toBe(inputPath);
});
});

describe.runIf(process.platform === "win32")("on Windows", () => {
it("updates the path to use named pipe format if not already in that format", () => {
const inputPath = "C:\\temp\\socket.sock";
const expectedPath = "\\\\?\\pipe\\C:\\temp\\socket.sock";
const result = getOSSocketPath(inputPath);
expect(result).toBe(expectedPath);
});

it("returns the same path if already in named pipe format with \\\\?\\pipe prefix", () => {
const inputPath = "\\\\?\\pipe\\my_named_pipe";
const result = getOSSocketPath(inputPath);
expect(result).toBe(inputPath);
});
});
});
7 changes: 5 additions & 2 deletions packages/controller/src/ipc/ipc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { LaunchpadState } from "../core/state-store.js";
import { IPCConnectionError, IPCMessageError, IPCTimeoutError } from "../errors.js";
import type { IPCBroadcastMessage, IPCMessage, IPCResponse } from "../transports/ipc-transport.js";
import { IPCSerializer } from "./ipc-serializer.js";
import { getOSSocketPath } from "./ipc-utils.js";

enablePatches();

Expand All @@ -37,7 +38,9 @@ export class IPCClient {
/**
* Connect to the IPC socket
*/
connect(socketPath: string): ResultAsync<void, IPCConnectionError> {
connect(originalSocketPath: string): ResultAsync<void, IPCConnectionError> {
const socketPath = getOSSocketPath(originalSocketPath);

return ResultAsync.fromPromise(
new Promise<void>((resolve, reject) => {
this._socket = net.createConnection(socketPath, () => {
Expand Down Expand Up @@ -229,7 +232,7 @@ export class IPCClient {
try {
this._lastState = applyPatches(this._lastState, patches);
this._lastStateVersion = version;
} catch (e) {
} catch (_e) {
return errAsync(new IPCMessageError("Failed to apply patches"));
}

Expand Down
18 changes: 18 additions & 0 deletions packages/controller/src/ipc/ipc-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import path from "node:path";

/**
* Ensure path conforms with OS requirements (only relevant for windows ATM)
* https://nodejs.org/api/net.html#identifying-paths-for-ipc-connections
*/
export function getOSSocketPath(socketPath: string) {
if (process.platform === "win32") {
// On Windows, we need to use a named pipe

if (socketPath.includes("\\\\?\\pipe") || socketPath.includes("\\\\.\\pipe")) {
// Already in correct format
return socketPath;
}
return path.join("\\\\?\\pipe", socketPath);
}
return socketPath;
}
45 changes: 31 additions & 14 deletions packages/controller/src/transports/ipc-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import fs from "node:fs";
import net from "node:net";
import path from "node:path";
import type { LaunchpadEvents } from "@bluecadet/launchpad-utils";
import chalk from "chalk";
import type { Patch } from "immer";
import { ResultAsync } from "neverthrow";
import type { VersionedLaunchpadState } from "../core/state-store.js";
Expand All @@ -18,6 +19,7 @@ import {
TransportError,
} from "../errors.js";
import { IPCSerializer } from "../ipc/ipc-serializer.js";
import { getOSSocketPath } from "../ipc/ipc-utils.js";

export type IPCTransportOptions = {
/** Path to the Unix socket file */
Expand Down Expand Up @@ -54,12 +56,25 @@ export function createIPCTransport(options: IPCTransportOptions): Transport {
let server: net.Server | null = null;
const clients = new Set<net.Socket>();

const socketPath = getOSSocketPath(options.socketPath);

return {
id: "ipc",

start(ctx: TransportContext): ResultAsync<void, TransportError> {
const { logger, abortSignal } = ctx;
const { socketPath } = options;

if (process.platform === "win32" && options.socketPath !== socketPath) {
// notify user that the socket path has been updated to conform with windows named pipe reqs,
// as it might not be where they expect it

logger.warn(
`Windows named pipes must be located in ${chalk.grey("\\\\?\\pipe\\")} or ${chalk.grey("\\\\.\\pipe\\")}. `,
);
logger.warn(
`The configured socketPath has been moved to the ${chalk.grey("\\\\?\\pipe\\")} directory to conform with this requirement.`,
);
}

const promise = new Promise<void>((resolve, reject) => {
// Clean up existing socket
Expand All @@ -75,18 +90,21 @@ export function createIPCTransport(options: IPCTransportOptions): Transport {
}
}

// Ensure directory exists
try {
const dir = path.dirname(socketPath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
// This step isn't relevant for windows named pipes
if (process.platform !== "win32") {
// Ensure directory exists
try {
const dir = path.dirname(socketPath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
} catch (e) {
return reject(
new TransportError("Failed to create socket directory", {
cause: e instanceof Error ? e : new Error(String(e)),
}),
);
}
} catch (e) {
return reject(
new TransportError("Failed to create socket directory", {
cause: e instanceof Error ? e : new Error(String(e)),
}),
);
}

// Create server
Expand Down Expand Up @@ -139,7 +157,7 @@ export function createIPCTransport(options: IPCTransportOptions): Transport {
});

server.listen(socketPath, () => {
logger.info(`IPC transport listening at ${socketPath}`);
logger.info(`IPC transport listening at ${chalk.grey(socketPath)}`);

// Subscribe to EventBus for event streaming with type-safe handler
const handleEvent = <K extends keyof LaunchpadEvents>(
Expand Down Expand Up @@ -208,7 +226,6 @@ export function createIPCTransport(options: IPCTransportOptions): Transport {

stop(ctx: TransportContext): ResultAsync<void, TransportError> {
const { logger } = ctx;
const { socketPath } = options;

const promise = new Promise<void>((resolve, reject) => {
if (!server) {
Expand Down