Permalink
Browse files

Added passthrough back into async_parallel, since its abscence appear…

…s to cause a leak
  • Loading branch information...
1 parent 7883502 commit 7d05c25e9ce601d1357c60a12ee4f8c3c5275e33 @srstrong srstrong committed Jan 24, 2014
Showing with 55 additions and 36 deletions.
  1. +55 −36 c_src/async_parallel_filter.c
@@ -119,6 +119,7 @@ typedef struct _thread_struct_t
typedef struct _codec_t
{
AVClass *av_class;
+ int pass_through;
thread_struct *threads;
} codec_t;
@@ -134,21 +135,28 @@ static void process(ID3ASFilterContext *context, AVFrame *frame, AVRational time
}
return;
*/
- codec_t *this = context->priv_data;
- for (int i = 0; i < context->num_downstream_filters; i++) {
-
- frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
- frame_entry->timebase = timebase;
- frame_entry->frame = av_frame_clone(frame);
- frame_entry->exit_thread = 0;
-
- ADD_TO_QUEUE(context, this->threads[i].inbound_frame_queue, frame_entry);
- }
+ codec_t *this = context->priv_data;
- if (sync_mode) {
+ if (this->pass_through) {
+ send_to_graph(context, frame, timebase);
+ }
+ else {
+
for (int i = 0; i < context->num_downstream_filters; i++) {
- pthread_cond_wait(&this->threads[i].complete, &this->threads[i].complete_mutex);
+
+ frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
+ frame_entry->timebase = timebase;
+ frame_entry->frame = av_frame_clone(frame);
+ frame_entry->exit_thread = 0;
+
+ ADD_TO_QUEUE(context, this->threads[i].inbound_frame_queue, frame_entry);
+ }
+
+ if (sync_mode) {
+ for (int i = 0; i < context->num_downstream_filters; i++) {
+ pthread_cond_wait(&this->threads[i].complete, &this->threads[i].complete_mutex);
+ }
}
}
}
@@ -157,16 +165,21 @@ static void flush(ID3ASFilterContext *context)
{
codec_t *this = context->priv_data;
- for (int i = 0; i < context->num_downstream_filters; i++) {
-
- frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
- frame_entry->exit_thread = 1;
-
- ADD_TO_QUEUE(context, this->threads[i].inbound_frame_queue, frame_entry);
+ if (this->pass_through) {
+ flush_graph(context);
}
-
- for (int i = 0; i < context->num_downstream_filters; i++) {
- pthread_join(this->threads[i].thread, NULL);
+ else {
+ for (int i = 0; i < context->num_downstream_filters; i++) {
+
+ frame_entry_t *frame_entry = (frame_entry_t *) malloc(sizeof(frame_entry_t));
+ frame_entry->exit_thread = 1;
+
+ ADD_TO_QUEUE(context, this->threads[i].inbound_frame_queue, frame_entry);
+ }
+
+ for (int i = 0; i < context->num_downstream_filters; i++) {
+ pthread_join(this->threads[i].thread, NULL);
+ }
}
}
@@ -215,22 +228,28 @@ static void init(ID3ASFilterContext *context, AVDictionary *codec_options)
{
codec_t *this = context->priv_data;
- this->threads = (thread_struct *) malloc(sizeof(thread_struct) * context->num_downstream_filters);
+ if (context->num_downstream_filters < 2) {
+ this->pass_through = 1;
+ }
+ else {
+ this->pass_through = 0;
+ this->threads = (thread_struct *) malloc(sizeof(thread_struct) * context->num_downstream_filters);
- for (int i = 0; i < context->num_downstream_filters; i++)
- {
- this->threads[i].thread_id = thread_id++;
- this->threads[i].context = context;
- this->threads[i].downstream_filter = context->downstream_filters[i];
- this->threads[i].codec_t = this;
- pthread_cond_init(&this->threads[i].complete, NULL);
- pthread_mutex_init(&this->threads[i].complete_mutex, NULL);
- pthread_mutex_lock(&this->threads[i].complete_mutex);
-
- INIT_QUEUE(this->threads[i].inbound_frame_queue);
-
- pthread_create(&this->threads[i].thread, NULL, &thread_proc, &this->threads[i]);
- }
+ for (int i = 0; i < context->num_downstream_filters; i++)
+ {
+ this->threads[i].thread_id = thread_id++;
+ this->threads[i].context = context;
+ this->threads[i].downstream_filter = context->downstream_filters[i];
+ this->threads[i].codec_t = this;
+ pthread_cond_init(&this->threads[i].complete, NULL);
+ pthread_mutex_init(&this->threads[i].complete_mutex, NULL);
+ pthread_mutex_lock(&this->threads[i].complete_mutex);
+
+ INIT_QUEUE(this->threads[i].inbound_frame_queue);
+
+ pthread_create(&this->threads[i].thread, NULL, &thread_proc, &this->threads[i]);
+ }
+ }
}
static const AVOption options[] = {

0 comments on commit 7d05c25

Please sign in to comment.