Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions packages/hulypulse-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type ProtocolMessage =

export class HulypulseClient implements Disposable {
private ws: WebSocket | null = null
private closed = false
private closed_manually = false
private reconnectTimeout: any | undefined
private readonly RECONNECT_INTERVAL_MS = 1000

Expand Down Expand Up @@ -194,7 +194,8 @@ export class HulypulseClient implements Disposable {
private resubscribe (): void {
for (const [key] of this.subscribes) {
this.send({ type: 'sub', key }).catch((error) => {
throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`)
console.error(`Resubscription failed for key=${key}:`, error)
// throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`)
})
}
}
Expand Down Expand Up @@ -240,21 +241,20 @@ export class HulypulseClient implements Disposable {
}
this.stopPing()

if (!this.closed) {
if (!this.closed_manually) {
this.reconnectTimeout = setTimeout(() => {
void this.connect()
}, this.RECONNECT_INTERVAL_MS)
}
}

public close (): void {
this.closed = true
this.closed_manually = true
if (this.reconnectTimeout !== undefined) {
clearTimeout(this.reconnectTimeout)
}
this.reconnectTimeout = undefined
this.stopPing()

this.ws?.close()
}

Expand Down Expand Up @@ -291,26 +291,25 @@ export class HulypulseClient implements Disposable {
// Already subscribed?
list.push(callback)
if (list.length === 1) {
const reply = await this.send({ type: 'sub', key })
if (reply.error !== undefined) {
throw new Error(reply.error)
}
void (await this.send({ type: 'sub', key }))
}
}

// callback for every old item (expires_at > 1 sec for atomicity)
// callback for every old item (ttl > 1 sec for atomicity)
const prevlist = await this.send({ type: 'list', key })
if (prevlist.error !== undefined) {
throw new Error(prevlist.error)
} else if (Array.isArray(prevlist.result)) {
if (prevlist?.error === undefined && Array.isArray(prevlist?.result)) {
for (const item of prevlist.result) {
try {
const value = item.data !== undefined ? JSON.parse(item.data) : undefined
if (item.expires_at <= 1 || value === undefined) {
if (
(item.ttl ?? item.expires_at ?? 0) <= 1 || // TODO: remove expires_at after upgrade server
value === undefined
) {
continue
}
callback(item.key, value)
} catch (err) {
// throw new Error(err)
console.error('Error in initial callback', err)
}
}
Expand All @@ -329,10 +328,7 @@ export class HulypulseClient implements Disposable {
const newList = list.filter((cb) => cb !== callback)
if (newList.length === 0) {
this.subscribes.delete(key)
const reply = await this.send({ type: 'unsub', key })
if (reply?.error !== undefined) {
throw new Error(reply.error)
}
void (await this.send({ type: 'unsub', key }))
} else {
this.subscribes.set(key, newList)
}
Expand All @@ -349,6 +345,26 @@ export class HulypulseClient implements Disposable {
key: string,
data: any,
third?: number | Pick<PutMessage, 'ifMatch' | 'ifNoneMatch' | 'TTL' | 'expiresAt'>
): Promise<void> {
const message: Omit<PutMessage, 'correlation'> = {
type: 'put',
key,
data: JSON.stringify(data),
...(typeof third === 'number' ? { TTL: third } : third)
}
void (await this.send(message))
}

public async put_full (key: string, data: any, ttl: number): Promise<any>
public async put_full (
key: string,
data: any,
options?: Pick<PutMessage, 'ifMatch' | 'ifNoneMatch' | 'TTL' | 'expiresAt'>
): Promise<void>
public async put_full (
key: string,
data: any,
third?: number | Pick<PutMessage, 'ifMatch' | 'ifNoneMatch' | 'TTL' | 'expiresAt'>
): Promise<void> {
const message: Omit<PutMessage, 'correlation'> = {
type: 'put',
Expand Down Expand Up @@ -404,8 +420,21 @@ export class HulypulseClient implements Disposable {
const id = String(this.correlationId++)
const message = { ...msg, correlation: id.toString() } satisfies M

// Reconnect if need before
if (this.closed_manually) {
if (msg.type === 'unsub') {
// don't need to do anything
return
}
this.closed_manually = false
if (this.ws?.readyState === WebSocket.CONNECTING || this.ws?.readyState === WebSocket.OPEN) {
return
}
await this.connect()
}

return await new Promise((resolve, reject) => {
if (this.ws?.readyState !== WebSocket.OPEN) {
if (this.closed_manually || this.ws?.readyState !== WebSocket.OPEN) {
resolve({ error: 'WebSocket is not open.' })
return
}
Expand Down
Loading