Skip to content

Commit 87cb79c

Browse files
committed
fixes
1 parent f93624d commit 87cb79c

File tree

11 files changed

+100
-145
lines changed

11 files changed

+100
-145
lines changed

src/client.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,10 @@ Client.prototype._getOptions = function (options) {
127127
return mergedOptions
128128
}
129129

130-
function createDeepstream(url, options) {
130+
export default function createDeepstream(url, options) {
131131
return new Client(url, options)
132132
}
133133

134134
Client.prototype.isSameOrNewer = utils.isSameOrNewer
135135
Client.prototype.CONSTANTS = C
136136
createDeepstream.CONSTANTS = C
137-
138-
export default createDeepstream

src/event/event-handler.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import * as messageBuilder from '../message/message-builder.js'
2-
import messageParser from '../message/message-parser.js'
31
import * as C from '../constants/constants.js'
2+
import * as messageBuilder from '../message/message-builder.js'
3+
import * as messageParser from '../message/message-parser.js'
44
import MulticastListener from '../utils/multicast-listener.js'
55
import UnicastListener from '../utils/unicast-listener.js'
66
import EventEmitter from 'component-emitter2'
7-
import * as rxjs from 'rxjs'
7+
import rxjs from 'rxjs'
88

99
const EventHandler = function (options, connection, client) {
1010
this._options = options

src/message/connection.js

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
import * as utils from '../utils/utils.js'
2-
import messageParser from './message-parser.js'
2+
import * as messageParser from './message-parser.js'
33
import * as messageBuilder from './message-builder.js'
44
import * as C from '../constants/constants.js'
55
import xxhash from 'xxhash-wasm'
66
import FixedQueue from '../utils/fixed-queue.js'
77
import Emitter from 'component-emitter2'
88

9-
const HASHER = await xxhash()
109
const NodeWebSocket = utils.isNode ? await import('ws').then((x) => x.default) : null
1110
const BrowserWebSocket = globalThis.WebSocket || globalThis.MozWebSocket
1211

13-
const Connection = function (client, url, options) {
12+
export default function Connection(client, url, options) {
1413
this._client = client
1514
this._options = options
1615
this._logger = options.logger
@@ -40,9 +39,11 @@ const Connection = function (client, url, options) {
4039

4140
this._state = C.CONNECTION_STATE.CLOSED
4241

43-
this.hasher = HASHER
44-
45-
this._createEndpoint()
42+
this.hasher = null
43+
xxhash().then((hasher) => {
44+
this.hasher = hasher
45+
this._createEndpoint()
46+
})
4647
}
4748

4849
Emitter(Connection.prototype)
@@ -86,30 +87,25 @@ Connection.prototype.close = function () {
8687
this._endpoint?.close()
8788

8889
if (this._reconnectTimeout) {
89-
globalThis.clearTimeout(this._reconnectTimeout)
90+
clearTimeout(this._reconnectTimeout)
9091
this._reconnectTimeout = null
9192
}
9293
}
9394

9495
Connection.prototype._createEndpoint = function () {
95-
if (utils.isNode) {
96-
this._endpoint = new NodeWebSocket(this._url, {
97-
generateMask() {},
98-
})
99-
} else {
100-
this._endpoint = new BrowserWebSocket(this._url)
101-
this._endpoint.binaryType = 'arraybuffer'
102-
}
96+
this._endpoint = NodeWebSocket
97+
? new NodeWebSocket(this._url, {
98+
generateMask() {},
99+
})
100+
: new BrowserWebSocket(this._url)
103101
this._corked = false
104102

105103
this._endpoint.onopen = this._onOpen.bind(this)
106104
this._endpoint.onerror = this._onError.bind(this)
107105
this._endpoint.onclose = this._onClose.bind(this)
108-
109-
const decoder = new TextDecoder()
110-
this._endpoint.onmessage = ({ data }) => {
111-
this._onMessage(typeof data === 'string' ? data : decoder.decode(data))
112-
}
106+
this._endpoint.onmessage = BrowserWebSocket
107+
? ({ data }) => this._onMessage(typeof data === 'string' ? data : Buffer.from(data).toString())
108+
: ({ data }) => this._onMessage(typeof data === 'string' ? data : data.toString())
113109
}
114110

115111
Connection.prototype.send = function (message) {
@@ -136,7 +132,7 @@ Connection.prototype.send = function (message) {
136132
if (this._endpoint._socket && !this._corked) {
137133
this._endpoint._socket.cork()
138134
this._corked = true
139-
globalThis.setTimeout(() => {
135+
setTimeout(() => {
140136
this._endpoint._socket.uncork()
141137
this._corked = false
142138
}, 1)
@@ -170,7 +166,7 @@ Connection.prototype._sendAuthParams = function () {
170166
this._setState(C.CONNECTION_STATE.AUTHENTICATING)
171167
const authMessage = messageBuilder.getMsg(C.TOPIC.AUTH, C.ACTIONS.REQUEST, [
172168
this._authParams,
173-
'27.0.0',
169+
'26.0.0',
174170
utils.isNode
175171
? `Node/${process.version}`
176172
: globalThis.navigator && globalThis.navigator.userAgent,
@@ -245,6 +241,10 @@ Connection.prototype._recvMessages = function (deadline) {
245241
continue
246242
}
247243

244+
if (this._logger) {
245+
this._logger.trace(message, 'receive')
246+
}
247+
248248
messageParser.parseMessage(message, this._client, this._message)
249249

250250
this.emit('recv', this._message)
@@ -365,5 +365,3 @@ Connection.prototype._clearReconnect = function () {
365365
}
366366
this._reconnectionAttempt = 0
367367
}
368-
369-
export default Connection

src/message/message-builder.js

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import * as C from '../constants/constants.js'
2-
import varint from 'varint'
32
import * as utils from '../utils/utils.js'
3+
import varint from 'varint'
44

55
const poolEncoder = new globalThis.TextEncoder()
66

7-
let poolSize
7+
// TODO (fix): Don't assume maxMesageSize is 1MB
8+
const maxMessageSize = 1024 * 1024
9+
const poolSize = maxMessageSize * 4
10+
811
let poolBuffer
912
let poolView
1013
let poolOffset
1114

12-
function allocPool(size) {
13-
poolSize = size || poolSize || 1024 * 1024
15+
function reallocPool() {
1416
poolBuffer = utils.isNode
1517
? globalThis.Buffer.allocUnsafe(poolSize)
1618
: new Uint8Array(new ArrayBuffer(poolSize))
@@ -40,8 +42,8 @@ export function getMsg(topic, action, data) {
4042
throw new Error('data must be an array')
4143
}
4244

43-
if (!poolSize || poolOffset + poolSize / 16 >= poolSize) {
44-
allocPool()
45+
if (!poolBuffer || poolOffset + maxMessageSize > poolSize) {
46+
reallocPool()
4547
} else {
4648
alignPool()
4749
}
@@ -84,12 +86,11 @@ export function getMsg(topic, action, data) {
8486
varint.encode(len + 1, poolBuffer, headerPos)
8587
headerPos += varint.encode.bytes
8688
if (headerPos - start >= headerSize) {
87-
throw new Error(`header too large: ${headerPos - start} ${headerSize}`)
89+
throw new Error('header too large')
8890
}
8991

90-
if (poolOffset >= poolBuffer.length) {
91-
allocPool(start === 0 ? poolSize * 2 : poolSize)
92-
return getMsg(topic, action, data)
92+
if (poolOffset >= poolSize) {
93+
throw new Error('message too large')
9394
}
9495
}
9596
}

src/message/message-parser.js

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import * as C from '../constants/constants.js'
22

3-
const MessageParser = function () {
4-
this._actions = this._getActions()
3+
const actions = {}
4+
5+
for (const key in C.ACTIONS) {
6+
actions[C.ACTIONS[key]] = key
57
}
68

7-
MessageParser.prototype.convertTyped = function (value, client) {
9+
export function convertTyped(value, client) {
810
const type = value.charAt(0)
911

1012
if (type === C.TYPES.STRING) {
@@ -45,17 +47,7 @@ MessageParser.prototype.convertTyped = function (value, client) {
4547
return undefined
4648
}
4749

48-
MessageParser.prototype._getActions = function () {
49-
const actions = {}
50-
51-
for (const key in C.ACTIONS) {
52-
actions[C.ACTIONS[key]] = key
53-
}
54-
55-
return actions
56-
}
57-
58-
MessageParser.prototype.parseMessage = function (message, client, result) {
50+
export function parseMessage(message, client, result) {
5951
const parts = message.split(C.MESSAGE_PART_SEPERATOR)
6052

6153
if (parts.length < 2) {
@@ -72,7 +64,7 @@ MessageParser.prototype.parseMessage = function (message, client, result) {
7264
return null
7365
}
7466

75-
if (this._actions[parts[1]] === undefined) {
67+
if (actions[parts[1]] === undefined) {
7668
client._$onError(
7769
C.TOPIC.ERROR,
7870
C.EVENT.MESSAGE_PARSE_ERROR,
@@ -87,5 +79,3 @@ MessageParser.prototype.parseMessage = function (message, client, result) {
8779
result.action = parts[1]
8880
result.data = parts.splice(2)
8981
}
90-
91-
export default new MessageParser()

src/record/record-handler.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import Record from './record.js'
22
import MulticastListener from '../utils/multicast-listener.js'
33
import UnicastListener from '../utils/unicast-listener.js'
44
import * as C from '../constants/constants.js'
5-
import * as rxjs from 'rxjs'
5+
import rxjs from 'rxjs'
66
import invariant from 'invariant'
77
import EventEmitter from 'component-emitter2'
88
import jsonPath from '@nxtedition/json-path'
@@ -97,11 +97,19 @@ class RecordHandler {
9797
this._connection = connection
9898
this._client = client
9999
this._records = new Map()
100+
this._cache = new Map()
100101
this._listeners = new Map()
101102
this._pruning = new Set()
102103
this._patching = new Map()
103104
this._updating = new Map()
104105

106+
this._registry = new FinalizationRegistry((name) => {
107+
const entry = this._cache.get(name)
108+
if (entry && entry.deref && entry.deref() === undefined) {
109+
this._cache.delete(name)
110+
}
111+
})
112+
105113
this._connected = 0
106114
this._stats = {
107115
updating: 0,
@@ -134,6 +142,11 @@ class RecordHandler {
134142
for (const rec of pruning) {
135143
rec._$dispose()
136144
this._records.delete(rec.name)
145+
146+
if (!this._cache.has(rec.name)) {
147+
this._cache.set(rec.name, new WeakRef(rec))
148+
this._registry.register(rec, rec.name)
149+
}
137150
}
138151

139152
this._stats.pruning -= pruning.size
@@ -219,7 +232,7 @@ class RecordHandler {
219232
let record = this._records.get(name)
220233

221234
if (!record) {
222-
record = new Record(name, this)
235+
record = this._cache.get(name)?.deref() ?? new Record(name, this)
223236
this._stats.records += 1
224237
this._stats.created += 1
225238
this._records.set(name, record)

src/record/record.js

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,37 @@
11
import jsonPath from '@nxtedition/json-path'
22
import * as utils from '../utils/utils.js'
33
import * as C from '../constants/constants.js'
4-
import messageParser from '../message/message-parser.js'
4+
import * as messageParser from '../message/message-parser.js'
55
import xuid from 'xuid'
66
import invariant from 'invariant'
77
import cloneDeep from 'lodash.clonedeep'
88
import * as timers from '../utils/timers.js'
99

10-
export default class Record {
10+
class Record {
1111
static STATE = C.RECORD_STATE
1212

1313
constructor(name, handler) {
14-
if (!name?.length || typeof name !== 'string') {
15-
throw new Error('invalid argument: name')
16-
}
17-
1814
const connection = handler._connection
1915

2016
this._handler = handler
21-
2217
this._name = name
23-
this._key = utils.h64(name)
2418
this._version = ''
2519
this._data = jsonPath.EMPTY
2620
this._state = C.RECORD_STATE.VOID
2721
this._refs = 0
2822
this._subscriptions = []
2923
this._emitting = false
24+
3025
/** @type Map? */ this._updating = null
3126
/** @type Array? */ this._patching = null
32-
this._subscribed = connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [
33-
this._name,
34-
this._key,
35-
])
27+
this._subscribed = connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name])
3628
}
3729

3830
/** @type {string} */
3931
get name() {
4032
return this._name
4133
}
4234

43-
/** @type {bigint} */
44-
get key() {
45-
return this._key
46-
}
47-
4835
/** @type {string} */
4936
get version() {
5037
return this._version
@@ -75,8 +62,7 @@ export default class Record {
7562
if (this._refs === 1) {
7663
this._handler._onPruning(this, false)
7764
this._subscribed =
78-
this._subscribed ||
79-
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name, this._key])
65+
this._subscribed || connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name])
8066
}
8167
return this
8268
}
@@ -338,8 +324,7 @@ export default class Record {
338324

339325
if (connected) {
340326
this._subscribed =
341-
this._refs > 0 &&
342-
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._key, this._name])
327+
this._refs > 0 && connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.SUBSCRIBE, [this._name])
343328

344329
if (this._updating) {
345330
for (const update of this._updating.values()) {
@@ -364,7 +349,7 @@ export default class Record {
364349
invariant(!this._updating, 'must not have updates')
365350

366351
if (this._subscribed) {
367-
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.UNSUBSCRIBE, [this._key])
352+
connection.sendMsg(C.TOPIC.RECORD, C.ACTIONS.UNSUBSCRIBE, [this._name])
368353
this._subscribed = false
369354
}
370355

@@ -386,7 +371,7 @@ export default class Record {
386371
const prevVersion = this._version
387372
const nextVersion = this._makeVersion(parseInt(prevVersion) + 1)
388373

389-
const update = [this._key, nextVersion, jsonPath.stringify(nextData), prevVersion]
374+
const update = [this._name, nextVersion, jsonPath.stringify(nextData), prevVersion]
390375

391376
if (!this._updating) {
392377
this._onUpdating(true)
@@ -437,7 +422,7 @@ export default class Record {
437422
this._onPatching(false)
438423
}
439424

440-
if (this._state < C.RECORD_STATE.PROVIDER && hasProvider === 'T') {
425+
if (hasProvider === 'T') {
441426
this._state = C.RECORD_STATE.PROVIDER
442427
} else if (this._state < C.RECORD_STATE.SERVER) {
443428
this._state = this._version.charAt(0) === 'I' ? C.RECORD_STATE.STALE : C.RECORD_STATE.SERVER
@@ -588,3 +573,5 @@ Object.defineProperty(Record.prototype, 'hasProvider', {
588573
return this.state >= C.RECORD_STATE.PROVIDER
589574
},
590575
})
576+
577+
export default Record

0 commit comments

Comments
 (0)