Skip to content

Commit

Permalink
winegstreamer: Use a GstAtomicQueue for wg_transform output queue.
Browse files Browse the repository at this point in the history
Some GStreamer video plugins such as vaapi do not expect their buffers
to be appended to each other, and it breaks some of their assertions.

We will also later need to queue caps change events, and the atomic
queue will let us queue GstSample instead to hold the caps too. It also
removes the need of a mutex or locking operations.

Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=45988
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=47084
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=49715
Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=52183
Signed-off-by: Rémi Bernon <rbernon@codeweavers.com>
Signed-off-by: Nikolay Sivov <nsivov@codeweavers.com>
Signed-off-by: Zebediah Figura <zfigura@codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard@winehq.org>
  • Loading branch information
rbernon authored and julliard committed Apr 4, 2022
1 parent ad44ede commit b5d6c59
Showing 1 changed file with 33 additions and 34 deletions.
67 changes: 33 additions & 34 deletions dlls/winegstreamer/wg_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ struct wg_transform
GstSegment segment;
GstBufferList *input;
guint input_max_length;

pthread_mutex_t mutex;
GstBuffer *output;
GstAtomicQueue *output_queue;
GstBuffer *output_buffer;
};

static GstFlowReturn transform_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
Expand All @@ -61,32 +60,32 @@ static GstFlowReturn transform_sink_chain_cb(GstPad *pad, GstObject *parent, Gst

GST_LOG("transform %p, buffer %p.", transform, buffer);

pthread_mutex_lock(&transform->mutex);
if (transform->output)
transform->output = gst_buffer_append(transform->output, buffer);
else
transform->output = buffer;
pthread_mutex_unlock(&transform->mutex);
gst_atomic_queue_push(transform->output_queue, buffer);

return GST_FLOW_OK;
}

NTSTATUS wg_transform_destroy(void *args)
{
struct wg_transform *transform = args;
GstBuffer *buffer;

if (transform->input)
gst_buffer_list_unref(transform->input);
if (transform->output)
gst_buffer_unref(transform->output);

gst_element_set_state(transform->container, GST_STATE_NULL);

if (transform->output_buffer)
gst_buffer_unref(transform->output_buffer);
while ((buffer = gst_atomic_queue_pop(transform->output_queue)))
gst_buffer_unref(buffer);

g_object_unref(transform->their_sink);
g_object_unref(transform->their_src);
g_object_unref(transform->container);
g_object_unref(transform->my_sink);
g_object_unref(transform->my_src);
pthread_mutex_destroy(&transform->mutex);
gst_atomic_queue_unref(transform->output_queue);
free(transform);

return STATUS_SUCCESS;
Expand Down Expand Up @@ -182,6 +181,8 @@ NTSTATUS wg_transform_create(void *args)
goto out;
if (!(transform->input = gst_buffer_list_new()))
goto out;
if (!(transform->output_queue = gst_atomic_queue_new(8)))
goto out;
transform->input_max_length = 1;

if (!(src_caps = wg_format_to_caps(&input_format)))
Expand Down Expand Up @@ -308,8 +309,6 @@ NTSTATUS wg_transform_create(void *args)
gst_caps_unref(sink_caps);
gst_caps_unref(src_caps);

pthread_mutex_init(&transform->mutex, NULL);

GST_INFO("Created winegstreamer transform %p.", transform);
params->transform = transform;
return STATUS_SUCCESS;
Expand All @@ -327,6 +326,8 @@ NTSTATUS wg_transform_create(void *args)
gst_object_unref(transform->my_src);
if (src_caps)
gst_caps_unref(src_caps);
if (transform->output_queue)
gst_atomic_queue_unref(transform->output_queue);
if (transform->input)
gst_buffer_list_unref(transform->input);
if (transform->container)
Expand Down Expand Up @@ -368,10 +369,8 @@ NTSTATUS wg_transform_push_data(void *args)
return STATUS_SUCCESS;
}

static NTSTATUS read_transform_output_data(struct wg_transform *transform,
struct wg_sample *sample)
static NTSTATUS read_transform_output_data(GstBuffer *buffer, struct wg_sample *sample)
{
GstBuffer *buffer = transform->output;
GstMapInfo info;

if (!gst_buffer_map(buffer, &info, GST_MAP_READ))
Expand All @@ -392,12 +391,6 @@ static NTSTATUS read_transform_output_data(struct wg_transform *transform,
gst_buffer_unmap(buffer, &info);
gst_buffer_resize(buffer, sample->size, -1);

if (info.size <= sample->size)
{
gst_buffer_unref(transform->output);
transform->output = NULL;
}

GST_INFO("Copied %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
return STATUS_SUCCESS;
}
Expand All @@ -424,20 +417,26 @@ NTSTATUS wg_transform_read_data(void *args)
return STATUS_UNSUCCESSFUL;
}

sample->size = 0;
pthread_mutex_lock(&transform->mutex);
if (transform->output)
{
params->result = S_OK;
status = read_transform_output_data(transform, sample);
}
else
if (!transform->output_buffer && !(transform->output_buffer = gst_atomic_queue_pop(transform->output_queue)))
{
sample->size = 0;
params->result = MF_E_TRANSFORM_NEED_MORE_INPUT;
status = STATUS_SUCCESS;
GST_INFO("Cannot read %u bytes, no output available", sample->max_size);
return STATUS_SUCCESS;
}
pthread_mutex_unlock(&transform->mutex);

return status;
if ((status = read_transform_output_data(transform->output_buffer, sample)))
{
sample->size = 0;
return status;
}

if (!(sample->flags & WG_SAMPLE_FLAG_INCOMPLETE))
{
gst_buffer_unref(transform->output_buffer);
transform->output_buffer = NULL;
}

params->result = S_OK;
return STATUS_SUCCESS;
}

0 comments on commit b5d6c59

Please sign in to comment.