diff --git a/lib/snappystreams.js b/lib/snappystreams.js index 9ac852e..d5058b0 100644 --- a/lib/snappystreams.js +++ b/lib/snappystreams.js @@ -44,21 +44,13 @@ class SnappyStream extends stream.Transform { // Split data if need be into chunks no larger than the maximum size for // a frame. const out = Buffer.from(data) - const dataChunks = (() => { - const result = [] - for ( - let offset = 0, - end1 = out.length / MAX_FRAME_DATA_SIZE, - asc = end1 >= 0; - asc ? offset <= end1 : offset >= end1; - asc ? offset++ : offset-- - ) { - const start = offset * MAX_FRAME_DATA_SIZE - const end = start + MAX_FRAME_DATA_SIZE - result.push(out.slice(start, end)) - } - return result - })() + + const dataChunks = [] + for (let offset = 0; offset < out.length / MAX_FRAME_DATA_SIZE; offset++) { + const start = offset * MAX_FRAME_DATA_SIZE + const end = start + MAX_FRAME_DATA_SIZE + dataChunks.push(out.slice(start, end)) + } return async.map( dataChunks, @@ -69,14 +61,29 @@ class SnappyStream extends stream.Transform { } const frameChunks = [] - for (let frameData of Array.from(compressedDataChunks)) { + + for (let i = 0; i < dataChunks.length; i++) { + const chunkData = dataChunks[i] + const frameData = compressedDataChunks[i] + const frameStart = Buffer.alloc(8) - frameStart.writeUInt8(CHUNKS.compressedData, 0) - int24.writeUInt24LE(frameStart, 1, frameData.length + 4) - frameStart.writeUInt32LE(checksumMask(frameData), 4, true) + + let headerType = CHUNKS.compressedData + let payload = frameData + + // If the improvement isn't more than 12.5% then use uncompressed + // data. + if (frameData.length >= (chunkData.length - chunkData.length / 8)) { + headerType = CHUNKS.uncompressedData + payload = chunkData + } + + frameStart.writeUInt8(headerType, 0) + int24.writeUInt24LE(frameStart, 1, payload.length + 4) + frameStart.writeUInt32LE(checksumMask(chunkData), 4, true) frameChunks.push(frameStart) - frameChunks.push(frameData) + frameChunks.push(payload) } this.push(Buffer.concat(frameChunks)) @@ -96,16 +103,13 @@ class UnsnappyStream extends stream.Transform { // Returns snappy compressed payload. Throws an error if the checksum fails // provided stream is checking checksums. - framePayload (data) { - const frameLength = int24.readUInt24LE(data, 1) - const mask = data.readUInt32LE(4) - const payload = data.slice(8, frameLength + 4) - - if (this.verifyChecksums && checksumMask(payload) !== mask) { - throw new Error('Frame failed checksum') - } + framePayload (frame) { + const frameLength = int24.readUInt24LE(frame, 1) + return frame.slice(8, frameLength + 4) + } - return payload + frameMask (frame) { + return frame.readUInt32LE(4) } // Data contains at least one full frame. @@ -120,15 +124,34 @@ class UnsnappyStream extends stream.Transform { return data.slice(4 + frameLength) } + verify (mask, data, callback) { + if (this.verifyChecksums && checksumMask(data) !== mask) { + return callback(new Error('Frame failed checksum')) + } + + callback(null, data) + } + processChunks (chunks, done) { - const uncompressChunk = function (chunk, cb) { - if (chunk[0] === CHUNKS.uncompressedData) { - return cb(null, chunk[1]) - } - return snappy.uncompress(chunk[1], cb) + const uncompressVerify = ([frameType, mask, payload], callback) => { + async.waterfall([ + // Uncompress daeta if need be + callback => { + if (frameType === CHUNKS.uncompressedData) { + return callback(null, payload) + } + + snappy.uncompress(payload, callback) + }, + // Verify + (uncompressedPayload, callback) => { + this.verify(mask, uncompressedPayload, callback) + } + ], + callback) } - return async.map(chunks, uncompressChunk, (err, data) => { + return async.map(chunks, uncompressVerify, (err, data) => { if (err) { return this.emit('error', err) } @@ -169,10 +192,18 @@ class UnsnappyStream extends stream.Transform { this.identifierFound = true break case CHUNKS.compressedData: - chunks.push([CHUNKS.compressedData, this.framePayload(data)]) + chunks.push([ + CHUNKS.compressedData, + this.frameMask(data), + this.framePayload(data) + ]) break case CHUNKS.uncompressedData: - chunks.push([CHUNKS.uncompressedData, this.framePayload(data)]) + chunks.push([ + CHUNKS.uncompressedData, + this.frameMask(data), + this.framePayload(data) + ]) break case CHUNKS.unskippable(frameId): throw new Error('Encountered unskippable frame') @@ -187,6 +218,7 @@ class UnsnappyStream extends stream.Transform { if (data.length) { this.frameBuffer = data } + if (chunks.length) { return this.processChunks(chunks, done) } else { diff --git a/test/snappystream_test.js b/test/snappystream_test.js index f6280bd..90ff4a7 100644 --- a/test/snappystream_test.js +++ b/test/snappystream_test.js @@ -3,6 +3,9 @@ const int24 = require('int24') const snappy = require('snappy') const { SnappyStream } = require('../lib/snappystreams') +const sentence = 'the quick brown fox jumped over the lazy dog.' +const txt = [sentence, sentence, sentence].join('\n') + // Generate a snappy stream from data. Return the snappy stream as a string. function compress (data, callback) { let compressedFrames = Buffer.alloc(0) @@ -26,12 +29,12 @@ describe('SnappyStream', () => { describe('stream identifer', () => { let compressedFrames = null - before(done => - compress('test', (err, data) => { + before(done => { + compress(txt, (err, data) => { compressedFrames = data return done(err) }) - ) + }) it('should have the stream identifier chunk ID', () => compressedFrames.readUInt8(0).should.eql(0xff)) @@ -47,16 +50,15 @@ describe('SnappyStream', () => { }) describe('single compressed frame', () => { - const data = 'test' let compressedFrames = null let compressedData = null before(done => - snappy.compress(data, (err, snappyData) => { + snappy.compress(txt, (err, snappyData) => { if (err) { return done(err) } compressedData = snappyData - return compress(data, (err, out) => { + return compress(txt, (err, out) => { compressedFrames = out.slice(10) return done(err) }) @@ -74,13 +76,13 @@ describe('SnappyStream', () => { }) it('should have a valid checksum mask', () => - compressedFrames.readUInt32LE(4).should.eql(0x3239074d)) + compressedFrames.readUInt32LE(4).should.eql(0xa2825e6b)) return it('should have match decompressed data', done => { const payload = compressedFrames.slice(8) return snappy.uncompress(payload, { asBuffer: false }, (err, uncompressedPayload) => { - uncompressedPayload.should.eql(data) + uncompressedPayload.should.eql(txt) return done(err) }) }) diff --git a/test/unsnappystream_test.js b/test/unsnappystream_test.js index 654dff2..7d047f1 100644 --- a/test/unsnappystream_test.js +++ b/test/unsnappystream_test.js @@ -106,7 +106,7 @@ describe('UnsnappyStream', () => { describe('processChunks', () => { it('should return decompressed data for compressed chunks', done => { - const chunks = [[0x00, compressedData], [0x00, compressedData]] + const chunks = [[0x00, null, compressedData], [0x00, null, compressedData]] return stream.processChunks(chunks, () => { stream.read().should.eql(Buffer.from(data + data)) return done() @@ -114,7 +114,7 @@ describe('UnsnappyStream', () => { }) return it('should return decompressed data for multiple of chunks types', done => { - const chunks = [[0x00, compressedData], [0x01, Buffer.from('hello world')]] + const chunks = [[0x00, null, compressedData], [0x01, null, Buffer.from('hello world')]] return stream.processChunks(chunks, () => { stream.read().should.eql(Buffer.from(data + 'hello world')) return done()