Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Memory leak in frame_info_queue fixed

  • Loading branch information...
commit 7883502ec1983780f450b6e23394b5ae838e2fc2 1 parent c748be0
@srstrong srstrong authored
View
127 c_src/async_parallel_filter.c
@@ -6,10 +6,11 @@ struct _codec_t;
#define INIT_QUEUE(QUEUE)\
(QUEUE).head = NULL;\
- (QUEUE).tail = NULL;\
- INITIALISE_MUTEX(&(QUEUE).mutex);\
- INITIALISE_COND(&(QUEUE).cond);\
- (QUEUE).active = 1;
+ (QUEUE).tail = NULL;\
+ (QUEUE).len = 0;\
+ INITIALISE_MUTEX(&(QUEUE).mutex);\
+ INITIALISE_COND(&(QUEUE).cond);\
+ (QUEUE).active = 1;
#define DESTROY_QUEUE(QUEUE, FUN)\
i_mutex_lock(&(QUEUE).mutex);\
@@ -23,7 +24,7 @@ struct _codec_t;
}\
i_mutex_unlock(&(QUEUE).mutex);
-#define ADD_TO_QUEUE(QUEUE, ITEM)\
+#define ADD_TO_QUEUE(CONTEXT, QUEUE, ITEM) \
i_mutex_lock(&(QUEUE).mutex);\
(ITEM)->queue_entry.next = NULL;\
if ((QUEUE).head == NULL)\
@@ -35,9 +36,12 @@ struct _codec_t;
(QUEUE).tail->next = &(ITEM)->queue_entry;\
}\
(QUEUE).tail = &(ITEM)->queue_entry;\
+ (QUEUE).len++;\
i_mutex_signal(&(QUEUE).cond);\
i_mutex_unlock(&(QUEUE).mutex);
+ // TRACEFMT("%s queue len is %d\n", (CONTEXT)->downstream_filters[0]->filter->name, (QUEUE).len);
+
#define WAIT_FOR_QUEUE(QUEUE)\
i_mutex_lock(&(QUEUE).mutex);\
i_mutex_wait(&(QUEUE).cond, &(QUEUE).mutex);\
@@ -65,6 +69,7 @@ struct _codec_t;
{\
(QUEUE).head = t->next;\
}\
+ (QUEUE).len--;\
ITEM = (void *)t;\
}\
i_mutex_unlock(&(QUEUE).mutex);\
@@ -83,6 +88,7 @@ typedef struct _queue_root_t
int active;
struct _queue_entry_t *head;
struct _queue_entry_t *tail;
+ int len;
} queue_root_t;
@@ -113,34 +119,36 @@ typedef struct _thread_struct_t
typedef struct _codec_t
{
AVClass *av_class;
- int pass_through;
thread_struct *threads;
} codec_t;
static void process(ID3ASFilterContext *context, AVFrame *frame, AVRational timebase)
{
+ /*
+ for (int i = 0; i < context->num_downstream_filters; i++)
+ {
+ AVFrame *foo = av_frame_clone(frame);
+ context->downstream_filters[i]->filter->execute(context->downstream_filters[i], foo, timebase);
+ av_frame_free(&foo);
+ }
+ return;
+ */
codec_t *this = context->priv_data;
- if (this->pass_through) {
- send_to_graph(context, frame, timebase);
+ for (int i = 0; i < context->num_downstream_filters; i++) {
+
+ frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
+ frame_entry->timebase = timebase;
+ frame_entry->frame = av_frame_clone(frame);
+ frame_entry->exit_thread = 0;
+
+ ADD_TO_QUEUE(context, this->threads[i].inbound_frame_queue, frame_entry);
}
- else {
-
+
+ if (sync_mode) {
for (int i = 0; i < context->num_downstream_filters; i++) {
-
- frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
- frame_entry->timebase = timebase;
- frame_entry->frame = av_frame_clone(frame);
- frame_entry->exit_thread = 0;
-
- ADD_TO_QUEUE(this->threads[i].inbound_frame_queue, frame_entry);
- }
-
- if (sync_mode) {
- for (int i = 0; i < context->num_downstream_filters; i++) {
- pthread_cond_wait(&this->threads[i].complete, &this->threads[i].complete_mutex);
- }
+ pthread_cond_wait(&this->threads[i].complete, &this->threads[i].complete_mutex);
}
}
}
@@ -149,21 +157,16 @@ static void flush(ID3ASFilterContext *context)
{
codec_t *this = context->priv_data;
- if (!this->pass_through) {
- for (int i = 0; i < context->num_downstream_filters; i++) {
-
- frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
- frame_entry->exit_thread = 1;
-
- ADD_TO_QUEUE(this->threads[i].inbound_frame_queue, frame_entry);
- }
+ for (int i = 0; i < context->num_downstream_filters; i++) {
- for (int i = 0; i < context->num_downstream_filters; i++) {
- pthread_join(this->threads[i].thread, NULL);
- }
+ frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
+ frame_entry->exit_thread = 1;
+
+ ADD_TO_QUEUE(context, this->threads[i].inbound_frame_queue, frame_entry);
}
- else {
- flush_graph(context);
+
+ for (int i = 0; i < context->num_downstream_filters; i++) {
+ pthread_join(this->threads[i].thread, NULL);
}
}
@@ -193,14 +196,15 @@ static void *thread_proc(void *data)
av_frame_free(&inbound->frame);
free(inbound);
+
+ if (sync_mode) {
+ pthread_mutex_lock(&this->complete_mutex);
+ pthread_cond_signal(&this->complete);
+ pthread_mutex_unlock(&this->complete_mutex);
+ }
}
- } while (inbound != NULL);
- if (sync_mode) {
- pthread_mutex_lock(&this->complete_mutex);
- pthread_cond_signal(&this->complete);
- pthread_mutex_unlock(&this->complete_mutex);
- }
+ } while (inbound != NULL);
} while(1);
@@ -211,29 +215,22 @@ static void init(ID3ASFilterContext *context, AVDictionary *codec_options)
{
codec_t *this = context->priv_data;
- if (context->num_downstream_filters < 2) {
- this->pass_through = 1;
- }
- else {
- this->pass_through = 0;
- this->threads = (thread_struct *) malloc(sizeof(thread_struct) * context->num_downstream_filters);
-
- for (int i = 0; i < context->num_downstream_filters; i++)
- {
- this->threads[i].thread_id = thread_id++;
- this->threads[i].context = context;
- this->threads[i].downstream_filter = context->downstream_filters[i];
- this->threads[i].codec_t = this;
- pthread_cond_init(&this->threads[i].complete, NULL);
- pthread_mutex_init(&this->threads[i].complete_mutex, NULL);
- pthread_mutex_lock(&this->threads[i].complete_mutex);
-
- INIT_QUEUE(this->threads[i].inbound_frame_queue);
-
- pthread_create(&this->threads[i].thread, NULL, &thread_proc, &this->threads[i]);
- }
-
- }
+ this->threads = (thread_struct *) malloc(sizeof(thread_struct) * context->num_downstream_filters);
+
+ for (int i = 0; i < context->num_downstream_filters; i++)
+ {
+ this->threads[i].thread_id = thread_id++;
+ this->threads[i].context = context;
+ this->threads[i].downstream_filter = context->downstream_filters[i];
+ this->threads[i].codec_t = this;
+ pthread_cond_init(&this->threads[i].complete, NULL);
+ pthread_mutex_init(&this->threads[i].complete_mutex, NULL);
+ pthread_mutex_lock(&this->threads[i].complete_mutex);
+
+ INIT_QUEUE(this->threads[i].inbound_frame_queue);
+
+ pthread_create(&this->threads[i].thread, NULL, &thread_proc, &this->threads[i]);
+ }
}
static const AVOption options[] = {
View
24 c_src/effects_processor.c
@@ -34,7 +34,7 @@ static void process(ID3ASFilterContext *context, AVFrame *frame, AVRational time
while (1) {
ret = av_buffersink_get_frame(this->buffersink_ctx, this->output_frame);
- if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
+ if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
break;
if (ret < 0) {
ERROR("Error from get_frame");
@@ -55,18 +55,8 @@ static void flush(ID3ASFilterContext *context)
static void init(ID3ASFilterContext *context, AVDictionary *codec_options)
{
codec_t *this = context->priv_data;
- int ret;
this->initialised = 0;
- this->filter_graph = avfilter_graph_alloc();
- this->output_frame = av_frame_alloc();
-
- ret = avfilter_graph_create_filter(&this->buffersink_ctx, avfilter_get_by_name("buffersink"), "out", NULL, NULL, this->filter_graph);
-
- if (ret < 0) {
- ERROR("Cannot create buffer sink\n");
- exit(-1);
- }
}
static void do_init(codec_t *this, AVFrame *frame, AVRational timebase)
@@ -76,9 +66,19 @@ static void do_init(codec_t *this, AVFrame *frame, AVRational timebase)
}
char args[512];
- int ret;
AVFilterInOut *outputs = avfilter_inout_alloc();
AVFilterInOut *inputs = avfilter_inout_alloc();
+ int ret;
+
+ this->filter_graph = avfilter_graph_alloc();
+ this->output_frame = av_frame_alloc();
+
+ ret = avfilter_graph_create_filter(&this->buffersink_ctx, avfilter_get_by_name("buffersink"), "out", NULL, NULL, this->filter_graph);
+
+ if (ret < 0) {
+ ERROR("Cannot create buffer sink\n");
+ exit(-1);
+ }
snprintf(args, sizeof(args),
"%d:%d:%d:%d:%d:%d:%d",
View
61 c_src/frame_info_queue.c
@@ -61,42 +61,61 @@ void queue_frame_info(frame_info_queue *queue, unsigned char *frame_info_data, u
void add_frame_info_to_frame(frame_info_queue *queue, AVFrame *frame)
{
- frame_info *frame_inf = get_frame_info(queue, frame->pkt_pts);
+ frame_info *frame_inf = get_frame_info(queue, frame->pkt_pts, 1);
AVFrameSideData *side_data = av_frame_new_side_data(frame, FRAME_INFO_SIDE_DATA_TYPE, sizeof(frame_info) + frame_inf->buffer_size);
memcpy(side_data->data, frame_inf, sizeof(frame_info) + frame_inf->buffer_size);
+
+ free(frame_inf);
+}
+
+static frame_info *remove_from_queue(frame_info_queue *queue, frame_info_queue_item *item, frame_info_queue_item *prev)
+{
+ frame_info *frame_info;
+
+ if (queue->inbound_list_head == item) {
+ queue->inbound_list_head = item->next;
+ }
+ else {
+ prev->next = item->next;
+ }
+
+ if (queue->inbound_list_tail == item) {
+ queue->inbound_list_tail = prev;
+ }
+
+ frame_info = item->frame_info;
+ free(item);
+
+ return frame_info;
}
-frame_info *get_frame_info(frame_info_queue *queue, int64_t pts)
+frame_info *get_frame_info(frame_info_queue *queue, int64_t pts, int drop_old_pts)
{
frame_info_queue_item *prev = NULL;
frame_info_queue_item *current = queue->inbound_list_head;
- frame_info *frame_info;
while (current)
{
- if (current->pts == pts)
+ if ((current->pts < pts) && drop_old_pts)
{
- if (queue->inbound_list_head == current) {
- queue->inbound_list_head = current->next;
- }
- else {
- prev->next = current->next;
- }
-
- if (queue->inbound_list_tail == current) {
- queue->inbound_list_tail = prev;
- }
+ frame_info_queue_item *t = current;
+ current = t->next;
- frame_info = current->frame_info;
- free(current);
-
- return frame_info;
+ frame_info *frame_info = remove_from_queue(queue, t, prev);
+
+ free(frame_info);
+ }
+ else if (current->pts == pts)
+ {
+ return remove_from_queue(queue, current, prev);
+ }
+ else
+ {
+ prev = current;
+ current = current->next;
}
-
- prev = current;
- current = current->next;
}
ERRORFMT("Failed to find frame_info for %ld", pts);
View
2  c_src/id3as_libav.h
@@ -106,4 +106,4 @@ void queue_frame_info_from_frame(frame_info_queue *queue, AVFrame *frame);
void queue_frame_info(frame_info_queue *queue, unsigned char *frame_info, unsigned int frame_info_size, int64_t pts);
void add_frame_info_to_frame(frame_info_queue *queue, AVFrame *frame);
void init_frame_info_queue(frame_info_queue **queue);
-frame_info *get_frame_info(frame_info_queue *queue, int64_t pts);
+frame_info *get_frame_info(frame_info_queue *queue, int64_t pts, int drop_old_pts);
View
14 c_src/main.c
@@ -1,3 +1,7 @@
+#define _GNU_SOURCE /* See feature_test_macros(7) */
+#include <pthread.h>
+#include <sched.h>
+
#include "id3as_libav.h"
#include <libavfilter/avfilter.h>
@@ -17,6 +21,14 @@ void initialise(char *mode, void *initialisation_data, int length)
sync_mode = (strncmp(mode, "async", 5) != 0);
input = build_graph((char *) initialisation_data);
+ /*
+ cpu_set_t cpuset;
+ CPU_ZERO(&cpuset);
+ CPU_SET(0, &cpuset);
+
+ pthread_t current_thread = pthread_self();
+ pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
+ */
}
void process_frame(void *metadata, int metadata_size, void *frame_info, int frame_info_size)
@@ -132,6 +144,8 @@ static ID3ASFilterContext *read_filter(char *buf, int *index)
ID3ASFilter *filter = find_filter(name);
+ free(name);
+
return allocate_instance(filter, params, codec_params, downstream_filters, num_downstream_filters);
}
View
8 c_src/video_encoded_output.c
@@ -46,7 +46,7 @@ static int encode(ID3ASFilterContext *context, AVFrame *frame, AVPacket *pkt)
if (got_packet_ptr)
{
- frame_info *frame_info = get_frame_info(this->frame_info_queue, pkt->pts);
+ frame_info *frame_info = get_frame_info(this->frame_info_queue, pkt->pts, 0);
// And rescale back to "erlang time"
pkt->pts = av_rescale_q(pkt->pts, this->context->time_base, NINETY_KHZ);
@@ -111,6 +111,8 @@ static void do_init(codec_t *this, AVFrame *frame)
i_mutex_lock(&mutex);
+ this->codec = get_encoder(this->codec_name);
+
AVDictionaryEntry *flagsEntry = av_dict_get(this->codec_options, "flags", NULL, 0);
char flags[255];
strcpy(flags, flagsEntry ? flagsEntry-> value : "");
@@ -140,10 +142,8 @@ static void do_init(codec_t *this, AVFrame *frame)
static void init(ID3ASFilterContext *context, AVDictionary *codec_options)
{
codec_t *this = context->priv_data;
- this->initialised = 0;
-
- this->codec = get_encoder(this->codec_name);
this->codec_options = codec_options;
+ this->initialised = 0;
}
static const AVOption options[] = {
Please sign in to comment.
Something went wrong with that request. Please try again.