Skip to content

Commit

Permalink
[GStreamer][WebRTC] Tighten EndPoint pipeline with playback pipeline
Browse files Browse the repository at this point in the history
https://bugs.webkit.org/show_bug.cgi?id=256041

Reviewed by Xabier Rodriguez-Calvar.

The RTP depayloaders and parsers are now wrapped by the RealtimeIncomingSourceGStreamer sub-classes,
using parsebin. The relationship between the incoming source appsink and the mediastreamsrc appsrc
elements is also stronger, latency is now properly propagated and queries are relayed. This all
helps improving lip-sync of incoming WebRTC tracks.

* Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.cpp:
(WebCore::MediaPlayerPrivateGStreamer::configureElement):
(WebCore::MediaPlayerPrivateGStreamer::configureDepayloader): Deleted.
* Source/WebCore/platform/graphics/gstreamer/MediaPlayerPrivateGStreamer.h:
* Source/WebCore/platform/mediastream/gstreamer/GStreamerMediaStreamSource.cpp:
(webkitMediaStreamSrcAddTrack):
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingSourceGStreamer.cpp:
(WebCore::RealtimeIncomingSourceGStreamer::RealtimeIncomingSourceGStreamer):
(WebCore::RealtimeIncomingSourceGStreamer::registerClient):
(WebCore::RealtimeIncomingSourceGStreamer::unregisterClient):
(WebCore::RealtimeIncomingSourceGStreamer::handleUpstreamEvent):
(WebCore::RealtimeIncomingSourceGStreamer::handleUpstreamQuery):
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingSourceGStreamer.h:
(WebCore::RealtimeIncomingSourceGStreamer::bin):

Canonical link: https://commits.webkit.org/263499@main
  • Loading branch information
philn committed Apr 28, 2023
1 parent 2dc265c commit 72fe397
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2169,11 +2169,6 @@ void MediaPlayerPrivateGStreamer::configureElement(GstElement* element)
return;
}

if (classifiers.contains("Depayloader"_s)) {
configureDepayloader(element);
return;
}

if (isMediaStreamPlayer())
return;

Expand Down Expand Up @@ -2932,17 +2927,6 @@ void MediaPlayerPrivateGStreamer::createGSTPlayBin(const URL& url)
#endif
}

void MediaPlayerPrivateGStreamer::configureDepayloader(GstElement* depayloader)
{
if (!isMediaStreamPlayer())
return;

if (gstObjectHasProperty(depayloader, "request-keyframe"))
g_object_set(depayloader, "request-keyframe", TRUE, nullptr);
if (gstObjectHasProperty(depayloader, "wait-for-keyframe"))
g_object_set(depayloader, "wait-for-keyframe", TRUE, nullptr);
}

void MediaPlayerPrivateGStreamer::configureVideoDecoder(GstElement* decoder)
{
GUniquePtr<char> name(gst_element_get_name(decoder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@ class MediaPlayerPrivateGStreamer : public MediaPlayerPrivateInterface
void configureDownloadBuffer(GstElement*);
static void downloadBufferFileCreatedCallback(MediaPlayerPrivateGStreamer*);

void configureDepayloader(GstElement*);
void configureVideoDecoder(GstElement*);
void configureElement(GstElement*);
#if PLATFORM(BROADCOM) || USE(WESTEROS_SINK) || PLATFORM(AMLOGIC) || PLATFORM(REALTEK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
using namespace WebCore;

static GstStaticPadTemplate videoSrcTemplate = GST_STATIC_PAD_TEMPLATE("video_src%u", GST_PAD_SRC, GST_PAD_SOMETIMES,
GST_STATIC_CAPS("video/x-raw;video/x-h264;video/x-vp8;video/x-vp9;application/x-rtp, media=(string)video"));
GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate audioSrcTemplate = GST_STATIC_PAD_TEMPLATE("audio_src%u", GST_PAD_SRC, GST_PAD_SOMETIMES,
GST_STATIC_CAPS("audio/x-raw(ANY);application/x-rtp, media=(string)audio"));
GST_STATIC_CAPS_ANY);

GST_DEBUG_CATEGORY_STATIC(webkitMediaStreamSrcDebug);
#define GST_CAT_DEFAULT webkitMediaStreamSrcDebug
Expand Down Expand Up @@ -152,18 +152,38 @@ class InternalSource final : public MediaStreamTrackPrivate::Observer,
}), this);

