Skip to content

Commit

Permalink
more refactoring for streaming, adding packet fifo for sorting packet…
Browse files Browse the repository at this point in the history
…s by timestamp/stream
  • Loading branch information
benvanik committed Nov 30, 2011
1 parent 7a92463 commit 4fdc642
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 22 deletions.
106 changes: 106 additions & 0 deletions src/packetfifo.cpp
@@ -0,0 +1,106 @@
#include "packetfifo.h"
#include <float.h>

using namespace std;
using namespace transcoding;

StreamPacket::StreamPacket(AVPacket& packet, double timestamp) :
packet(packet), timestamp(timestamp) {
}

StreamPacket::~StreamPacket() {
}

StreamPacketList::StreamPacketList() {
}

StreamPacketList::~StreamPacketList() {
this->DropAllPackets();
}

int StreamPacketList::GetCount() {
return this->packets.size();
}

void StreamPacketList::QueuePacket(AVPacket& packet, double timestamp) {
// NOTE: assuming that packets in a stream come in order
this->packets.push_back(new StreamPacket(packet, timestamp));
}

double StreamPacketList::GetNextTimestamp() {
if (this->packets.size()) {
this->packets[0]->timestamp;
} else {
return -1;
}
}

bool StreamPacketList::DequeuePacket(AVPacket& packet) {
if (!this->packets.size()) {
return false;
}
StreamPacket* streamPacket = this->packets.front();
this->packets.erase(this->packets.begin());
packet = streamPacket->packet;
delete streamPacket;
return true;
}

void StreamPacketList::DropAllPackets() {
AVPacket packet;
while (this->DequeuePacket(packet)) {
av_free_packet(&packet);
}
}

PacketFifo::PacketFifo(int streamCount) :
count(0) {
for (int n = 0; n < streamCount; n++) {
this->streams.push_back(new StreamPacketList());
}
}

PacketFifo::~PacketFifo() {
while(this->streams.size()) {
StreamPacketList* list = this->streams.back();
this->streams.pop_back();
delete list;
}
}

int PacketFifo::GetCount() {
return this->count;
}

void PacketFifo::QueuePacket(int stream, AVPacket& packet, double timestamp) {
this->count++;
this->streams[stream]->QueuePacket(packet, timestamp);
}

bool PacketFifo::DequeuePacket(AVPacket& packet) {
printf("dequeue: %d\n", this->count);
int stream = -1;
double lowestTimestamp = DBL_MAX;
for (int n = 0; n < this->streams.size(); n++) {
if (this->streams[n]->GetCount()) {
double timestamp = this->streams[n]->GetNextTimestamp();
if (timestamp < lowestTimestamp) {
stream = n;
lowestTimestamp = timestamp;
}
}
}
if (stream == -1) {
printf("no stream\n");
return false;
}
this->streams[stream]->DequeuePacket(packet);
this->count--;
return true;
}

void PacketFifo::DropAllPackets() {
for (int n = 0; n < this->streams.size(); n++) {
this->streams[n]->DropAllPackets();
}
}
56 changes: 56 additions & 0 deletions src/packetfifo.h
@@ -0,0 +1,56 @@
#include <node.h>
#include <v8.h>
#include <vector>
#include "utils.h"

#ifndef NODE_TRANSCODING_PACKETFIFO
#define NODE_TRANSCODING_PACKETFIFO

using namespace v8;

namespace transcoding {

class StreamPacket {
public:
StreamPacket(AVPacket& packet, double timestamp);
~StreamPacket();

double timestamp;
AVPacket packet;
};

class StreamPacketList {
public:
StreamPacketList();
~StreamPacketList();

int GetCount();

void QueuePacket(AVPacket& packet, double timestamp);
double GetNextTimestamp();
bool DequeuePacket(AVPacket& packet);
void DropAllPackets();

private:
std::vector<StreamPacket*> packets;
};

class PacketFifo {
public:
PacketFifo(int streamCount);
~PacketFifo();

int GetCount();

void QueuePacket(int stream, AVPacket& packet, double timestamp);
bool DequeuePacket(AVPacket& packet);
void DropAllPackets();

private:
int count;
std::vector<StreamPacketList*> streams;
};

}; // transcoding

#endif // NODE_TRANSCODING_PACKETFIFO
143 changes: 122 additions & 21 deletions src/taskcontext.cpp
Expand Up @@ -3,16 +3,23 @@
using namespace transcoding;
using namespace transcoding::io;

// Number of packets to keep in the FIFO
#define FIFO_MIN_COUNT 64

