Skip to content

Commit

Permalink
Merge 7c8d829 into 11677db
Browse files Browse the repository at this point in the history
  • Loading branch information
albe committed Feb 21, 2021
2 parents 11677db + 7c8d829 commit 5cbaf25
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 10 deletions.
84 changes: 79 additions & 5 deletions src/Partition/ReadablePartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ const { assert } = require('../util');
const DEFAULT_READ_BUFFER_SIZE = 64 * 1024;
const DOCUMENT_HEADER_SIZE = 16;
const DOCUMENT_ALIGNMENT = 4;
const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n";

// node-event-store partition V02
const HEADER_MAGIC = "nesprt02";
// node-event-store partition V03
const HEADER_MAGIC = "nesprt03";

class CorruptFileError extends Error {}
class InvalidDataSizeError extends Error {}
Expand Down Expand Up @@ -160,8 +161,8 @@ class ReadablePartition extends events.EventEmitter {
* @returns {number} The size of the data including header, padded to 16 bytes alignment and ended with a line break.
*/
documentWriteSize(dataSize) {
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
return dataSize + 1 + padSize + DOCUMENT_HEADER_SIZE;
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 4 + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
return dataSize + DOCUMENT_SEPARATOR.length + 4 + padSize + DOCUMENT_HEADER_SIZE;
}

/**
Expand Down Expand Up @@ -221,7 +222,8 @@ class ReadablePartition extends events.EventEmitter {
throw new InvalidDataSizeError(`Invalid document size ${dataSize} at position ${position}, expected ${size}.`);
}

if (position + dataSize + DOCUMENT_HEADER_SIZE > this.size) {
const writeSize = this.documentWriteSize(dataSize);
if (position + writeSize > this.size) {
throw new CorruptFileError(`Invalid document at position ${position}. This may be caused by an unfinished write.`);
}

Expand Down Expand Up @@ -249,6 +251,25 @@ class ReadablePartition extends events.EventEmitter {
return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
}

/**
* Prepare the read buffer for reading *before* the specified position. Don't try to reader *after* the returned cursor.
*
* @protected
* @param {number} position The position in the file to prepare the read buffer for reading before.
* @returns {{ buffer: Buffer|null, cursor: number, length: number }} A reader object with properties `buffer`, `cursor` and `length`.
*/
prepareReadBufferBackwards(position) {
if (position < 0) {
return ({ buffer: null, cursor: 0, length: 0 });
}
let bufferCursor = position - this.readBufferPos;
if (this.readBufferPos < 0 || (this.readBufferPos > 0 && bufferCursor < DOCUMENT_SEPARATOR.length + 4)) {
this.fillBuffer(Math.max(position - this.readBuffer.byteLength, 0));
bufferCursor = this.readBufferLength;
}
return ({ buffer: this.readBuffer, cursor: bufferCursor, length: this.readBufferLength });
}

/**
* Read the data from the given position.
*
Expand Down Expand Up @@ -290,6 +311,45 @@ class ReadablePartition extends events.EventEmitter {
return reader.buffer.toString('utf8', dataPosition, dataPosition + dataSize);
}

/**
* Find the start position of the document that precedes the given position.
*
* @protected
* @param {number} position The file position to read backwards from.
* @returns {number|boolean} The start position of the first document before the given position or false if no header could be found.
*/
findDocumentPositionBefore(position) {
assert(this.fd, 'Partition is not opened.');
if (position <= 0) {
return false;
}

assert((position % DOCUMENT_ALIGNMENT) === 0, `Invalid read position. Needs to be a multiple of ${DOCUMENT_ALIGNMENT}.`);

const separatorSize = DOCUMENT_SEPARATOR.length;
// Optimization if we are at an exact document boundary, where we can just read the document size
let reader = this.prepareReadBufferBackwards(position);
const block = reader.buffer.toString('ascii', reader.cursor - separatorSize, reader.cursor);
if (block === DOCUMENT_SEPARATOR) {
const dataSize = reader.buffer.readUInt32BE(reader.cursor - separatorSize - 4);
return position - this.documentWriteSize(dataSize);
}

do {
reader = this.prepareReadBufferBackwards(position - separatorSize);

const bufferSeparatorPosition = reader.buffer.lastIndexOf(DOCUMENT_SEPARATOR, reader.cursor - separatorSize, 'ascii');
if (bufferSeparatorPosition >= 0) {
position = this.readBufferPos + bufferSeparatorPosition + separatorSize;
break;
}
position -= this.readBufferLength;
} while (position > 0);
return position;
/*const header = this.readDocumentHeader(reader.buffer, reader.cursor, position);
return ({ position, ...header });*/
}

/**
* @api
* @returns {Generator<string>} A generator that returns all documents in this partition.
Expand All @@ -303,6 +363,20 @@ class ReadablePartition extends events.EventEmitter {
}
}

/**
* @api
* @returns {Generator<string>} A generator that returns all documents in this partition in reverse order.
*/
*readAllBackwards() {
let position = this.size;
while ((position = this.findDocumentPositionBefore(position)) !== false) {
const data = this.readFrom(position);
if (data === false) {
break;
}
yield data;
}
}
}

