Skip to content
Permalink
Browse files
[GStreamer][Playbin3] Stream collection handling fixes
https://bugs.webkit.org/show_bug.cgi?id=222322

Patch by Philippe Normand <pnormand@igalia.com> on 2021-03-01
Reviewed by Xabier Rodriguez-Calvar.

The track orphaning trying to avoid un-necessary track addition/removals was making the
whole thing inconsistent with the final stream collection. Also stream-collection messages
don't need to be handled from a synchronous GstBus handler, this should be needed for
need-context messages only, so the corresponding code has been refactored.

This patch also includes changes and cleanups for the mediastreamsrc element, needed after
the stream collection handling fixes. Most notably the element now keeps an internal list of
tracks, in addition to observing the MediaStreamPrivate for topology changes. Also it emits
a new stream collection whenever a new source pad has been added.

No new tests, but this patch fixes flakyness of http/tests/media/hls/hls-audio-tracks.html
when the pipeline is driven by playbin3.

* platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
(WebCore::MediaPlayerPrivateGStreamer::~MediaPlayerPrivateGStreamer):
(WebCore::MediaPlayerPrivateGStreamer::updateTracks):
(WebCore::MediaPlayerPrivateGStreamer::setPipeline):
(WebCore::MediaPlayerPrivateGStreamer::handleStreamCollectionMessage):
(WebCore::MediaPlayerPrivateGStreamer::handleNeedContextMessage):
(WebCore::MediaPlayerPrivateGStreamer::handleMessage):
(WebCore::MediaPlayerPrivateGStreamer::createGSTPlayBin):
* platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.h:
* platform/graphics/gstreamer/mse/AppendPipeline.cpp:
(WebCore::AppendPipeline::handleNeedContextSyncMessage):
* platform/mediastream/gstreamer/GStreamerMediaStreamSource.cpp:
(stopObservingTracks):
(webkitMediaStreamSrcDispose):
(webkitMediaStreamSrcAddPad):
(ProbeData::ProbeData):
(webkitMediaStreamSrcPadProbeCb):
(webkitMediaStreamSrcSetupSrc):
(webkitMediaStreamSrcPostStreamCollection):
(webkitMediaStreamSrcAddTrack):
(webkitMediaStreamSrcSetStream):
(webkitMediaStreamSrcTrackEnded):

Canonical link: https://commits.webkit.org/234684@main
git-svn-id: https://svn.webkit.org/repository/webkit/trunk@273644 268f45cc-cd09-0410-ab3c-d52691b4dbfc
  • Loading branch information
philn authored and webkit-commit-queue committed Mar 1, 2021
1 parent da3dd96 commit 65db70e
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 148 deletions.
@@ -1,3 +1,46 @@
2021-03-01 Philippe Normand <pnormand@igalia.com>

[GStreamer][Playbin3] Stream collection handling fixes
https://bugs.webkit.org/show_bug.cgi?id=222322

Reviewed by Xabier Rodriguez-Calvar.

The track orphaning trying to avoid un-necessary track addition/removals was making the
whole thing inconsistent with the final stream collection. Also stream-collection messages
don't need to be handled from a synchronous GstBus handler, this should be needed for
need-context messages only, so the corresponding code has been refactored.

This patch also includes changes and cleanups for the mediastreamsrc element, needed after
the stream collection handling fixes. Most notably the element now keeps an internal list of
tracks, in addition to observing the MediaStreamPrivate for topology changes. Also it emits
a new stream collection whenever a new source pad has been added.

No new tests, but this patch fixes flakyness of http/tests/media/hls/hls-audio-tracks.html
when the pipeline is driven by playbin3.

* platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
(WebCore::MediaPlayerPrivateGStreamer::~MediaPlayerPrivateGStreamer):
(WebCore::MediaPlayerPrivateGStreamer::updateTracks):
(WebCore::MediaPlayerPrivateGStreamer::setPipeline):
(WebCore::MediaPlayerPrivateGStreamer::handleStreamCollectionMessage):
(WebCore::MediaPlayerPrivateGStreamer::handleNeedContextMessage):
(WebCore::MediaPlayerPrivateGStreamer::handleMessage):
(WebCore::MediaPlayerPrivateGStreamer::createGSTPlayBin):
* platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.h:
* platform/graphics/gstreamer/mse/AppendPipeline.cpp:
(WebCore::AppendPipeline::handleNeedContextSyncMessage):
* platform/mediastream/gstreamer/GStreamerMediaStreamSource.cpp:
(stopObservingTracks):
(webkitMediaStreamSrcDispose):
(webkitMediaStreamSrcAddPad):
(ProbeData::ProbeData):
(webkitMediaStreamSrcPadProbeCb):
(webkitMediaStreamSrcSetupSrc):
(webkitMediaStreamSrcPostStreamCollection):
(webkitMediaStreamSrcAddTrack):
(webkitMediaStreamSrcSetStream):
(webkitMediaStreamSrcTrackEnded):

2021-02-28 Sam Weinig <weinig@apple.com>

Consider removing iOS only CSS property alias -webkit-hyphenate-locale introduced in r80288
@@ -101,20 +101,6 @@
#undef GST_USE_UNSTABLE_API
#endif // ENABLE(VIDEO) && USE(GSTREAMER_MPEGTS)

#define CREATE_TRACK(type, Type) G_STMT_START { \
m_has##Type = true; \
if (!useMediaSource) { \
RefPtr<Type##TrackPrivateGStreamer> track = Type##TrackPrivateGStreamer::create(makeWeakPtr(*this), type##TrackIndex++, stream); \
if (!track->trackIndex()) { \
track->setActive(true); \
m_wanted##Type##StreamId = track->id(); \
m_requested##Type##StreamId = track->id(); \
} \
m_##type##Tracks.add(track->id(), track); \
m_player->add##Type##Track(*track); \
} \
} G_STMT_END

#if USE(GSTREAMER_GL)
#include "GLVideoSinkGStreamer.h"
#include "VideoTextureCopierGStreamer.h"
@@ -148,11 +134,6 @@ using namespace std;
static const FloatSize s_holePunchDefaultFrameSize(1280, 720);
#endif

static void busMessageCallback(GstBus*, GstMessage* message, MediaPlayerPrivateGStreamer* player)
{
player->handleMessage(message);
}

static void initializeDebugCategory()
{
static std::once_flag onceFlag;
@@ -244,11 +225,10 @@ MediaPlayerPrivateGStreamer::~MediaPlayerPrivateGStreamer()
}

if (m_pipeline) {
GRefPtr<GstBus> bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
auto bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
ASSERT(bus);
g_signal_handlers_disconnect_by_func(bus.get(), gpointer(busMessageCallback), this);
g_signal_handlers_disconnect_matched(bus.get(), G_SIGNAL_MATCH_DATA, 0, 0, nullptr, nullptr, this);
gst_bus_remove_signal_watch(bus.get());
gst_bus_set_sync_handler(bus.get(), nullptr, nullptr, nullptr);
g_signal_handlers_disconnect_matched(m_pipeline.get(), G_SIGNAL_MATCH_DATA, 0, 0, nullptr, nullptr, this);
}

@@ -1482,16 +1462,7 @@ void MediaPlayerPrivateGStreamer::playbin3SendSelectStreamsIfAppropriate()
g_list_free_full(streams, reinterpret_cast<GDestroyNotify>(g_free));
}

template<typename K, typename V>
HashSet<K> hashSetFromHashMapKeys(const HashMap<K, V>& hashMap)
{
HashSet<K> keys;
for (auto& key : hashMap.keys())
keys.add(key);
return keys;
}

