Skip to content

Commit

Permalink
performance: use hand-rolled msgpack encoder
Browse files Browse the repository at this point in the history
Most of the new code is from 3aa4d9b
and parents, with a few optimizations made here and there.
  • Loading branch information
bengl committed Feb 19, 2020
1 parent 90896da commit e8f72f0
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 53 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -61,6 +61,7 @@
"lodash.sortby": "^4.7.0",
"lodash.uniq": "^4.5.0",
"methods": "^1.1.2",
"mnemonist": "^0.32.0",
"module-details-from-path": "^1.0.3",
"msgpack-lite": "^0.1.26",
"nan": "^2.12.1",
Expand Down
22 changes: 0 additions & 22 deletions packages/dd-trace/src/encode.js

This file was deleted.

41 changes: 41 additions & 0 deletions packages/dd-trace/src/encode/cache.js
@@ -0,0 +1,41 @@
'use strict'

const LRUCache = require('mnemonist/lru-cache')
const tokens = require('./tokens')
const util = require('./util')

function cache (max) {
const cache = new LRUCache(max)

return value => {
let item = cache.get(value)

if (!item) {
const buffer = Buffer.from(value, 'utf-8')

item = Buffer.concat([
prefix(buffer.length),
buffer
])

cache.set(value, item)
}

return item
}
}

function prefix (length) {
if (length <= 0xffff) {
return tokens.str[length]
}

const buffer = Buffer.allocUnsafe(5)

util.writeUInt8(buffer, 0xdb, 0)
util.writeUInt32(buffer, length, 1)

return buffer
}

module.exports = cache
172 changes: 172 additions & 0 deletions packages/dd-trace/src/encode/index.js
@@ -0,0 +1,172 @@
'use strict'

const Uint64BE = require('int64-buffer').Uint64BE
const util = require('./util')
const tokens = require('./tokens')
const cachedString = require('./cache')(1024)

const fields = getFields()

const {
headerBuffer,
traceIdOffset,
spanIdOffset,
startOffset,
durationOffset,
errorOffset
} = (() => {
const buffer = Buffer.alloc(1024)
let offset = 0

offset += copy(buffer, offset, fields.trace_id)
offset += copy(buffer, offset, tokens.uint64)
const traceIdOffset = offset
new Uint64BE(buffer, offset, 0) // eslint-disable-line no-new
offset += 8

offset += copy(buffer, offset, fields.span_id)
offset += copy(buffer, offset, tokens.uint64)
const spanIdOffset = offset
new Uint64BE(buffer, offset, 0) // eslint-disable-line no-new
offset += 8

offset += copy(buffer, offset, fields.start)
offset += copy(buffer, offset, tokens.uint64)
const startOffset = offset
new Uint64BE(buffer, offset, 0) // eslint-disable-line no-new
offset += 8

offset += copy(buffer, offset, fields.duration)
offset += copy(buffer, offset, tokens.uint64)
const durationOffset = offset
new Uint64BE(buffer, offset, 0) // eslint-disable-line no-new
offset += 8

offset += copy(buffer, offset, fields.error)
const errorOffset = offset
offset += copy(buffer, offset, tokens.int[0])

return {
headerBuffer: buffer.slice(0, offset),
traceIdOffset,
spanIdOffset,
startOffset,
durationOffset,
errorOffset
}
})()

function encode (buffer, offset, trace) {
offset = writeArrayPrefix(buffer, offset, trace)

for (const span of trace) {
let fieldCount = 9

span.parent_id && fieldCount++
span.type && fieldCount++

offset += copy(buffer, offset, tokens.map[fieldCount])

offset += copyHeader(buffer, offset, span)

if (span.parent_id) {
offset += copy(buffer, offset, fields.parent_id)
offset += copy(buffer, offset, tokens.uint64)
offset += copy(buffer, offset, span.parent_id.toBuffer())
}

offset += copy(buffer, offset, fields.name)
offset += write(buffer, offset, span.name)

offset += copy(buffer, offset, fields.resource)
offset += write(buffer, offset, span.resource)

offset += copy(buffer, offset, fields.service)
offset += write(buffer, offset, span.service)

if (span.type) {
offset += copy(buffer, offset, fields.type)
offset += write(buffer, offset, span.type)
}

offset += copy(buffer, offset, fields.meta)
offset = writeMap(buffer, offset, span.meta)
}

buffer.write('', offset) // throw if offset is out of bounds

return offset
}

function copyHeader (buffer, offset, span) {
copy(headerBuffer, traceIdOffset, span.trace_id.toBuffer())
copy(headerBuffer, spanIdOffset, span.span_id.toBuffer())
new Uint64BE(headerBuffer, startOffset, span.start) // eslint-disable-line no-new
new Uint64BE(headerBuffer, durationOffset, span.duration) // eslint-disable-line no-new
copy(headerBuffer, errorOffset, tokens.int[span.error])
return copy(buffer, offset, headerBuffer)
}

function write (buffer, offset, str) {
return copy(buffer, offset, cachedString(str))
}

function copy (buffer, offset, source, sourceStart, sourceEnd) {
const length = source.length

sourceStart = sourceStart || 0
sourceEnd = sourceEnd || length

if (sourceStart !== 0 || sourceEnd !== length) {
source = source.slice(sourceEnd, sourceEnd)
}
buffer.set(source, offset)

return source.length
}

