Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
support reading from streams!
queryInfo not yet supported
  • Loading branch information
benvanik committed Nov 26, 2011
1 parent 0cc3c16 commit 065c601
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 36 deletions.
5 changes: 5 additions & 0 deletions src/io/filereader.cpp
Expand Up @@ -22,3 +22,8 @@ int FileReader::Open() {
this->context = s;
return 0;
}

void FileReader::Close() {
avio_close(this->context);
this->context = NULL;
}
1 change: 1 addition & 0 deletions src/io/filereader.h
Expand Up @@ -17,6 +17,7 @@ class FileReader : public IOReader {
virtual ~FileReader();

virtual int Open();
virtual void Close();

public:
std::string path;
Expand Down
7 changes: 6 additions & 1 deletion src/io/filewriter.cpp
Expand Up @@ -7,7 +7,7 @@ FileWriter::FileWriter(Handle<Object> source) :
IOWriter(source) {
HandleScope scope;

this->path = *String::AsciiValue(source);
this->path = *String::Utf8Value(source);
}

FileWriter::~FileWriter() {
Expand All @@ -22,3 +22,8 @@ int FileWriter::Open() {
this->context = s;
return 0;
}

void FileWriter::Close() {
avio_close(this->context);
this->context = NULL;
}
1 change: 1 addition & 0 deletions src/io/filewriter.h
Expand Up @@ -17,6 +17,7 @@ class FileWriter : public IOWriter {
virtual ~FileWriter();

virtual int Open();
virtual void Close();

public:
std::string path;
Expand Down
3 changes: 3 additions & 0 deletions src/io/io.cpp
Expand Up @@ -30,6 +30,9 @@ AVFormatContext* transcoding::io::createInputContext(
goto CLEANUP;
}

// Prevent avio_close (which would hose us)
ctx->flags |= AVFMT_FLAG_CUSTOM_IO;

ret = av_find_stream_info(ctx);
if (ret < 0) {
goto CLEANUP;
Expand Down
5 changes: 0 additions & 5 deletions src/io/iohandle.cpp
Expand Up @@ -17,11 +17,6 @@ IOHandle::~IOHandle() {
this->source.Dispose();
}

void IOHandle::Close() {
avio_close(this->context);
this->context = NULL;
}

IOReader::IOReader(Handle<Object> source) :
IOHandle(source) {
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/iohandle.h
Expand Up @@ -16,7 +16,7 @@ class IOHandle {
virtual ~IOHandle();

virtual int Open() = 0;
virtual void Close();
virtual void Close() = 0;

public:
Persistent<Object> source;
Expand Down
209 changes: 199 additions & 10 deletions src/io/streamreader.cpp
Expand Up @@ -5,52 +5,241 @@ using namespace node;
using namespace transcoding;
using namespace transcoding::io;

#define STREAM_HANDLE_BUFFER_SIZE (64 * 1024)
// The stream reader works by listening for data events on the v8 thread
// and queuing them up. The actual ReadPacket calls come across on a worker
// thread and pop them off. If there are no buffers available, the worker thread
// will block until there are. If there are too many buffers queued, the stream
// is paused until buffers are drained.

StreamReader::StreamReader(Handle<Object> source) :
IOReader(source) {
StreamReader::StreamReader(Handle<Object> source, size_t maxBufferedBytes) :
IOReader(source),
paused(false), err(0), eof(false),
maxBufferedBytes(maxBufferedBytes), totalBufferredBytes(0) {
HandleScope scope;

pthread_mutex_init(&this->lock, NULL);
pthread_cond_init(&this->cond, NULL);

// TODO: support seeking?
this->canSeek = false;

// Add events to stream
// TODO: keep self alive somehow?
NODE_ON_EVENT(source, "data", OnData, this);
NODE_ON_EVENT(source, "end", OnEnd, this);
NODE_ON_EVENT(source, "close", OnClose, this);
NODE_ON_EVENT(source, "error", OnError, this);

// Kick off the stream (some sources need this)
Local<Function> resume =
Local<Function>::Cast(source->Get(String::New("resume")));
if (!resume.IsEmpty()) {
resume->Call(source, 0, NULL);
}
}

StreamReader::~StreamReader() {
pthread_cond_destroy(&this->cond);
pthread_mutex_destroy(&this->lock);
}

int StreamReader::Open() {
int bufferSize = STREAM_HANDLE_BUFFER_SIZE;
int bufferSize = STREAMREADER_BUFFER_SIZE;
uint8_t* buffer = (uint8_t*)av_malloc(bufferSize);
AVIOContext* s = avio_alloc_context(
buffer, bufferSize,
0, // 1 = write
this,
ReadPacket, NULL, this->canSeek ? Seek : NULL);
s->seekable = 0; // AVIO_SEEKABLE_NORMAL
this->context = s;
return 0;
}

void StreamReader::Close() {
HandleScope scope;
Local<Object> global = Context::GetCurrent()->Global();
Handle<Object> source = this->source;

Local<Function> end =
Local<Function>::Cast(this->source->Get(String::New("end")));
end->Call(this->source, 0, NULL);
// Unbind all events
// NOTE: this will remove any user ones too, which could be bad...
Local<Function> removeAllListeners =
Local<Function>::Cast(source->Get(String::New("removeAllListeners")));
removeAllListeners->Call(source,
1, (Handle<Value>[]){ String::New("data") });
removeAllListeners->Call(source,
1, (Handle<Value>[]){ String::New("end") });
removeAllListeners->Call(source,
1, (Handle<Value>[]){ String::New("close") });
removeAllListeners->Call(source,
1, (Handle<Value>[]){ String::New("error") });

bool readable = source->Get(String::New("readable"))->IsTrue();
if (readable) {
Local<Function> destroy =
Local<Function>::Cast(source->Get(String::New("destroy")));
if (!destroy.IsEmpty()) {
destroy->Call(source, 0, NULL);
}
}

av_free(this->context->buffer);
av_free(this->context);
this->context = NULL;
}

Handle<Value> StreamReader::OnData(const Arguments& args) {
HandleScope scope;
StreamReader* stream =
static_cast<StreamReader*>(External::Unwrap(args.Data()));

pthread_mutex_lock(&stream->lock);

Local<Object> buffer = Local<Object>::Cast(args[0]);
ReadBuffer* readBuffer = new ReadBuffer(
(uint8_t*)Buffer::Data(buffer), Buffer::Length(buffer));
stream->buffers.push_back(readBuffer);
stream->totalBufferredBytes += readBuffer->length;

// Check for max buffer condition
bool needsPause = false;
if (stream->totalBufferredBytes > stream->maxBufferedBytes) {
//needsPause = true;
stream->paused = true;
}

//pthread_cond_signal(&stream->cond);
pthread_mutex_unlock(&stream->lock);

if (needsPause) {
Local<Function> pause =
Local<Function>::Cast(stream->source->Get(String::New("pause")));
if (!pause.IsEmpty()) {
pause->Call(stream->source, 0, NULL);
}
}

return scope.Close(Undefined());
}

Handle<Value> StreamReader::OnEnd(const Arguments& args) {
HandleScope scope;
StreamReader* stream =
static_cast<StreamReader*>(External::Unwrap(args.Data()));

pthread_mutex_lock(&stream->lock);
stream->eof = true;
pthread_cond_signal(&stream->cond);
pthread_mutex_unlock(&stream->lock);

return scope.Close(Undefined());
}

Handle<Value> StreamReader::OnClose(const Arguments& args) {
HandleScope scope;
StreamReader* stream =
static_cast<StreamReader*>(External::Unwrap(args.Data()));

pthread_mutex_lock(&stream->lock);
stream->eof = true;
pthread_cond_signal(&stream->cond);
pthread_mutex_unlock(&stream->lock);

return scope.Close(Undefined());
}

Handle<Value> StreamReader::OnError(const Arguments& args) {
HandleScope scope;
StreamReader* stream =
static_cast<StreamReader*>(External::Unwrap(args.Data()));

printf("StreamReader::OnError %s\n", *String::Utf8Value(args[0]->ToString()));

pthread_mutex_lock(&stream->lock);
stream->err = AVERROR_IO;
pthread_cond_signal(&stream->cond);
pthread_mutex_unlock(&stream->lock);

return scope.Close(Undefined());
}

int StreamReader::ReadPacket(void* opaque, uint8_t* buffer, int bufferSize) {
StreamReader* stream = static_cast<StreamReader*>(opaque);
// TODO: read
//stream->source->Call()
return 0;

int ret = 0;
bool needsResume = false;

pthread_mutex_lock(&stream->lock);

// Wait until some bytes are available
while (!stream->err && !stream->eof && !stream->totalBufferredBytes) {
pthread_cond_wait(&stream->cond, &stream->lock);
}

if (stream->err) {
// Stream error
ret = stream->err;
} else if (stream->totalBufferredBytes) {
// Read the next buffer
ReadBuffer* nextBuffer = stream->buffers.front();
size_t bytesRead = nextBuffer->Read(buffer, bufferSize);
if (nextBuffer->IsEmpty()) {
stream->totalBufferredBytes -= nextBuffer->length;
stream->buffers.erase(stream->buffers.begin());
delete nextBuffer;
}
ret = (int)bytesRead;
} else if (stream->eof) {
// Stream at EOF
ret = 0; // eof
} else {
// Stream in error (or unknown, so return EOF)
ret = stream->err;
}

if (stream->paused && ret > 0) {
// Stream is paused - restart it
if (stream->totalBufferredBytes < stream->maxBufferedBytes) {
needsResume = true;
}
}

pthread_mutex_unlock(&stream->lock);

if (needsResume) {
// TODO: issue async resume
}

return ret;
}

int64_t StreamReader::Seek(void* opaque, int64_t offset, int whence) {
StreamReader* stream = static_cast<StreamReader*>(opaque);
// TODO: seek
return 0;
}

ReadBuffer::ReadBuffer(uint8_t* source, int64_t length) :
offset(0), data(NULL), length(length) {
this->length = length;
this->data = new uint8_t[this->length];
memcpy(this->data, source, this->length);
}

ReadBuffer::~ReadBuffer() {
delete[] this->data;
this->data = NULL;
}

bool ReadBuffer::IsEmpty() {
return this->offset >= this->length || !this->length;
}

int64_t ReadBuffer::Read(uint8_t* buffer, int64_t bufferSize) {
int64_t toRead = std::min(bufferSize, this->length - this->offset);
if (toRead) {
memcpy(buffer, this->data + this->offset, toRead);
this->offset += toRead;
}
return toRead;
}
36 changes: 35 additions & 1 deletion src/io/streamreader.h
@@ -1,5 +1,7 @@
#include <node.h>
#include <v8.h>
#include <node_buffer.h>
#include <vector>
#include "../utils.h"
#include "iohandle.h"

Expand All @@ -11,20 +13,52 @@ using namespace v8;
namespace transcoding {
namespace io {

#define STREAMREADER_BUFFER_SIZE (64 * 1024)
#define STREAMREADER_MAX_SIZE (64 * 64 * 1024)

class ReadBuffer {
public:
ReadBuffer(uint8_t* source, int64_t length);
~ReadBuffer();

bool IsEmpty();
int64_t Read(uint8_t* buffer, int64_t bufferSize);

public:
int64_t offset;
uint8_t* data;
int64_t length;
};

class StreamReader : public IOReader {
public:
StreamReader(Handle<Object> source);
StreamReader(Handle<Object> source,
size_t maxBufferedBytes = STREAMREADER_MAX_SIZE);
virtual ~StreamReader();

virtual int Open();
virtual void Close();

private:
static Handle<Value> OnData(const Arguments& args);
static Handle<Value> OnEnd(const Arguments& args);
static Handle<Value> OnClose(const Arguments& args);
static Handle<Value> OnError(const Arguments& args);

static int ReadPacket(void* opaque, uint8_t* buffer, int bufferSize);
static int64_t Seek(void* opaque, int64_t offset, int whence);

public:
bool canSeek;

pthread_mutex_t lock;
pthread_cond_t cond;
bool paused;
int err;
bool eof;
int64_t maxBufferedBytes;
int64_t totalBufferredBytes;
std::vector<ReadBuffer*> buffers;
};

}; // io
Expand Down
1 change: 1 addition & 0 deletions src/io/streamwriter.cpp
Expand Up @@ -38,6 +38,7 @@ void StreamWriter::Close() {
Local<Function>::Cast(this->source->Get(String::New("end")));
end->Call(this->source, 0, NULL);

av_free(this->context->buffer);
av_free(this->context);
this->context = NULL;
}
Expand Down
Empty file added src/queryinfo.cpp
Empty file.
Empty file added src/queryinfo.h
Empty file.

0 comments on commit 065c601

Please sign in to comment.