Skip to content

Commit 849e87c

Browse files
committed
feat(rpc): implement WebSocket RPC communication with client-server architecture
1 parent 749f819 commit 849e87c

11 files changed

Lines changed: 783 additions & 3 deletions

File tree

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
# WebSocket RPC 通讯机制设计
2+
3+
## 概述
4+
5+
**现状**:Client 端主动上报信息到 Server,单向通讯
6+
**需求**:Server 需要主动控制 Client(如添加/更新隧道)
7+
**方案**:基于 WebSocket 的双向 RPC 通讯
8+
9+
```
10+
Client Server
11+
│ │
12+
├─ WebSocket 连接 ─────────────>│
13+
│<─── tunnel.add 命令 ──────────│
14+
│ │
15+
├─ 执行返回结果 ───────────────>│
16+
│ │
17+
│<─── ping ─────────────────────│
18+
│ │
19+
├─ pong ───────────────────────>│
20+
│ │
21+
```
22+
23+
---
24+
25+
## 通讯协议
26+
27+
```typescript
28+
// RPC 请求(Server → Client)
29+
export interface RpcRequest {
30+
id: string
31+
method: string // tunnel.add, tunnel.update, tunnel.remove
32+
params: Record<string, unknown>
33+
timeout?: number // 默认 30s
34+
}
35+
36+
// RPC 响应(Client → Server)
37+
export interface RpcResponse {
38+
id: string
39+
status: 'success' | 'error'
40+
result?: unknown
41+
error?: { code: string, message: string }
42+
}
43+
44+
// 心跳消息
45+
export interface PingMessage { type: 'ping', timestamp: number }
46+
export interface PongMessage { type: 'pong', timestamp: number }
47+
48+
// 注册消息(Client 连接时发送)
49+
export interface RegisterMessage {
50+
type: 'register'
51+
nodeId: string
52+
payload: NodeInfo
53+
}
54+
```
55+
56+
---
57+
58+
## Server 端实现
59+
60+
```typescript
61+
import { Buffer } from 'node:buffer'
62+
63+
export class RpcServer {
64+
private clients = new Map<string, WebSocket>()
65+
private pendingRequests = new Map<string, any>()
66+
67+
onClientConnect(ws: WebSocket, nodeId: string) {
68+
this.clients.set(nodeId, ws)
69+
ws.on('message', (data: Uint8Array) => {
70+
const msg = JSON.parse(Buffer.from(data).toString())
71+
if (msg.id && msg.status) {
72+
this.handleRpcResponse(msg)
73+
}
74+
})
75+
ws.on('close', () => this.clients.delete(nodeId))
76+
}
77+
78+
async rpcCall(nodeId: string, method: string, params: any, timeout = 30000): Promise<any> {
79+
const ws = this.clients.get(nodeId)
80+
if (!ws)
81+
throw new Error('Client not connected')
82+
83+
const id = crypto.randomUUID()
84+
const request: RpcRequest = { id, method, params, timeout }
85+
86+
return new Promise((resolve, reject) => {
87+
const timer = setTimeout(() => {
88+
this.pendingRequests.delete(id)
89+
reject(new Error(`RPC timeout: ${method}`))
90+
}, timeout)
91+
92+
this.pendingRequests.set(id, { resolve, reject, timer })
93+
ws.send(JSON.stringify(request))
94+
})
95+
}
96+
97+
private handleRpcResponse(msg: RpcResponse) {
98+
const pending = this.pendingRequests.get(msg.id)
99+
if (!pending)
100+
return
101+
clearTimeout(pending.timer)
102+
this.pendingRequests.delete(msg.id)
103+
if (msg.status === 'success') {
104+
pending.resolve(msg.result)
105+
}
106+
else {
107+
pending.reject(new Error(msg.error?.message))
108+
}
109+
}
110+
111+
// 启动心跳检测
112+
startHeartbeat() {
113+
setInterval(() => {
114+
this.clients.forEach((ws, nodeId) => {
115+
if (ws.readyState === WebSocket.OPEN) {
116+
ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }))
117+
}
118+
else {
119+
this.clients.delete(nodeId)
120+
}
121+
})
122+
}, 30000)
123+
}
124+
}
125+
```
126+
127+
## Client 端实现
128+
129+
```typescript
130+
import { Buffer } from 'node:buffer'
131+
132+
export class RpcClient {
133+
private ws: WebSocket | null = null
134+
135+
async connect(serverUrl: string, nodeId: string) {
136+
this.ws = new WebSocket(serverUrl)
137+
138+
this.ws.on('open', () => {
139+
this.send({ type: 'register', nodeId, payload: this.collectNodeInfo() })
140+
})
141+
142+
this.ws.on('message', (data: Uint8Array) => {
143+
const msg = JSON.parse(Buffer.from(data).toString())
144+
145+
if (msg.type === 'ping') {
146+
this.send({ type: 'pong', timestamp: Date.now() })
147+
}
148+
else if (msg.method) {
149+
this.handleRpcRequest(msg)
150+
}
151+
})
152+
153+
this.ws.on('close', () => {
154+
setTimeout(() => this.connect(serverUrl, nodeId), 5000) // 自动重连
155+
})
156+
}
157+
158+
private async handleRpcRequest(req: RpcRequest) {
159+
try {
160+
let result: any
161+
if (req.method === 'tunnel.add') {
162+
result = await this.bridge.execute({ name: 'proxy.add', payload: req.params })
163+
}
164+
else if (req.method === 'tunnel.update') {
165+
result = await this.bridge.execute({ name: 'proxy.update', payload: req.params })
166+
}
167+
else if (req.method === 'tunnel.remove') {
168+
result = await this.bridge.execute({ name: 'proxy.remove', payload: req.params })
169+
}
170+
171+
this.send({ id: req.id, status: 'success', result })
172+
}
173+
catch (error) {
174+
this.send({
175+
id: req.id,
176+
status: 'error',
177+
error: { code: 'EXECUTION_ERROR', message: error instanceof Error ? error.message : 'Unknown error' }
178+
})
179+
}
180+
}
181+
182+
private send(msg: any) {
183+
if (this.ws?.readyState === WebSocket.OPEN) {
184+
this.ws.send(JSON.stringify(msg))
185+
}
186+
}
187+
}
188+
```
189+
190+
---
191+
192+
## FrpBridge 集成
193+
194+
```typescript
195+
export class FrpBridge {
196+
private rpcServer?: RpcServer
197+
private rpcClient?: RpcClient
198+
199+
constructor(options: FrpBridgeOptions) {
200+
if (this.isServerMode) {
201+
this.rpcServer = new RpcServer()
202+
this.rpcServer.startHeartbeat()
203+
}
204+
205+
if (this.isClientMode) {
206+
this.rpcClient = new RpcClient()
207+
this.rpcClient.connect(options.serverUrl, options.nodeId)
208+
}
209+
}
210+
211+
// Server 端:远程添加隧道
212+
async addTunnelRemote(nodeId: string, tunnel: ProxyConfig) {
213+
if (!this.rpcServer)
214+
throw new Error('Not in server mode')
215+
return this.rpcServer.rpcCall(nodeId, 'tunnel.add', tunnel)
216+
}
217+
}
218+
```
219+
220+
---
221+
222+
## 可靠性保证
223+
224+
### 1. 自动重连
225+
Client 连接断开时自动重连,间隔 5s
226+
227+
### 2. 请求超时
228+
RPC 请求默认 30s 超时,Server 自动清理过期请求
229+
230+
### 3. 心跳保活
231+
Server 每 30s 发送 ping,Client 立即响应 pong
232+
233+
---
234+
235+
## 安全性
236+
237+
### 1. 身份认证
238+
WebSocket 握手时验证 token:
239+
```typescript
240+
const token = new URL(`ws://localhost${request.url}`).searchParams.get('token')
241+
if (!validateToken(token))
242+
request.reject()
243+
```
244+
245+
### 2. 命令授权
246+
执行前检查权限:
247+
```typescript
248+
if (!isAuthorizedForMethod(nodeId, method)) {
249+
throw new Error('UNAUTHORIZED')
250+
}
251+
```
252+
253+
---
254+
255+
## 支持的方法
256+
257+
| 方法 | 参数 | 用途 |
258+
|------|------|------|
259+
| `tunnel.add` | ProxyConfig | 添加隧道 |
260+
| `tunnel.update` | { name, config } | 更新隧道 |
261+
| `tunnel.remove` | { name } | 删除隧道 |
262+
| `config.update` | ClientConfig | 更新配置 |
263+
264+
---
265+
266+
## 使用示例
267+
268+
```typescript
269+
// Server 侧:添加隧道
270+
await bridge.addTunnelRemote('node-001', {
271+
name: 'web',
272+
type: 'http',
273+
localPort: 8080,
274+
customDomains: ['web.example.com']
275+
})
276+
277+
// Server 侧:更新隧道
278+
await bridge.rpcServer.rpcCall('node-001', 'tunnel.update', {
279+
name: 'web',
280+
config: { localPort: 9090 }
281+
})
282+
283+
// Server 侧:删除隧道
284+
await bridge.rpcServer.rpcCall('node-001', 'tunnel.remove', { name: 'web' })
285+
```
286+
287+
---
288+
289+
## 技术栈
290+
291+
- ****`ws` (Node.js WebSocket)
292+
- **协议**:WebSocket + JSON RPC
293+
- **心跳**:30s 间隔
294+
- **超时**:30s (可配置)
295+
- **重连**:自动,5s 间隔

packages/core/package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333
"consola": "^3.4.2",
3434
"execa": "^9.6.0",
3535
"fs-extra": "^11.3.2",
36-
"pathe": "^2.0.3"
36+
"pathe": "^2.0.3",
37+
"ws": "^8.18.0"
3738
},
3839
"devDependencies": {
39-
"@types/fs-extra": "^11.0.4"
40+
"@types/fs-extra": "^11.0.4",
41+
"@types/ws": "^8.5.14"
4042
}
4143
}

0 commit comments

Comments
 (0)