diff --git a/packages/hulypulse-client/src/client.ts b/packages/hulypulse-client/src/client.ts index 78787f9088e..a57a2b6b736 100644 --- a/packages/hulypulse-client/src/client.ts +++ b/packages/hulypulse-client/src/client.ts @@ -19,78 +19,12 @@ export type UnsubscribeCallback = () => Promise export type Callback = (key: string, data: T | undefined) => void -// hulypulse API: incoming messages variants - -// interface Ping_Message { -// data: 'ping' | 'pong' -// } - -// interface Answer_Message { -// data: { answer: string } -// } - -// interface Error_Message { -// data: { error: string; reason?: string } -// } - -// interface Subscribe_Message { -// data: { key: string; result: { data: JSONValue; etag: string; expiresAt: number } } -// } - -// put: -// {action: "put", correlation, result:"OK" } -// {action: "put", correlation, error: "...error" } - -// get: -// {action: "get", correlation, "result":{ -// "data":"hello 1", -// "etag":"df0649bc4f1be901c85b6183091c1d83", -// "expires_at":3, -// "key":"00000000-0000-0000-0000-000000000001/foo/bar1" -// }} -// {action: "get", correlation, error: "...error" } - -// delete: -// {action: "delete", correlation, result:"OK" } -// {action: "delete", correlation, error: "...error" } - -// list: -// {action: "list", correlation, result:[ -// {"data":"hello 1","etag":"df0649bc4f1be901c85b6183091c1d83","expires_at":41,"key":"00000000-0000-0000-0000-000000000001/foo/bar1"}, -// {"data":"hello 2","etag":"bb21ec8394b75795622f61613a777a8b","expires_at":85,"key":"00000000-0000-0000-0000-000000000001/foo/bar2"} -// ] } -// {action: "list", correlation, error: "...error" } - -// sub: -// {action: "sub", correlation, result:"OK" } -// {action: "sub", correlation, error: "...error" } - -// unsub: -// {action: "unsub", correlation, result:"OK" } -// {action: "unsub", correlation, error: "...error" } - -// sublist: -// {action: "sublist", correlation, result:[keys] } -// {action: "sublist", correlation, error: "...error" } - interface GetFullResult { data: T etag: string expiresAt: number } -// interface GetFullResultKey { -// data: T -// etag: string -// expiresAt: number -// key: string -// } - -// hulypulse API: subscription messages variants - -// {"message":"Expired","key":"00000000-0000-0000-0000-000000000001/foo/bar1"} -// {"message":"Set","key":"00000000-0000-0000-0000-000000000001/foo/bar1","value":"hello 1"} - type Command = 'sub' | 'unsub' | 'put' | 'get' | 'delete' | 'list' | 'sublist' | 'info' interface SubscribedMessage { @@ -113,50 +47,9 @@ interface ErrorCommandMessage { type PulseIncomingMessage = SubscribedMessage | CommandMessage | ErrorCommandMessage -// hulypulse API: answer messages variants - -// interface OkResponse { -// result: "OK" -// action: "put" | "delete" | "sub" | "unsub" -// correlation?: string -// } - -// interface ErrorResponse { -// error: string -// action: "put" | "get" | "delete" | "list" | "sub" | "unsub" | "sublist" | "info" -// correlation?: string -// } - -// interface InfoResponse { -// action: "info" -// correlation?: string -// result: string -// } - -// interface GetResponse { -// action: "get" -// correlation?: string -// result: GetFullResultKey -// } - -// interface ListResponse { -// action: "list" -// correlation?: string -// result: GetFullResultKey[] -// } - -// interface SublistResponse { -// action: "sublist" -// correlation?: string -// result: string[] -// } - -// hulypulse API: outcoming messages variants - interface GetMessage { type: 'get' key: string - // correlation: string } interface PutMessage { @@ -167,36 +60,30 @@ interface PutMessage { expiresAt?: number ifMatch?: string ifNoneMatch?: string - // correlation: string } interface DeleteMessage { type: 'delete' key: string ifMatch?: string - // correlation: string } interface SubscribeMessages { type: 'sub' key: string - // correlation: string } interface UnsubscribeMessages { type: 'unsub' key: string - // correlation: string } interface SubscribesList { type: 'list' - // correlation: string } interface InfoMessage { type: 'info' - // correlation: string } type ProtocolMessage = @@ -260,7 +147,6 @@ export class HulypulseClient implements Disposable { this.ws?.send('pong') return } - if (event.data === 'pong') { return } @@ -306,7 +192,7 @@ export class HulypulseClient implements Disposable { } private resubscribe (): void { - for (const key in this.subscribes) { + for (const [key] of this.subscribes) { this.send({ type: 'sub', key }).catch((error) => { throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`) }) @@ -314,28 +200,33 @@ export class HulypulseClient implements Disposable { } private startPing (): void { - clearInterval(this.pingInterval) + this.stopPing() this.pingInterval = setInterval(() => { - if (this.ws !== null && this.ws.readyState === WebSocket.OPEN) { + if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send('ping') } - clearTimeout(this.pingTimeout) + if (this.pingTimeout !== undefined) { + clearTimeout(this.pingTimeout) + } this.pingTimeout = setTimeout(() => { - if (this.ws !== null) { - console.log('no response from server') + if (this.ws?.readyState !== WebSocket.OPEN) { + console.warn('WS-server not responding to ping, closing connection') clearInterval(this.pingInterval) - this.ws.close(WS_CLOSE_NORMAL) + this.ws?.close(WS_CLOSE_NORMAL) } }, this.PING_TIMEOUT_MS) }, this.PING_INTERVAL_MS) } private stopPing (): void { - clearInterval(this.pingInterval) - this.pingInterval = undefined - - clearTimeout(this.pingTimeout) - this.pingTimeout = undefined + if (this.pingInterval !== undefined) { + clearInterval(this.pingInterval) + this.pingInterval = undefined + } + if (this.pingTimeout !== undefined) { + clearTimeout(this.pingTimeout) + this.pingTimeout = undefined + } } [Symbol.dispose] (): void { @@ -345,8 +236,8 @@ export class HulypulseClient implements Disposable { private reconnect (): void { if (this.reconnectTimeout !== undefined) { clearTimeout(this.reconnectTimeout) + this.reconnectTimeout = undefined } - this.reconnectTimeout = undefined this.stopPing() if (!this.closed) { @@ -375,7 +266,7 @@ export class HulypulseClient implements Disposable { public async info (): Promise { const reply = await this.send({ type: 'info' }) - if (reply.error != null) { + if (reply.error !== undefined) { throw new Error(reply.error) } return reply.result ?? '' @@ -383,7 +274,7 @@ export class HulypulseClient implements Disposable { public async list (): Promise { const reply = await this.send({ type: 'list' }) - if (reply.error != null) { + if (reply.error !== undefined) { throw new Error(reply.error) } return reply.result ?? '' @@ -391,7 +282,7 @@ export class HulypulseClient implements Disposable { public async subscribe (key: string, callback: Callback): Promise { let list = this.subscribes.get(key) - if (list == null) { + if (list === undefined) { list = [] this.subscribes.set(key, list) } @@ -401,8 +292,26 @@ export class HulypulseClient implements Disposable { list.push(callback) if (list.length === 1) { const reply = await this.send({ type: 'sub', key }) - if (reply.error != null) { - this.reconnect() + if (reply.error !== undefined) { + throw new Error(reply.error) + } + } + } + + // callback for every old item (expires_at > 1 sec for atomicity) + const prevlist = await this.send({ type: 'list', key }) + if (prevlist.error !== undefined) { + throw new Error(prevlist.error) + } else if (Array.isArray(prevlist.result)) { + for (const item of prevlist.result) { + try { + const value = item.data !== undefined ? JSON.parse(item.data) : undefined + if (item.expires_at <= 1 || value === undefined) { + continue + } + callback(item.key, value) + } catch (err) { + console.error('Error in initial callback', err) } } } @@ -414,16 +323,15 @@ export class HulypulseClient implements Disposable { public async unsubscribe (key: string, callback: Callback): Promise { const list = this.subscribes.get(key) - if (list?.includes(callback) == null) { + if (list === undefined || !list.includes(callback)) { return false } const newList = list.filter((cb) => cb !== callback) if (newList.length === 0) { this.subscribes.delete(key) const reply = await this.send({ type: 'unsub', key }) - if (reply.error != null) { - this.reconnect() - return true + if (reply?.error !== undefined) { + throw new Error(reply.error) } } else { this.subscribes.set(key, newList) @@ -449,14 +357,14 @@ export class HulypulseClient implements Disposable { ...(typeof third === 'number' ? { TTL: third } : third) } const reply = await this.send(message) - if (reply.error != null) { + if (reply.error !== undefined) { throw new Error(reply.error) } } public async get(key: string): Promise { const reply = await this.send({ type: 'get', key }) - if (reply.error != null) { + if (reply.error !== undefined) { if (reply.error === 'not found') { return undefined } @@ -467,7 +375,7 @@ export class HulypulseClient implements Disposable { public async get_full(key: string): Promise | undefined> { const reply = await this.send({ type: 'get', key }) - if (reply.error != null) { + if (reply.error !== undefined) { if (reply.error === 'not found') { return undefined } @@ -483,7 +391,7 @@ export class HulypulseClient implements Disposable { public async delete (key: string, options?: Pick): Promise { const message: Omit = { type: 'delete', key, ...options } const reply = await this.send(message) - if (reply.error != null) { + if (reply.error !== undefined) { if (reply.error === 'not found') { return false } @@ -497,24 +405,25 @@ export class HulypulseClient implements Disposable { const message = { ...msg, correlation: id.toString() } satisfies M return await new Promise((resolve, reject) => { - if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) { - reject(new Error('WebSocket is not open.')) + if (this.ws?.readyState !== WebSocket.OPEN) { + resolve({ error: 'WebSocket is not open.' }) return } const sendTimeout = setTimeout(() => { const pending = this.pending.get(id) if (pending !== undefined) { - pending.reject(new Error('Timeout waiting for response')) + pending.resolve({ error: 'Timeout waiting for response' }) this.pending.delete(id) } }, this.SEND_TIMEOUT_MS) this.pending.set(id, { resolve, reject, send_timeout: sendTimeout }) this.ws.send(JSON.stringify(message)) + this.startPing() // reset ping timer on any send }) } } export function escapeString (str: string): string { - // Escape special characters to '*' | '?' | '[' | ']' | '\\' | '\0'..='\x1F' | '\x7F' | '"' | '\'' - return str.replace(/[\\'"]/g, '\\$&') + // eslint-disable-next-line no-control-regex, no-useless-escape + return str.replace(/[*?\[\]\\\x00-\x1F\x7F"']/g, '_') } diff --git a/pods/external/services.d/hulypulse.service b/pods/external/services.d/hulypulse.service index 694dc8e160d..95dbeb9205c 100644 --- a/pods/external/services.d/hulypulse.service +++ b/pods/external/services.d/hulypulse.service @@ -1 +1 @@ -hulypulse hardcoreeng/service_hulypulse:0.1.14 \ No newline at end of file +hulypulse hardcoreeng/service_hulypulse:0.1.29 \ No newline at end of file