Skip to content

Commit

Permalink
Merge r236409 - [MSE][GStreamer] Pull demuxed samples in batches
Browse files Browse the repository at this point in the history
https://bugs.webkit.org/show_bug.cgi?id=189871

Reviewed by Xabier Rodriguez-Calvar.

After this patch, only the notifications of "new samples available"
(appsink-new-sample bus messages) travel from the streaming thread to
the main thread through the bus and the main thread is the responsible
of pulling as many samples as it can from appsink. Before, the samples
were pulled from appsink in the non-main thread and traveled to the
main thread through the bus one by one.

This reduces drastically the amount of context switches and waiting
time in the streaming thread, resulting in a noticeable performance
improvement.

This fixes stutter while loading YouTube videos.

* platform/graphics/gstreamer/MediaSampleGStreamer.cpp:
(WebCore::MediaSampleGStreamer::MediaSampleGStreamer):
* platform/graphics/gstreamer/mse/AppendPipeline.cpp:
(WebCore::AppendPipeline::AppendPipeline):
(WebCore::AppendPipeline::~AppendPipeline):
(WebCore::AppendPipeline::clearPlayerPrivate):
(WebCore::AppendPipeline::handleApplicationMessage):
(WebCore::AppendPipeline::appsinkNewSample):
(WebCore::AppendPipeline::consumeAppSinkAvailableSamples):
(WebCore::AppendPipeline::resetPipeline):
(WebCore::AppendPipeline::handleNewAppsinkSample):
* platform/graphics/gstreamer/mse/AppendPipeline.h:
  • Loading branch information
ntrrgc authored and aperezdc committed Oct 1, 2018
1 parent 398b3e0 commit 69c7153
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 90 deletions.
33 changes: 33 additions & 0 deletions Source/WebCore/ChangeLog
@@ -1,3 +1,36 @@
2018-09-24 Alicia Boya García <aboya@igalia.com>

[MSE][GStreamer] Pull demuxed samples in batches
https://bugs.webkit.org/show_bug.cgi?id=189871

Reviewed by Xabier Rodriguez-Calvar.

After this patch, only the notifications of "new samples available"
(appsink-new-sample bus messages) travel from the streaming thread to
the main thread through the bus and the main thread is the responsible
of pulling as many samples as it can from appsink. Before, the samples
were pulled from appsink in the non-main thread and traveled to the
main thread through the bus one by one.

This reduces drastically the amount of context switches and waiting
time in the streaming thread, resulting in a noticeable performance
improvement.

This fixes stutter while loading YouTube videos.

* platform/graphics/gstreamer/MediaSampleGStreamer.cpp:
(WebCore::MediaSampleGStreamer::MediaSampleGStreamer):
* platform/graphics/gstreamer/mse/AppendPipeline.cpp:
(WebCore::AppendPipeline::AppendPipeline):
(WebCore::AppendPipeline::~AppendPipeline):
(WebCore::AppendPipeline::clearPlayerPrivate):
(WebCore::AppendPipeline::handleApplicationMessage):
(WebCore::AppendPipeline::appsinkNewSample):
(WebCore::AppendPipeline::consumeAppSinkAvailableSamples):
(WebCore::AppendPipeline::resetPipeline):
(WebCore::AppendPipeline::handleNewAppsinkSample):
* platform/graphics/gstreamer/mse/AppendPipeline.h:

2018-09-24 Enrique Ocaña González <eocanha@igalia.com>

