Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 54 additions & 145 deletions packages/hulypulse-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,78 +19,12 @@ export type UnsubscribeCallback = () => Promise<boolean>

export type Callback<T> = (key: string, data: T | undefined) => void

// hulypulse API: incoming messages variants

// interface Ping_Message {
// data: 'ping' | 'pong'
// }

// interface Answer_Message {
// data: { answer: string }
// }

// interface Error_Message {
// data: { error: string; reason?: string }
// }

// interface Subscribe_Message {
// data: { key: string; result: { data: JSONValue; etag: string; expiresAt: number } }
// }

// put:
// {action: "put", correlation, result:"OK" }
// {action: "put", correlation, error: "...error" }

// get:
// {action: "get", correlation, "result":{
// "data":"hello 1",
// "etag":"df0649bc4f1be901c85b6183091c1d83",
// "expires_at":3,
// "key":"00000000-0000-0000-0000-000000000001/foo/bar1"
// }}
// {action: "get", correlation, error: "...error" }

// delete:
// {action: "delete", correlation, result:"OK" }
// {action: "delete", correlation, error: "...error" }

// list:
// {action: "list", correlation, result:[
// {"data":"hello 1","etag":"df0649bc4f1be901c85b6183091c1d83","expires_at":41,"key":"00000000-0000-0000-0000-000000000001/foo/bar1"},
// {"data":"hello 2","etag":"bb21ec8394b75795622f61613a777a8b","expires_at":85,"key":"00000000-0000-0000-0000-000000000001/foo/bar2"}
// ] }
// {action: "list", correlation, error: "...error" }

// sub:
// {action: "sub", correlation, result:"OK" }
// {action: "sub", correlation, error: "...error" }

// unsub:
// {action: "unsub", correlation, result:"OK" }
// {action: "unsub", correlation, error: "...error" }

// sublist:
// {action: "sublist", correlation, result:[keys] }
// {action: "sublist", correlation, error: "...error" }

interface GetFullResult<T> {
data: T
etag: string
expiresAt: number
}

// interface GetFullResultKey<T> {
// data: T
// etag: string
// expiresAt: number
// key: string
// }

// hulypulse API: subscription messages variants

// {"message":"Expired","key":"00000000-0000-0000-0000-000000000001/foo/bar1"}
// {"message":"Set","key":"00000000-0000-0000-0000-000000000001/foo/bar1","value":"hello 1"}

type Command = 'sub' | 'unsub' | 'put' | 'get' | 'delete' | 'list' | 'sublist' | 'info'