#if USE(GSTREAMER_WEBRTC)
auto pad = adoptGRef(gst_element_get_static_pad(m_src.get(), "src"));
gst_pad_add_probe(pad.get(), GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, reinterpret_cast<GstPadProbeCallback>(+[](GstPad*, GstPadProbeInfo* info, InternalSource* internalSource) -> GstPadProbeReturn {
auto& trackSource = internalSource->m_track.source();
auto& trackSource = m_track.source();
if (trackSource.isIncomingAudioSource() || trackSource.isIncomingVideoSource()) {

if (trackSource.isIncomingAudioSource()) {
auto& source = static_cast<RealtimeIncomingAudioSourceGStreamer&>(trackSource);
source.handleUpstreamEvent(GRefPtr<GstEvent>(GST_PAD_PROBE_INFO_EVENT(info)));
m_webrtcSourceClientId = source.registerClient(GRefPtr<GstElement>(m_src));
} else if (trackSource.isIncomingVideoSource()) {
auto& source = static_cast<RealtimeIncomingVideoSourceGStreamer&>(trackSource);
source.handleUpstreamEvent(GRefPtr<GstEvent>(GST_PAD_PROBE_INFO_EVENT(info)));
m_webrtcSourceClientId = source.registerClient(GRefPtr<GstElement>(m_src));
}
return GST_PAD_PROBE_OK;
}), this, nullptr);

auto pad = adoptGRef(gst_element_get_static_pad(m_src.get(), "src"));
gst_pad_add_probe(pad.get(), static_cast<GstPadProbeType>(GST_PAD_PROBE_TYPE_EVENT_UPSTREAM | GST_PAD_PROBE_TYPE_QUERY_UPSTREAM), reinterpret_cast<GstPadProbeCallback>(+[](GstPad*, GstPadProbeInfo* info, InternalSource* internalSource) -> GstPadProbeReturn {
auto& trackSource = internalSource->m_track.source();
ASSERT(internalSource->m_webrtcSourceClientId.has_value());
auto clientId = internalSource->m_webrtcSourceClientId.value();
if (trackSource.isIncomingAudioSource()) {
auto& source = static_cast<RealtimeIncomingAudioSourceGStreamer&>(trackSource);
if (GST_IS_EVENT(info->data))
source.handleUpstreamEvent(GRefPtr<GstEvent>(GST_PAD_PROBE_INFO_EVENT(info)), clientId);
else if (source.handleUpstreamQuery(GST_PAD_PROBE_INFO_QUERY(info), clientId))
return GST_PAD_PROBE_HANDLED;
} else if (trackSource.isIncomingVideoSource()) {
auto& source = static_cast<RealtimeIncomingVideoSourceGStreamer&>(trackSource);
if (GST_IS_EVENT(info->data))
source.handleUpstreamEvent(GRefPtr<GstEvent>(GST_PAD_PROBE_INFO_EVENT(info)), clientId);
else if (source.handleUpstreamQuery(GST_PAD_PROBE_INFO_QUERY(info), clientId))
return GST_PAD_PROBE_HANDLED;
}
return GST_PAD_PROBE_OK;
}), this, nullptr);
}
#endif
}

Expand All @@ -176,6 +196,20 @@ class InternalSource final : public MediaStreamTrackPrivate::Observer,

if (m_src)
g_signal_handlers_disconnect_matched(m_src.get(), G_SIGNAL_MATCH_DATA, 0, 0, nullptr, nullptr, this);

#if USE(GSTREAMER_WEBRTC)
if (!m_webrtcSourceClientId)
return;

auto& trackSource = m_track.source();
if (trackSource.isIncomingAudioSource()) {
auto& source = static_cast<RealtimeIncomingAudioSourceGStreamer&>(trackSource);
source.unregisterClient(*m_webrtcSourceClientId);
} else if (trackSource.isIncomingVideoSource()) {
auto& source = static_cast<RealtimeIncomingVideoSourceGStreamer&>(trackSource);
source.unregisterClient(*m_webrtcSourceClientId);
}
#endif
}

const MediaStreamTrackPrivate& track() const { return m_track; }
Expand Down Expand Up @@ -482,6 +516,7 @@ class InternalSource final : public MediaStreamTrackPrivate::Observer,
Condition m_eosCondition;
Lock m_eosLock;
bool m_eosPending WTF_GUARDED_BY_LOCK(m_eosLock) { false };
std::optional<int> m_webrtcSourceClientId;
};

