diff --git a/package.json b/package.json index 0c842d4..cc26a59 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@appwrite.io/synapse", - "version": "0.3.1", + "version": "0.4.0", "description": "Operating system gateway for remote serverless environments", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/synapse.ts b/src/synapse.ts index 5540c3d..9fa1729 100644 --- a/src/synapse.ts +++ b/src/synapse.ts @@ -10,13 +10,25 @@ export type MessagePayload = { [key: string]: string | Record; }; -export type MessageHandler = (message: MessagePayload) => void; -export type ConnectionCallback = () => void; -export type ErrorCallback = (error: Error) => void; +export type Connection = { + ws: WebSocket; + id: string; + path: string; + params: Record | null; + reconnectAttempts: number; +}; + +export type MessageHandler = ( + message: MessagePayload, + connectionId: string, +) => void; +export type ConnectionCallback = (connectionId: string) => void; +export type ErrorCallback = (error: Error, connectionId: string) => void; +export type ServerConnectionCallback = (connectionId: string) => void; export type Logger = (message: string) => void; class Synapse { - private ws: WebSocket | null = null; + private connections: Map = new Map(); private wss: WebSocketServer; private messageHandlers: Record = {}; private connectionListeners = { @@ -29,16 +41,16 @@ class Synapse { private isReconnecting = false; private maxReconnectAttempts = 5; - private reconnectAttempts = 0; private reconnectInterval = 3000; - private lastPath: string | null = null; - private reconnectTimeout: NodeJS.Timeout | null = null; + private reconnectTimeouts: Map = new Map(); private host: string; private port: number; public workDir: string; + private serverConnectionListener: ServerConnectionCallback = () => {}; + constructor( host: string = "localhost", port: number = 3000, @@ -53,79 +65,134 @@ class Synapse { this.workDir = workDir; this.wss = new WebSocketServer({ noServer: true }); - this.wss.on("connection", (ws: WebSocket) => { - this.setupWebSocket(ws); + this.wss.on("connection", (ws: WebSocket, req: IncomingMessage) => { + const connectionId = this.generateConnectionId(); + this.setupWebSocket(ws, req, connectionId); + this.serverConnectionListener(connectionId); }); } + private generateConnectionId(): string { + return `conn_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`; + } + private log(message: string): void { const timestamp = new Date().toISOString(); console.log(`[Synapse][${timestamp}] ${message}`); } - private setupWebSocket(ws: WebSocket): void { - this.ws = ws; - this.reconnectAttempts = 0; + private setupWebSocket( + ws: WebSocket, + req: IncomingMessage, + connectionId: string, + ): void { + const path = req.url?.split("?")[0] ?? "/"; + let params: Record | null = null; + const query = req.url?.split("?")[1]; + if (query) { + try { + params = JSON.parse(query); + } catch { + params = Object.fromEntries( + query.split("&").map((kv) => { + const [k, v] = kv.split("="); + return [decodeURIComponent(k), decodeURIComponent(v ?? "")]; + }), + ); + } + } - ws.onmessage = (event) => this.handleMessage(event); + this.connections.set(connectionId, { + ws, + id: connectionId, + path, + params, + reconnectAttempts: 0, + }); + + ws.onmessage = (event) => this.handleMessage(event, connectionId); ws.onclose = () => { - this.ws = null; - this.connectionListeners.onClose(); - this.attemptReconnect(); + this.connectionListeners.onClose(connectionId); + this.attemptReconnect(connectionId); }; ws.onerror = (error) => { const errorMessage = `WebSocket error: ${error.message || "Unknown error"}. Connection to ${this.host}:${this.port} failed.`; - this.connectionListeners.onError(new Error(errorMessage)); + this.connectionListeners.onError(new Error(errorMessage), connectionId); }; - this.connectionListeners.onOpen(); + this.connectionListeners.onOpen(connectionId); } - private attemptReconnect() { + private attemptReconnect(connectionId: string) { + const connection = this.connections.get(connectionId); + if (!connection) return; + if ( this.isReconnecting || - this.reconnectAttempts >= this.maxReconnectAttempts + connection.reconnectAttempts >= this.maxReconnectAttempts ) { + this.connections.delete(connectionId); return; } this.isReconnecting = true; - this.reconnectAttempts++; + connection.reconnectAttempts++; - this.reconnectTimeout = setTimeout(() => { + const timeout = setTimeout(() => { this.log( - `Attempting to reconnect (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`, + `Attempting to reconnect connection ${connectionId} (${connection.reconnectAttempts}/${this.maxReconnectAttempts})...`, ); - this.connect(this.lastPath || "/") - .then(() => { + this.connect(connection.path || "/") + .then((newConnectionId) => { this.isReconnecting = false; - this.log("Reconnection successful"); + this.log( + `Reconnection successful with new connection ID: ${newConnectionId}`, + ); + // Copy any necessary state from old connection to new connection + const newConnection = this.connections.get(newConnectionId); + if (newConnection) { + newConnection.params = connection.params; + } + // Delete the old connection + this.connections.delete(connectionId); }) .catch((error) => { this.isReconnecting = false; - this.log(`Reconnection failed: ${error.message}`); - if (this.reconnectAttempts < this.maxReconnectAttempts) { - this.attemptReconnect(); + this.log( + `Reconnection failed for connection ${connectionId}: ${error.message}`, + ); + if (connection.reconnectAttempts < this.maxReconnectAttempts) { + this.attemptReconnect(connectionId); + } else { + this.connections.delete(connectionId); } }); }, this.reconnectInterval); + + this.reconnectTimeouts.set(connectionId, timeout); } - private handleMessage(event: WebSocket.MessageEvent): void { + private handleMessage( + event: WebSocket.MessageEvent, + connectionId: string, + ): void { try { const data = event.data as string; + const connection = this.connections.get(connectionId); + + if (!connection) return; - if (data === "ping" && this.ws) { - this.ws.send("pong"); + if (data === "ping" && connection.ws) { + connection.ws.send("pong"); return; } const message: MessagePayload = JSON.parse(data); if (this.messageHandlers[message.type]) { - this.messageHandlers[message.type](message); + this.messageHandlers[message.type](message, connectionId); } } catch (error) { const errorMessage = @@ -191,55 +258,57 @@ class Synapse { } /** - * Cancels the reconnection process + * Cancels the reconnection process for a specific connection + * @param connectionId - The ID of the connection to cancel reconnection for, or all if not specified * @returns void */ - cancelReconnect(): void { - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout); - this.reconnectTimeout = null; + cancelReconnect(connectionId?: string): void { + if (connectionId) { + const timeout = this.reconnectTimeouts.get(connectionId); + if (timeout) { + clearTimeout(timeout); + this.reconnectTimeouts.delete(connectionId); + } + + const connection = this.connections.get(connectionId); + if (connection) { + connection.reconnectAttempts = this.maxReconnectAttempts; + } + } else { + this.reconnectTimeouts.forEach((timeout) => { + clearTimeout(timeout); + }); + this.reconnectTimeouts.clear(); + + this.connections.forEach((connection) => { + connection.reconnectAttempts = this.maxReconnectAttempts; + }); } + this.isReconnecting = false; - this.reconnectAttempts = this.maxReconnectAttempts; } /** - * Establishes a WebSocket connection to the specified URL and initializes the terminal + * Establishes a WebSocket connection to the specified URL * @param path - The WebSocket endpoint path (e.g. '/' or '/terminal') - * @returns Promise that resolves with the Synapse instance when connected + * @returns Promise that resolves with the connection ID when connected * @throws Error if WebSocket connection fails */ - connect(path: string): Promise { - this.lastPath = path; + connect(path: string): Promise { const url = this.buildWebSocketUrl(path); return new Promise((resolve, reject) => { try { - if (this.ws && this.ws.readyState === WebSocket.OPEN) { - resolve(this); - return; - } + const ws = new WebSocket(url); - this.ws = new WebSocket(url); - - this.ws.onopen = () => { - this.reconnectAttempts = 0; - this.connectionListeners.onOpen(); - resolve(this); + ws.onopen = () => { + resolve("Synapse connected successfully"); }; - this.ws.onmessage = (event: WebSocket.MessageEvent) => - this.handleMessage(event); - - this.ws.onerror = (error: WebSocket.ErrorEvent) => { + ws.onerror = (error: WebSocket.ErrorEvent) => { const errorMessage = `WebSocket error: ${error.message || "Unknown error"}. Failed to connect to ${url}`; - this.connectionListeners.onError(new Error(errorMessage)); reject(new Error(errorMessage)); }; - - this.ws.onclose = () => { - this.connectionListeners.onClose(); - }; } catch (error) { reject( new Error( @@ -253,18 +322,67 @@ class Synapse { } /** - * Sends a message to the WebSocket server + * Gets the path associated with a specific connection + * @param connectionId - The ID of the connection + * @returns The connection path or null if connection not found + */ + getPath(connectionId: string): string | null { + const connection = this.connections.get(connectionId); + return connection ? connection.path : null; + } + + /** + * Gets the parameters associated with a specific connection + * @param connectionId - The ID of the connection + * @returns The connection parameters or null if connection not found + */ + getParams(connectionId: string): Record | null { + const connection = this.connections.get(connectionId); + return connection ? connection.params : null; + } + + /** + * Gets all active connection IDs + * @returns Array of connection IDs + */ + getConnections(): string[] { + return Array.from(this.connections.keys()); + } + + /** + * Gets connection information by ID + * @param connectionId - The ID of the connection to get + * @returns The connection object or null if not found + */ + getConnection( + connectionId: string, + ): { path: string; params: Record | null } | null { + const connection = this.connections.get(connectionId); + if (!connection) return null; + + return { + path: connection.path, + params: connection.params, + }; + } + + /** + * Sends a message to a specific WebSocket connection + * @param connectionId - The ID of the connection to send to * @param type - The type of message to send * @param payload - The payload of the message * @returns A promise that resolves with the message payload * @throws Error if WebSocket is not connected */ send( + connectionId: string, type: string, payload: Record = {}, ): Promise { - if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { - throw new Error("WebSocket is not connected"); + const connection = this.connections.get(connectionId); + + if (!connection || connection.ws.readyState !== WebSocket.OPEN) { + throw new Error(`WebSocket connection ${connectionId} is not connected`); } const message: MessagePayload = { @@ -274,25 +392,44 @@ class Synapse { }; return new Promise((resolve) => { - this.ws!.send(JSON.stringify(message)); + connection.ws.send(JSON.stringify(message)); resolve(message); }); } /** - * Sends a command to the terminal for execution - * @param command - The command string to execute - * @returns A promise that resolves with the message payload + * Broadcasts a message to all connected WebSocket clients + * @param type - The type of message to send + * @param payload - The payload of the message + * @returns An array of promises that resolve with the message payloads */ - sendCommand(command: string): Promise { - return this.send("terminal", { - operation: "createCommand", - params: { command }, + broadcast( + type: string, + payload: Record = {}, + ): Promise[] { + const promises: Promise[] = []; + + this.connections.forEach((connection, connectionId) => { + if (connection.ws.readyState === WebSocket.OPEN) { + promises.push(this.send(connectionId, type, payload)); + } }); + + return promises; } /** - * Registers a callback for when the WebSocket connection is established + * Registers a callback for when a new WebSocket connection is established on the server side + * @param callback - Function to be called when a new connection is established + * @returns The Synapse instance for method chaining + */ + onConnection(callback: ServerConnectionCallback): Synapse { + this.serverConnectionListener = callback; + return this; + } + + /** + * Registers a callback for when a WebSocket connection is established * @param callback - Function to be called when connection opens * @returns The Synapse instance for method chaining */ @@ -302,7 +439,7 @@ class Synapse { } /** - * Registers a callback for when the WebSocket connection is closed + * Registers a callback for when a WebSocket connection is closed * @param callback - Function to be called when connection closes * @returns The Synapse instance for method chaining */ @@ -339,29 +476,73 @@ class Synapse { * @param head - The first packet of the upgraded stream */ handleUpgrade(req: IncomingMessage, socket: Socket, head: Buffer): void { + let params: Record | null = null; + let path = "/"; + + if (req.url) { + path = req.url.split("?")[0]; + const query = req.url.split("?")[1]; + if (query) { + try { + params = JSON.parse(query); + } catch { + params = Object.fromEntries( + query.split("&").map((kv) => { + const [k, v] = kv.split("="); + return [decodeURIComponent(k), decodeURIComponent(v ?? "")]; + }), + ); + } + } + } + this.wss.handleUpgrade(req, socket, head, (ws: WebSocket) => { this.wss.emit("connection", ws, req); }); } /** - * Checks if the WebSocket connection is currently open and ready + * Checks if a specific WebSocket connection is currently open and ready + * @param connectionId - The ID of the connection to check * @returns {boolean} True if the connection is open and ready */ - isConnected() { - return this.ws && this.ws.readyState === WebSocket.OPEN; + isConnected(connectionId?: string): boolean { + if (connectionId) { + const connection = this.connections.get(connectionId); + return !!connection && connection.ws.readyState === WebSocket.OPEN; + } + + // Check if any connection is open + for (const connection of this.connections.values()) { + if (connection.ws.readyState === WebSocket.OPEN) { + return true; + } + } + + return false; } /** - * Closes the WebSocket connection + * Closes a specific WebSocket connection + * @param connectionId - The ID of the connection to close, or all if not specified */ - disconnect(): void { - this.cancelReconnect(); - if (this.ws) { - this.ws.close(); - this.ws = null; + disconnect(connectionId?: string): void { + if (connectionId) { + this.cancelReconnect(connectionId); + const connection = this.connections.get(connectionId); + if (connection) { + connection.ws.close(); + this.connections.delete(connectionId); + } + } else { + // Close all connections + this.cancelReconnect(); + this.connections.forEach((connection) => { + connection.ws.close(); + }); + this.connections.clear(); + this.wss.close(); } - this.wss.close(); } } diff --git a/tests/synapse.test.ts b/tests/synapse.test.ts index cf35e14..cb6f0ca 100644 --- a/tests/synapse.test.ts +++ b/tests/synapse.test.ts @@ -38,10 +38,11 @@ describe("Synapse", () => { (WebSocket as unknown as jest.Mock).mockImplementation(() => mockWs); const connectPromise = synapse.connect("/terminal"); - setTimeout(() => mockWs.onopen!({} as WebSocket.Event), 0); + setTimeout(() => mockWs.onopen && mockWs.onopen({} as any), 0); - const result = await connectPromise; - expect(result).toBe(synapse); + await expect(connectPromise).resolves.toBe( + "Synapse connected successfully", + ); }); it("should reject on connection error", async () => { @@ -51,9 +52,11 @@ describe("Synapse", () => { const connectPromise = synapse.connect("/"); setTimeout( () => - mockWs.onerror!({ + mockWs.onerror && + mockWs.onerror({ error: new Error("Connection failed"), - } as WebSocket.ErrorEvent), + message: "Connection failed", + } as any), 0, ); @@ -63,16 +66,22 @@ describe("Synapse", () => { describe("message handling", () => { let mockWs: ReturnType; + let connectionId: string; - beforeEach(async () => { + beforeEach(() => { mockWs = createMockWebSocket(); - (WebSocket as unknown as jest.Mock).mockImplementation(() => mockWs); - const connectPromise = synapse.connect("/"); - setTimeout(() => mockWs.onopen!({} as WebSocket.Event), 0); - await connectPromise; + connectionId = "test-conn"; + // Manually add connection for testing + (synapse as any).connections.set(connectionId, { + ws: mockWs, + id: connectionId, + path: "/", + params: null, + reconnectAttempts: 0, + }); }); - it("should handle messages correctly", async () => { + it("should handle messages correctly", () => { const handler = jest.fn(); synapse.onMessageType("test", handler); @@ -82,47 +91,55 @@ describe("Synapse", () => { data: { foo: "bar" }, }; - mockWs.onmessage!({ - data: JSON.stringify(testMessage), - } as WebSocket.MessageEvent); - expect(handler).toHaveBeenCalledWith(testMessage); + // Simulate message event + (synapse as any).handleMessage( + { data: JSON.stringify(testMessage) }, + connectionId, + ); + expect(handler).toHaveBeenCalledWith(testMessage, connectionId); }); - it("should ignore malformed messages", async () => { + it("should ignore malformed messages", () => { const handler = jest.fn(); synapse.onMessageType("test", handler); - mockWs.onmessage!({ data: "invalid json{" } as WebSocket.MessageEvent); + (synapse as any).handleMessage({ data: "invalid json{" }, connectionId); expect(handler).not.toHaveBeenCalled(); }); }); describe("send", () => { let mockWs: ReturnType; + let connectionId: string; - beforeEach(async () => { + beforeEach(() => { mockWs = createMockWebSocket(); - (WebSocket as unknown as jest.Mock).mockImplementation(() => mockWs); - const connectPromise = synapse.connect("/"); - setTimeout(() => mockWs.onopen!({} as WebSocket.Event), 0); - await connectPromise; + connectionId = "test-conn"; + (synapse as any).connections.set(connectionId, { + ws: mockWs, + id: connectionId, + path: "/", + params: null, + reconnectAttempts: 0, + }); }); it("should send messages and return payload", async () => { const payload = { data: "test-data" }; - const result = await synapse.send("test-type", payload); + const result = await synapse.send(connectionId, "test-type", payload); expect(result).toEqual({ type: "test-type", requestId: expect.any(String), data: "test-data", }); + expect(mockWs.send).toHaveBeenCalledWith(JSON.stringify(result)); }); it("should throw when not connected", () => { - synapse.disconnect(); - expect(() => synapse.send("test-type")).toThrow( - "WebSocket is not connected", + mockWs.readyState = WebSocket.CLOSED; + expect(() => synapse.send(connectionId, "test-type")).toThrow( + `WebSocket connection ${connectionId} is not connected`, ); }); }); @@ -137,7 +154,7 @@ describe("Synapse", () => { ([eventName]: [string]) => eventName === "connection", )?.[1]; if (connectionHandler) { - connectionHandler(mockWs); + connectionHandler(mockWs, req); } }), emit: jest.fn(), @@ -151,12 +168,112 @@ describe("Synapse", () => { synapse = new Synapse(); synapse.onOpen(onOpenMock); - synapse.handleUpgrade( - {} as IncomingMessage, - {} as Socket, - Buffer.alloc(0), - ); + // Provide a mock IncomingMessage with a url property + const mockReq = { url: "/test?foo=bar" } as IncomingMessage; + synapse.handleUpgrade(mockReq, {} as Socket, Buffer.alloc(0)); expect(onOpenMock).toHaveBeenCalled(); }); + + it("should call onConnection callback when new connection is established", () => { + const mockWs = createMockWebSocket(); + const mockWss = { + handleUpgrade: jest.fn((req, socket, head, cb) => { + cb(mockWs); + const connectionHandler = (mockWss.on as jest.Mock).mock.calls.find( + ([eventName]: [string]) => eventName === "connection", + )?.[1]; + if (connectionHandler) { + connectionHandler(mockWs, req); + } + }), + emit: jest.fn(), + on: jest.fn() as jest.Mock, + close: jest.fn(), + } as unknown as WebSocketServer; + + jest.mocked(WebSocketServer).mockImplementation(() => mockWss); + + const onConnectionMock = jest.fn(); + synapse = new Synapse(); + synapse.onConnection(onConnectionMock); + + // Provide a mock IncomingMessage with a url property + const mockReq = { url: "/test?foo=bar" } as IncomingMessage; + synapse.handleUpgrade(mockReq, {} as Socket, Buffer.alloc(0)); + + expect(onConnectionMock).toHaveBeenCalledWith(expect.any(String)); // connectionId + }); + }); + + describe("multiple connections", () => { + it("should allow multiple connections and track them independently", () => { + const mockWs1 = createMockWebSocket(); + const mockWs2 = createMockWebSocket(); + const mockWss = { + handleUpgrade: jest.fn((req, socket, head, cb) => { + if ((req as any).clientId === 1) cb(mockWs1); + else cb(mockWs2); + const connectionHandler = (mockWss.on as jest.Mock).mock.calls.find( + ([eventName]: [string]) => eventName === "connection", + )?.[1]; + if (connectionHandler) { + connectionHandler( + (req as any).clientId === 1 ? mockWs1 : mockWs2, + req, + ); + } + }), + emit: jest.fn(), + on: jest.fn() as jest.Mock, + close: jest.fn(), + } as unknown as WebSocketServer; + + jest.mocked(WebSocketServer).mockImplementation(() => mockWss); + + synapse = new Synapse(); + + const connectionIds: string[] = []; + synapse.onConnection((id) => connectionIds.push(id)); + + // Provide mock IncomingMessages with url property + const mockReq1 = { clientId: 1, url: "/test1?foo=bar" } as any; + const mockReq2 = { clientId: 2, url: "/test2?foo=baz" } as any; + synapse.handleUpgrade(mockReq1, {} as Socket, Buffer.alloc(0)); + synapse.handleUpgrade(mockReq2, {} as Socket, Buffer.alloc(0)); + + expect(connectionIds.length).toBe(2); + expect(connectionIds[0]).not.toBe(connectionIds[1]); + expect(synapse.getConnections()).toEqual( + expect.arrayContaining([connectionIds[0], connectionIds[1]]), + ); + + synapse.disconnect(connectionIds[0]); + expect(synapse.getConnections()).toContain(connectionIds[1]); + expect(synapse.getConnections()).not.toContain(connectionIds[0]); + }); + }); + + describe("connection cleanup", () => { + it("should clear all connections when disconnect is called", () => { + const mockWs1 = createMockWebSocket(); + const mockWs2 = createMockWebSocket(); + (synapse as any).connections.set("conn1", { + ws: mockWs1, + id: "conn1", + path: "/a", + params: null, + reconnectAttempts: 0, + }); + (synapse as any).connections.set("conn2", { + ws: mockWs2, + id: "conn2", + path: "/b", + params: null, + reconnectAttempts: 0, + }); + expect(synapse.getConnections().length).toBe(2); + synapse.disconnect(); + expect(synapse.getConnections().length).toBe(0); + }); }); });