interface SubscribedMessage {
Expand All @@ -113,50 +47,9 @@ interface ErrorCommandMessage {

type PulseIncomingMessage = SubscribedMessage | CommandMessage | ErrorCommandMessage

// hulypulse API: answer messages variants

// interface OkResponse {
// result: "OK"
// action: "put" | "delete" | "sub" | "unsub"
// correlation?: string
// }

// interface ErrorResponse {
// error: string
// action: "put" | "get" | "delete" | "list" | "sub" | "unsub" | "sublist" | "info"
// correlation?: string
// }

// interface InfoResponse {
// action: "info"
// correlation?: string
// result: string
// }

// interface GetResponse {
// action: "get"
// correlation?: string
// result: GetFullResultKey<JSONValue>
// }

// interface ListResponse {
// action: "list"
// correlation?: string
// result: GetFullResultKey<JSONValue>[]
// }

// interface SublistResponse {
// action: "sublist"
// correlation?: string
// result: string[]
// }

// hulypulse API: outcoming messages variants

interface GetMessage {
type: 'get'
key: string
// correlation: string
}

interface PutMessage {
Expand All @@ -167,36 +60,30 @@ interface PutMessage {
expiresAt?: number
ifMatch?: string
ifNoneMatch?: string
// correlation: string
}

interface DeleteMessage {
type: 'delete'
key: string
ifMatch?: string
// correlation: string
}

interface SubscribeMessages {
type: 'sub'
key: string
// correlation: string
}

interface UnsubscribeMessages {
type: 'unsub'
key: string
// correlation: string
}

interface SubscribesList {
type: 'list'
// correlation: string
}

interface InfoMessage {
type: 'info'
// correlation: string
}

type ProtocolMessage =
Expand Down Expand Up @@ -260,7 +147,6 @@ export class HulypulseClient implements Disposable {
this.ws?.send('pong')
return
}

if (event.data === 'pong') {
return
}
Expand Down Expand Up @@ -306,36 +192,41 @@ export class HulypulseClient implements Disposable {
}

private resubscribe (): void {
for (const key in this.subscribes) {
for (const [key] of this.subscribes) {
this.send({ type: 'sub', key }).catch((error) => {
throw new Error(`Resubscription failed for key=${key}: ${error.message ?? error}`)
})
}
}

private startPing (): void {
clearInterval(this.pingInterval)
this.stopPing()
this.pingInterval = setInterval(() => {
if (this.ws !== null && this.ws.readyState === WebSocket.OPEN) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send('ping')
}
clearTimeout(this.pingTimeout)
if (this.pingTimeout !== undefined) {
clearTimeout(this.pingTimeout)
}
this.pingTimeout = setTimeout(() => {
if (this.ws !== null) {
console.log('no response from server')
if (this.ws?.readyState !== WebSocket.OPEN) {
console.warn('WS-server not responding to ping, closing connection')
clearInterval(this.pingInterval)
this.ws.close(WS_CLOSE_NORMAL)
this.ws?.close(WS_CLOSE_NORMAL)
}
}, this.PING_TIMEOUT_MS)
}, this.PING_INTERVAL_MS)
}

private stopPing (): void {
clearInterval(this.pingInterval)
this.pingInterval = undefined

clearTimeout(this.pingTimeout)
this.pingTimeout = undefined
if (this.pingInterval !== undefined) {
clearInterval(this.pingInterval)
this.pingInterval = undefined
}
if (this.pingTimeout !== undefined) {
clearTimeout(this.pingTimeout)
this.pingTimeout = undefined
}
}

[Symbol.dispose] (): void {
Expand All @@ -345,8 +236,8 @@ export class HulypulseClient implements Disposable {
private reconnect (): void {
if (this.reconnectTimeout !== undefined) {
clearTimeout(this.reconnectTimeout)
this.reconnectTimeout = undefined
}
this.reconnectTimeout = undefined
this.stopPing()

if (!this.closed) {
Expand Down Expand Up @@ -375,23 +266,23 @@ export class HulypulseClient implements Disposable {

public async info (): Promise<string> {
const reply = await this.send({ type: 'info' })
if (reply.error != null) {
if (reply.error !== undefined) {
throw new Error(reply.error)
}
return reply.result ?? ''
}

public async list (): Promise<string> {
const reply = await this.send({ type: 'list' })
if (reply.error != null) {
if (reply.error !== undefined) {
throw new Error(reply.error)
}
return reply.result ?? ''
}

public async subscribe (key: string, callback: Callback<any>): Promise<UnsubscribeCallback> {
let list = this.subscribes.get(key)
if (list == null) {
if (list === undefined) {
list = []
this.subscribes.set(key, list)
}
Expand All @@ -401,8 +292,26 @@ export class HulypulseClient implements Disposable {
list.push(callback)
if (list.length === 1) {
const reply = await this.send({ type: 'sub', key })
if (reply.error != null) {
this.reconnect()
if (reply.error !== undefined) {
throw new Error(reply.error)
}
}
}

// callback for every old item (expires_at > 1 sec for atomicity)
const prevlist = await this.send({ type: 'list', key })
if (prevlist.error !== undefined) {
throw new Error(prevlist.error)
} else if (Array.isArray(prevlist.result)) {
for (const item of prevlist.result) {
try {
const value = item.data !== undefined ? JSON.parse(item.data) : undefined
if (item.expires_at <= 1 || value === undefined) {
continue
}
callback(item.key, value)
} catch (err) {
console.error('Error in initial callback', err)
}
}
}
Expand All @@ -414,16 +323,15 @@ export class HulypulseClient implements Disposable {

public async unsubscribe (key: string, callback: Callback<any>): Promise<boolean> {
const list = this.subscribes.get(key)
if (list?.includes(callback) == null) {
if (list === undefined || !list.includes(callback)) {
return false
}
const newList = list.filter((cb) => cb !== callback)
if (newList.length === 0) {
this.subscribes.delete(key)
const reply = await this.send({ type: 'unsub', key })
if (reply.error != null) {
this.reconnect()
return true
if (reply?.error !== undefined) {
throw new Error(reply.error)
}
} else {
this.subscribes.set(key, newList)
Expand All @@ -449,14 +357,14 @@ export class HulypulseClient implements Disposable {
...(typeof third === 'number' ? { TTL: third } : third)
}
const reply = await this.send(message)
if (reply.error != null) {
if (reply.error !== undefined) {
throw new Error(reply.error)
}
}

public async get<T>(key: string): Promise<T | undefined> {
const reply = await this.send({ type: 'get', key })
if (reply.error != null) {
if (reply.error !== undefined) {
if (reply.error === 'not found') {
return undefined
}
Expand All @@ -467,7 +375,7 @@ export class HulypulseClient implements Disposable {

public async get_full<T>(key: string): Promise<GetFullResult<T> | undefined> {
const reply = await this.send({ type: 'get', key })
if (reply.error != null) {
if (reply.error !== undefined) {
if (reply.error === 'not found') {
return undefined
}
Expand All @@ -483,7 +391,7 @@ export class HulypulseClient implements Disposable {
public async delete (key: string, options?: Pick<DeleteMessage, 'ifMatch'>): Promise<boolean> {
const message: Omit<DeleteMessage, 'correlation'> = { type: 'delete', key, ...options }
const reply = await this.send(message)
if (reply.error != null) {
if (reply.error !== undefined) {
if (reply.error === 'not found') {
return false
}
Expand All @@ -497,24 +405,25 @@ export class HulypulseClient implements Disposable {
const message = { ...msg, correlation: id.toString() } satisfies M

return await new Promise((resolve, reject) => {
if (this.ws == null || this.ws.readyState !== WebSocket.OPEN) {
reject(new Error('WebSocket is not open.'))
if (this.ws?.readyState !== WebSocket.OPEN) {
resolve({ error: 'WebSocket is not open.' })
return
}
const sendTimeout = setTimeout(() => {
const pending = this.pending.get(id)
if (pending !== undefined) {
pending.reject(new Error('Timeout waiting for response'))
pending.resolve({ error: 'Timeout waiting for response' })
this.pending.delete(id)
}
}, this.SEND_TIMEOUT_MS)
this.pending.set(id, { resolve, reject, send_timeout: sendTimeout })
this.ws.send(JSON.stringify(message))
this.startPing() // reset ping timer on any send
})
}
}

export function escapeString (str: string): string {
// Escape special characters to '*' | '?' | '[' | ']' | '\\' | '\0'..='\x1F' | '\x7F' | '"' | '\''
return str.replace(/[\\'"]/g, '\\$&')
// eslint-disable-next-line no-control-regex, no-useless-escape
return str.replace(/[*?\[\]\\\x00-\x1F\x7F"']/g, '_')
}
2 changes: 1 addition & 1 deletion pods/external/services.d/hulypulse.service
Original file line number Diff line number Diff line change
@@ -1 +1 @@
hulypulse hardcoreeng/service_hulypulse:0.1.14
hulypulse hardcoreeng/service_hulypulse:0.1.29
Loading