Skip to content

Commit

Permalink
decoder: refactor as a Transform stream
Browse files Browse the repository at this point in the history
Depends on the node-ogg streams2 refactor
  • Loading branch information
TooTallNate committed Jan 26, 2013
1 parent 4c31baf commit 8bdf508
Showing 1 changed file with 44 additions and 84 deletions.
128 changes: 44 additions & 84 deletions lib/decoder.js
Expand Up @@ -6,10 +6,10 @@
var debug = require('debug')('vorbis:decoder');
var binding = require('./binding');
var inherits = require('util').inherits;
var Readable = require('stream').Readable;
var Transform = require('stream').Transform;

// node v0.8.x compat
if (!Readable) Readable = require('readable-stream');
if (!Transform) Transform = require('readable-stream/transform');

/**
* Module exports.
Expand All @@ -18,20 +18,22 @@ if (!Readable) Readable = require('readable-stream');
module.exports = Decoder;

/**
* The `Decoder` class.
* The Vorbis `Decoder` class.
* Accepts "ogg_page" Buffer instances and outputs raw PCM data.
*
* @param {Object} opts
* @api public
*/

function Decoder (stream, opts) {
if (!(this instanceof Decoder)) return new Decoder(stream, opts);
debug('creating Decoder instance for ogg_stream(%d)', stream.serialno);
Readable.call(this, opts);
function Decoder (opts) {
if (!(this instanceof Decoder)) return new Decoder(opts);
Transform.call(this, opts);

// the "ogg stream" to decode
this._stream = stream;
// XXX: nasty hack since we can't set only the Readable props through the
// Transform constructor.
this._readableState.objectMode = true;
this._readableState.lowWaterMark = 0;
this._readableState.highWaterMark = 0;

// headerin() needs to be called 3 times
this._headerCount = 3;
Expand All @@ -45,35 +47,35 @@ function Decoder (stream, opts) {
// headers have been parsed
this.vd = null;
this.vb = null;
}
inherits(Decoder, Transform);

// set to "true" when a `vorbis_block` has been decoded, and pcm data should be
// read
this._blockin = false;
// set to "true" when the `ogg_packet` with "e_o_s" marked on it is decoded
this._gotEos = false;
/**
* Alias `packetin()` as `write()`, for backwards-compat.
*/

this.packetin = this.packetin.bind(this);
stream.on('packet', this.packetin);
}
inherits(Decoder, Readable);
Decoder.prototype.packetin = Decoder.prototype.write;

/**
* Called for the stream that's being decoded's "packet" event.
* This function passes the "ogg_packet" struct to the libvorbis backend.
*
* @api private
*/

Decoder.prototype.packetin = function (packet, done) {
debug('packetin()');
Decoder.prototype._transform = function (packet, output, fn) {
debug('_transform()');

var r;
if (this._headerCount > 0) {
debug('headerin', this._headerCount);
// still decoding the header...
var vi = this.vi;
var vc = this.vc;
binding.vorbis_synthesis_headerin(vi, vc, packet, function (r) {
debug('headerin return = %d', r);
if (0 !== r) {
//this._error(new Error('headerin() failed: ' + r));
done(new Error('headerin() failed: ' + r));
fn(new Error('headerin() failed: ' + r));
return;
}
this._headerCount--;
Expand All @@ -90,75 +92,43 @@ Decoder.prototype.packetin = function (packet, done) {
}
this.emit('format', format);
var err = this._synthesis_init();
if (err) return done(err);
if (err) return fn(err);
}
done();
fn();
}.bind(this));
} else if (this._blockin) {
debug('need to wait for _read()');
this.once('pcmout', this.packetin.bind(this, packet, done));
} else {
debug('synthesising ogg_packet (packetno %d)', packet.packetno);
var vd = this.vd;
var vb = this.vb;
// TODO: async...
r = binding.vorbis_synthesis(vb, packet);
if (0 !== r) {
this._error(new Error('vorbis_synthesis() failed: ' + r));
process.nextTick(done);
return;
}
if (packet.e_o_s) {
this._gotEos = true;
return fn(new Error('vorbis_synthesis() failed: ' + r));
}
// TODO: async...
r = binding.vorbis_synthesis_blockin(vd, vb);
if (0 !== r) {
this._error(new Error('vorbis_synthesis_blockin() failed: ' + r));
process.nextTick(done);
return;
return fn(new Error('vorbis_synthesis_blockin() failed: ' + r));
}
this._blockin = true;
this.emit('blockin');
process.nextTick(done);
}
};
//fn();
//process.nextTick(done);

/**
* Readable stream base class _read() callback function.
*
* @api private
*/

Decoder.prototype._read = function (bytes, done) {
debug('_read(%d bytes)', bytes);
if (!this._blockin) {
debug('need to wait for "vorbis_block" to be decoded...');
this.once('blockin', this._read.bind(this, bytes, done));
return;
}
var vd = this.vd;
var channels = this.channels;

var b = binding.vorbis_synthesis_pcmout(vd, channels);
if (0 === b) {
debug('need more "vorbis_block" data...');
this._blockin = false;
if (this._gotEos) {
// we're done, send EOF
done(null, null);
var channels = this.channels;
// TODO: async...
var b = binding.vorbis_synthesis_pcmout(vd, channels);
//console.log(b);
if (0 === b) {
debug('need more "vorbis_block" data...');
fn();
} else if (b < 0) {
// some other error...
fn(new Error('vorbis_synthesis_pcmout() failed: ' + b));
} else {
// need to wait for another vorbis_block to be decoded
this.once('blockin', this._read.bind(this, bytes, done));
debug('got PCM data (%d bytes)', b.length);
output(b);
fn();
// need to wait for another write() call..
}
this.emit('pcmout');
} else if (b < 0) {
// some other error...
done(new Error('vorbis_synthesis_pcmout() failed: ' + b));
} else {
debug('got PCM data (%d bytes)', b.length);
done(null, b);
// need to wait for another _read() call..
}
};

Expand All @@ -183,13 +153,3 @@ Decoder.prototype._synthesis_init = function () {
return new Error(r);
}
};

/**
* Emits an "error" event and tears down the decoder.
*/

Decoder.prototype._error = function (err) {
this._stream.removeListener('packet', this.packetin);
this._stream = null;
this.emit('error', err);
};

0 comments on commit 8bdf508

Please sign in to comment.