Skip to content

Commit

Permalink
Pages, drafts, journal in separate directories.
Browse files Browse the repository at this point in the history
Closes #462.
  • Loading branch information
flatheadmill committed Feb 8, 2015
1 parent 9390fda commit 72a1e0b
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 134 deletions.
13 changes: 7 additions & 6 deletions balancer.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Balancer.prototype.shouldSplitBranch = function (sheaf, branch, key, callback) {

Balancer.prototype.splitLeafAndUnlock = cadence(function (async, sheaf, address, key, ghosts) {
var locker = sheaf.createLocker(),
script = new Script(sheaf),
script = sheaf.logger.createScript(),
descents = [], replacements = [], encached = [],
completed = 0,
penultimate, leaf, split, pages, page,
Expand Down Expand Up @@ -319,7 +319,7 @@ Balancer.prototype.splitLeaf = cadence(function (async, sheaf, address, key, gho

Balancer.prototype.splitBranchAndUnlock = cadence(function (async, sheaf, address, key) {
var locker = sheaf.createLocker(),
script = new Script(sheaf),
script = sheaf.logger.createScript(),
descents = [],
children = [],
encached = [],
Expand Down Expand Up @@ -392,7 +392,7 @@ Balancer.prototype.splitBranch = cadence(function (async, sheaf, address, key) {

Balancer.prototype.drainRootAndUnlock = cadence(function (async, sheaf) {
var locker = sheaf.createLocker(),
script = new Script(sheaf),
script = sheaf.logger.createScript(),
children = [], locks = [],
root, pages, records, remainder

Expand Down Expand Up @@ -471,7 +471,7 @@ Balancer.prototype.exorcise = function (sheaf, pivot, page, corporal) {

Balancer.prototype.deleteGhost = cadence(function (async, sheaf, key) {
var locker = sheaf.createLocker(),
script = new Script(sheaf),
script = sheaf.logger.createScript(),
descents = [],
pivot, leaf, reference
async([function () {
Expand Down Expand Up @@ -528,7 +528,7 @@ Balancer.prototype.mergePagesAndUnlock = cadence(function (
async, sheaf, key, leftKey, stopper, merger, ghostly
) {
var locker = sheaf.createLocker(),
script = new Script(sheaf),
script = sheaf.logger.createScript(),
descents = [],
singles = { left: [], right: [] }, parents = {}, pages = {},
ancestor, pivot, empties, ghosted, designation
Expand Down Expand Up @@ -814,7 +814,8 @@ Balancer.prototype.mergeBranches = function (sheaf, key, heft, address, callback
}

Balancer.prototype.fillRoot = cadence(function (async, sheaf) {
var locker = sheaf.createLocker(), script = new Script(sheaf), descents = [], root, child
var locker = sheaf.createLocker(), script = sheaf.logger.createScript(),
descents = [], root, child

async([function () {
descents.forEach(function (descent) { locker.unlock(descent.page) })
Expand Down
41 changes: 15 additions & 26 deletions cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,14 @@ Cursor.prototype._unlock = cadence(function (async) {
this._locker.unlock(this._page)
this._locker.dispose()
}], function () {
this.queue.finish()
this._write()
this.scribe.close(async())
this._appender.close(async())
})
})

// todo: pass an integer as the first argument to force the arity of the
// return.
Cursor.prototype.unlock = function (callback) {
if (this.scribe) {
if (this._appender) {
this._unlock(callback)
} else {
this._locker.unlock(this._page)
Expand All @@ -81,16 +79,6 @@ Cursor.prototype.unlock = function (callback) {
}
}

Cursor.prototype._write = function () {
if (this.queue.buffers.length) {
this.queue.buffers.forEach(function (buffer) {
this.scribe.write(buffer, 0, buffer.length, this._page.position)
this._page.position += buffer.length
}, this)
this.queue.clear()
}
}

// note: exclusive, index, offset and length are public

Cursor.prototype.__defineGetter__('address', function () {
Expand All @@ -113,46 +101,47 @@ Cursor.prototype.__defineGetter__('length', function () {
return this._page.items.length
})

Cursor.prototype._filename = function (page) {
return path.join(this._sheaf.directory, 'pages', pages.address + '.' + page.rotation)
}

Cursor.prototype.insert = function (record, key, index) {
ok(this.exclusive, 'cursor is not exclusive')
ok(index > 0 || this._page.address == 1)

this._sheaf.unbalanced(this._page)

if (!this.queue) {
this.queue = new Queue
this.scribe = new Scribe(this._sheaf.filename2(this._page), 'a')
this.scribe.open()
if (this._appender == null) {
this._appender = this._sheaf.logger.createAppender(this._page)
}

var length = this._sheaf.logger.writeInsert(this.queue, this._page, index, record)
var length = this._appender.writeInsert(index, record)

this._sheaf.splice(this._page, index, 0, {
key: key,
record: record,
heft: length
})

this.length = this._page.items.length
this._write()
}

Cursor.prototype.remove = function (index) {
var ghost = this._page.address != 1 && index == 0, entry
this._sheaf.unbalanced(this._page)

if (!this.queue) {
this.queue = new Queue
this.scribe = new Scribe(this._sheaf.filename2(this._page), 'a')
this.scribe.open()
if (this._appender == null) {
this._appender = this._sheaf.logger.createAppender(this._page)
}

this._sheaf.logger.writeDelete(this.queue, this._page, index)
this._appender.writeDelete(index)

if (ghost) {
this._page.ghosts++
this.offset || this.offset++
} else {
this._sheaf.splice(this._page, index, 1)
}
this._write()
}

module.exports = Cursor
84 changes: 69 additions & 15 deletions logger.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
var ok = require('assert').ok

var fs = require('fs')
var path = require('path')

var cadence = require('cadence/redux')

var Queue = require('./queue')
var Script = require('./script')
var Scribe = require('./scribe')

function Logger (sheaf) {
function Logger (directory, sheaf) {
this._directory = directory
// todo: remove when page can slice
this._sheaf = sheaf
}

Logger.prototype.filename = function (page, draft) {
return path.join(this._directory, draft ? 'drafts' : 'pages', page.address + '.' + page.rotation)
}

Logger.prototype.writeEntry = function (options) {
var entry, buffer, json, line, length

Expand Down Expand Up @@ -74,8 +83,8 @@ Logger.prototype.writeLeafEntry = function (queue, page, item) {
this.writeInsert(queue, page, page.entries - 1, item.record)
}

Logger.prototype.rewriteLeaf = function (page, suffix, callback) {
this.writePage(page, this._sheaf._filename(page.address, 0, suffix), 'writeLeafEntry', callback)
Logger.prototype.rewriteLeaf = function (page, file, callback) {
this.writePage(page, file, 'writeLeafEntry', callback)
}

Logger.prototype.writeBranchEntry = function (queue, page, item) {
Expand Down Expand Up @@ -106,7 +115,6 @@ Logger.prototype.writePage = function (page, file, writer, callback) {

scribe.open()

page.rotation = 0
page.entries = 0
page.position = 0

Expand Down Expand Up @@ -136,14 +144,10 @@ Logger.prototype.writePage = function (page, file, writer, callback) {
scribe.close(callback)
}

Logger.prototype.rotate = cadence(function (async, page) {
Logger.prototype.rotate = function (page, file, callback) {
page.position = 0
page.rotation++

var from = this._sheaf.filename2(page, '.replace')
var to = this._sheaf.filename2(page)

var scribe = new Scribe(from, 'a')
var scribe = new Scribe(file, 'a')
scribe.open()

var queue = new Queue
Expand All @@ -154,11 +158,61 @@ Logger.prototype.rotate = cadence(function (async, page) {
page.position += buffer.length
})

async(function () {
scribe.close(async())
}, function () {
return [ from, to ]
})
scribe.close(callback)
}

Logger.prototype.mkdir = cadence(function (async) {
async([function () {
fs.mkdir(path.join(this._directory, 'pages'), 0755, async())
fs.mkdir(path.join(this._directory, 'drafts'), 0755, async())
}, function (error) {
throw error
}])
})

Logger.prototype.createScript = function () {
return new Script(this)
}

Logger.prototype.createAppender = function (page) {
var scribe = new Scribe(this.filename(page), 'a')
scribe.open()
return new Appender(this, scribe, page)
}

function Appender (logger, scribe, page) {
this._logger = logger
this._scribe = scribe
this._page = page
this._queue = new Queue
}

Appender.prototype.writeInsert = function (index, record) {
var length = this._logger.writeInsert(this._queue, this._page, index, record)
this._write()
return length
}

Appender.prototype.writeDelete = function (index) {
var length = this._logger.writeDelete(this._queue, this._page, index)
this._write()
return length
}

Appender.prototype._write = function () {
if (this._queue.buffers.length) {
this._queue.buffers.forEach(function (buffer) {
this._scribe.write(buffer, 0, buffer.length, this._page.position)
this._page.position += buffer.length
}, this)
this._queue.clear()
}
}

Appender.prototype.close = function (callback) {
this._queue.finish()
this._write()
this._scribe.close(callback)
}

module.exports = Logger
7 changes: 5 additions & 2 deletions player.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
var ok = require('assert').ok
var fs = require('fs')
var path = require('path')
var cadence = require('cadence/redux')

function Player () {
function Player (directory) {
this.directory = directory
}

// todo: outgoing
Expand Down Expand Up @@ -36,7 +38,8 @@ Player.prototype.io = cadence(function (async, direction, filename) {
Player.prototype.read = cadence(function (async, sheaf, page) {
page.entries = page.ghosts = page.position = 0
var rotation = 0, loop = async([function () {
this.io('read', sheaf._filename(page.address, rotation), async())
var filename = path.join(this.directory, 'pages', page.address + '.' + rotation)
this.io('read', filename, async())
}, function (error) {
if (rotation === 0 || error.code !== 'ENOENT') {
throw error
Expand Down
41 changes: 21 additions & 20 deletions script.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ var path = require('path')
var fs = require('fs')
var Queue = require('./queue')

function Script (sheaf) {
this._sheaf = sheaf
function Script (logger) {
this._logger = logger
this._journal = []
this._operations = []
}
Expand All @@ -16,7 +16,7 @@ Script.prototype.rotate = function (page) {
Script.prototype.unlink = function (page) {
var rotations = []
for (var i = 0; i <= page.rotation; i++) {
rotations.push(this._sheaf._filename(page.address, i))
rotations.push(path.join(this._logger._directory, 'pages', page.address + '.' + i))
}
this._journal.push({
name: '_purge', rotations: rotations
Expand All @@ -35,34 +35,35 @@ Script.prototype.writeBranch = function (page) {
})
}

Script.prototype._rotate = cadence(function (async, operation) {
Script.prototype._rotate = function (operation, callback) {
var page = operation.page, queue = operation.queue, entry
async(function () {
this._sheaf.logger.rotate(page, async())
}, function (from, to) {
this._journal.push({
name: '_replace', from: from, to: to
})
page.rotation++
var from = this._logger.filename(page, true)
var to = this._logger.filename(page)
this._journal.push({
name: '_replace', from: from, to: to
})
})
this._logger.rotate(page, from, callback)
}

Script.prototype._writeBranch = cadence(function (async, operation) {
var page = operation.page
var file = this._sheaf.filename2(page, '.replace')
var file = this._logger.filename(page, true)
this._journal.push({
name: '_replace', from: file, to: this._sheaf.filename2(page)
name: '_replace', from: file, to: this._logger.filename(page)
})
this._sheaf.logger.writeBranch(page, file, async())
this._logger.writeBranch(page, file, async())
})

Script.prototype._rewriteLeaf = cadence(function (async, operation) {
var page = operation.page
this.unlink(page)
var file = this._sheaf._filename(page.address, 0, '.replace')
page.rotation = 0
var file = this._logger.filename(page, true)
this._journal.push({
name: '_replace', from: file, to: this._sheaf._filename(page.address, 0)
name: '_replace', from: file, to: this._logger.filename(page)
})
this._sheaf.logger.rewriteLeaf(page, '.replace', async())
this._logger.rewriteLeaf(page, file, async())
})

Script.prototype.commit = cadence(function (async) {
Expand All @@ -72,8 +73,8 @@ Script.prototype.commit = cadence(function (async) {
})(this._operations)
}, function () {
this._journal.push({ name: '_complete' })
var pending = path.join(this._sheaf.directory, 'journal.pending')
var comitted = path.join(this._sheaf.directory, 'journal')
var pending = path.join(this._logger._directory, 'journal.pending')
var comitted = path.join(this._logger._directory, 'journal')
async(function () {
var script = this._journal.map(function (operation) {
return JSON.stringify(operation)
Expand Down Expand Up @@ -119,7 +120,7 @@ Script.prototype._replace = cadence(function (async, operation) {
})

Script.prototype._complete = cadence(function (async, operation) {
var journal = path.join(this._sheaf.directory, 'journal')
var journal = path.join(this._logger._directory, 'journal')
async([function () {
fs.unlink(journal, async())
}, function (error) {
Expand Down
Loading

0 comments on commit 72a1e0b

Please sign in to comment.