Skip to content

Commit 3b1981d

Browse files
committed
feat: hash key
1 parent a51d27e commit 3b1981d

File tree

5 files changed

+83
-26
lines changed

5 files changed

+83
-26
lines changed

src/message/connection.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ Connection.prototype._sendAuthParams = function () {
170170
this._setState(C.CONNECTION_STATE.AUTHENTICATING)
171171
const authMessage = messageBuilder.getMsg(C.TOPIC.AUTH, C.ACTIONS.REQUEST, [
172172
this._authParams,
173-
'26.0.0',
173+
'27.0.0',
174174
utils.isNode
175175
? `Node/${process.version}`
176176
: globalThis.navigator && globalThis.navigator.userAgent,

src/record/record.js

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,16 @@ export default class Record {
1111
static STATE = C.RECORD_STATE
1212

1313
constructor(name, handler) {
14+
if (!name?.length || typeof name !== 'string') {
15+
throw new Error('invalid argument: name')
16+
}
17+
1418
const connection = handler._connection
1519

1620
this._handler = handler
1721

1822
this._name = name
23+
this._key = utils.hashNameBigInt(name)
1924
this._version = ''
2025
this._data = jsonPath.EMPTY
2126
this._state = C.RECORD_STATE.VOID
@@ -24,14 +29,22 @@ export default class Record {
2429
this._emitting = false
2530
/** @type Map? */ this._updating = null
2631
/** @type Array? */ this._patching = null
27-
this._subscribed = connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name])
32+
this._subscribed = connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [
33+
this._name,
34+
this._key,
35+
])
2836
}
2937

3038
/** @type {string} */
3139
get name() {
3240
return this._name
3341
}
3442

43+
/** @type {bigint} */
44+
get key() {
45+
return this._key
46+
}
47+
3548
/** @type {string} */
3649
get version() {
3750
return this._version
@@ -62,7 +75,8 @@ export default class Record {
6275
if (this._refs === 1) {
6376
this._handler._onPruning(this, false)
6477
this._subscribed =
65-
this._subscribed || connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name])
78+
this._subscribed ||
79+
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name, this._key])
6680
}
6781
return this
6882
}
@@ -324,7 +338,8 @@ export default class Record {
324338

325339
if (connected) {
326340
this._subscribed =
327-
this._refs > 0 && connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name])
341+
this._refs > 0 &&
342+
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._key, this._name])
328343

