diff --git a/packages/hulypulse-client/src/client.ts b/packages/hulypulse-client/src/client.ts index a57a2b6b736..2d9c83e19f9 100644 --- a/packages/hulypulse-client/src/client.ts +++ b/packages/hulypulse-client/src/client.ts @@ -97,7 +97,7 @@ type ProtocolMessage = export class HulypulseClient implements Disposable { private ws: WebSocket | null = null - private closed = false + private closed_manually = false private reconnectTimeout: any | undefined private readonly RECONNECT_INTERVAL_MS = 1000 @@ -194,7 +194,8 @@ export class HulypulseClient implements Disposable { private resubscribe (): void { for (const [key] of this.subscribes) { this.send({ type: 'sub', key }).catch((error) => { - throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`) + console.error(`Resubscription failed for key=${key}:`, error) + // throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`) }) } } @@ -240,7 +241,7 @@ export class HulypulseClient implements Disposable { } this.stopPing() - if (!this.closed) { + if (!this.closed_manually) { this.reconnectTimeout = setTimeout(() => { void this.connect() }, this.RECONNECT_INTERVAL_MS) @@ -248,13 +249,12 @@ export class HulypulseClient implements Disposable { } public close (): void { - this.closed = true + this.closed_manually = true if (this.reconnectTimeout !== undefined) { clearTimeout(this.reconnectTimeout) } this.reconnectTimeout = undefined this.stopPing() - this.ws?.close() } @@ -291,26 +291,25 @@ export class HulypulseClient implements Disposable { // Already subscribed? list.push(callback) if (list.length === 1) { - const reply = await this.send({ type: 'sub', key }) - if (reply.error !== undefined) { - throw new Error(reply.error) - } + void (await this.send({ type: 'sub', key })) } } - // callback for every old item (expires_at > 1 sec for atomicity) + // callback for every old item (ttl > 1 sec for atomicity) const prevlist = await this.send({ type: 'list', key }) - if (prevlist.error !== undefined) { - throw new Error(prevlist.error) - } else if (Array.isArray(prevlist.result)) { + if (prevlist?.error === undefined && Array.isArray(prevlist?.result)) { for (const item of prevlist.result) { try { const value = item.data !== undefined ? JSON.parse(item.data) : undefined - if (item.expires_at <= 1 || value === undefined) { + if ( + (item.ttl ?? item.expires_at ?? 0) <= 1 || // TODO: remove expires_at after upgrade server + value === undefined + ) { continue } callback(item.key, value) } catch (err) { + // throw new Error(err) console.error('Error in initial callback', err) } } @@ -329,10 +328,7 @@ export class HulypulseClient implements Disposable { const newList = list.filter((cb) => cb !== callback) if (newList.length === 0) { this.subscribes.delete(key) - const reply = await this.send({ type: 'unsub', key }) - if (reply?.error !== undefined) { - throw new Error(reply.error) - } + void (await this.send({ type: 'unsub', key })) } else { this.subscribes.set(key, newList) } @@ -349,6 +345,26 @@ export class HulypulseClient implements Disposable { key: string, data: any, third?: number | Pick + ): Promise { + const message: Omit = { + type: 'put', + key, + data: JSON.stringify(data), + ...(typeof third === 'number' ? { TTL: third } : third) + } + void (await this.send(message)) + } + + public async put_full (key: string, data: any, ttl: number): Promise + public async put_full ( + key: string, + data: any, + options?: Pick + ): Promise + public async put_full ( + key: string, + data: any, + third?: number | Pick ): Promise { const message: Omit = { type: 'put', @@ -404,8 +420,21 @@ export class HulypulseClient implements Disposable { const id = String(this.correlationId++) const message = { ...msg, correlation: id.toString() } satisfies M + // Reconnect if need before + if (this.closed_manually) { + if (msg.type === 'unsub') { + // don't need to do anything + return + } + this.closed_manually = false + if (this.ws?.readyState === WebSocket.CONNECTING || this.ws?.readyState === WebSocket.OPEN) { + return + } + await this.connect() + } + return await new Promise((resolve, reject) => { - if (this.ws?.readyState !== WebSocket.OPEN) { + if (this.closed_manually || this.ws?.readyState !== WebSocket.OPEN) { resolve({ error: 'WebSocket is not open.' }) return }