Skip to content

Commit

Permalink
Add example of streaming (asynchronous) decompression.
Browse files Browse the repository at this point in the history
  • Loading branch information
cscott committed Sep 26, 2013
1 parent c3ba328 commit 4b9476b
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 2 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -98,6 +98,11 @@ value will be a `Uint8Array`.
If you provide the second argument, it must be a "stream", implementing
the `writeByte` method.

## Asynchronous streaming

See the `test/stream.js` for sample code using the `fibers` package
to implement an asynchronous de/compression interface.

## Related projects

* http://code.google.com/p/js-lzma Decompression code by Juan Mellado
Expand Down
46 changes: 46 additions & 0 deletions lib/Stream.js
@@ -0,0 +1,46 @@
if (typeof define !== 'function') { var define = require('amdefine')(module); }
define(['./freeze'],function(freeze){
'use strict';
/* very simple input/output stream interface */
var Stream = function() {
};

// input streams //////////////
/** Returns the next byte, or -1 for EOF. */
Stream.prototype.readByte = function() {
throw new Error("abstract method readByte() not implemented");
};
/** Attempts to fill the buffer; returns number of bytes read, or
* -1 for EOF. */
Stream.prototype.read = function(buffer, bufOffset, length) {
var bytesRead = 0;
while (bytesRead < length) {
var c = this.readByte();
if (c < 0) { // EOF
return (bytesRead===0) ? -1 : bytesRead;
}
buffer[bufOffset++] = c;
bytesRead++;
}
return bytesRead;
};
Stream.prototype.seek = function(new_pos) {
throw new Error("abstract method seek() not implemented");
};

// output streams ///////////
Stream.prototype.writeByte = function(_byte) {
throw new Error("abstract method readByte() not implemented");
};
Stream.prototype.write = function(buffer, bufOffset, length) {
var i;
for (i=0; i<length; i++) {
this.writeByte(buffer[bufOffset++]);
}
return length;
};
Stream.prototype.flush = function() {
};

return freeze(Stream);
});
3 changes: 2 additions & 1 deletion main.js
@@ -1,12 +1,13 @@
if (typeof define !== 'function') { var define = require('amdefine')(module); }
define(['./lib/freeze', './lib/LZ', './lib/LZMA', './lib/RangeCoder', './lib/Util'], function(freeze, LZ, LZMA, RangeCoder, Util) {
define(['./lib/freeze', './lib/LZ', './lib/LZMA', './lib/RangeCoder', './lib/Stream', './lib/Util'], function(freeze, LZ, LZMA, RangeCoder, Stream, Util) {
'use strict';

return freeze({
version: "0.9.0",
LZ: LZ,
LZMA: LZMA,
RangeCoder: RangeCoder,
Stream: Stream,
Util: Util,
// utility methods
compress: Util.compress,
Expand Down
3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -21,7 +21,8 @@
"commander": "~1.1.1"
},
"devDependencies": {
"mocha": "~1.9.0"
"mocha": "~1.9.0",
"fibers": "~1.0.1"
},
"scripts": {
"test": "mocha"
Expand Down
64 changes: 64 additions & 0 deletions test/stream.js
@@ -0,0 +1,64 @@
/* Example of Node 0.10 streaming interface. */

// only run these tests in node v0.10 and above.
if (/^v0\.[0-9]\./.test(process.version)) { return; }

var assert = require("assert");
var lzmajs = require('../');
var fs = require('fs');
var stream = require('stream');
var Fiber = require('fibers');

/** Use node-fibers to convert our synchronous Stream interface to the
* standard node asynchronous interface. */
var LzmaStream = function() {
var trans = this;
stream.Transform.call(trans); // initialize superclass.
this._fiber = new Fiber(function() {
var buffer = [], pos = 0;
var inputStream = new lzmajs.Stream();
inputStream.readByte = function() {
if (pos >= buffer.length) {
buffer = Fiber.yield(); pos = 0;
}
return buffer[pos++];
};
var outputStream = new lzmajs.Stream();
outputStream.writeByte = function(_byte) {
this.write(new Buffer([_byte]),0,1);
};
outputStream.write = function(buffer, bufOffset, length) {
if (bufOffset !== 0 || length !== buffer.length) {
buffer = buffer.slice(bufOffset, bufOffset + length);
}
trans.push(buffer);
};
lzmajs.decompressFile(inputStream, outputStream);
});
this._fiber.run();
};
LzmaStream.prototype = Object.create(stream.Transform.prototype);
LzmaStream.prototype._transform = function(chunk, encoding, callback) {
this._fiber.run(chunk);
callback();
};

describe('lzma streaming decode', function(){
['sample0', 'sample1', 'sample2', 'sample3', 'sample4'].forEach(function(f) {
it('should correctly decode '+f, function(callback) {
this.timeout(0); // no timeout!
var referenceData = fs.readFileSync('test/'+f+'.ref');
var inStream = fs.createReadStream('test/'+f+'.lzma');
var outStream = inStream.pipe(new LzmaStream());
var data = new Buffer(referenceData.length), pos = 0;
outStream.on('readable', function() {
var b = outStream.read(), i;
for (i=0; i<b.length; i++) { data[pos++] = b[i]; }
}).on('end', function() {
assert.equal(pos, data.length);
assert.equal(data.toString('hex'), referenceData.toString('hex'));
callback();
});
});
});
});

0 comments on commit 4b9476b

Please sign in to comment.