struct _WebKitMediaStreamSrcPrivate {
Expand Down Expand Up @@ -900,16 +935,6 @@ void webkitMediaStreamSrcAddTrack(WebKitMediaStreamSrc* self, MediaStreamTrackPr
counter = self->priv->videoPadCounter.exchangeAdd(1);
}

#if USE(GSTREAMER_WEBRTC)
if (track->source().isIncomingAudioSource()) {
auto& source = static_cast<RealtimeIncomingAudioSourceGStreamer&>(track->source());
source.registerClient();
} else if (track->source().isIncomingVideoSource()) {
auto& source = static_cast<RealtimeIncomingVideoSourceGStreamer&>(track->source());
source.registerClient();
}
#endif

GST_DEBUG_OBJECT(self, "Setup %s source for track %s, only track: %s", sourceType, track->id().utf8().data(), boolForPrinting(onlyTrack));

auto padName = makeString(sourceType, "_src", counter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,34 @@ RealtimeIncomingSourceGStreamer::RealtimeIncomingSourceGStreamer(const CaptureDe
m_bin = gst_bin_new(nullptr);
m_valve = gst_element_factory_make("valve", nullptr);
m_tee = gst_element_factory_make("tee", nullptr);
g_object_set(m_tee.get(), "allow-not-linked", true, nullptr);
g_object_set(m_tee.get(), "allow-not-linked", TRUE, nullptr);

auto* parsebin = makeGStreamerElement("parsebin", nullptr);

g_signal_connect(parsebin, "element-added", G_CALLBACK(+[](GstBin*, GstElement* element, gpointer) {
auto elementClass = makeString(gst_element_get_metadata(element, GST_ELEMENT_METADATA_KLASS));
auto classifiers = elementClass.split('/');
if (!classifiers.contains("Depayloader"_s))
return;

if (gstObjectHasProperty(element, "request-keyframe"))
g_object_set(element, "request-keyframe", TRUE, nullptr);
if (gstObjectHasProperty(element, "wait-for-keyframe"))
g_object_set(element, "wait-for-keyframe", TRUE, nullptr);
g_object_set(element, "auto-header-extension", FALSE, nullptr);
}),
nullptr);

g_signal_connect_swapped(parsebin, "pad-added", G_CALLBACK(+[](RealtimeIncomingSourceGStreamer* source, GstPad* pad) {
auto sinkPad = adoptGRef(gst_element_get_static_pad(source->m_tee.get(), "sink"));
gst_pad_link(pad, sinkPad.get());

gst_bin_sync_children_states(GST_BIN_CAST(source->m_bin.get()));
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN_CAST(source->m_bin.get()), GST_DEBUG_GRAPH_SHOW_ALL, GST_OBJECT_NAME(source->m_bin.get()));
}), this);

gst_bin_add_many(GST_BIN_CAST(m_bin.get()), m_valve.get(), m_tee.get(), nullptr);
gst_element_link(m_valve.get(), m_tee.get());
gst_bin_add_many(GST_BIN_CAST(m_bin.get()), m_valve.get(), parsebin, m_tee.get(), nullptr);
gst_element_link(m_valve.get(), parsebin);

auto sinkPad = adoptGRef(gst_element_get_static_pad(m_valve.get(), "sink"));
gst_element_add_pad(m_bin.get(), gst_ghost_pad_new("sink", sinkPad.get()));
Expand All @@ -69,52 +93,125 @@ const RealtimeMediaSourceCapabilities& RealtimeIncomingSourceGStreamer::capabili
return RealtimeMediaSourceCapabilities::emptyCapabilities();
}

void RealtimeIncomingSourceGStreamer::registerClient()
int RealtimeIncomingSourceGStreamer::registerClient(GRefPtr<GstElement>&& appsrc)
{
GST_DEBUG_OBJECT(m_bin.get(), "Registering new client");
auto* queue = gst_element_factory_make("queue", nullptr);
auto* sink = makeGStreamerElement("appsink", nullptr);
g_object_set(sink, "enable-last-sample", FALSE, "emit-signals", TRUE, "sync", FALSE, nullptr);
g_signal_connect_swapped(sink, "new-sample", G_CALLBACK(+[](RealtimeIncomingSourceGStreamer* self, GstElement* sink) -> GstFlowReturn {
auto sample = adoptGRef(gst_app_sink_pull_sample(GST_APP_SINK(sink)));
self->dispatchSample(WTFMove(sample));
return GST_FLOW_OK;
}), this);

g_signal_connect_swapped(sink, "new-preroll", G_CALLBACK(+[](RealtimeIncomingSourceGStreamer* self, GstElement* sink) -> GstFlowReturn {
auto sample = adoptGRef(gst_app_sink_pull_preroll(GST_APP_SINK(sink)));
self->dispatchSample(WTFMove(sample));
return GST_FLOW_OK;
}), this);

g_signal_connect_swapped(sink, "new-serialized-event", G_CALLBACK(+[](RealtimeIncomingSourceGStreamer* self, GstElement* sink) -> gboolean {
auto event = adoptGRef(GST_EVENT_CAST(gst_app_sink_pull_object(GST_APP_SINK(sink))));
switch (GST_EVENT_TYPE(event.get())) {
case GST_EVENT_STREAM_START:
case GST_EVENT_CAPS:
static Atomic<int> counter = 1;
auto clientId = counter.exchangeAdd(1);

auto* queue = gst_element_factory_make("queue", makeString("queue-"_s, clientId).ascii().data());
auto* sink = makeGStreamerElement("appsink", makeString("sink-"_s, clientId).ascii().data());
g_object_set(sink, "enable-last-sample", FALSE, nullptr);

if (!m_clientQuark)
m_clientQuark = g_quark_from_static_string("client-id");
g_object_set_qdata(G_OBJECT(sink), m_clientQuark, GINT_TO_POINTER(clientId));
GST_DEBUG_OBJECT(m_bin.get(), "Client %" GST_PTR_FORMAT " associated to new sink %" GST_PTR_FORMAT, appsrc.get(), sink);
m_clients.add(clientId, WTFMove(appsrc));

static GstAppSinkCallbacks callbacks = {
nullptr, // eos
[](GstAppSink* sink, gpointer userData) -> GstFlowReturn {
auto* self = reinterpret_cast<RealtimeIncomingSourceGStreamer*>(userData);
auto sample = adoptGRef(gst_app_sink_pull_preroll(sink));
self->dispatchSample(WTFMove(sample));
return GST_FLOW_OK;
},
[](GstAppSink* sink, gpointer userData) -> GstFlowReturn {
auto* self = reinterpret_cast<RealtimeIncomingSourceGStreamer*>(userData);
auto sample = adoptGRef(gst_app_sink_pull_sample(sink));
self->dispatchSample(WTFMove(sample));
return GST_FLOW_OK;
},
[](GstAppSink* sink, gpointer userData) -> gboolean {
auto* self = reinterpret_cast<RealtimeIncomingSourceGStreamer*>(userData);
auto event = adoptGRef(GST_EVENT_CAST(gst_app_sink_pull_object(sink)));
switch (GST_EVENT_TYPE(event.get())) {
case GST_EVENT_STREAM_START:
case GST_EVENT_CAPS:
case GST_EVENT_SEGMENT:
return false;
case GST_EVENT_LATENCY: {
GstClockTime minLatency, maxLatency;
if (gst_base_sink_query_latency(GST_BASE_SINK(sink), nullptr, nullptr, &minLatency, &maxLatency)) {
if (int clientId = GPOINTER_TO_INT(g_object_get_qdata(G_OBJECT(sink), self->m_clientQuark))) {
GST_DEBUG_OBJECT(sink, "Setting client latency to min %" GST_TIME_FORMAT " max %" GST_TIME_FORMAT, GST_TIME_ARGS(minLatency), GST_TIME_ARGS(maxLatency));
auto appsrc = self->m_clients.get(clientId);
g_object_set(appsrc, "min-latency", minLatency, "max-latency", maxLatency, nullptr);
}
}
return false;
}
default:
break;
}
self->handleDownstreamEvent(WTFMove(event));
return false;
default:
break;
}
self->handleDownstreamEvent(WTFMove(event));
return true;
}), this);
},
#if GST_CHECK_VERSION(1, 23, 0)
// propose_allocation
nullptr,
#endif
{ nullptr }
};
gst_app_sink_set_callbacks(GST_APP_SINK(sink), &callbacks, this, nullptr);

auto sinkPad = adoptGRef(gst_element_get_static_pad(sink, "sink"));
gst_pad_add_probe(sinkPad.get(), GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(+[](GstPad* pad, GstPadProbeInfo* info, RealtimeIncomingSourceGStreamer* self) -> GstPadProbeReturn {
auto sink = adoptGRef(gst_pad_get_parent_element(pad));
int clientId = GPOINTER_TO_INT(g_object_get_qdata(G_OBJECT(sink.get()), self->m_clientQuark));
if (!clientId)
return GST_PAD_PROBE_OK;

auto appsrc = self->m_clients.get(clientId);
auto srcSrcPad = adoptGRef(gst_element_get_static_pad(appsrc, "src"));
if (gst_pad_peer_query(srcSrcPad.get(), GST_QUERY_CAST(info->data)))
return GST_PAD_PROBE_HANDLED;

return GST_PAD_PROBE_OK;
}), this, nullptr);

gst_bin_add_many(GST_BIN_CAST(m_bin.get()), queue, sink, nullptr);
gst_element_link_many(m_tee.get(), queue, sink, nullptr);
gst_element_sync_state_with_parent(queue);
gst_element_sync_state_with_parent(sink);

GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN_CAST(m_bin.get()), GST_DEBUG_GRAPH_SHOW_ALL, GST_OBJECT_NAME(m_bin.get()));
return clientId;
}