void MediaPlayerPrivateGStreamer::updateTracks(GRefPtr<GstStreamCollection>&& streamCollection)
void MediaPlayerPrivateGStreamer::updateTracks(const GRefPtr<GstStreamCollection>& streamCollection)
{
ASSERT(!m_isLegacyPlaybin);

@@ -1502,29 +1473,46 @@ void MediaPlayerPrivateGStreamer::updateTracks(GRefPtr<GstStreamCollection>&& st
bool oldHasAudio = m_hasAudio;
bool oldHasVideo = m_hasVideo;

// New stream collections override previous ones.
// New stream collections override previous ones so in order to keep our internal tracks
// consistent with the collection contents, we can't reuse our old tracks.
#define REMOVE_OLD_TRACKS(type, Type) G_STMT_START { \
for (const auto& trackId : m_##type##Tracks.keys()) { \
auto track = m_##type##Tracks.get(trackId); \
m_player->remove##Type##Track(*track); \
} \
m_##type##Tracks.clear(); \
} G_STMT_END

REMOVE_OLD_TRACKS(audio, Audio);
REMOVE_OLD_TRACKS(video, Video);
REMOVE_OLD_TRACKS(text, Text);

unsigned audioTrackIndex = 0;
unsigned videoTrackIndex = 0;
unsigned textTrackIndex = 0;
HashSet<AtomString> orphanedAudioTrackIds = hashSetFromHashMapKeys(m_audioTracks);
HashSet<AtomString> orphanedVideoTrackIds = hashSetFromHashMapKeys(m_videoTracks);
HashSet<AtomString> orphanedTextTrackIds = hashSetFromHashMapKeys(m_textTracks);
for (unsigned i = 0; i < length; i++) {
GRefPtr<GstStream> stream = gst_stream_collection_get_stream(streamCollection.get(), i);
String streamId(gst_stream_get_stream_id(stream.get()));
GstStreamType type = gst_stream_get_stream_type(stream.get());

orphanedAudioTrackIds.remove(streamId);
orphanedVideoTrackIds.remove(streamId);
orphanedTextTrackIds.remove(streamId);
#define CREATE_TRACK(type, Type) G_STMT_START { \
m_has##Type = true; \
if (!useMediaSource) { \
RefPtr<Type##TrackPrivateGStreamer> track = Type##TrackPrivateGStreamer::create(makeWeakPtr(*this), type##TrackIndex, stream); \
auto trackId = track->id(); \
if (!type##TrackIndex) { \
m_wanted##Type##StreamId = trackId; \
m_requested##Type##StreamId = trackId; \
track->setActive(true); \
} \
type##TrackIndex++; \
m_##type##Tracks.add(trackId, track); \
m_player->add##Type##Track(*track); \
} \
} G_STMT_END

for (unsigned i = 0; i < length; i++) {
auto* stream = gst_stream_collection_get_stream(streamCollection.get(), i);
String streamId(gst_stream_get_stream_id(stream));
auto type = gst_stream_get_stream_type(stream);

GST_DEBUG_OBJECT(pipeline(), "Inspecting %s track with ID %s", gst_stream_type_get_name(type), streamId.utf8().data());
if ((type & GST_STREAM_TYPE_AUDIO && m_audioTracks.contains(streamId)) || (type & GST_STREAM_TYPE_VIDEO && m_videoTracks.contains(streamId))
|| (type & GST_STREAM_TYPE_TEXT && m_textTracks.contains(streamId)))
{
GST_DEBUG_OBJECT(pipeline(), "%s track with ID %s already exists, skipping", gst_stream_type_get_name(type), streamId.utf8().data());
continue;
}

if (type & GST_STREAM_TYPE_AUDIO)
CREATE_TRACK(audio, Audio);
@@ -1538,17 +1526,6 @@ void MediaPlayerPrivateGStreamer::updateTracks(GRefPtr<GstStreamCollection>&& st
GST_WARNING("Unknown track type found for stream %s", streamId.utf8().data());
}

#define REMOVE_ORPHANED_TRACKS(type, Type) \
for (auto& trackId : orphaned##Type##TrackIds) { \
auto iter = m_##type##Tracks.find(trackId); \
m_player->remove##Type##Track(*iter->value); \
m_##type##Tracks.remove(iter->key); \
}

REMOVE_ORPHANED_TRACKS(audio, Audio);
REMOVE_ORPHANED_TRACKS(video, Video);
REMOVE_ORPHANED_TRACKS(text, Text);

if (oldHasVideo != m_hasVideo || oldHasAudio != m_hasAudio)
m_player->characteristicChanged();

@@ -1574,52 +1551,37 @@ void MediaPlayerPrivateGStreamer::setPipeline(GstElement* pipeline)
}

m_pipeline = pipeline;

GRefPtr<GstBus> bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
gst_bus_set_sync_handler(bus.get(), [](GstBus*, GstMessage* message, gpointer userData) {
auto& player = *static_cast<MediaPlayerPrivateGStreamer*>(userData);

if (player.handleSyncMessage(message)) {
gst_message_unref(message);
return GST_BUS_DROP;
}

return GST_BUS_PASS;
}, this, nullptr);
}

