Skip to content

Commit

Permalink
obs-ffmpeg: Add a circlebuf to buffer output in ffmpeg-mux
Browse files Browse the repository at this point in the history
This adds a circular buffer to ffmpeg-mux when writing to a file.
Output from ffmpeg is buffered so that slow disk I/O does not block
ffmpeg writes, as this causes the pipe to become full and OBS stops
sending frames with a misleading "Encoding overloaded!" warning. The
buffer may grow to 256 MB depending on the rate of data coming in and
out, if the buffer is full OBS will start waiting in ffmpeg writes.

A separate I/O thread is responsible for processing the contents of
the buffer and writing them to the output file. It tries to process 1 MB
at a time to minimize small I/O.

Complicating things considerably, some formats in ffmpeg require seeking
on the output, so we can't just treat everything as a stream of bytes.
To handle this, we record offsets of each write and try to buffer as
many contiguous writes as possible. This unfortunately makes the code
quite complicated, but hopefully well commented.
  • Loading branch information
notr1ch authored and jp9000 committed May 30, 2022
1 parent 43867ce commit 898256d
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 5 deletions.
3 changes: 3 additions & 0 deletions plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ target_sources(obs-ffmpeg-mux PRIVATE ffmpeg-mux.c ffmpeg-mux.h)

target_link_libraries(obs-ffmpeg-mux PRIVATE OBS::libobs FFmpeg::avcodec
FFmpeg::avutil FFmpeg::avformat)
if(OS_WINDOWS)
target_link_libraries(obs-ffmpeg-mux PRIVATE OBS::w32-pthreads)
endif()

if(ENABLE_FFMPEG_MUX_DEBUG)
target_compile_definitions(obs-ffmpeg-mux PRIVATE ENABLE_FFMPEG_MUX_DEBUG)
Expand Down
297 changes: 292 additions & 5 deletions plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include <stdlib.h>
#include "ffmpeg-mux.h"

#include <util/threading.h>
#include <util/platform.h>
#include <util/circlebuf.h>
#include <util/dstr.h>
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
Expand All @@ -42,6 +45,8 @@
#define CODEC_FLAG_GLOBAL_H CODEC_FLAG_GLOBAL_HEADER
#endif

#define AVIO_BUFFER_SIZE 65536

/* ------------------------------------------------------------------------- */

static char *global_stream_key = "";
Expand Down Expand Up @@ -117,6 +122,24 @@ struct audio_info {
AVCodecContext *ctx;
};

struct io_header {
uint64_t seek_offset;
uint64_t data_length;
};

struct io_buffer {
bool active;
bool shutdown_requested;
bool output_error;
os_event_t *buffer_space_available_event;
os_event_t *new_data_available_event;
pthread_t io_thread;
pthread_mutex_t data_mutex;
FILE *output_file;
struct circlebuf data;
uint64_t next_pos;
};

struct ffmpeg_mux {
AVFormatContext *output;
AVStream *video_stream;
Expand All @@ -129,7 +152,7 @@ struct ffmpeg_mux {
struct header *audio_header;
int num_audio_streams;
bool initialized;
char error[4096];
struct io_buffer io;
};

static void header_free(struct header *header)
Expand Down Expand Up @@ -167,6 +190,29 @@ static void ffmpeg_mux_free(struct ffmpeg_mux *ffm)
av_write_trailer(ffm->output);
}

// If we're writing to a file with the circlebuf, shut it
// down gracefully
if (ffm->io.active) {
os_atomic_set_bool(&ffm->io.shutdown_requested, true);

// Wakes up the I/O thread and waits for it to finish
pthread_mutex_lock(&ffm->io.data_mutex);
os_event_signal(ffm->io.new_data_available_event);
pthread_mutex_unlock(&ffm->io.data_mutex);
pthread_join(ffm->io.io_thread, NULL);

// Cleanup everything else
av_free(ffm->output->pb->buffer);
avio_context_free(&ffm->output->pb);

os_event_destroy(ffm->io.new_data_available_event);
os_event_destroy(ffm->io.buffer_space_available_event);

pthread_mutex_destroy(&ffm->io.data_mutex);

circlebuf_free(&ffm->io.data);
}

