Skip to content

Commit 68487cc

Browse files
amingclawdevclaude
andcommitted
feat(stateService): Phase 1 — foundation (StateService + StateClient + tests)
Add unified state management infrastructure for multi-agent architecture: - StateService singleton with EventBus, StateStore, Persistence, AgentBridge stubs - StateClient agent SDK with Proxy-based state, set/merge/batch, WS sync - 85 unit tests covering all modules Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1df139d commit 68487cc

4 files changed

Lines changed: 1682 additions & 0 deletions

File tree

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/**
2+
* StateClient — Agent-side SDK for unified state management.
3+
*
4+
* Runs inside agent child processes. Communicates with the main-process
5+
* StateService via WebSocket messages (state_sync_*).
6+
*
7+
* Usage in agent code:
8+
* const { StateClient } = require('../shared/stateClient');
9+
* const stateClient = new StateClient(ws, 'job-seek');
10+
* stateClient.state.direction = { jobTitle: 'Engineer' }; // auto-syncs
11+
* stateClient.set('session.config.timeout', 5000); // deep set
12+
* stateClient.merge('session.config', { retries: 3 }); // deep merge
13+
*/
14+
15+
// ─── Helpers ───────────────────────────────────────────────
16+
17+
function getByPath(obj, dotPath) {
18+
if (!dotPath) return obj;
19+
const parts = dotPath.split('.');
20+
let current = obj;
21+
for (const part of parts) {
22+
if (current == null || typeof current !== 'object') return undefined;
23+
current = current[part];
24+
}
25+
return current;
26+
}
27+
28+
function setByPath(obj, dotPath, value) {
29+
const parts = dotPath.split('.');
30+
let current = obj;
31+
for (let i = 0; i < parts.length - 1; i++) {
32+
const part = parts[i];
33+
if (current[part] == null || typeof current[part] !== 'object') {
34+
current[part] = {};
35+
}
36+
current = current[part];
37+
}
38+
current[parts[parts.length - 1]] = value;
39+
}
40+
41+
function deepClone(obj) {
42+
if (obj === undefined) return undefined;
43+
return JSON.parse(JSON.stringify(obj));
44+
}
45+
46+
// ─── StateClient ───────────────────────────────────────────
47+
48+
class StateClient {
49+
/**
50+
* @param {Object} ws - WebSocket connection (must have .send(string) method)
51+
* @param {string} agentName - agent identifier (e.g. 'job-seek')
52+
*/
53+
constructor(ws, agentName) {
54+
this._ws = ws;
55+
this._agentName = agentName;
56+
this._data = {}; // local state cache
57+
this._batching = false; // true during batch()
58+
this._batchOps = []; // collected ops during batch
59+
60+
// Create shallow Proxy for `state` property
61+
this.state = this._createProxy();
62+
}
63+
64+
// ── Proxy ──
65+
66+
/**
67+
* Create a shallow Proxy that intercepts top-level property sets
68+
* and sends state_sync_set messages to the server.
69+
*/
70+
_createProxy() {
71+
const self = this;
72+
return new Proxy(this._data, {
73+
get(target, prop) {
74+
if (typeof prop === 'symbol') return target[prop];
75+
return target[prop];
76+
},
77+
set(target, prop, value) {
78+
if (typeof prop === 'symbol') {
79+
target[prop] = value;
80+
return true;
81+
}
82+
const cloned = deepClone(value);
83+
target[prop] = cloned;
84+
self._sendSet(String(prop), cloned);
85+
return true;
86+
},
87+
deleteProperty(target, prop) {
88+
if (typeof prop === 'symbol') {
89+
delete target[prop];
90+
return true;
91+
}
92+
delete target[prop];
93+
self._sendPatch({ op: 'delete', path: String(prop) });
94+
return true;
95+
}
96+
});
97+
}
98+
99+
// ── Public API ──
100+
101+
/**
102+
* Explicit deep set — for paths deeper than one level.
103+
* @param {string} dotPath - e.g. 'session.config.timeout'
104+
* @param {*} value
105+
*/
106+
set(dotPath, value) {
107+
const cloned = deepClone(value);
108+
setByPath(this._data, dotPath, cloned);
109+
if (this._batching) {
110+
this._batchOps.push({ op: 'set', path: dotPath, value: cloned });
111+
} else {
112+
this._sendPatch({ op: 'set', path: dotPath, value: cloned });
113+
}
114+
}
115+
116+
/**
117+
* Deep merge a partial object at a dot-path.
118+
* @param {string} dotPath
119+
* @param {Object} partial
120+
*/
121+
merge(dotPath, partial) {
122+
const cloned = deepClone(partial);
123+
// Apply locally
124+
let current = getByPath(this._data, dotPath);
125+
if (current == null || typeof current !== 'object') {
126+
setByPath(this._data, dotPath, cloned);
127+
} else {
128+
Object.assign(current, cloned); // shallow merge at target
129+
}
130+
if (this._batching) {
131+
this._batchOps.push({ op: 'merge', path: dotPath, partial: cloned });
132+
} else {
133+
this._sendPatch({ op: 'merge', path: dotPath, partial: cloned });
134+
}
135+
}
136+
137+
/**
138+
* Collect multiple operations and send as a single WS message.
139+
* @param {Function} fn - called synchronously; use this.set/merge inside
140+
*/
141+
batch(fn) {
142+
this._batching = true;
143+
this._batchOps = [];
144+
try {
145+
fn();
146+
} finally {
147+
this._batching = false;
148+
}
149+
if (this._batchOps.length > 0) {
150+
this._send({
151+
type: 'state_sync_patch',
152+
agentName: this._agentName,
153+
ops: this._batchOps
154+
});
155+
this._batchOps = [];
156+
}
157+
}
158+
159+
/**
160+
* Request full state from the server (used on restart/recovery).
161+
* Server will respond with state_sync_response.
162+
*/
163+
syncFromServer() {
164+
this._send({
165+
type: 'state_sync_request',
166+
agentName: this._agentName
167+
});
168+
}
169+
170+
/**
171+
* Process an incoming message from the server.
172+
* @param {Object} msg - parsed message object
173+
*/
174+
handleServerMessage(msg) {
175+
if (!msg || !msg.type) return;
176+
177+
switch (msg.type) {
178+
case 'state_sync_response': {
179+
// Full state restore from server
180+
if (msg.data && typeof msg.data === 'object') {
181+
// Replace local data while keeping Proxy reference intact
182+
const keys = Object.keys(this._data);
183+
for (const k of keys) delete this._data[k];
184+
Object.assign(this._data, deepClone(msg.data));
185+
}
186+
break;
187+
}
188+
case 'agent_state_patch': {
189+
// Incremental update from server (e.g., another source changed state)
190+
if (msg.op === 'set' && msg.path) {
191+
setByPath(this._data, msg.path, deepClone(msg.value));
192+
} else if (msg.op === 'merge' && msg.path && msg.partial) {
193+
let current = getByPath(this._data, msg.path);
194+
if (current == null || typeof current !== 'object') {
195+
setByPath(this._data, msg.path, deepClone(msg.partial));
196+
} else {
197+
Object.assign(current, deepClone(msg.partial));
198+
}
199+
} else if (msg.op === 'delete' && msg.path) {
200+
// Delete by path
201+
const parts = msg.path.split('.');
202+
if (parts.length === 1) {
203+
delete this._data[parts[0]];
204+
} else {
205+
const parent = getByPath(this._data, parts.slice(0, -1).join('.'));
206+
if (parent && typeof parent === 'object') {
207+
delete parent[parts[parts.length - 1]];
208+
}
209+
}
210+
}
211+
break;
212+
}
213+
}
214+
}
215+
216+
// ── Internal ──
217+
218+
/**
219+
* Send a state_sync_set message (triggered by Proxy set).
220+
*/
221+
_sendSet(path, value) {
222+
if (this._batching) {
223+
this._batchOps.push({ op: 'set', path, value });
224+
return;
225+
}
226+
this._send({
227+
type: 'state_sync_set',
228+
agentName: this._agentName,
229+
path,
230+
value
231+
});
232+
}
233+
234+
/**
235+
* Send a state_sync_patch message.
236+
*/
237+
_sendPatch(opObj) {
238+
this._send({
239+
type: 'state_sync_patch',
240+
agentName: this._agentName,
241+
ops: [opObj]
242+
});
243+
}
244+
245+
/**
246+
* Low-level WS send. Serializes to JSON string.
247+
*/
248+
_send(msg) {
249+
if (this._ws && typeof this._ws.send === 'function') {
250+
try {
251+
this._ws.send(JSON.stringify(msg));
252+
} catch (err) {
253+
console.error(`[StateClient] Failed to send WS message:`, err.message);
254+
}
255+
}
256+
}
257+
}
258+
259+
module.exports = { StateClient };

0 commit comments

Comments
 (0)