void RealtimeIncomingSourceGStreamer::handleUpstreamEvent(GRefPtr<GstEvent>&& event)
void RealtimeIncomingSourceGStreamer::unregisterClient(int clientId)
{
GST_DEBUG_OBJECT(m_bin.get(), "Unregistering client %d", clientId);
auto sink = adoptGRef(gst_bin_get_by_name(GST_BIN_CAST(m_bin.get()), makeString("sink-", clientId).ascii().data()));
auto queue = adoptGRef(gst_bin_get_by_name(GST_BIN_CAST(m_bin.get()), makeString("queue-", clientId).ascii().data()));
auto queueSinkPad = adoptGRef(gst_element_get_static_pad(queue.get(), "sink"));
auto teeSrcPad = adoptGRef(gst_pad_get_peer(queueSinkPad.get()));

gst_element_set_locked_state(m_bin.get(), TRUE);
gst_element_set_state(queue.get(), GST_STATE_NULL);
gst_element_set_state(sink.get(), GST_STATE_NULL);
gst_element_unlink_many(m_tee.get(), queue.get(), sink.get(), nullptr);
gst_bin_remove_many(GST_BIN_CAST(m_bin.get()), queue.get(), sink.get(), nullptr);
gst_element_release_request_pad(m_tee.get(), teeSrcPad.get());
gst_element_set_locked_state(m_bin.get(), FALSE);
m_clients.remove(clientId);
}

