Skip to content

Commit dd74587

Browse files
authored
Fix subscribe initial state, fix switch workspace error, upgrade ping (#10026)
Signed-off-by: Leonid Kaganov <lleo@lleo.me>
1 parent b7c5e2b commit dd74587

File tree

2 files changed

+55
-146
lines changed

2 files changed

+55
-146
lines changed

packages/hulypulse-client/src/client.ts

Lines changed: 54 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -19,78 +19,12 @@ export type UnsubscribeCallback = () => Promise<boolean>
1919

2020
export type Callback<T> = (key: string, data: T | undefined) => void
2121

22-
// hulypulse API: incoming messages variants
23-
24-
// interface Ping_Message {
25-
// data: 'ping' | 'pong'
26-
// }
27-
28-
// interface Answer_Message {
29-
// data: { answer: string }
30-
// }
31-
32-
// interface Error_Message {
33-
// data: { error: string; reason?: string }
34-
// }
35-
36-
// interface Subscribe_Message {
37-
// data: { key: string; result: { data: JSONValue; etag: string; expiresAt: number } }
38-
// }
39-
40-
// put:
41-
// {action: "put", correlation, result:"OK" }
42-
// {action: "put", correlation, error: "...error" }
43-
44-
// get:
45-
// {action: "get", correlation, "result":{
46-
// "data":"hello 1",
47-
// "etag":"df0649bc4f1be901c85b6183091c1d83",
48-
// "expires_at":3,
49-
// "key":"00000000-0000-0000-0000-000000000001/foo/bar1"
50-
// }}
51-
// {action: "get", correlation, error: "...error" }
52-
53-
// delete:
54-
// {action: "delete", correlation, result:"OK" }
55-
// {action: "delete", correlation, error: "...error" }
56-
57-
// list:
58-
// {action: "list", correlation, result:[
59-
// {"data":"hello 1","etag":"df0649bc4f1be901c85b6183091c1d83","expires_at":41,"key":"00000000-0000-0000-0000-000000000001/foo/bar1"},
60-
// {"data":"hello 2","etag":"bb21ec8394b75795622f61613a777a8b","expires_at":85,"key":"00000000-0000-0000-0000-000000000001/foo/bar2"}
61-
// ] }
62-
// {action: "list", correlation, error: "...error" }
63-
64-
// sub:
65-
// {action: "sub", correlation, result:"OK" }
66-
// {action: "sub", correlation, error: "...error" }
67-
68-
// unsub:
69-
// {action: "unsub", correlation, result:"OK" }
70-
// {action: "unsub", correlation, error: "...error" }
71-
72-
// sublist:
73-
// {action: "sublist", correlation, result:[keys] }
74-
// {action: "sublist", correlation, error: "...error" }
75-
7622
interface GetFullResult<T> {
7723
data: T
7824
etag: string
7925
expiresAt: number
8026
}
8127

82-
// interface GetFullResultKey<T> {
83-
// data: T
84-
// etag: string
85-
// expiresAt: number
86-
// key: string
87-
// }
88-
89-
// hulypulse API: subscription messages variants
90-
91-
// {"message":"Expired","key":"00000000-0000-0000-0000-000000000001/foo/bar1"}
92-
// {"message":"Set","key":"00000000-0000-0000-0000-000000000001/foo/bar1","value":"hello 1"}
93-
9428
type Command = 'sub' | 'unsub' | 'put' | 'get' | 'delete' | 'list' | 'sublist' | 'info'
9529

9630
interface SubscribedMessage {
@@ -113,50 +47,9 @@ interface ErrorCommandMessage {
11347

11448
type PulseIncomingMessage = SubscribedMessage | CommandMessage | ErrorCommandMessage
11549

116-
// hulypulse API: answer messages variants
117-
118-
// interface OkResponse {
119-
// result: "OK"
120-
// action: "put" | "delete" | "sub" | "unsub"
121-
// correlation?: string
122-
// }
123-
124-
// interface ErrorResponse {
125-
// error: string
126-
// action: "put" | "get" | "delete" | "list" | "sub" | "unsub" | "sublist" | "info"
127-
// correlation?: string
128-
// }
129-
130-
// interface InfoResponse {
131-
// action: "info"
132-
// correlation?: string
133-
// result: string
134-
// }
135-
136-
// interface GetResponse {
137-
// action: "get"
138-
// correlation?: string
139-
// result: GetFullResultKey<JSONValue>
140-
// }
141-
142-
// interface ListResponse {
143-
// action: "list"
144-
// correlation?: string
145-
// result: GetFullResultKey<JSONValue>[]
146-
// }
147-
148-
// interface SublistResponse {
149-
// action: "sublist"
150-
// correlation?: string
151-
// result: string[]
152-
// }
153-
154-
// hulypulse API: outcoming messages variants
155-
15650
interface GetMessage {
15751
type: 'get'
15852
key: string
159-
// correlation: string
16053
}
16154

16255
interface PutMessage {
@@ -167,36 +60,30 @@ interface PutMessage {
16760
expiresAt?: number
16861
ifMatch?: string
16962
ifNoneMatch?: string
170-
// correlation: string
17163
}
17264

17365
interface DeleteMessage {
17466
type: 'delete'
17567
key: string
17668
ifMatch?: string
177-
// correlation: string
17869
}
17970

18071
interface SubscribeMessages {
18172
type: 'sub'
18273
key: string
183-
// correlation: string
18474
}
18575

18676
interface UnsubscribeMessages {
18777
type: 'unsub'
18878
key: string
189-
// correlation: string
19079
}
19180

19281
interface SubscribesList {
19382
type: 'list'
194-
// correlation: string
19583
}
19684

19785
interface InfoMessage {
19886
type: 'info'
199-
// correlation: string
20087
}
20188

20289
type ProtocolMessage =
@@ -260,7 +147,6 @@ export class HulypulseClient implements Disposable {
260147
this.ws?.send('pong')
261148
return
262149
}
263-
264150
if (event.data === 'pong') {
265151
return
266152
}
@@ -306,36 +192,41 @@ export class HulypulseClient implements Disposable {
306192
}
307193

308194
private resubscribe (): void {
309-
for (const key in this.subscribes) {
195+
for (const [key] of this.subscribes) {
310196
this.send({ type: 'sub', key }).catch((error) => {
311197
throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`)
312198
})
313199
}
314200
}
315201

316202
private startPing (): void {
317-
clearInterval(this.pingInterval)
203+
this.stopPing()
318204
this.pingInterval = setInterval(() => {
319-
if (this.ws !== null && this.ws.readyState === WebSocket.OPEN) {
205+
if (this.ws?.readyState === WebSocket.OPEN) {
320206
this.ws.send('ping')
321207
}
322-
clearTimeout(this.pingTimeout)
208+
if (this.pingTimeout !== undefined) {
209+
clearTimeout(this.pingTimeout)
210+
}
323211
this.pingTimeout = setTimeout(() => {
324-
if (this.ws !== null) {
325-
console.log('no response from server')
212+
if (this.ws?.readyState !== WebSocket.OPEN) {
213+
console.warn('WS-server not responding to ping, closing connection')
326214
clearInterval(this.pingInterval)
327-
this.ws.close(WS_CLOSE_NORMAL)
215+
this.ws?.close(WS_CLOSE_NORMAL)
328216
}
329217
}, this.PING_TIMEOUT_MS)
330218
}, this.PING_INTERVAL_MS)
331219
}
332220

333221
private stopPing (): void {
334-
clearInterval(this.pingInterval)
335-
this.pingInterval = undefined
336-
337-
clearTimeout(this.pingTimeout)
338-
this.pingTimeout = undefined
222+
if (this.pingInterval !== undefined) {
223+
clearInterval(this.pingInterval)
224+
this.pingInterval = undefined
225+
}
226+
if (this.pingTimeout !== undefined) {
227+
clearTimeout(this.pingTimeout)
228+
this.pingTimeout = undefined
229+
}
339230
}
340231

341232
[Symbol.dispose] (): void {
@@ -345,8 +236,8 @@ export class HulypulseClient implements Disposable {
345236
private reconnect (): void {
346237
if (this.reconnectTimeout !== undefined) {
347238
clearTimeout(this.reconnectTimeout)
239+
this.reconnectTimeout = undefined
348240
}
349-
this.reconnectTimeout = undefined
350241
this.stopPing()
351242

352243
if (!this.closed) {
@@ -375,23 +266,23 @@ export class HulypulseClient implements Disposable {
375266

376267
public async info (): Promise<string> {
377268
const reply = await this.send({ type: 'info' })
378-
if (reply.error != null) {
269+
if (reply.error !== undefined) {
379270
throw new Error(reply.error)
380271
}
381272
return reply.result ?? ''
382273
}
383274

384275
public async list (): Promise<string> {
385276
const reply = await this.send({ type: 'list' })
386-
if (reply.error != null) {
277+
if (reply.error !== undefined) {
387278
throw new Error(reply.error)
388279
}
389280
return reply.result ?? ''
390281
}
391282

392283
public async subscribe (key: string, callback: Callback<any>): Promise<UnsubscribeCallback> {
393284
let list = this.subscribes.get(key)
394-
if (list == null) {
285+
if (list === undefined) {
395286
list = []
396287
this.subscribes.set(key, list)
397288
}
@@ -401,8 +292,26 @@ export class HulypulseClient implements Disposable {
401292
list.push(callback)
402293
if (list.length === 1) {
403294
const reply = await this.send({ type: 'sub', key })
404-
if (reply.error != null) {
405-
this.reconnect()
295+
if (reply.error !== undefined) {
296+
throw new Error(reply.error)
297+
}
298+
}
299+
}
300+
301+
// callback for every old item (expires_at > 1 sec for atomicity)
302+
const prevlist = await this.send({ type: 'list', key })
303+
if (prevlist.error !== undefined) {
304+
throw new Error(prevlist.error)
305+
} else if (Array.isArray(prevlist.result)) {
306+
for (const item of prevlist.result) {
307+
try {
308+
const value = item.data !== undefined ? JSON.parse(item.data) : undefined
309+
if (item.expires_at <= 1 || value === undefined) {
310+
continue
311+
}
312+
callback(item.key, value)
313+
} catch (err) {
314+
console.error('Error in initial callback', err)
406315
}
407316
}
408317
}
@@ -414,16 +323,15 @@ export class HulypulseClient implements Disposable {
414323

415324
public async unsubscribe (key: string, callback: Callback<any>): Promise<boolean> {
416325
const list = this.subscribes.get(key)
417-
if (list?.includes(callback) == null) {
326+
if (list === undefined || !list.includes(callback)) {
418327
return false
419328
}
420329
const newList = list.filter((cb) => cb !== callback)
421330
if (newList.length === 0) {
422331
this.subscribes.delete(key)
423332
const reply = await this.send({ type: 'unsub', key })
424-
if (reply.error != null) {
425-
this.reconnect()
426-
return true
333+
if (reply?.error !== undefined) {
334+
throw new Error(reply.error)
427335
}
428336
} else {
429337
this.subscribes.set(key, newList)
@@ -449,14 +357,14 @@ export class HulypulseClient implements Disposable {
449357
...(typeof third === 'number' ? { TTL: third } : third)
450358
}
451359
const reply = await this.send(message)
452-
if (reply.error != null) {
360+
if (reply.error !== undefined) {
453361
throw new Error(reply.error)
454362
}
455363
}
456364

457365
public async get<T>(key: string): Promise<T | undefined> {
458366
const reply = await this.send({ type: 'get', key })
459-
if (reply.error != null) {
367+
if (reply.error !== undefined) {
460368
if (reply.error === 'not found') {
461369
return undefined
462370
}
@@ -467,7 +375,7 @@ export class HulypulseClient implements Disposable {
467375

468376
public async get_full<T>(key: string): Promise<GetFullResult<T> | undefined> {
469377
const reply = await this.send({ type: 'get', key })
470-
if (reply.error != null) {
378+
if (reply.error !== undefined) {
471379
if (reply.error === 'not found') {
472380
return undefined
473381
}
@@ -483,7 +391,7 @@ export class HulypulseClient implements Disposable {
483391
public async delete (key: string, options?: Pick<DeleteMessage, 'ifMatch'>): Promise<boolean> {
484392
const message: Omit<DeleteMessage, 'correlation'> = { type: 'delete', key, ...options }
485393
const reply = await this.send(message)
486-
if (reply.error != null) {
394+
if (reply.error !== undefined) {
487395
if (reply.error === 'not found') {
488396
return false
489397
}
@@ -497,24 +405,25 @@ export class HulypulseClient implements Disposable {
497405
const message = { ...msg, correlation: id.toString() } satisfies M
498406

499407
return await new Promise((resolve, reject) => {
500-
if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) {
501-
reject(new Error('WebSocket is not open.'))
408+
if (this.ws?.readyState !== WebSocket.OPEN) {
409+
resolve({ error: 'WebSocket is not open.' })
502410
return
503411
}
504412
const sendTimeout = setTimeout(() => {
505413
const pending = this.pending.get(id)
506414
if (pending !== undefined) {
507-
pending.reject(new Error('Timeout waiting for response'))
415+
pending.resolve({ error: 'Timeout waiting for response' })
508416
this.pending.delete(id)
509417
}
510418
}, this.SEND_TIMEOUT_MS)
511419
this.pending.set(id, { resolve, reject, send_timeout: sendTimeout })
512420
this.ws.send(JSON.stringify(message))
421+
this.startPing() // reset ping timer on any send
513422
})
514423
}
515424
}
516425

517426
export function escapeString (str: string): string {
518-
// Escape special characters to '*' | '?' | '[' | ']' | '\\' | '\0'..='\x1F' | '\x7F' | '"' | '\''
519-
return str.replace(/[\\'"]/g, '\\$&')
427+
// eslint-disable-next-line no-control-regex, no-useless-escape
428+
return str.replace(/[*?\[\]\\\x00-\x1F\x7F"']/g, '_')
520429
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
hulypulse hardcoreeng/service_hulypulse:0.1.14
1+
hulypulse hardcoreeng/service_hulypulse:0.1.29

0 commit comments

Comments
 (0)