Skip to content

Commit

Permalink
refactor(ffmeg_producer): decoder threads
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed May 24, 2021
1 parent 5dcd50c commit b657428
Showing 1 changed file with 170 additions and 81 deletions.
251 changes: 170 additions & 81 deletions src/modules/ffmpeg/producer/av_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,29 @@ struct Frame
// TODO (fix) Handle ts discontinuities.
// TODO (feat) Forward options.

struct Decoder
class Decoder
{
AVStream* st = nullptr;
std::shared_ptr<AVCodecContext> ctx;
int64_t next_pts = AV_NOPTS_VALUE;
Decoder(const Decoder&) = delete;
Decoder& operator=(const Decoder&) = delete;

AVStream* st = nullptr;
int64_t next_pts = AV_NOPTS_VALUE;
std::atomic<bool> eof = { false };

std::queue<std::shared_ptr<AVPacket>> input;
std::shared_ptr<AVFrame> frame;
bool eof = false;
mutable boost::mutex input_mutex;
boost::condition_variable input_cond;
int input_capacity = 2;

std::queue<std::shared_ptr<AVFrame>> output;
mutable boost::mutex output_mutex;
boost::condition_variable output_cond;
int output_capacity = 8;

boost::thread thread;

public:
std::shared_ptr<AVCodecContext> ctx;

Decoder() = default;

Expand Down Expand Up @@ -124,63 +139,135 @@ struct Decoder
}

FF(avcodec_open2(ctx.get(), codec, nullptr));

thread = boost::thread([=]()
{
try {
while (!thread.interruption_requested()) {
auto av_frame = alloc_frame();
auto ret = avcodec_receive_frame(ctx.get(), av_frame.get());

if (ret == AVERROR(EAGAIN)) {
std::shared_ptr<AVPacket> packet;
{
boost::unique_lock<boost::mutex> lock(input_mutex);
input_cond.wait(lock, [&]() { return !input.empty(); });
packet = std::move(input.front());
input.pop();
}
FF(avcodec_send_packet(ctx.get(), packet.get()));
} else if (ret == AVERROR_EOF) {
avcodec_flush_buffers(ctx.get());
av_frame->pts = next_pts;
next_pts = AV_NOPTS_VALUE;
eof = true;

{
boost::unique_lock<boost::mutex> lock(output_mutex);
output_cond.wait(lock, [&]() { return output.size() < output_capacity; });
output.push(std::move(av_frame));
}
} else {
FF_RET(ret, "avcodec_receive_frame");

// NOTE This is a workaround for DVCPRO HD.
if (av_frame->width > 1024 && av_frame->interlaced_frame) {
av_frame->top_field_first = 1;
}

// TODO (fix) is this always best?
av_frame->pts = av_frame->best_effort_timestamp;

auto duration_pts = av_frame->pkt_duration;
if (duration_pts <= 0) {
if (ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
const auto ticks =
av_stream_get_parser(st) ? av_stream_get_parser(st)->repeat_pict + 1 : ctx->ticks_per_frame;
duration_pts = static_cast<int64_t>(AV_TIME_BASE) * ctx->framerate.den * ticks /
ctx->framerate.num / ctx->ticks_per_frame;
duration_pts = av_rescale_q(duration_pts, { 1, AV_TIME_BASE }, st->time_base);
} else if (ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
duration_pts = av_rescale_q(av_frame->nb_samples, { 1, ctx->sample_rate }, st->time_base);
}
}

if (duration_pts > 0) {
next_pts = av_frame->pts + duration_pts;
} else {
next_pts = AV_NOPTS_VALUE;
}

{
boost::unique_lock<boost::mutex> lock(output_mutex);
//output_cond.wait(lock, [&]() { return output.size() < 8; });
output.push(std::move(av_frame));
}
}
}
} catch (boost::thread_interrupted&) {
// Do nothing...
}
});
}