TaskContext::TaskContext(IOReader* input, Profile* profile,
TaskOptions* options) :
input(input), profile(profile), options(options),
ictx(NULL), bitStreamFilter(NULL) {
ictx(NULL), bitStreamFilter(NULL), fifo(NULL), doneReading(false) {
TC_LOG_D("TaskContext::TaskContext()\n");
}

TaskContext::~TaskContext() {
TC_LOG_D("TaskContext::~TaskContext()\n");

if (this->fifo) {
delete this->fifo;
}

if (this->bitStreamFilter) {
av_bitstream_filter_close(this->bitStreamFilter);
}
Expand Down Expand Up @@ -72,6 +79,11 @@ int TaskContext::PrepareOutput() {
octx->bit_rate = ictx->bit_rate;
}

// Setup input FIFO
if (!ret) {
this->fifo = new PacketFifo(ictx->nb_streams);
}

// Setup streams
if (!ret) {
for (int n = 0; n < ictx->nb_streams; n++) {
Expand Down Expand Up @@ -249,22 +261,44 @@ AVStream* TaskContext::AddOutputStreamCopy(AVFormatContext* octx,
return NULL;
}

bool TaskContext::Pump(int* pret, Progress* progress) {
//TC_LOG_D("TaskContext::Pump()\n");
bool TaskContext::NextPacket(int* pret, Progress* progress, AVPacket& packet) {
TC_LOG_D("TaskContext::NextPacket()\n");

AVFormatContext* ictx = this->ictx;
AVFormatContext* octx = this->octx;

int ret = 0;

AVPacket packet;
int done = av_read_frame(ictx, &packet);
if (done) {
TC_LOG_D("TaskContext::Pump(): done/failed to read frame (%d)\n", ret);
*pret = 0;
// Read a few packets to fill our queue before we process
if (!this->doneReading) {
while (this->fifo->GetCount() <= FIFO_MIN_COUNT) {
AVPacket readPacket;
int done = av_read_frame(ictx, &readPacket);
if (done) {
TC_LOG_D("TaskContext::NextPacket(): done/failed to read frame (%d)\n",
ret);
*pret = 0;
this->doneReading = true;
break;
} else {
double timestamp = readPacket.pts *
(double)ictx->streams[readPacket.stream_index]->time_base.num /
(double)ictx->streams[readPacket.stream_index]->time_base.den;
this->fifo->QueuePacket(readPacket.stream_index, readPacket, timestamp);
}
}
}
if (this->doneReading && !this->fifo->GetCount()) {
return true;
}

// Grab the next packet
if (!this->fifo->DequeuePacket(packet)) {
return true;
}
double timestamp = packet.pts *
(double)ictx->streams[packet.stream_index]->time_base.num /
(double)ictx->streams[packet.stream_index]->time_base.den;

AVStream* stream = ictx->streams[packet.stream_index];

// Ignore if we don't care about this stream
Expand All @@ -274,7 +308,8 @@ bool TaskContext::Pump(int* pret, Progress* progress) {

ret = av_dup_packet(&packet);
if (ret) {
TC_LOG_D("TaskContext::Pump(): failed to duplicate packet (%d)\n", ret);
TC_LOG_D("TaskContext::NextPacket(): failed to duplicate packet (%d)\n",
ret);
av_free_packet(&packet);
*pret = ret;
return true;
Expand All @@ -298,34 +333,67 @@ bool TaskContext::Pump(int* pret, Progress* progress) {
bsfc = bsfc->next;
}
if (ret) {
TC_LOG_D("TaskContext::Pump(): failed to filter packet (%d)\n", ret);
TC_LOG_D("TaskContext::NextPacket(): failed to filter packet (%d)\n", ret);
*pret = ret;
av_free_packet(&packet);
return true;
}

// Update progress (only on success)
if (!ret) {
progress->timestamp = timestamp;
}

if (ret) {
TC_LOG_D("TaskContext::NextPacket() = %d\n", ret);
}
*pret = ret;
return false;
}

bool TaskContext::WritePacket(int* pret, AVPacket& packet) {
TC_LOG_D("TaskContext::WritePacket()\n");

AVFormatContext* ictx = this->ictx;
AVFormatContext* octx = this->octx;

int ret = 0;

ret = av_interleaved_write_frame(octx, &packet);
if (ret < 0) {
TC_LOG_D("TaskContext::Pump(): could not write frame of stream (%d)\n",
ret);
TC_LOG_D("TaskContext::WritePacket(): could not write frame of "
"stream (%d)\n", ret);
} else if (ret > 0) {
TC_LOG_D("TaskContext::Pump(): end of stream requested (%d)\n", ret);
TC_LOG_D("TaskContext::WritePacket(): end of stream requested (%d)\n", ret);
av_free_packet(&packet);
*pret = ret;
return true;
}

av_free_packet(&packet);
if (ret) {
TC_LOG_D("TaskContext::WritePacket() = %d\n", ret);
}
*pret = ret;
return false;
}

// Update progress (only on success)
if (!ret) {
progress->timestamp = packet.pts / (double)stream->time_base.den;
bool TaskContext::Pump(int* pret, Progress* progress) {
TC_LOG_D("TaskContext::Pump()\n");

AVPacket packet;
if (this->NextPacket(pret, progress, packet)) {
TC_LOG_D("TaskContext::Pump() = %d\n", *pret);
return true;
}

if (ret) {
TC_LOG_D("TaskContext::Pump() = %d\n", ret);
if (this->WritePacket(pret, packet)) {
TC_LOG_D("TaskContext::Pump() = %d\n", *pret);
av_free_packet(&packet);
return true;
}
*pret = ret;

av_free_packet(&packet);

return false;
}

Expand Down Expand Up @@ -407,3 +475,36 @@ LiveStreamingTaskContext::~LiveStreamingTaskContext() {

delete this->playlist;
}

int LiveStreamingTaskContext::PrepareOutput() {
TC_LOG_D("LiveStreamingTaskContext::PrepareOutput()\n");

int ret = 0;

if (ret) {
TC_LOG_D("LiveStreamingTaskContext::PrepareOutput() = %d\n", ret);
}
return ret;
}

bool LiveStreamingTaskContext::Pump(int* pret, Progress* progress) {
TC_LOG_D("LiveStreamingTaskContext::Pump()\n");

AVPacket packet;
if (this->NextPacket(pret, progress, packet)) {
TC_LOG_D("LiveStreamingTaskContext::Pump() = %d\n", *pret);
return true;
}

// TODO: switch segment/etc based on packet timeestamp/duration

if (this->WritePacket(pret, packet)) {
TC_LOG_D("LiveStreamingTaskContext::Pump() = %d\n", *pret);
av_free_packet(&packet);
return true;
}

av_free_packet(&packet);

return false;
}

0 comments on commit 4fdc642

Please sign in to comment.