Skip to content

Commit ae8b778

Browse files
committed
perf(unicast): reduce memory usage
1 parent 630580c commit ae8b778

File tree

2 files changed

+48
-28
lines changed

2 files changed

+48
-28
lines changed

src/utils/multicast-listener.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,13 @@ export default class Listener {
227227
} else if (message.action === C.ACTIONS.LISTEN_REJECT) {
228228
this._subscriptions.get(name)?.reject()
229229
} else if (message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED) {
230-
this._subscriptions.get(name)?.dispose()
231-
this._subscriptions.delete(name)
230+
const provider = this._subscriptions.get(name)
231+
if (provider) {
232+
provider.dispose()
233+
this._subscriptions.delete(name)
234+
} else {
235+
this._error(name, 'invalid remove: listener missing')
236+
}
232237
} else {
233238
return false
234239
}

src/utils/unicast-listener.js

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1-
import * as rxjs from 'rxjs'
21
import * as C from '../constants/constants.js'
32
import { h64ToString } from '../utils/utils.js'
43

5-
const PIPE = rxjs.pipe(
6-
rxjs.map((value) => {
4+
class Observer {
5+
#name
6+
#listener
7+
#version = ''
8+
9+
constructor(name, listener) {
10+
this.#name = name
11+
this.#listener = listener
12+
}
13+
14+
next(value) {
715
let data
816
if (value && typeof value === 'string') {
917
if (value.charAt(0) !== '{' && value.charAt(0) !== '[') {
@@ -16,10 +24,34 @@ const PIPE = rxjs.pipe(
1624
throw new Error(`invalid value: ${value}`)
1725
}
1826

19-
return data
20-
}),
21-
rxjs.distinctUntilChanged(),
22-
)
27+
const version = data ? `INF-${h64ToString(data)}` : ''
28+
if (this.#version === version) {
29+
return
30+
}
31+
32+
if (version) {
33+
this.#listener._connection.sendMsg(this.#listener._topic, C.ACTIONS.UPDATE, [
34+
this.#name,
35+
version,
36+
data,
37+
])
38+
} else {
39+
this.#listener._connection.sendMsg(this.#listener._topic, C.ACTIONS.LISTEN_REJECT, [
40+
this.#listener._pattern,
41+
this.#name,
42+
])
43+
}
44+
45+
this.#version = version
46+
}
47+
error(err) {
48+
this.#listener._error(this.#name, err)
49+
this.#listener._connection.sendMsg(this.#listener._topic, C.ACTIONS.LISTEN_REJECT, [
50+
this.#listener._pattern,
51+
this.#name,
52+
])
53+
}
54+
}
2355

2456
export default class Listener {
2557
constructor(topic, pattern, callback, handler, opts) {
@@ -65,28 +97,11 @@ export default class Listener {
6597
try {
6698
value$ = this._callback(name)
6799
} catch (err) {
68-
value$ = rxjs.throwError(() => err)
100+
this._error(name, err)
69101
}
70102

71103
if (value$) {
72-
const subscription = value$.pipe(PIPE).subscribe({
73-
next: (data) => {
74-
if (data == null) {
75-
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, name])
76-
this._subscriptions.delete(name)
77-
subscription.unsubscribe()
78-
} else {
79-
const version = `INF-${h64ToString(data)}`
80-
this._connection.sendMsg(this._topic, C.ACTIONS.UPDATE, [name, version, data])
81-
}
82-
},
83-
error: (err) => {
84-
this._error(name, err)
85-
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, name])
86-
this._subscriptions.delete(name)
87-
},
88-
})
89-
this._subscriptions.set(name, subscription)
104+
this._subscriptions.set(name, value$.subscribe(new Observer(name, this)))
90105
} else {
91106
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, name])
92107
}

0 commit comments

Comments
 (0)