Skip to content

Commit

Permalink
Implement pluggable frame serializer.
Browse files Browse the repository at this point in the history
Deciding against using Murmur3 since my benchmarks show that a
non-cryptographic checksum in is slower than `crypto`s SHA1.

insert: 107, gather: 61

Closes #447.
Closes #445.
Closes #474.
Closes #472.
Closes #461.
Closes #404.
  • Loading branch information
flatheadmill committed Feb 12, 2015
1 parent 4f4bc9b commit 84afeea
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 171 deletions.
18 changes: 0 additions & 18 deletions checksum.js

This file was deleted.

10 changes: 5 additions & 5 deletions frame/binary.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ Binary.prototype.serialize = function (serializer, queue, header, body) {
return length
}

Binary.prototype.length = function (buffer, offset) {
var remaining = buffer.length - offset
Binary.prototype.length = function (buffer, i, I) {
var remaining = I - i
if (remaining < 4) {
return null
}
return buffer.readUInt32BE(offset, true)
return buffer.readUInt32BE(i, true)
}

Binary.prototype.deserialize = function (deserialize, buffer, offset) {
Binary.prototype.deserialize = function (deserialize, buffer, offset, length) {
var start = offset
var remaining = buffer.length - offset
var remaining = length - offset
if (remaining < 4) {
return null
}
Expand Down
30 changes: 16 additions & 14 deletions frame/utf8.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,49 +57,51 @@ UTF8.prototype.serialize = function (serializer, queue, header, body) {
return length
}

UTF8.prototype.length = function (buffer, offset) {
for (var i = offset, I = buffer.length; i < I; i++) {
UTF8.prototype.length = function (buffer, i, I) {
var start = i
for (; i < I; i++) {
if (buffer[i] == 0x20) break
}
if (buffer[i] != 0x20) {
return null
}
return parseInt(buffer.toString('utf8', offset, i))
return parseInt(buffer.toString('utf8', start, i))
}

UTF8.prototype.deserialize = function (deserialize, buffer, offset) {
for (var i = offset, I = buffer.length; i < I; i++) {
UTF8.prototype.deserialize = function (deserialize, buffer, i, I) {
var start = i
for (; i < I; i++) {
if (buffer[i] == 0x20) break
}
if (buffer[i] != 0x20) {
return null
}
var size = parseInt(buffer.toString('utf8', offset, i))
if (buffer.length - offset < size) {
var size = parseInt(buffer.toString('utf8', start, i))
if (I - start < size) {
return null
}
for (var count = 2, i = offset, I = buffer.length; i < I && count; i++) {
for (var count = 2, i = start; i < I && count; i++) {
if (buffer[i] == 0x20) count--
}
if (count) {
throw new Error('invalid record')
throw new Error('corrupt line: could not find end of line header')
}
var checksumStart = i
for (count = 1; i < I && count; i++) {
if (buffer[i] == 0x20 || buffer[i] == 0x0a) count--
}
if (count) {
throw new Error('invalid record')
throw new Error('couldn not find end of line header record')
}
var fields = buffer.toString('utf8', 0, i - 1).split(' ')
var fields = buffer.toString('utf8', start, i - 1).split(' ')
var checksum = this.checksum
if (checksum) {
var digest = checksum(buffer, checksumStart, offset + size - 1)
var digest = checksum(buffer, checksumStart, start + size - 1)
ok(fields[1] == '-' || digest == fields[1], 'corrupt line: invalid checksum')
}
var body, length
if (buffer[i - 1] == 0x20) {
body = buffer.slice(i, offset + size - 1)
body = buffer.slice(i, start + size - 1)
length = body.length
}
if (buffer[i - 1] == 0x20) {
Expand All @@ -108,7 +110,7 @@ UTF8.prototype.deserialize = function (deserialize, buffer, offset) {
}
var entry = {
heft: length || null,
length: i - offset,
length: i - start,
header: JSON.parse(fields[2]),
body: body || null
}
Expand Down
50 changes: 6 additions & 44 deletions logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,32 @@ var cadence = require('cadence/redux')
var Queue = require('./queue')
var Script = require('./script')
var Scribe = require('./scribe')
var checksum = require('./checksum')

function Logger (options, sheaf) {
this._directory = options.directory
this._checksum = checksum(options.checksum)
// todo: remove when page can slice
this._sheaf = sheaf
this.framer = options.framer
this.serializers = options.serializers
}

Logger.prototype.filename = function (page, draft) {
return path.join(this._directory, draft ? 'drafts' : 'pages', page.address + '.' + page.rotation)
}

Logger.prototype.writeEntry = function (options) {
var entry, buffer, json, line, length

ok(options.page.position != null, 'page has not been positioned: ' + options.page.position)
ok(options.header.every(function (n) { return typeof n == 'number' }), 'header values must be numbers')

entry = options.header.slice()
json = JSON.stringify(entry)
var hash = this._checksum()
hash.update(json)

length = 0

var separator = ''
if (options.body != null) {
var body = this._sheaf.serialize(options.body, options.isKey)
separator = ' '
length += body.length
hash.update(body)
}

line = hash.digest('hex') + ' ' + json + separator

length += Buffer.byteLength(line, 'utf8') + 1

var entire = length + String(length).length + 1
if (entire < length + String(entire).length + 1) {
length = length + String(entire).length + 1
} else {
length = entire
}

buffer = options.queue.slice(length)

buffer.write(String(length) + ' ' + line)
if (options.body != null) {
body.copy(buffer, buffer.length - 1 - body.length)
}
buffer[length - 1] = 0x0A

return length
var serializer = options.isKey ? this.serializers.key : this.serializers.record
return this.framer.serialize(serializer, options.queue, options.header, options.body)
}

Logger.prototype.writeInsert = function (queue, page, index, record) {
var header = [ ++page.entries, index + 1 ]
return this.writeEntry({ queue: queue, page: page, header: header, body: record, type: 'insert' })
return this.writeEntry({ queue: queue, page: page, header: header, body: record })
}

Logger.prototype.writeDelete = function (queue, page, index, callback) {
var header = [ ++page.entries, -(index + 1) ]
this.writeEntry({ queue: queue, page: page, header: header, type: 'delete' })
this.writeEntry({ queue: queue, page: page, header: header })
}

Logger.prototype.writeHeader = function (queue, page) {
Expand Down
120 changes: 44 additions & 76 deletions player.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ var ok = require('assert').ok
var fs = require('fs')
var path = require('path')
var cadence = require('cadence/redux')
var checksum = require('./checksum')

function Player (options) {
this.directory = options.directory
this.checksum = checksum(options.checksum)
this.framer = options.framer
this.deserializers = options.deserializers
}

// todo: outgoing
Expand Down Expand Up @@ -53,88 +53,56 @@ Player.prototype.read = cadence(function (async, sheaf, page) {
})()
})

Player.prototype.readEntry = function (sheaf, buffer, isKey) {
for (var count = 2, i = 0, I = buffer.length; i < I && count; i++) {
if (buffer[i] == 0x20) count--
}
for (count = 1; i < I && count; i++) {
if (buffer[i] == 0x20 || buffer[i] == 0x0a) count--
}
ok(!count, 'corrupt line: could not find end of line header')
var fields = buffer.toString('utf8', 0, i - 1).split(' ')
var hash = this.checksum(), body, length
hash.update(fields[2])
if (buffer[i - 1] == 0x20) {
body = buffer.slice(i, buffer.length - 1)
length = body.length
hash.update(body)
}
var digest = hash.digest('hex')
ok(fields[1] == '-' || digest == fields[1], 'corrupt line: invalid checksum')
if (buffer[i - 1] == 0x20) {
body = sheaf.deserialize(body, isKey)
}
var entry = { length: length, header: JSON.parse(fields[2]), body: body }
ok(entry.header.every(function (n) { return typeof n == 'number' }), 'header values must be numbers')
return entry
}

Player.prototype._play = function (sheaf, slice, start, page) {
var leaf = page.address % 2 === 1, length
for (var offset = 0, i = 0, I = slice.length; i < I; i++) {
if (slice[i] == 0x20) {
var sip = slice.toString('utf8', offset, i)
length = parseInt(sip)
ok(String(length).length == sip.length, 'invalid length')
if (offset + length > slice.length) {
break
var leaf = page.address % 2 === 1,
deserialize = leaf ? this.deserializers.record : this.deserializers.key,
framer = this.framer
for (var i = 0, I = slice.length; i < I; i += entry.length) {
var entry = framer.deserialize(deserialize, slice, i, I)
if (entry == null) {
return i
}
page.position += entry.length
var header = entry.header
if (header[1] === 0) {
page.right = {
address: header[2],
key: entry.body || null
}
var position = start + offset
ok(length)
page.position += length
var entry = this.readEntry(sheaf, slice.slice(offset, offset + length), !leaf),
header = entry.header
if (header[1] === 0) {
page.right = {
address: header[2],
key: entry.body || null
}
if (header[3] === 0 && page.ghosts) {
page.splice(0, 1)
page.ghosts = 0
}
page.entries++
} else {
ok(header[0] === ++page.entries, 'entry count is off')
var index = header[1]
if (leaf) {
if (index > 0) {
page.splice(index - 1, 0, {
key: sheaf.extractor(entry.body),
record: entry.body,
heft: length
})
} else if (~index === 0 && page.address !== 1) {
ok(!page.ghosts, 'double ghosts')
page.ghosts++
} else if (index < 0) {
page.splice(-(index + 1), 1)
}
} else {
var address = header[2], key = null, heft = 0
if (index - 1) {
key = entry.body
heft = length
}
if (header[3] === 0 && page.ghosts) {
page.splice(0, 1)
page.ghosts = 0
}
page.entries++
} else {
ok(header[0] === ++page.entries, 'entry count is off')
var index = header[1]
if (leaf) {
if (index > 0) {
page.splice(index - 1, 0, {
key: key, address: address, heft: heft
key: sheaf.extractor(entry.body),
record: entry.body,
heft: entry.heft
})
} else if (~index === 0 && page.address !== 1) {
ok(!page.ghosts, 'double ghosts')
page.ghosts++
} else if (index < 0) {
page.splice(-(index + 1), 1)
}
} else {
var address = header[2], key = null, heft = 0
if (index - 1) {
key = entry.body
heft = entry.heft
}
page.splice(index - 1, 0, {
key: key, address: address, heft: heft
})
}
i = offset = offset + length
}
}
return offset
return i
}

Player.prototype.play = cadence(function (async, sheaf, fd, stat, read, page) {
Expand Down
15 changes: 15 additions & 0 deletions strata.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ function extend(to, from) {
}

function Strata (options) {
if (!options.serializers) {
var json = require('./json')
options.serializers = {
key: json.serializer,
record: json.serializer
}
options.deserializers = {
key: json.deserialize,
record: json.deserialize
}
}
if (!options.framer) {
var UTF8 = require('./frame/utf8')
options.framer = new UTF8(options.checksum || 'sha1')
}
options.player = new Player(options)
options.logger = new Logger(options, this.sheaf)
options.logger._sheaf = this.sheaf = new Sheaf(options)
Expand Down
4 changes: 2 additions & 2 deletions t/basics/get.t.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ function prove (async, assert) {
var item = cursor.get(cursor.offset)
assert(item.record, 'a', 'get record')
assert(item.key, 'a', 'get key')
assert(strata.size, 54, 'json size after read')
assert(item.heft, 54, 'record size')
assert(strata.size, 3, 'json size after read')
assert(item.heft, 3, 'record size')

cursor.unlock(async())
}, function () {
Expand Down
6 changes: 5 additions & 1 deletion t/coverage/checksum-constructor.t.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ function prove (async, assert) {
directory: tmp,
leafSize: 3,
branchSize: 3,
checksum: function () { return crypto.createHash('sha1') }
checksum: function (buffer, start, end) {
var hash = crypto.createHash('sha1')
hash.update(buffer.slice(start, end))
return hash.digest('hex')
}
})
strata.open(async())
}, function () {
Expand Down
Loading

0 comments on commit 84afeea

Please sign in to comment.