Skip to content

Commit

Permalink
Write a delete using streams.
Browse files Browse the repository at this point in the history
See #355.
See #107.
See #4.
  • Loading branch information
flatheadmill committed May 10, 2014
1 parent 1e3572d commit e8197be
Showing 1 changed file with 83 additions and 10 deletions.
93 changes: 83 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ function Strata (options) {
length = body.length
hash.update(body)
}
ok(fields[1] == '-' || hash.digest('hex') == fields[1], 'corrupt line: invalid checksum')
var digest = hash.digest('hex')
ok(fields[1] == '-' || digest == fields[1], 'corrupt line: invalid checksum')
if (buffer[i - 1] == 0x20) {
body = deserialize(body, isKey)
}
Expand Down Expand Up @@ -220,6 +221,42 @@ function Strata (options) {
return entry
}

var cadence = require('cadence')

function Abender (error) {
this._error = error
}

Abender.prototype.write = Abender.prototype.once = Abender.prototype.end = function () {
throw this._error
}

function Transcript (file, position) {
this._file = file
this._position = position
this._stream = fs.createWriteStream(this._file, {
flags: 'r+',
mode: 0644,
start: this._position
}).once('error', function (error) {
this._stream = new Abender(error)
}.bind(this))
}

Transcript.prototype.write = cadence(function (step, buffer) {
if (!this._stream.write(buffer)) {
step(function () {
this._stream.once('drain', step(-1))
}, function () {
this.write(buffer, step())
})
}
})

Transcript.prototype.close = cadence(function (step) {
this._stream.end(step())
})

function cookEntry (options, author) {
var entry, buffer, json, line, length

Expand Down Expand Up @@ -271,8 +308,26 @@ function Strata (options) {
author(buffer, body, options.page.position, length)
}

function writeEntry (options, callback) {
function writeEntry2 (options, callback) {
cookEntry(options, function (buffer, body, position, length) {
var check = validator(callback), transcript = options.transcript

transcript.write(buffer, check(sent))

function sent () {
options.page.position += length
if (!(options.page.address % 2) || options.type == 'footer') {
callback(null, position, length)
} else {
writeFooter2(options.transcript, options.page, function () {
callback(null, position, length, body && body.length)
})
}
}
})
}

function writeEntry (options, callback) {
cookEntry(options, function (buffer, body, position,length) {
var check = validator(callback),
offset = 0
Expand Down Expand Up @@ -306,9 +361,9 @@ function Strata (options) {
writeEntry({ fd: fd, page: page, header: header, body: record }, callback)
}

function writeDelete (fd, page, index, callback) {
function writeDelete (transcript, page, index, callback) {
var header = [ ++page.entries, -(index + 1) ]
writeEntry({ fd: fd, page: page, header: header }, callback)
writeEntry2({ transcript: transcript, page: page, header: header }, callback)
}

function io (direction, filename, callback) {
Expand Down Expand Up @@ -359,6 +414,26 @@ function Strata (options) {
}))
}

function writeFooter2 (transcript, page, callback) {
ok(page.address % 2 && page.bookmark != null)
var header = [
0, page.bookmark.position, page.bookmark.length, page.bookmark.entry,
page.right || 0, page.position, page.entries, page.ghosts, page.positions.length - page.ghosts
]
cookEntry({
page: page,
header: header,
type: 'footer'
}, function (buffer, body, position, length) {
var check = validator(callback)
transcript.write(buffer, check(sent))
function sent () {
// page.position = header[5]
callback(null, position, length)
}
})
}

function readHeader (entry) {
var header = entry.header
return {
Expand Down Expand Up @@ -1290,15 +1365,13 @@ function Strata (options) {
function remove (index, callback) {
var ghost = page.address != 1 && index == 0,
check = validator(callback),
fd
transcript

balancer.unbalanced(page)

fs.open(filename(page.address), 'r+', 0644, check(opened))
transcript = new Transcript(filename(page.address), page.position)

function opened (opened) {
writeDelete(fd = opened, page, index, check(written, close))
}
writeDelete(transcript, page, index, check(written, close))

function written () {
if (ghost) {
Expand All @@ -1313,7 +1386,7 @@ function Strata (options) {
}

function close (writeError) {
fs.close(fd, check(complete, complete))
transcript.close(check(complete, complete))

function complete (closeError) {
toUserLand(callback, writeError || closeError)
Expand Down

0 comments on commit e8197be

Please sign in to comment.