[MSE][GStreamer] Don't update duration when it was not previously NaN
Expand Down
Expand Up @@ -36,8 +36,7 @@ MediaSampleGStreamer::MediaSampleGStreamer(GRefPtr<GstSample>&& sample, const Fl
{
ASSERT(sample);
GstBuffer* buffer = gst_sample_get_buffer(sample.get());
if (!buffer)
return;
RELEASE_ASSERT(buffer);

auto createMediaTime =
[](GstClockTime time) -> MediaTime {
Expand Down
150 changes: 67 additions & 83 deletions Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.cpp
Expand Up @@ -108,6 +108,7 @@ AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceCli
, m_sourceBufferPrivate(sourceBufferPrivate.get())
, m_playerPrivate(&playerPrivate)
, m_id(0)
, m_wasBusAlreadyNotifiedOfAvailableSamples(false)
, m_appsrcAtLeastABufferLeft(false)
, m_appsrcNeedDataReceived(false)
, m_appsrcDataLeavingProbeId(0)
Expand Down Expand Up @@ -188,11 +189,7 @@ AppendPipeline::~AppendPipeline()
{
ASSERT(WTF::isMainThread());

{
LockHolder locker(m_newSampleLock);
setAppendState(AppendState::Invalid);
m_newSampleCondition.notifyOne();
}
setAppendState(AppendState::Invalid);

{
LockHolder locker(m_padAddRemoveLock);
Expand Down Expand Up @@ -253,15 +250,9 @@ void AppendPipeline::clearPlayerPrivate()
ASSERT(WTF::isMainThread());
GST_DEBUG("cleaning private player");

{
LockHolder locker(m_newSampleLock);
// Make sure that AppendPipeline won't process more data from now on and
// instruct handleNewSample to abort itself from now on as well.
setAppendState(AppendState::Invalid);

// Awake any pending handleNewSample operation in the streaming thread.
m_newSampleCondition.notifyOne();
}
// Make sure that AppendPipeline won't process more data from now on and
// instruct handleNewSample to abort itself from now on as well.
setAppendState(AppendState::Invalid);

{
LockHolder locker(m_padAddRemoveLock);
Expand Down Expand Up @@ -316,10 +307,8 @@ void AppendPipeline::handleApplicationMessage(GstMessage* message)
}

if (gst_structure_has_name(structure, "appsink-new-sample")) {
GRefPtr<GstSample> newSample;
gst_structure_get(structure, "new-sample", GST_TYPE_SAMPLE, &newSample.outPtr(), nullptr);

appsinkNewSample(newSample.get());
m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
consumeAppsinkAvailableSamples();
return;
}

Expand Down Expand Up @@ -626,58 +615,48 @@ void AppendPipeline::checkEndOfAppend()
}
}

void AppendPipeline::appsinkNewSample(GstSample* sample)
void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
{
ASSERT(WTF::isMainThread());

{
LockHolder locker(m_newSampleLock);

// Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
// FIXME: Return ERROR and find a more robust way to detect that all the
// data has been processed, so we don't need to resort to these hacks.
// All in all, return OK, even if it's not the proper thing to do. We don't want to break the demuxer.
m_flowReturn = GST_FLOW_OK;
m_newSampleCondition.notifyOne();
return;
}
// Ignore samples if we're not expecting them. Refuse processing if we're in Invalid state.
if (m_appendState != AppendState::Ongoing && m_appendState != AppendState::Sampling) {
GST_WARNING("Unexpected sample, appendState=%s", dumpAppendState(m_appendState));
// FIXME: Return ERROR and find a more robust way to detect that all the
// data has been processed, so we don't need to resort to these hacks.
return;
}

RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(sample, m_presentationSize, trackId());

GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
mediaSample->trackID().string().utf8().data(),
mediaSample->presentationTime().toString().utf8().data(),
mediaSample->decodeTime().toString().utf8().data(),
mediaSample->duration().toString().utf8().data(),
mediaSample->presentationSize().width(), mediaSample->presentationSize().height());

// If we're beyond the duration, ignore this sample and the remaining ones.
MediaTime duration = m_mediaSourceClient->duration();
if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
setAppendState(AppendState::LastSample);
m_flowReturn = GST_FLOW_OK;
m_newSampleCondition.notifyOne();
return;
}
if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
GST_WARNING("Received sample without buffer from appsink.");
return;
}

// Add a gap sample if a gap is detected before the first sample.
if (mediaSample->decodeTime() == MediaTime::zeroTime()
&& mediaSample->presentationTime() > MediaTime::zeroTime()
&& mediaSample->presentationTime() <= MediaTime(1, 10)) {
GST_DEBUG("Adding gap offset");
mediaSample->applyPtsOffset(MediaTime::zeroTime());
}
RefPtr<MediaSampleGStreamer> mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());

GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
mediaSample->trackID().string().utf8().data(),
mediaSample->presentationTime().toString().utf8().data(),
mediaSample->decodeTime().toString().utf8().data(),
mediaSample->duration().toString().utf8().data(),
mediaSample->presentationSize().width(), mediaSample->presentationSize().height());

// If we're beyond the duration, ignore this sample and the remaining ones.
MediaTime duration = m_mediaSourceClient->duration();
if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
GST_DEBUG("Detected sample (%f) beyond the duration (%f), declaring LastSample", mediaSample->presentationTime().toFloat(), duration.toFloat());
setAppendState(AppendState::LastSample);
return;
}

m_sourceBufferPrivate->didReceiveSample(*mediaSample);
setAppendState(AppendState::Sampling);
m_flowReturn = GST_FLOW_OK;
m_newSampleCondition.notifyOne();
// Add a gap sample if a gap is detected before the first sample.
if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
GST_DEBUG("Adding gap offset");
mediaSample->applyPtsOffset(MediaTime::zeroTime());
}

checkEndOfAppend();
m_sourceBufferPrivate->didReceiveSample(*mediaSample);
setAppendState(AppendState::Sampling);
}

void AppendPipeline::appsinkEOS()
Expand Down Expand Up @@ -743,19 +722,32 @@ AtomicString AppendPipeline::trackId()
return m_track->id();
}

void AppendPipeline::consumeAppsinkAvailableSamples()
{
ASSERT(WTF::isMainThread());

GRefPtr<GstSample> sample;
int batchedSampleCount = 0;
while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
appsinkNewSample(WTFMove(sample));
batchedSampleCount++;
}