module.exports = ReadablePartition;
Expand Down
19 changes: 14 additions & 5 deletions src/Partition/WritablePartition.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ const Clock = require('../Clock');
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
const DOCUMENT_HEADER_SIZE = 16;
const DOCUMENT_ALIGNMENT = 4;
const DOCUMENT_PAD = ' '.repeat(15) + "\n";
const DOCUMENT_SEPARATOR = "\x00\x00\x1E\n";
const DOCUMENT_PAD = ' '.repeat(16 - 4 - DOCUMENT_SEPARATOR.length)/* + DOCUMENT_SEPARATOR*/;

const NES_EPOCH = new Date('2020-01-01T00:00:00');

/**
* @param {number} dataSize
* @returns {string} The data padded to 16 bytes alignment and ended with a line break.
* @returns {string} The data padded to 16 bytes alignment and ended with \0x1E (record separator) and a line break.
*/
function padData(dataSize) {
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 1) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
return DOCUMENT_PAD.substr(-padSize - 1);
const padSize = (DOCUMENT_ALIGNMENT - ((dataSize + 4 + DOCUMENT_SEPARATOR.length) % DOCUMENT_ALIGNMENT)) % DOCUMENT_ALIGNMENT;
return DOCUMENT_PAD.substr(0, padSize);
}

/**
Expand Down Expand Up @@ -218,6 +219,11 @@ class WritablePartition extends ReadablePartition {
let bytesWritten = 0;
bytesWritten += fs.writeSync(this.fd, dataHeader);
bytesWritten += fs.writeSync(this.fd, data);
bytesWritten += fs.writeSync(this.fd, padData(dataSize));
const dataSizeBuffer = Buffer.alloc(4);
dataSizeBuffer.writeUInt32BE(dataSize, 0);
bytesWritten += fs.writeSync(this.fd, dataSizeBuffer);
bytesWritten += fs.writeSync(this.fd, DOCUMENT_SEPARATOR);
if (typeof callback === 'function') {
process.nextTick(callback);
}
Expand All @@ -240,6 +246,10 @@ class WritablePartition extends ReadablePartition {
let bytesWritten = 0;
bytesWritten += this.writeDocumentHeader(this.writeBuffer, this.writeBufferCursor, dataSize, sequenceNumber);
bytesWritten += this.writeBuffer.write(data, this.writeBufferCursor + bytesWritten, 'utf8');
bytesWritten += this.writeBuffer.write(padData(dataSize), this.writeBufferCursor + bytesWritten, 'utf8');
this.writeBuffer.writeUInt32BE(dataSize, this.writeBufferCursor + bytesWritten);
bytesWritten += 4;
bytesWritten += this.writeBuffer.write(DOCUMENT_SEPARATOR, this.writeBufferCursor + bytesWritten, 'utf8');
this.writeBufferCursor += bytesWritten;
this.writeBufferDocuments++;
if (typeof callback === 'function') {
Expand Down Expand Up @@ -267,7 +277,6 @@ class WritablePartition extends ReadablePartition {
const dataSize = Buffer.byteLength(data, 'utf8');
assert(dataSize <= 64 * 1024 * 1024, 'Document is too large! Maximum is 64 MB');

data += padData(dataSize);
const dataPosition = this.size;
if (dataSize + DOCUMENT_HEADER_SIZE >= this.writeBuffer.byteLength * 4 / 5) {
this.size += this.writeUnbuffered(data, dataSize, sequenceNumber, callback);
Expand Down
30 changes: 30 additions & 0 deletions test/Partition.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,36 @@ describe('Partition', function() {

});

describe('readAll', function() {

it('reads all documents in write order', function() {
partition.open();
fillPartition(50, i => 'foo-' + i.toString());
partition.close();
partition.open();
let i = 1;
for (let data of partition.readAll()) {
expect(data).to.be('foo-' + i.toString());
i++;
}
expect(i).to.be(51);
});

it('reads all documents in backwards write order', function() {
partition.open();
fillPartition(50, i => 'foo-' + i.toString());
partition.close();
partition.open();
let i = 50;
for (let data of partition.readAllBackwards()) {
expect(data).to.be('foo-' + i.toString());
i--;
}
expect(i).to.be(0);
});

});

describe('readFrom', function() {

it('returns false when partition is not open', function() {
Expand Down

0 comments on commit 5cbaf25

Please sign in to comment.