bool MediaPlayerPrivateGStreamer::handleSyncMessage(GstMessage* message)
void MediaPlayerPrivateGStreamer::handleStreamCollectionMessage(GstMessage* message)
{
if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_STREAM_COLLECTION && !m_isLegacyPlaybin) {
GRefPtr<GstStreamCollection> collection;
gst_message_parse_stream_collection(message, &collection.outPtr());
#ifndef GST_DISABLE_DEBUG
GST_DEBUG_OBJECT(pipeline(), "Received STREAM_COLLECTION message with upstream id \"%s\" defining the following streams:", gst_stream_collection_get_upstream_id(collection.get()));
unsigned numStreams = gst_stream_collection_get_size(collection.get());
for (unsigned i = 0; i < numStreams; i++) {
GstStream* stream = gst_stream_collection_get_stream(collection.get(), i);
GST_DEBUG_OBJECT(pipeline(), "#%u %s %s", i, gst_stream_type_get_name(gst_stream_get_stream_type(stream)), gst_stream_get_stream_id(stream));
}
#endif
if (m_isLegacyPlaybin)
return;

if (collection) {
m_notifier->notify(MainThreadNotification::StreamCollectionChanged, [this, collection = WTFMove(collection)]() mutable {
this->updateTracks(WTFMove(collection));
});
}
ASSERT(GST_MESSAGE_TYPE(message) == GST_MESSAGE_STREAM_COLLECTION);
GRefPtr<GstStreamCollection> collection;
gst_message_parse_stream_collection(message, &collection.outPtr());
#ifndef GST_DISABLE_DEBUG
} else if (GST_MESSAGE_TYPE(message) == GST_MESSAGE_STREAMS_SELECTED && !m_isLegacyPlaybin) {
GST_DEBUG_OBJECT(pipeline(), "Received STREAMS_SELECTED message selecting the following streams:");
unsigned numStreams = gst_message_streams_selected_get_size(message);
for (unsigned i = 0; i < numStreams; i++) {
GstStream* stream = gst_message_streams_selected_get_stream(message, i);
GST_DEBUG_OBJECT(pipeline(), "#%u %s %s", i, gst_stream_type_get_name(gst_stream_get_stream_type(stream)), gst_stream_get_stream_id(stream));
}
#endif
GST_DEBUG_OBJECT(pipeline(), "Received STREAM_COLLECTION message with upstream id \"%s\" defining the following streams:", gst_stream_collection_get_upstream_id(collection.get()));
unsigned numStreams = gst_stream_collection_get_size(collection.get());
for (unsigned i = 0; i < numStreams; i++) {
GstStream* stream = gst_stream_collection_get_stream(collection.get(), i);
GST_DEBUG_OBJECT(pipeline(), "#%u %s %s", i, gst_stream_type_get_name(gst_stream_get_stream_type(stream)), gst_stream_get_stream_id(stream));
}
#endif

if (GST_MESSAGE_TYPE(message) != GST_MESSAGE_NEED_CONTEXT)
return false;
if (!collection)
return;

callOnMainThreadAndWait([player = makeWeakPtr(*this), collection = WTFMove(collection)] {
if (player)
player->updateTracks(collection);
});
}