function writeMap (buffer, offset, map) {
const keys = Object.keys(map)

offset += copy(buffer, offset, tokens.map[keys.length])

for (let i = 0, l = keys.length; i < l; i++) {
offset += write(buffer, offset, keys[i])
offset += write(buffer, offset, map[keys[i]])
}

return offset
}

function writePrefix (buffer, offset, length, tokens, startByte) {
if (length <= 0xffff) {
return copy(buffer, offset, tokens[length])
}

return util.writeUInt8(buffer, startByte + 1, offset) + util.writeUInt32(buffer, length, offset + 1)
}

function writeArrayPrefix (buffer, offset, array) {
return offset + writePrefix(buffer, offset, array.length, tokens.array, 0xdc)
}

function getFields () {
return [
'trace_id',
'span_id',
'parent_id',
'service',
'resource',
'name',
'type',
'error',
'meta',
'start',
'duration'
].reduce((prev, next) => {
prev[next] = Buffer.concat([tokens.str[next.length], Buffer.from(next)])
return prev
}, {})
}

module.exports = encode
82 changes: 82 additions & 0 deletions packages/dd-trace/src/encode/tokens.js
@@ -0,0 +1,82 @@
'use strict'

const util = require('./util')

function getStrPrefixes () {
const values = []

for (let i = 0; i < 32; i++) {
values[i] = Buffer.allocUnsafe(1)
util.writeUInt8(values[i], 0xa0 + i, 0)
}

for (let i = 32; i <= 0xff; i++) {
values[i] = Buffer.allocUnsafe(2)
util.writeUInt8(values[i], 0xd9, 0)
util.writeUInt8(values[i], i, 1)
}

for (let i = 256; i <= 0xffff; i++) {
values[i] = Buffer.allocUnsafe(3)
util.writeUInt8(values[i], 0xda, 0)
util.writeUInt16(values[i], i, 1)
}

return values
}

function getIntPrefixes () {
const values = []

for (let i = 0; i < 128; i++) {
values[i] = Buffer.allocUnsafe(1)
util.writeUInt8(values[i], i, 0)
}

return values
}

function getArrayPrefixes () {
const values = []

for (let i = 0; i <= 0xf; i++) {
values[i] = Buffer.allocUnsafe(1)
util.writeUInt8(values[i], 0x90 + i, 0)
}

for (let i = 0x10; i <= 0xffff; i++) {
values[i] = Buffer.allocUnsafe(3)
util.writeUInt8(values[i], 0xdc, 0)
util.writeUInt16(values[i], i, 1)
}

return values
}

function getMapPrefixes () {
const values = []

for (let i = 0; i <= 0xf; i++) {
values[i] = Buffer.allocUnsafe(1)
util.writeUInt8(values[i], 0x80 + i, 0)
}

for (let i = 16; i <= 0xffff; i++) {
values[i] = Buffer.allocUnsafe(3)
util.writeUInt8(values[i], 0xde, 0)
util.writeUInt16(values[i], i, 1)
}

return values
}

module.exports = {
str: getStrPrefixes(),
int: getIntPrefixes(),
array: getArrayPrefixes(),
map: getMapPrefixes(),
null: Buffer.alloc(1, 0xc0),
uint8: Buffer.alloc(1, 0xcc),
uint32: Buffer.alloc(1, 0xce),
uint64: Buffer.alloc(1, 0xcf)
}
65 changes: 65 additions & 0 deletions packages/dd-trace/src/encode/util.js
@@ -0,0 +1,65 @@
'use strict'

function writeUInt8 (buffer, value, offset) {
buffer[offset] = value

return 1
}

function writeUInt16 (buffer, value, offset) {
buffer[offset + 1] = value & 255
value = value >> 8
buffer[offset + 0] = value & 255

return 2
}

function writeUInt32 (buffer, value, offset) {
buffer[offset + 3] = value & 255
value = value >> 8
buffer[offset + 2] = value & 255
value = value >> 8
buffer[offset + 1] = value & 255
value = value >> 8
buffer[offset + 0] = value & 255

return 4
}

function write (buffer, string, offset) {
let index = offset || (offset |= 0)
const length = string.length
let chr = 0
let i = 0
while (i < length) {
chr = string.charCodeAt(i++)

if (chr < 128) {
buffer[index++] = chr
} else if (chr < 0x800) {
// 2 bytes
buffer[index++] = 0xC0 | (chr >>> 6)
buffer[index++] = 0x80 | (chr & 0x3F)
} else if (chr < 0xD800 || chr > 0xDFFF) {
// 3 bytes
buffer[index++] = 0xE0 | (chr >>> 12)
buffer[index++] = 0x80 | ((chr >>> 6) & 0x3F)
buffer[index++] = 0x80 | (chr & 0x3F)
} else {
// 4 bytes - surrogate pair
chr = (((chr - 0xD800) << 10) | (string.charCodeAt(i++) - 0xDC00)) + 0x10000
buffer[index++] = 0xF0 | (chr >>> 18)
buffer[index++] = 0x80 | ((chr >>> 12) & 0x3F)
buffer[index++] = 0x80 | ((chr >>> 6) & 0x3F)
buffer[index++] = 0x80 | (chr & 0x3F)
}
}
return index - offset
}

module.exports = {
writeUInt8,
writeUInt16,
writeUInt32,
write
}

0 comments on commit e8f72f0

Please sign in to comment.