GST_TRACE_OBJECT(this, "batchedSampleCount = %d", batchedSampleCount);

if (batchedSampleCount > 0)
checkEndOfAppend();
}

void AppendPipeline::resetPipeline()
{
ASSERT(WTF::isMainThread());
GST_DEBUG("resetting pipeline");
m_appsrcAtLeastABufferLeft = false;
setAppsrcDataLeavingProbe();

{
LockHolder locker(m_newSampleLock);
m_newSampleCondition.notifyOne();
gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);
}
gst_element_set_state(m_pipeline.get(), GST_STATE_READY);
gst_element_get_state(m_pipeline.get(), nullptr, nullptr, 0);

#if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
{
Expand Down Expand Up @@ -844,27 +836,19 @@ GstFlowReturn AppendPipeline::handleNewAppsinkSample(GstElement* appsink)
{
ASSERT(!WTF::isMainThread());

// Even if we're disabled, it's important to pull the sample out anyway to
// avoid deadlocks when changing to GST_STATE_NULL having a non empty appsink.
GRefPtr<GstSample> sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(appsink)));
LockHolder locker(m_newSampleLock);

if (!m_playerPrivate || m_appendState == AppendState::Invalid) {
GST_WARNING("AppendPipeline has been disabled, ignoring this sample");
return GST_FLOW_ERROR;
}

GstStructure* structure = gst_structure_new("appsink-new-sample", "new-sample", GST_TYPE_SAMPLE, sample.get(), nullptr);
GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
gst_bus_post(m_bus.get(), message);
GST_TRACE("appsink-new-sample message posted to bus");

m_newSampleCondition.wait(m_newSampleLock);
// We've been awaken because the sample was processed or because of
// an exceptional condition (entered in Invalid state, destructor, etc.).
// We can't reliably delete info here, appendPipelineAppsinkNewSampleMainThread will do it.
if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
GstStructure* structure = gst_structure_new_empty("appsink-new-sample");
GstMessage* message = gst_message_new_application(GST_OBJECT(appsink), structure);
gst_bus_post(m_bus.get(), message);
GST_TRACE("appsink-new-sample message posted to bus");
}

return m_flowReturn;
return GST_FLOW_OK;
}

static GRefPtr<GstElement>
Expand Down
16 changes: 11 additions & 5 deletions Source/WebCore/platform/graphics/gstreamer/mse/AppendPipeline.h
Expand Up @@ -27,6 +27,7 @@
#include "MediaSourceClientGStreamerMSE.h"
#include "SourceBufferPrivateGStreamer.h"

#include <atomic>
#include <gst/gst.h>
#include <wtf/Condition.h>

Expand Down Expand Up @@ -61,7 +62,7 @@ class AppendPipeline : public ThreadSafeRefCounted<AppendPipeline> {
// Takes ownership of caps.
void parseDemuxerSrcPadCaps(GstCaps*);
void appsinkCapsChanged();
void appsinkNewSample(GstSample*);
void appsinkNewSample(GRefPtr<GstSample>&&);
void appsinkEOS();
void didReceiveInitializationSegment();
AtomicString trackId();
Expand Down Expand Up @@ -94,6 +95,8 @@ class AppendPipeline : public ThreadSafeRefCounted<AppendPipeline> {
void removeAppsrcDataLeavingProbe();
void setAppsrcDataLeavingProbe();

void consumeAppsinkAvailableSamples();

Ref<MediaSourceClientGStreamerMSE> m_mediaSourceClient;
Ref<SourceBufferPrivateGStreamer> m_sourceBufferPrivate;
MediaPlayerPrivateGStreamerMSE* m_playerPrivate;
Expand All @@ -103,8 +106,6 @@ class AppendPipeline : public ThreadSafeRefCounted<AppendPipeline> {

MediaTime m_initialDuration;

GstFlowReturn m_flowReturn;

GRefPtr<GstElement> m_pipeline;
GRefPtr<GstBus> m_bus;
GRefPtr<GstElement> m_appsrc;
Expand All @@ -113,8 +114,13 @@ class AppendPipeline : public ThreadSafeRefCounted<AppendPipeline> {
// The demuxer has one src stream only, so only one appsink is needed and linked to it.
GRefPtr<GstElement> m_appsink;

Lock m_newSampleLock;
Condition m_newSampleCondition;
// Used to avoid unnecessary notifications per sample.
// It is read and written from the streaming thread and written from the main thread.
// The main thread must set it to false before actually pulling samples.
// This strategy ensures that at any time, there are at most two notifications in the bus
// queue, instead of it growing unbounded.
std::atomic_flag m_wasBusAlreadyNotifiedOfAvailableSamples;

Lock m_padAddRemoveLock;
Condition m_padAddRemoveCondition;

Expand Down

0 comments on commit 69c7153

Please sign in to comment.