From a5a2cea866cf5f2ccf7aaa8fc09475244fa5d3d0 Mon Sep 17 00:00:00 2001 From: Leonid Kaganov Date: Sun, 5 Oct 2025 18:11:24 +0300 Subject: [PATCH 1/6] Fix subscribe initial state, fix switch workspace error, upgrade ping --- packages/hulypulse-client/src/client.ts | 166 ++++++------------------ 1 file changed, 42 insertions(+), 124 deletions(-) diff --git a/packages/hulypulse-client/src/client.ts b/packages/hulypulse-client/src/client.ts index 78787f9088e..3e62bb2b08c 100644 --- a/packages/hulypulse-client/src/client.ts +++ b/packages/hulypulse-client/src/client.ts @@ -19,78 +19,12 @@ export type UnsubscribeCallback = () => Promise export type Callback = (key: string, data: T | undefined) => void -// hulypulse API: incoming messages variants - -// interface Ping_Message { -// data: 'ping' | 'pong' -// } - -// interface Answer_Message { -// data: { answer: string } -// } - -// interface Error_Message { -// data: { error: string; reason?: string } -// } - -// interface Subscribe_Message { -// data: { key: string; result: { data: JSONValue; etag: string; expiresAt: number } } -// } - -// put: -// {action: "put", correlation, result:"OK" } -// {action: "put", correlation, error: "...error" } - -// get: -// {action: "get", correlation, "result":{ -// "data":"hello 1", -// "etag":"df0649bc4f1be901c85b6183091c1d83", -// "expires_at":3, -// "key":"00000000-0000-0000-0000-000000000001/foo/bar1" -// }} -// {action: "get", correlation, error: "...error" } - -// delete: -// {action: "delete", correlation, result:"OK" } -// {action: "delete", correlation, error: "...error" } - -// list: -// {action: "list", correlation, result:[ -// {"data":"hello 1","etag":"df0649bc4f1be901c85b6183091c1d83","expires_at":41,"key":"00000000-0000-0000-0000-000000000001/foo/bar1"}, -// {"data":"hello 2","etag":"bb21ec8394b75795622f61613a777a8b","expires_at":85,"key":"00000000-0000-0000-0000-000000000001/foo/bar2"} -// ] } -// {action: "list", correlation, error: "...error" } - -// sub: -// {action: "sub", correlation, result:"OK" } -// {action: "sub", correlation, error: "...error" } - -// unsub: -// {action: "unsub", correlation, result:"OK" } -// {action: "unsub", correlation, error: "...error" } - -// sublist: -// {action: "sublist", correlation, result:[keys] } -// {action: "sublist", correlation, error: "...error" } - interface GetFullResult { data: T etag: string expiresAt: number } -// interface GetFullResultKey { -// data: T -// etag: string -// expiresAt: number -// key: string -// } - -// hulypulse API: subscription messages variants - -// {"message":"Expired","key":"00000000-0000-0000-0000-000000000001/foo/bar1"} -// {"message":"Set","key":"00000000-0000-0000-0000-000000000001/foo/bar1","value":"hello 1"} - type Command = 'sub' | 'unsub' | 'put' | 'get' | 'delete' | 'list' | 'sublist' | 'info' interface SubscribedMessage { @@ -113,50 +47,9 @@ interface ErrorCommandMessage { type PulseIncomingMessage = SubscribedMessage | CommandMessage | ErrorCommandMessage -// hulypulse API: answer messages variants - -// interface OkResponse { -// result: "OK" -// action: "put" | "delete" | "sub" | "unsub" -// correlation?: string -// } - -// interface ErrorResponse { -// error: string -// action: "put" | "get" | "delete" | "list" | "sub" | "unsub" | "sublist" | "info" -// correlation?: string -// } - -// interface InfoResponse { -// action: "info" -// correlation?: string -// result: string -// } - -// interface GetResponse { -// action: "get" -// correlation?: string -// result: GetFullResultKey -// } - -// interface ListResponse { -// action: "list" -// correlation?: string -// result: GetFullResultKey[] -// } - -// interface SublistResponse { -// action: "sublist" -// correlation?: string -// result: string[] -// } - -// hulypulse API: outcoming messages variants - interface GetMessage { type: 'get' key: string - // correlation: string } interface PutMessage { @@ -167,36 +60,30 @@ interface PutMessage { expiresAt?: number ifMatch?: string ifNoneMatch?: string - // correlation: string } interface DeleteMessage { type: 'delete' key: string ifMatch?: string - // correlation: string } interface SubscribeMessages { type: 'sub' key: string - // correlation: string } interface UnsubscribeMessages { type: 'unsub' key: string - // correlation: string } interface SubscribesList { type: 'list' - // correlation: string } interface InfoMessage { type: 'info' - // correlation: string } type ProtocolMessage = @@ -314,12 +201,14 @@ export class HulypulseClient implements Disposable { } private startPing (): void { - clearInterval(this.pingInterval) + this.stopPing() this.pingInterval = setInterval(() => { if (this.ws !== null && this.ws.readyState === WebSocket.OPEN) { this.ws.send('ping') } - clearTimeout(this.pingTimeout) + if (this.pingTimeout !== undefined) { + clearTimeout(this.pingTimeout) + } this.pingTimeout = setTimeout(() => { if (this.ws !== null) { console.log('no response from server') @@ -331,11 +220,14 @@ export class HulypulseClient implements Disposable { } private stopPing (): void { - clearInterval(this.pingInterval) - this.pingInterval = undefined - - clearTimeout(this.pingTimeout) - this.pingTimeout = undefined + if (this.pingInterval !== undefined) { + clearInterval(this.pingInterval) + this.pingInterval = undefined + } + if (this.pingTimeout !== undefined) { + clearTimeout(this.pingTimeout) + this.pingTimeout = undefined + } } [Symbol.dispose] (): void { @@ -407,6 +299,24 @@ export class HulypulseClient implements Disposable { } } + // callback for every old item (expires_at > 1 sec for atomicity) + const prevlist = await this.send({ type: 'list', key }) + if (prevlist.error != null) { + this.reconnect() + } else if (Array.isArray(prevlist.result)) { + for (const item of prevlist.result) { + try { + const value = item.data !== undefined ? JSON.parse(item.data) : undefined + if (item.expires_at <= 1 || value === undefined) { + continue + } + callback(item.key, value) + } catch (err) { + console.error(`Error in initial callback for key "${key}":`, err) + } + } + } + return async () => { return await this.unsubscribe(key, callback) } @@ -496,25 +406,33 @@ export class HulypulseClient implements Disposable { const id = String(this.correlationId++) const message = { ...msg, correlation: id.toString() } satisfies M + // connect if needed + if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) { + this.reconnect() + await this.connect() + } + return await new Promise((resolve, reject) => { if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) { - reject(new Error('WebSocket is not open.')) + // reject(new Error('WebSocket is not open.')) + resolve({ error: 'WebSocket is not open.' }) return } const sendTimeout = setTimeout(() => { const pending = this.pending.get(id) if (pending !== undefined) { - pending.reject(new Error('Timeout waiting for response')) + // pending.reject(new Error('Timeout waiting for response')) + pending.resolve({ error: 'Timeout waiting for response' }) this.pending.delete(id) } }, this.SEND_TIMEOUT_MS) this.pending.set(id, { resolve, reject, send_timeout: sendTimeout }) this.ws.send(JSON.stringify(message)) + this.startPing() // reset ping timer on any send }) } } export function escapeString (str: string): string { - // Escape special characters to '*' | '?' | '[' | ']' | '\\' | '\0'..='\x1F' | '\x7F' | '"' | '\'' - return str.replace(/[\\'"]/g, '\\$&') + return str.replace(/[\*\?\[\]\\\x00-\x1F\x7F"']/g, '_') } From f6d2d3f2671fec4d69e6f0d9b03a59c064fcd3f3 Mon Sep 17 00:00:00 2001 From: Leonid Kaganov Date: Sun, 5 Oct 2025 18:43:50 +0300 Subject: [PATCH 2/6] Fix rushx format warnings --- packages/hulypulse-client/src/client.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/hulypulse-client/src/client.ts b/packages/hulypulse-client/src/client.ts index 3e62bb2b08c..6574e8dc886 100644 --- a/packages/hulypulse-client/src/client.ts +++ b/packages/hulypulse-client/src/client.ts @@ -312,7 +312,7 @@ export class HulypulseClient implements Disposable { } callback(item.key, value) } catch (err) { - console.error(`Error in initial callback for key "${key}":`, err) + console.error('Error in initial callback', err) } } } @@ -414,14 +414,12 @@ export class HulypulseClient implements Disposable { return await new Promise((resolve, reject) => { if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) { - // reject(new Error('WebSocket is not open.')) resolve({ error: 'WebSocket is not open.' }) return } const sendTimeout = setTimeout(() => { const pending = this.pending.get(id) if (pending !== undefined) { - // pending.reject(new Error('Timeout waiting for response')) pending.resolve({ error: 'Timeout waiting for response' }) this.pending.delete(id) } @@ -434,5 +432,6 @@ export class HulypulseClient implements Disposable { } export function escapeString (str: string): string { - return str.replace(/[\*\?\[\]\\\x00-\x1F\x7F"']/g, '_') + // eslint-disable-next-line no-control-regex, no-useless-escape + return str.replace(/[*?\[\]\\\x00-\x1F\x7F"']/g, '_') } From 02dbe7661706813ad7ca1d67dfbc57e1afab2619 Mon Sep 17 00:00:00 2001 From: Leonid Kaganov Date: Sun, 5 Oct 2025 23:04:09 +0300 Subject: [PATCH 3/6] Fix ping back (waiting for server upgrade before) --- packages/hulypulse-client/src/client.ts | 40 ++++++++++++------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/hulypulse-client/src/client.ts b/packages/hulypulse-client/src/client.ts index 6574e8dc886..6a697c3ee92 100644 --- a/packages/hulypulse-client/src/client.ts +++ b/packages/hulypulse-client/src/client.ts @@ -147,7 +147,6 @@ export class HulypulseClient implements Disposable { this.ws?.send('pong') return } - if (event.data === 'pong') { return } @@ -203,17 +202,17 @@ export class HulypulseClient implements Disposable { private startPing (): void { this.stopPing() this.pingInterval = setInterval(() => { - if (this.ws !== null && this.ws.readyState === WebSocket.OPEN) { + if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send('ping') } if (this.pingTimeout !== undefined) { clearTimeout(this.pingTimeout) } this.pingTimeout = setTimeout(() => { - if (this.ws !== null) { + if (this.ws?.readyState !== WebSocket.OPEN) { console.log('no response from server') clearInterval(this.pingInterval) - this.ws.close(WS_CLOSE_NORMAL) + this.ws?.close(WS_CLOSE_NORMAL) } }, this.PING_TIMEOUT_MS) }, this.PING_INTERVAL_MS) @@ -237,8 +236,8 @@ export class HulypulseClient implements Disposable { private reconnect (): void { if (this.reconnectTimeout !== undefined) { clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = undefined } - this.reconnectTimeout = undefined this.stopPing() if (!this.closed) { @@ -267,7 +266,7 @@ export class HulypulseClient implements Disposable { public async info (): Promise { const reply = await this.send({ type: 'info' }) - if (reply.error != null) { + if (reply.error !== undefined) { throw new Error(reply.error) } return reply.result ?? '' @@ -275,7 +274,7 @@ export class HulypulseClient implements Disposable { public async list (): Promise { const reply = await this.send({ type: 'list' }) - if (reply.error != null) { + if (reply.error !== undefined) { throw new Error(reply.error) } return reply.result ?? '' @@ -283,7 +282,7 @@ export class HulypulseClient implements Disposable { public async subscribe (key: string, callback: Callback): Promise { let list = this.subscribes.get(key) - if (list == null) { + if (list === undefined) { list = [] this.subscribes.set(key, list) } @@ -293,7 +292,7 @@ export class HulypulseClient implements Disposable { list.push(callback) if (list.length === 1) { const reply = await this.send({ type: 'sub', key }) - if (reply.error != null) { + if (reply.error !== undefined) { this.reconnect() } } @@ -301,7 +300,7 @@ export class HulypulseClient implements Disposable { // callback for every old item (expires_at > 1 sec for atomicity) const prevlist = await this.send({ type: 'list', key }) - if (prevlist.error != null) { + if (prevlist.error !== undefined) { this.reconnect() } else if (Array.isArray(prevlist.result)) { for (const item of prevlist.result) { @@ -324,14 +323,14 @@ export class HulypulseClient implements Disposable { public async unsubscribe (key: string, callback: Callback): Promise { const list = this.subscribes.get(key) - if (list?.includes(callback) == null) { + if (list === undefined || !list.includes(callback)) { return false } const newList = list.filter((cb) => cb !== callback) if (newList.length === 0) { this.subscribes.delete(key) const reply = await this.send({ type: 'unsub', key }) - if (reply.error != null) { + if (reply?.error !== undefined) { this.reconnect() return true } @@ -359,14 +358,14 @@ export class HulypulseClient implements Disposable { ...(typeof third === 'number' ? { TTL: third } : third) } const reply = await this.send(message) - if (reply.error != null) { + if (reply.error !== undefined) { throw new Error(reply.error) } } public async get(key: string): Promise { const reply = await this.send({ type: 'get', key }) - if (reply.error != null) { + if (reply.error !== undefined) { if (reply.error === 'not found') { return undefined } @@ -377,7 +376,7 @@ export class HulypulseClient implements Disposable { public async get_full(key: string): Promise | undefined> { const reply = await this.send({ type: 'get', key }) - if (reply.error != null) { + if (reply.error !== undefined) { if (reply.error === 'not found') { return undefined } @@ -393,7 +392,7 @@ export class HulypulseClient implements Disposable { public async delete (key: string, options?: Pick): Promise { const message: Omit = { type: 'delete', key, ...options } const reply = await this.send(message) - if (reply.error != null) { + if (reply.error !== undefined) { if (reply.error === 'not found') { return false } @@ -407,13 +406,13 @@ export class HulypulseClient implements Disposable { const message = { ...msg, correlation: id.toString() } satisfies M // connect if needed - if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) { + if (this.ws?.readyState !== WebSocket.OPEN) { this.reconnect() - await this.connect() + return // await this.connect() } return await new Promise((resolve, reject) => { - if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) { + if (this.ws?.readyState !== WebSocket.OPEN) { resolve({ error: 'WebSocket is not open.' }) return } @@ -426,7 +425,8 @@ export class HulypulseClient implements Disposable { }, this.SEND_TIMEOUT_MS) this.pending.set(id, { resolve, reject, send_timeout: sendTimeout }) this.ws.send(JSON.stringify(message)) - this.startPing() // reset ping timer on any send + // TODO: RENEW HULYPULSE SERVER BEFORE + // this.startPing() // reset ping timer on any send }) } } From cce7ce97642e41834cbbc4cef4dfe18da4b7f50a Mon Sep 17 00:00:00 2001 From: Leonid Kaganov Date: Mon, 6 Oct 2025 18:13:36 +0300 Subject: [PATCH 4/6] Fix subscribe after reconnect --- packages/hulypulse-client/src/client.ts | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/packages/hulypulse-client/src/client.ts b/packages/hulypulse-client/src/client.ts index 6a697c3ee92..567b1790da5 100644 --- a/packages/hulypulse-client/src/client.ts +++ b/packages/hulypulse-client/src/client.ts @@ -192,7 +192,7 @@ export class HulypulseClient implements Disposable { } private resubscribe (): void { - for (const key in this.subscribes) { + for (const [key, callbacks] of this.subscribes) { this.send({ type: 'sub', key }).catch((error) => { throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`) }) @@ -210,7 +210,7 @@ export class HulypulseClient implements Disposable { } this.pingTimeout = setTimeout(() => { if (this.ws?.readyState !== WebSocket.OPEN) { - console.log('no response from server') + console.warn('WS-server not responding to ping, closing connection') clearInterval(this.pingInterval) this.ws?.close(WS_CLOSE_NORMAL) } @@ -293,7 +293,7 @@ export class HulypulseClient implements Disposable { if (list.length === 1) { const reply = await this.send({ type: 'sub', key }) if (reply.error !== undefined) { - this.reconnect() + throw new Error(reply.error) } } } @@ -301,7 +301,7 @@ export class HulypulseClient implements Disposable { // callback for every old item (expires_at > 1 sec for atomicity) const prevlist = await this.send({ type: 'list', key }) if (prevlist.error !== undefined) { - this.reconnect() + throw new Error(prevlist.error) } else if (Array.isArray(prevlist.result)) { for (const item of prevlist.result) { try { @@ -331,8 +331,7 @@ export class HulypulseClient implements Disposable { this.subscribes.delete(key) const reply = await this.send({ type: 'unsub', key }) if (reply?.error !== undefined) { - this.reconnect() - return true + throw new Error(reply.error) } } else { this.subscribes.set(key, newList) @@ -405,12 +404,6 @@ export class HulypulseClient implements Disposable { const id = String(this.correlationId++) const message = { ...msg, correlation: id.toString() } satisfies M - // connect if needed - if (this.ws?.readyState !== WebSocket.OPEN) { - this.reconnect() - return // await this.connect() - } - return await new Promise((resolve, reject) => { if (this.ws?.readyState !== WebSocket.OPEN) { resolve({ error: 'WebSocket is not open.' }) @@ -425,8 +418,7 @@ export class HulypulseClient implements Disposable { }, this.SEND_TIMEOUT_MS) this.pending.set(id, { resolve, reject, send_timeout: sendTimeout }) this.ws.send(JSON.stringify(message)) - // TODO: RENEW HULYPULSE SERVER BEFORE - // this.startPing() // reset ping timer on any send + this.startPing() // reset ping timer on any send }) } } From 462dcc19f2e03bb03505700b968cc6e424113bec Mon Sep 17 00:00:00 2001 From: Leonid Kaganov Date: Mon, 6 Oct 2025 18:14:46 +0300 Subject: [PATCH 5/6] Version hulypulse --- pods/external/services.d/hulypulse.service | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pods/external/services.d/hulypulse.service b/pods/external/services.d/hulypulse.service index 694dc8e160d..95dbeb9205c 100644 --- a/pods/external/services.d/hulypulse.service +++ b/pods/external/services.d/hulypulse.service @@ -1 +1 @@ -hulypulse hardcoreeng/service_hulypulse:0.1.14 \ No newline at end of file +hulypulse hardcoreeng/service_hulypulse:0.1.29 \ No newline at end of file From 45ce0064779204ad6bfa1c916993ab781a8de72c Mon Sep 17 00:00:00 2001 From: Leonid Kaganov Date: Mon, 6 Oct 2025 19:19:33 +0300 Subject: [PATCH 6/6] Fix rush format warnings Signed-off-by: Leonid Kaganov --- packages/hulypulse-client/src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/hulypulse-client/src/client.ts b/packages/hulypulse-client/src/client.ts index 567b1790da5..a57a2b6b736 100644 --- a/packages/hulypulse-client/src/client.ts +++ b/packages/hulypulse-client/src/client.ts @@ -192,7 +192,7 @@ export class HulypulseClient implements Disposable { } private resubscribe (): void { - for (const [key, callbacks] of this.subscribes) { + for (const [key] of this.subscribes) { this.send({ type: 'sub', key }).catch((error) => { throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`) })