Skip to content

Commit 706b7f3

Browse files
committed
feat: send w/ fast header
1 parent 26558df commit 706b7f3

File tree

1 file changed

+85
-6
lines changed

1 file changed

+85
-6
lines changed

src/message/message-builder.js

Lines changed: 85 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,104 @@
11
import * as C from '../constants/constants.js'
2+
import * as utils from '../utils/utils.js'
3+
import varint from 'varint'
24

3-
const SEP = C.MESSAGE_PART_SEPERATOR
5+
const poolEncoder = new globalThis.TextEncoder()
6+
7+
// TODO (fix): Don't assume maxMesageSize is 1MB
8+
const maxMessageSize = 1024 * 1024
9+
const poolSize = maxMessageSize * 8
10+
11+
let poolBuffer
12+
let poolView
13+
let poolOffset
14+
15+
function reallocPool() {
16+
poolBuffer = utils.isNode
17+
? globalThis.Buffer.allocUnsafe(poolSize)
18+
: new Uint8Array(new ArrayBuffer(poolSize))
19+
poolView = new DataView(poolBuffer.buffer)
20+
poolOffset = 0
21+
}
22+
23+
function alignPool() {
24+
// Ensure aligned slices
25+
if (poolOffset & 0x7) {
26+
poolOffset |= 0x7
27+
poolOffset++
28+
}
29+
}
30+
31+
function writeString(dst, str, offset) {
32+
if (utils.isNode) {
33+
return dst.write(str, offset)
34+
} else {
35+
const res = poolEncoder.encodeInto(str, new Uint8Array(dst.buffer, offset))
36+
return res.written
37+
}
38+
}
439

540
export function getMsg(topic, action, data) {
641
if (data && !(data instanceof Array)) {
742
throw new Error('data must be an array')
843
}
944

10-
const sendData = [topic, action]
45+
if (!poolBuffer || poolOffset + maxMessageSize > poolSize) {
46+
reallocPool()
47+
} else {
48+
alignPool()
49+
}
50+
51+
const start = poolOffset
52+
53+
const headerSize = 8
54+
for (let n = 0; n < headerSize; n++) {
55+
poolBuffer[poolOffset++] = 0
56+
}
57+
58+
let headerPos = start
59+
poolBuffer[headerPos++] = 128 + headerSize
60+
61+
poolBuffer[poolOffset++] = topic.charCodeAt(0)
62+
poolBuffer[poolOffset++] = 31
63+
for (let n = 0; n < action.length; n++) {
64+
poolBuffer[poolOffset++] = action.charCodeAt(n)
65+
}
1166

1267
if (data) {
1368
for (let i = 0; i < data.length; i++) {
14-
if (typeof data[i] === 'object') {
15-
sendData.push(JSON.stringify(data[i]))
69+
const type = typeof data[i]
70+
let len
71+
if (data[i] == null) {
72+
poolBuffer[poolOffset++] = 31
73+
len = 0
74+
} else if (type === 'object') {
75+
poolBuffer[poolOffset++] = 31
76+
len = writeString(poolBuffer, JSON.stringify(data[i]), poolOffset)
77+
} else if (type === 'bigint') {
78+
poolBuffer[poolOffset++] = 31
79+
poolView.setBigUint64(poolOffset, data[i], false)
80+
len = 8
81+
} else if (type === 'string') {
82+
poolBuffer[poolOffset++] = 31
83+
len = writeString(poolBuffer, data[i], poolOffset)
1684
} else {
17-
sendData.push(data[i])
85+
throw new Error('invalid data')
86+
}
87+
poolOffset += len
88+
89+
varint.encode(len + 1, poolBuffer, headerPos)
90+
headerPos += varint.encode.bytes
91+
if (headerPos - start >= headerSize) {
92+
throw new Error('header too large')
93+
}
94+
95+
if (poolOffset >= poolSize) {
96+
throw new Error('message too large')
1897
}
1998
}
2099
}
21100

22-
return sendData.join(SEP)
101+
return new Uint8Array(poolBuffer.buffer, start, poolOffset - start)
23102
}
24103

25104
export function typed(value) {

0 commit comments

Comments
 (0)