free_avformat(ffm);

header_free(&ffm->video_header);
Expand Down Expand Up @@ -612,6 +658,219 @@ static inline bool ffmpeg_mux_get_extra_data(struct ffmpeg_mux *ffm)
#pragma warning(disable : 4996)
#endif

#define CHUNK_SIZE 1048576

static void *ffmpeg_mux_io_thread(void *data)
{
struct ffmpeg_mux *ffm = data;

// Chunk collects the writes into a larger batch
size_t chunk_used = 0;

unsigned char *chunk = malloc(CHUNK_SIZE);
if (!chunk) {
os_atomic_set_bool(&ffm->io.output_error, true);
fprintf(stderr, "Error allocating memory for output\n");
goto error;
}

bool shutting_down;
bool want_seek = false;
bool force_flush_chunk = false;

// current_seek_position is a virtual position updated as we read from
// the buffer, if it becomes discontinuous due to a seek request from
// ffmpeg, then we flush the chunk. next_seek_position is the actual
// offset we should seek to when we write the chunk.
uint64_t current_seek_position = 0;
uint64_t next_seek_position;

for (;;) {
// Wait for ffmpeg to write data to the buffer
os_event_wait(ffm->io.new_data_available_event);

// Loop to write in chunk_size chunks
for (;;) {
shutting_down = os_atomic_load_bool(
&ffm->io.shutdown_requested);

pthread_mutex_lock(&ffm->io.data_mutex);

// Fetch as many writes as possible from the circlebuf
// and fill up our local chunk. This may involve seeking
// if ffmpeg needs to, so take care of that as well.
for (;;) {
size_t available = ffm->io.data.size;

// Buffer is empty (now) or was already empty (we got
// woken up to exit)
if (!available)
break;

// Get seek offset and data size
struct io_header header;
circlebuf_peek_front(&ffm->io.data, &header,
sizeof(header));

// Do we need to seek?
if (header.seek_offset !=
current_seek_position) {

// If there's already part of a chunk pending,
// flush it at the current offset. Similarly,
// if we already plan to seek, then seek.
if (chunk_used || want_seek) {
force_flush_chunk = true;
break;
}

// Mark that we need to seek and where to
want_seek = true;
next_seek_position = header.seek_offset;

// Update our virtual position
current_seek_position =
header.seek_offset;
}

// Make sure there's enough room for the data, if
// not then force a flush
if (header.data_length + chunk_used >
CHUNK_SIZE) {
force_flush_chunk = true;
break;
}

// Remove header that we already read
circlebuf_pop_front(&ffm->io.data, NULL,
sizeof(header));

// Copy from the buffer to our local chunk
circlebuf_pop_front(&ffm->io.data,
chunk + chunk_used,
header.data_length);

// Update offsets
chunk_used += header.data_length;
current_seek_position += header.data_length;
}

// Signal that there is more room in the buffer
os_event_signal(ffm->io.buffer_space_available_event);

// Try to avoid lots of small writes unless this was the final
// data left in the buffer. The buffer might be entirely empty
// if we were woken up to exit.
if (!force_flush_chunk &&
(!chunk_used ||
(chunk_used < 65536 && !shutting_down))) {
os_event_reset(
ffm->io.new_data_available_event);
pthread_mutex_unlock(&ffm->io.data_mutex);
break;
}

pthread_mutex_unlock(&ffm->io.data_mutex);

// Seek if we need to
if (want_seek) {
os_fseeki64(ffm->io.output_file,
next_seek_position, SEEK_SET);
current_seek_position = next_seek_position;
want_seek = false;
}

// Write the current chunk to the output file
if (fwrite(chunk, chunk_used, 1, ffm->io.output_file) !=
1) {
os_atomic_set_bool(&ffm->io.output_error, true);
fprintf(stderr, "Error writing to '%s', %s\n",
ffm->params.printable_file.array,
strerror(errno));
goto error;
}

chunk_used = 0;
force_flush_chunk = false;
}

// If this was the last chunk, time to exit
if (shutting_down)
break;
}

error:
if (chunk)
free(chunk);

fclose(ffm->io.output_file);
return NULL;
}