void RealtimeIncomingSourceGStreamer::handleUpstreamEvent(GRefPtr<GstEvent>&& event, int clientId)
{
GST_DEBUG_OBJECT(m_bin.get(), "Handling %" GST_PTR_FORMAT, event.get());
auto pad = adoptGRef(gst_element_get_static_pad(m_tee.get(), "sink"));
auto sink = adoptGRef(gst_bin_get_by_name(GST_BIN_CAST(m_bin.get()), makeString("sink-", clientId).ascii().data()));
auto pad = adoptGRef(gst_element_get_static_pad(sink.get(), "sink"));
gst_pad_push_event(pad.get(), event.leakRef());
}

bool RealtimeIncomingSourceGStreamer::handleUpstreamQuery(GstQuery* query, int clientId)
{
auto sink = adoptGRef(gst_bin_get_by_name(GST_BIN_CAST(m_bin.get()), makeString("sink-", clientId).ascii().data()));
auto pad = adoptGRef(gst_element_get_static_pad(sink.get(), "sink"));
return gst_pad_peer_query(pad.get(), query);
}

void RealtimeIncomingSourceGStreamer::handleDownstreamEvent(GRefPtr<GstEvent>&& event)
{
GST_DEBUG_OBJECT(bin(), "Handling %" GST_PTR_FORMAT, event.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ namespace WebCore {
class RealtimeIncomingSourceGStreamer : public RealtimeMediaSource {
public:
GstElement* bin() { return m_bin.get(); }
void registerClient();

void handleUpstreamEvent(GRefPtr<GstEvent>&&);
int registerClient(GRefPtr<GstElement>&&);
void unregisterClient(int);

void handleUpstreamEvent(GRefPtr<GstEvent>&&, int clientId);
bool handleUpstreamQuery(GstQuery*, int clientId);

protected:
RealtimeIncomingSourceGStreamer(const CaptureDevice&);
Expand All @@ -48,6 +51,8 @@ class RealtimeIncomingSourceGStreamer : public RealtimeMediaSource {
GRefPtr<GstElement> m_bin;
GRefPtr<GstElement> m_valve;
GRefPtr<GstElement> m_tee;
GQuark m_clientQuark { 0 };
HashMap<int, GRefPtr<GstElement>> m_clients;
};

} // namespace WebCore
Expand Down

0 comments on commit 72fe397

Please sign in to comment.