329344
if (this._updating) {
330345
for (const update of this._updating.values()) {
@@ -349,7 +364,7 @@ export default class Record {
349364
invariant(!this._updating, 'must not have updates')
350365

351366
if (this._subscribed) {
352-
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.UNSUBSCRIBE, [this._name])
367+
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.UNSUBSCRIBE, [this._key])
353368
this._subscribed = false
354369
}
355370

@@ -371,7 +386,7 @@ export default class Record {
371386
const prevVersion = this._version
372387
const nextVersion = this._makeVersion(parseInt(prevVersion) + 1)
373388

374-
const update = [this._name, nextVersion, jsonPath.stringify(nextData), prevVersion]
389+
const update = [this._key, nextVersion, jsonPath.stringify(nextData), prevVersion]
375390

376391
if (!this._updating) {
377392
this._onUpdating(true)

src/utils/multicast-listener.js

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as C from '../constants/constants.js'
22
import * as rxjs from 'rxjs'
3+
import * as utils from '../utils/utils.js'
34

45
class Listener {
56
constructor(topic, pattern, callback, handler, { recursive = false, stringify = null } = {}) {
@@ -47,15 +48,23 @@ class Listener {
4748

4849
const name = message.data[1]
4950

51+
if (!name?.length) {
52+
this._error(name, 'invalid message')
53+
return
54+
}
55+
56+
const key = utils.hashNameBigInt(name)
57+
5058
if (message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_FOUND) {
51-
if (this._subscriptions.has(name)) {
59+
if (this._subscriptions.has(key)) {
5260
this._error(name, 'invalid add: listener exists')
5361
return
5462
}
5563

5664
// TODO (refactor): Move to class
5765
const provider = {
5866
name,
67+
key,
5968
value$: null,
6069
sending: false,
6170
accepted: false,
@@ -68,7 +77,7 @@ class Listener {
6877
if (this.connected && provider.accepted) {
6978
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [
7079
this._pattern,
71-
provider.name,
80+
provider.key,
7281
])
7382
}
7483

@@ -101,7 +110,7 @@ class Listener {
101110
this._connection.sendMsg(
102111
this._topic,
103112
accepted ? C.ACTIONS.LISTEN_ACCEPT : C.ACTIONS.LISTEN_REJECT,
104-
[this._pattern, provider.name],
113+
[this._pattern, provider.key],
105114
)
106115

107116
provider.version = null
@@ -151,13 +160,13 @@ class Listener {
151160
}
152161

153162
const body = typeof value !== 'string' ? this._stringify(value) : value
154-
const hash = this._connection.hasher.h64ToString(body)
163+
const hash = utils.h64ToString(body)
155164
const version = `INF-${hash}`
156165

157166
if (provider.version !== version) {
158167
provider.version = version
159168
this._connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.UPDATE, [
160-
provider.name,
169+
provider.key,
161170
version,
162171
body,
163172
])
@@ -181,9 +190,9 @@ class Listener {
181190

182191
provider.start()
183192

184-
this._subscriptions.set(provider.name, provider)
193+
this._subscriptions.set(provider.key, provider)
185194
} else if (message.action === C.ACTIONS.LISTEN_ACCEPT) {
186-
const provider = this._subscriptions.get(name)
195+
const provider = this._subscriptions.get(key)
187196
if (!provider?.value$) {
188197
return
189198
}
@@ -195,13 +204,13 @@ class Listener {
195204
provider.valueSubscription = provider.value$.subscribe(provider.observer)
196205
}
197206
} else if (message.action === C.ACTIONS.SUBSCRIPTION_FOR_PATTERN_REMOVED) {
198-
const provider = this._subscriptions.get(name)
207+
const provider = this._subscriptions.get(key)
199208

200209
if (!provider) {
201210
this._error(name, 'invalid remove: listener missing')
202211
} else {
203212
provider.stop()
204-
this._subscriptions.delete(provider.name)
213+
this._subscriptions.delete(provider.key)
205214
}
206215
} else {
207216
return false

src/utils/unicast-listener.js

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as C from '../constants/constants.js'
22
import * as rxjs from 'rxjs'
3+
import * as utils from '../utils/utils.js'
34

45
const PIPE = rxjs.pipe(
56
rxjs.map((value) => {
@@ -54,8 +55,15 @@ class Listener {
5455
_$onMessage(message) {
5556
const name = message.data[1]
5657

58+
if (!name?.length) {
59+
this._error(name, 'invalid message')
60+
return
61+
}
62+
63+
const key = utils.hashNameBigInt(name)
64+
5765
if (message.action === C.ACTIONS.LISTEN_ACCEPT) {
58-
if (this._subscriptions.has(name)) {
66+
if (this._subscriptions.has(key)) {
5967
this._error(name, 'invalid accept: listener exists')
6068
return
6169
}
@@ -71,29 +79,29 @@ class Listener {
7179
const subscription = value$.pipe(PIPE).subscribe({
7280
next: (data) => {
7381
if (data == null) {
74-
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, name])
75-
this._subscriptions.delete(name)
82+
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, key])
83+
this._subscriptions.delete(key)
7684
subscription.unsubscribe()
7785
} else {
78-
const version = `INF-${this._connection.hasher.h64ToString(data)}`
79-
this._connection.sendMsg(this._topic, C.ACTIONS.UPDATE, [name, version, data])
86+
const version = `INF-${utils.h64ToString(data)}`
87+
this._connection.sendMsg(this._topic, C.ACTIONS.UPDATE, [key, version, data])
8088
}
8189
},
8290
error: (err) => {
8391
this._error(name, err)
84-
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, name])
85-
this._subscriptions.delete(name)
92+
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, key])
93+
this._subscriptions.delete(key)
8694
},
8795
})
88-
this._subscriptions.set(name, subscription)
96+
this._subscriptions.set(key, subscription)
8997
} else {
90-
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, name])
98+
this._connection.sendMsg(this._topic, C.ACTIONS.LISTEN_REJECT, [this._pattern, key])
9199
}
92100
} else if (message.action === C.ACTIONS.LISTEN_REJECT) {
93-
const subscription = this._subscriptions.get(name)
101+
const subscription = this._subscriptions.get(key)
94102

95103
if (subscription) {
96-
this._subscriptions.delete(name)
104+
this._subscriptions.delete(key)
97105
subscription.unsubscribe()
98106
} else {
99107
this._error(name, 'invalid remove: listener missing')

src/utils/utils.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import xxhash from 'xxhash-wasm'
2+
3+
const HASHER = await xxhash()
4+
15
const NODE_ENV = typeof process !== 'undefined' && process.env && process.env.NODE_ENV
26
export const isNode = typeof process !== 'undefined' && process.toString() === '[object process]'
37
export const isProduction = NODE_ENV === 'production'
@@ -174,3 +178,24 @@ export function removeAbortListener(signal, handler) {
174178
}
175179
}
176180
}
181+
182+
export function h64(name) {
183+
return HASHER.h64(name)
184+
}
185+
186+
export function h64ToString(name) {
187+
return HASHER.h64ToString(name)
188+
}
189+
190+
const encoder = new globalThis.TextEncoder()
191+
const buffer = new Uint8Array(8 * 3)
192+
const view = new DataView(buffer.buffer)
193+
194+
export function hashNameBigInt(name) {
195+
if (name.length >= 8) {
196+
return HASHER.h64(name)
197+
}
198+
199+
const { written } = encoder.encodeInto(name, buffer)
200+
return written === 8 ? view.getBigUint64(0, false) : HASHER.h64Raw(buffer.subarray(0, written))
201+
}

0 commit comments

Comments
 (0)