static int64_t ffmpeg_mux_seek_av_buffer(void *opaque, int64_t offset,
int whence)
{
struct ffmpeg_mux *ffm = opaque;

// If the output thread failed, signal that back up the stack
if (os_atomic_load_bool(&ffm->io.output_error))
return -1;

// Update where the next write should go
pthread_mutex_lock(&ffm->io.data_mutex);
if (whence == SEEK_SET)
ffm->io.next_pos = offset;
else if (whence == SEEK_CUR)
ffm->io.next_pos += offset;
pthread_mutex_unlock(&ffm->io.data_mutex);

return 0;
}

static int ffmpeg_mux_write_av_buffer(void *opaque, uint8_t *buf, int buf_size)
{
struct ffmpeg_mux *ffm = opaque;

// If the output thread failed, signal that back up the stack
if (os_atomic_load_bool(&ffm->io.output_error))
return -1;

for (;;) {
pthread_mutex_lock(&ffm->io.data_mutex);

// Avoid unbounded growth of the circlebuf, cap to 256 MB
if (ffm->io.data.capacity >= 256 * 1048576 &&
ffm->io.data.capacity - ffm->io.data.size <
buf_size + sizeof(struct io_header)) {
// No space, wait for the I/O thread to make space
os_event_reset(ffm->io.buffer_space_available_event);
pthread_mutex_unlock(&ffm->io.data_mutex);
os_event_wait(ffm->io.buffer_space_available_event);
} else {
break;
}
}

struct io_header header;

header.data_length = buf_size;
header.seek_offset = ffm->io.next_pos;

// Copy the data into the buffer
circlebuf_push_back(&ffm->io.data, &header, sizeof(header));
circlebuf_push_back(&ffm->io.data, buf, buf_size);

// Advance the next write position
ffm->io.next_pos += buf_size;

// Tell the I/O thread that there's new data to be written
os_event_signal(ffm->io.new_data_available_event);

pthread_mutex_unlock(&ffm->io.data_mutex);

return buf_size;
}

static inline int open_output_file(struct ffmpeg_mux *ffm)
{
#if LIBAVFORMAT_VERSION_INT < AV_VERSION_INT(59, 0, 100)
Expand All @@ -622,14 +881,42 @@ static inline int open_output_file(struct ffmpeg_mux *ffm)
int ret;

if ((format->flags & AVFMT_NOFILE) == 0) {
ret = avio_open(&ffm->output->pb, ffm->params.file,
AVIO_FLAG_WRITE);
if (ret < 0) {
// If not outputting to a network, write to a circlebuf
// instead of relying on ffmpeg disk output. This hopefully
// works around too small buffers somewhere causing output
// stalls when recording.

// We're in charge of managing the actual file now
ffm->io.output_file = os_fopen(ffm->params.file, "wb");
if (!ffm->io.output_file) {
fprintf(stderr, "Couldn't open '%s', %s\n",
ffm->params.printable_file.array,
av_err2str(ret));
strerror(errno));
return FFM_ERROR;
}

// Start at 1MB, this can grow up to 256 MB depending
// how fast data is going in and out (limited in
// ffmpeg_mux_write_av_buffer)
circlebuf_reserve(&ffm->io.data, 1048576);

pthread_mutex_init(&ffm->io.data_mutex, NULL);

os_event_init(&ffm->io.buffer_space_available_event,
OS_EVENT_TYPE_AUTO);
os_event_init(&ffm->io.new_data_available_event,
OS_EVENT_TYPE_AUTO);

pthread_create(&ffm->io.io_thread, NULL, ffmpeg_mux_io_thread,
ffm);

unsigned char *avio_ctx_buffer = av_malloc(AVIO_BUFFER_SIZE);

ffm->output->pb = avio_alloc_context(
avio_ctx_buffer, AVIO_BUFFER_SIZE, 1, ffm, NULL,
ffmpeg_mux_write_av_buffer, ffmpeg_mux_seek_av_buffer);

ffm->io.active = true;
}

AVDictionary *dict = NULL;
Expand Down

0 comments on commit 898256d

Please sign in to comment.