Skip to content

Commit

Permalink
Merge pull request #16 from dudleycarr/correct-checksums
Browse files Browse the repository at this point in the history
Fix checksum masks
  • Loading branch information
dudleycarr committed Sep 3, 2017
2 parents 941560a + a34ab62 commit 36ba7c6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 47 deletions.
106 changes: 69 additions & 37 deletions lib/snappystreams.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,13 @@ class SnappyStream extends stream.Transform {
// Split data if need be into chunks no larger than the maximum size for
// a frame.
const out = Buffer.from(data)
const dataChunks = (() => {
const result = []
for (
let offset = 0,
end1 = out.length / MAX_FRAME_DATA_SIZE,
asc = end1 >= 0;
asc ? offset <= end1 : offset >= end1;
asc ? offset++ : offset--
) {
const start = offset * MAX_FRAME_DATA_SIZE
const end = start + MAX_FRAME_DATA_SIZE
result.push(out.slice(start, end))
}
return result
})()

const dataChunks = []
for (let offset = 0; offset < out.length / MAX_FRAME_DATA_SIZE; offset++) {
const start = offset * MAX_FRAME_DATA_SIZE
const end = start + MAX_FRAME_DATA_SIZE
dataChunks.push(out.slice(start, end))
}

return async.map(
dataChunks,
Expand All @@ -69,14 +61,29 @@ class SnappyStream extends stream.Transform {
}

const frameChunks = []
for (let frameData of Array.from(compressedDataChunks)) {

for (let i = 0; i < dataChunks.length; i++) {
const chunkData = dataChunks[i]
const frameData = compressedDataChunks[i]

const frameStart = Buffer.alloc(8)
frameStart.writeUInt8(CHUNKS.compressedData, 0)
int24.writeUInt24LE(frameStart, 1, frameData.length + 4)
frameStart.writeUInt32LE(checksumMask(frameData), 4, true)

let headerType = CHUNKS.compressedData
let payload = frameData

// If the improvement isn't more than 12.5% then use uncompressed
// data.
if (frameData.length >= (chunkData.length - chunkData.length / 8)) {
headerType = CHUNKS.uncompressedData
payload = chunkData
}

frameStart.writeUInt8(headerType, 0)
int24.writeUInt24LE(frameStart, 1, payload.length + 4)
frameStart.writeUInt32LE(checksumMask(chunkData), 4, true)

frameChunks.push(frameStart)
frameChunks.push(frameData)
frameChunks.push(payload)
}

this.push(Buffer.concat(frameChunks))
Expand All @@ -96,16 +103,13 @@ class UnsnappyStream extends stream.Transform {

// Returns snappy compressed payload. Throws an error if the checksum fails
// provided stream is checking checksums.
framePayload (data) {
const frameLength = int24.readUInt24LE(data, 1)
const mask = data.readUInt32LE(4)
const payload = data.slice(8, frameLength + 4)

if (this.verifyChecksums && checksumMask(payload) !== mask) {
throw new Error('Frame failed checksum')
}
framePayload (frame) {
const frameLength = int24.readUInt24LE(frame, 1)
return frame.slice(8, frameLength + 4)
}

return payload
frameMask (frame) {
return frame.readUInt32LE(4)
}

// Data contains at least one full frame.
Expand All @@ -120,15 +124,34 @@ class UnsnappyStream extends stream.Transform {
return data.slice(4 + frameLength)
}

verify (mask, data, callback) {
if (this.verifyChecksums && checksumMask(data) !== mask) {
return callback(new Error('Frame failed checksum'))
}

callback(null, data)
}

processChunks (chunks, done) {
const uncompressChunk = function (chunk, cb) {
if (chunk[0] === CHUNKS.uncompressedData) {
return cb(null, chunk[1])
}
return snappy.uncompress(chunk[1], cb)
const uncompressVerify = ([frameType, mask, payload], callback) => {
async.waterfall([
// Uncompress daeta if need be
callback => {
if (frameType === CHUNKS.uncompressedData) {
return callback(null, payload)
}

snappy.uncompress(payload, callback)
},
// Verify
(uncompressedPayload, callback) => {
this.verify(mask, uncompressedPayload, callback)
}
],
callback)
}

return async.map(chunks, uncompressChunk, (err, data) => {
return async.map(chunks, uncompressVerify, (err, data) => {
if (err) {
return this.emit('error', err)
}
Expand Down Expand Up @@ -169,10 +192,18 @@ class UnsnappyStream extends stream.Transform {
this.identifierFound = true
break
case CHUNKS.compressedData:
chunks.push([CHUNKS.compressedData, this.framePayload(data)])
chunks.push([
CHUNKS.compressedData,
this.frameMask(data),
this.framePayload(data)
])
break
case CHUNKS.uncompressedData:
chunks.push([CHUNKS.uncompressedData, this.framePayload(data)])
chunks.push([
CHUNKS.uncompressedData,
this.frameMask(data),
this.framePayload(data)
])
break
case CHUNKS.unskippable(frameId):
throw new Error('Encountered unskippable frame')
Expand All @@ -187,6 +218,7 @@ class UnsnappyStream extends stream.Transform {
if (data.length) {
this.frameBuffer = data
}

if (chunks.length) {
return this.processChunks(chunks, done)
} else {
Expand Down
18 changes: 10 additions & 8 deletions test/snappystream_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ const int24 = require('int24')
const snappy = require('snappy')
const { SnappyStream } = require('../lib/snappystreams')

const sentence = 'the quick brown fox jumped over the lazy dog.'
const txt = [sentence, sentence, sentence].join('\n')

// Generate a snappy stream from data. Return the snappy stream as a string.
function compress (data, callback) {
let compressedFrames = Buffer.alloc(0)
Expand All @@ -26,12 +29,12 @@ describe('SnappyStream', () => {
describe('stream identifer', () => {
let compressedFrames = null

before(done =>
compress('test', (err, data) => {
before(done => {
compress(txt, (err, data) => {
compressedFrames = data
return done(err)
})
)
})

it('should have the stream identifier chunk ID', () =>
compressedFrames.readUInt8(0).should.eql(0xff))
Expand All @@ -47,16 +50,15 @@ describe('SnappyStream', () => {
})

describe('single compressed frame', () => {
const data = 'test'
let compressedFrames = null
let compressedData = null

before(done =>
snappy.compress(data, (err, snappyData) => {
snappy.compress(txt, (err, snappyData) => {
if (err) { return done(err) }
compressedData = snappyData

return compress(data, (err, out) => {
return compress(txt, (err, out) => {
compressedFrames = out.slice(10)
return done(err)
})
Expand All @@ -74,13 +76,13 @@ describe('SnappyStream', () => {
})

it('should have a valid checksum mask', () =>
compressedFrames.readUInt32LE(4).should.eql(0x3239074d))
compressedFrames.readUInt32LE(4).should.eql(0xa2825e6b))

return it('should have match decompressed data', done => {
const payload = compressedFrames.slice(8)
return snappy.uncompress(payload, { asBuffer: false },
(err, uncompressedPayload) => {
uncompressedPayload.should.eql(data)
uncompressedPayload.should.eql(txt)
return done(err)
})
})
Expand Down
4 changes: 2 additions & 2 deletions test/unsnappystream_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ describe('UnsnappyStream', () => {

describe('processChunks', () => {
it('should return decompressed data for compressed chunks', done => {
const chunks = [[0x00, compressedData], [0x00, compressedData]]
const chunks = [[0x00, null, compressedData], [0x00, null, compressedData]]
return stream.processChunks(chunks, () => {
stream.read().should.eql(Buffer.from(data + data))
return done()
})
})
return it('should return decompressed data for multiple of chunks types',
done => {
const chunks = [[0x00, compressedData], [0x01, Buffer.from('hello world')]]
const chunks = [[0x00, null, compressedData], [0x01, null, Buffer.from('hello world')]]
return stream.processChunks(chunks, () => {
stream.read().should.eql(Buffer.from(data + 'hello world'))
return done()
Expand Down

0 comments on commit 36ba7c6

Please sign in to comment.