bool operator()()
~Decoder()
{
if (frame || eof || !st) {
try {
if (thread.joinable()) {
thread.interrupt();
thread.join();
}
} catch (boost::thread_interrupted&) {
// Do nothing...
}
}

bool want_packet() const
{
if (eof) {
return false;
}

auto av_frame = alloc_frame();
auto ret = avcodec_receive_frame(ctx.get(), av_frame.get());
{
boost::lock_guard<boost::mutex> lock(input_mutex);
return input.size() < input_capacity;
}
}

if (ret == AVERROR(EAGAIN)) {
if (input.empty()) {
return false;
}
FF(avcodec_send_packet(ctx.get(), input.front().get()));
input.pop();
} else if (ret == AVERROR_EOF) {
avcodec_flush_buffers(ctx.get());
av_frame->pts = next_pts;
eof = true;
next_pts = AV_NOPTS_VALUE;
frame = std::move(av_frame);
} else {
FF_RET(ret, "avcodec_receive_frame");
void push(std::shared_ptr<AVPacket> packet)
{
if (eof) {
return;
}

// NOTE This is a workaround for DVCPRO HD.
if (av_frame->width > 1024 && av_frame->interlaced_frame) {
av_frame->top_field_first = 1;
}
{
boost::lock_guard<boost::mutex> lock(input_mutex);
input.push(std::move(packet));
}

// TODO (fix) is this always best?
av_frame->pts = av_frame->best_effort_timestamp;

auto duration_pts = av_frame->pkt_duration;
if (duration_pts <= 0) {
if (ctx->codec_type == AVMEDIA_TYPE_VIDEO) {
const auto ticks =
av_stream_get_parser(st) ? av_stream_get_parser(st)->repeat_pict + 1 : ctx->ticks_per_frame;
duration_pts = static_cast<int64_t>(AV_TIME_BASE) * ctx->framerate.den * ticks /
ctx->framerate.num / ctx->ticks_per_frame;
duration_pts = av_rescale_q(duration_pts, {1, AV_TIME_BASE}, st->time_base);
} else if (ctx->codec_type == AVMEDIA_TYPE_AUDIO) {
duration_pts = av_rescale_q(av_frame->nb_samples, {1, ctx->sample_rate}, st->time_base);
}
}
input_cond.notify_all();
}

if (duration_pts > 0) {
next_pts = av_frame->pts + duration_pts;
} else {
next_pts = AV_NOPTS_VALUE;
std::shared_ptr<AVFrame> pop()
{
std::shared_ptr<AVFrame> frame;

{
boost::lock_guard<boost::mutex> lock(output_mutex);

if (!output.empty()) {
frame = std::move(output.front());
output.pop();
}
}

frame = std::move(av_frame);
if (frame) {
output_cond.notify_all();
} else if (eof) {
frame = alloc_frame();
}

return true;
return frame;
}
};

Expand Down Expand Up @@ -540,13 +627,13 @@ struct AVProducer::Impl
boost::condition_variable buffer_cond_;
std::atomic<bool> buffer_eof_{false};
int buffer_capacity_ = static_cast<int>(format_desc_.fps) / 4;

std::vector<std::unique_ptr<caspar::executor>> thread_pool_;

boost::optional<caspar::executor> video_executor_;
boost::optional<caspar::executor> audio_executor_;

int latency_ = 0;

boost::thread thread_;
std::atomic<bool> abort_request_{false};
boost::thread thread_;

Impl(std::shared_ptr<core::frame_factory> frame_factory,
core::video_format_desc format_desc,
Expand All @@ -570,6 +657,8 @@ struct AVProducer::Impl
, afilter_(afilter)
, vfilter_(vfilter)
, seekable_(seekable)
, video_executor_(L"video-executor")
, audio_executor_(L"audio-executor")
{
diagnostics::register_graph(graph_);
graph_->set_color("underflow", diagnostics::color(0.6f, 0.3f, 0.9f));
Expand Down Expand Up @@ -604,11 +693,21 @@ struct AVProducer::Impl

~Impl()
{
abort_request_ = true;
input_.abort();
buffer_cond_.notify_all();
thread_.join();
thread_pool_.clear();

try {
if (thread_.joinable()) {
thread_.interrupt();
thread_.join();
}
} catch (boost::thread_interrupted&) {
// Do nothing...
}

video_executor_.reset();
audio_executor_.reset();

CASPAR_LOG(debug) << print() << " Joined";
}

void run()
Expand Down Expand Up @@ -659,7 +758,7 @@ struct AVProducer::Impl

int warning_debounce = 0;

while (!abort_request_) {
while (!thread_.interruption_requested()) {
{
const auto seek = seek_.exchange(AV_NOPTS_VALUE);

Expand Down Expand Up @@ -703,24 +802,13 @@ struct AVProducer::Impl

std::vector<std::future<bool>> futures;

// TODO (refactor): Use some form of proper thread pool. Maybe global?
while (thread_pool_.size() < decoders_.size() + 2) {
thread_pool_.push_back(std::make_unique<caspar::executor>(L"av_producer"));
}

auto n = 0;
for (auto& decoder : decoders_) {
if (!decoder.second.frame) {
futures.push_back(thread_pool_[n++]->begin_invoke([&]() { return decoder.second(); }));
}
}

if (!video_filter_.frame) {
futures.push_back(thread_pool_[n++]->begin_invoke([&]() { return video_filter_(); }));
futures.push_back(video_executor_->begin_invoke([&]() { return video_filter_(); }));
}

if (!audio_filter_.frame) {
futures.push_back(thread_pool_[n++]->begin_invoke([&]() { return audio_filter_(audio_cadence[0]); }));
futures.push_back(audio_executor_->begin_invoke([&]() { return audio_filter_(audio_cadence[0]); }));
}

for (auto& future : futures) {
Expand Down Expand Up @@ -780,7 +868,7 @@ struct AVProducer::Impl

{
boost::unique_lock<boost::mutex> buffer_lock(buffer_mutex_);
buffer_cond_.wait(buffer_lock, [&] { return buffer_.size() < buffer_capacity_ || abort_request_; });
buffer_cond_.wait(buffer_lock, [&] { return buffer_.size() < buffer_capacity_; });
if (seek_ == AV_NOPTS_VALUE) {
buffer_.push_back(frame);
}
Expand Down Expand Up @@ -852,7 +940,7 @@ struct AVProducer::Impl
}

if (latency_ != -1) {
CASPAR_LOG(debug) << print() << " latency: " << latency_;
CASPAR_LOG(warning) << print() << " Latency: " << latency_;
latency_ = -1;
}

Expand Down Expand Up @@ -940,7 +1028,7 @@ struct AVProducer::Impl
bool want_packet()
{
return std::any_of(
decoders_.begin(), decoders_.end(), [](auto& p) { return p.second.input.size() < 2 && !p.second.eof; });
decoders_.begin(), decoders_.end(), [](auto& p) { return p.second.want_packet(); });
}

bool schedule()
Expand All @@ -953,15 +1041,13 @@ struct AVProducer::Impl

if (!packet) {
for (auto& p : decoders_) {
if (!p.second.eof) {
p.second.input.push(nullptr);
}
p.second.push(nullptr);
}
} else if (sources_.find(packet->stream_index) != sources_.end()) {
auto it = decoders_.find(packet->stream_index);
if (it != decoders_.end()) {
// TODO (fix): limit it->second.input.size()?
it->second.input.push(std::move(packet));
it->second.push(std::move(packet));
}
}
}
Expand All @@ -970,7 +1056,7 @@ struct AVProducer::Impl

for (auto& p : sources_) {
auto it = decoders_.find(p.first);
if (it == decoders_.end() || !it->second.frame) {
if (it == decoders_.end()) {
continue;
}

Expand All @@ -983,10 +1069,13 @@ struct AVProducer::Impl
continue;
}

auto frame = std::move(it->second.frame);
auto frame = it->second.pop();
if (!frame) {
continue;
}

for (auto& source : p.second) {
if (frame && !frame->data[0]) {
if (!frame->data[0]) {
FF(av_buffersrc_close(source, frame->pts, 0));
} else {
// TODO (fix) Guard against overflow?
Expand Down

0 comments on commit b657428

Please sign in to comment.