bool MediaPlayerPrivateGStreamer::handleNeedContextMessage(GstMessage* message)
{
ASSERT(GST_MESSAGE_TYPE(message) == GST_MESSAGE_NEED_CONTEXT);

const gchar* contextType;
if (!gst_message_parse_context_type(message, &contextType))
@@ -1846,7 +1808,6 @@ void MediaPlayerPrivateGStreamer::handleMessage(GstMessage* message)
case GST_MESSAGE_STATE_CHANGED: {
if (!messageSourceIsPlaybin || m_isDelayingLoad)
break;
updateStates();

// Construct a filename for the graphviz dot file output.
GstState newState;
@@ -1857,6 +1818,7 @@ void MediaPlayerPrivateGStreamer::handleMessage(GstMessage* message)

if (!m_isLegacyPlaybin && currentState == GST_STATE_PAUSED && newState == GST_STATE_PLAYING)
playbin3SendSelectStreamsIfAppropriate();
updateStates();

break;
}
@@ -1989,7 +1951,15 @@ void MediaPlayerPrivateGStreamer::handleMessage(GstMessage* message)
if (m_isLegacyPlaybin)
break;

GST_DEBUG_OBJECT(m_pipeline.get(), "Received STREAMS_SELECTED, setting m_waitingForStreamsSelectedEvent to false.");
#ifndef GST_DISABLE_DEBUG
GST_DEBUG_OBJECT(m_pipeline.get(), "Received STREAMS_SELECTED message selecting the following streams:");
unsigned numStreams = gst_message_streams_selected_get_size(message);
for (unsigned i = 0; i < numStreams; i++) {
GstStream* stream = gst_message_streams_selected_get_stream(message, i);
GST_DEBUG_OBJECT(pipeline(), "#%u %s %s", i, gst_stream_type_get_name(gst_stream_get_stream_type(stream)), gst_stream_get_stream_id(stream));
}
#endif
GST_DEBUG_OBJECT(m_pipeline.get(), "Setting m_waitingForStreamsSelectedEvent to false.");
m_waitingForStreamsSelectedEvent = false;

// Unfortunately, STREAMS_SELECTED messages from playbin3 are highly unreliable, often only including the audio
@@ -2004,6 +1974,9 @@ void MediaPlayerPrivateGStreamer::handleMessage(GstMessage* message)
playbin3SendSelectStreamsIfAppropriate();
break;
}
case GST_MESSAGE_STREAM_COLLECTION:
handleStreamCollectionMessage(message);
break;
default:
GST_DEBUG_OBJECT(pipeline(), "Unhandled GStreamer message type: %s", GST_MESSAGE_TYPE_NAME(message));
break;
@@ -2730,9 +2703,15 @@ void MediaPlayerPrivateGStreamer::createGSTPlayBin(const URL& url, const String&
setPlaybackFlags(pipeline());

// Let also other listeners subscribe to (application) messages in this bus.
GRefPtr<GstBus> bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
auto bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
gst_bus_enable_sync_message_emission(bus.get());
gst_bus_add_signal_watch_full(bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
g_signal_connect(bus.get(), "message", G_CALLBACK(busMessageCallback), this);
g_signal_connect_swapped(bus.get(), "message", G_CALLBACK(+[](MediaPlayerPrivateGStreamer* player, GstMessage* message) {
player->handleMessage(message);
}), this);
g_signal_connect_swapped(bus.get(), "sync-message::need-context", G_CALLBACK(+[](MediaPlayerPrivateGStreamer* player, GstMessage* message) {
player->handleNeedContextMessage(message);
}), this);

g_object_set(m_pipeline.get(), "mute", m_player->muted(), nullptr);

@@ -214,8 +214,10 @@ class MediaPlayerPrivateGStreamer : public MediaPlayerPrivateInterface
void playbin3SendSelectStreamsIfAppropriate();

// Append pipeline interface
// FIXME: Use the client interface pattern, AppendPipeline does not need the full interface to this class just for these two functions.
bool handleSyncMessage(GstMessage*);
// FIXME: Use the client interface pattern, AppendPipeline does not need the full interface to this class just for this function.
bool handleNeedContextMessage(GstMessage*);

void handleStreamCollectionMessage(GstMessage*);
void handleMessage(GstMessage*);

void triggerRepaint(GstSample*);
@@ -443,7 +445,7 @@ class MediaPlayerPrivateGStreamer : public MediaPlayerPrivateInterface
void setPlaybinURL(const URL& urlString);
void loadFull(const String& url, const String& pipelineName);

void updateTracks(GRefPtr<GstStreamCollection>&&);
void updateTracks(const GRefPtr<GstStreamCollection>&);
void videoSinkCapsChanged(GstPad*);
void updateVideoSizeAndOrientationFromCaps(const GstCaps*);

@@ -329,12 +329,8 @@ GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo*

void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
{
const gchar* contextType = nullptr;
gst_message_parse_context_type(message, &contextType);
GST_TRACE("context type: %s", contextType);

// MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
m_playerPrivate->handleSyncMessage(message);
m_playerPrivate->handleNeedContextMessage(message);
}

void AppendPipeline::handleStateChangeMessage(GstMessage* message)

0 comments on commit 65db70e

Please sign in to comment.