Skip to content

Commit

Permalink
Merge 256149@main - [GStreamer][WebRTC] Events forwarding between end…
Browse files Browse the repository at this point in the history
…-point and its consumers

https://bugs.webkit.org/show_bug.cgi?id=247126

Reviewed by Xabier Rodriguez-Calvar.

We need to relay upstream events coming from depayloaders to webrtcbin and we also need to relay
downstream events from webrtcbin to the incoming media sources. This should help improving RTP
retransmission, for instance.

RealtimeIncomingSourceGStreamer now directly inherits from RealtimeMediaSource in order to avoid
duplication of the downstream event handling in each sub-class.

* Source/WebCore/platform/mediastream/RealtimeMediaSource.h:
* Source/WebCore/platform/mediastream/gstreamer/GStreamerMediaStreamSource.cpp:
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingAudioSourceGStreamer.cpp:
(WebCore::RealtimeIncomingAudioSourceGStreamer::RealtimeIncomingAudioSourceGStreamer):
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingAudioSourceGStreamer.h:
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingSourceGStreamer.cpp:
(WebCore::RealtimeIncomingSourceGStreamer::RealtimeIncomingSourceGStreamer):
(WebCore::RealtimeIncomingSourceGStreamer::registerClient):
(WebCore::RealtimeIncomingSourceGStreamer::handleUpstreamEvent):
(WebCore::RealtimeIncomingSourceGStreamer::handleDownstreamEvent):
* Source/WebCore/platform/mediastream/gstreamer/RealtimeIncomingSourceGStreamer.h:

Canonical link: https://commits.webkit.org/256149@main

(cherry picked from commit e6b4012)
  • Loading branch information
philn authored and carlosgcampos committed Nov 3, 2022
1 parent e1799c4 commit fb4db1a
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 49 deletions.
8 changes: 8 additions & 0 deletions Source/WebCore/platform/mediastream/RealtimeMediaSource.h
Expand Up @@ -51,6 +51,10 @@
#include <wtf/WeakHashSet.h>
#include <wtf/text/WTFString.h>

#if USE(GSTREAMER)
typedef struct _GstEvent GstEvent;
#endif

namespace WTF {
class MediaTime;
}
Expand Down Expand Up @@ -90,6 +94,10 @@ class WEBCORE_EXPORT RealtimeMediaSource
virtual bool preventSourceFromStopping() { return false; }

virtual void hasStartedProducingData() { }

#if USE(GSTREAMER)
virtual void handleDownstreamEvent(GRefPtr<GstEvent>&&) { }
#endif
};
class AudioSampleObserver {
public:
Expand Down
Expand Up @@ -127,6 +127,7 @@ class WebKitMediaStreamObserver : public MediaStreamPrivate::Observer {
static void webkitMediaStreamSrcEnsureStreamCollectionPosted(WebKitMediaStreamSrc*);

class InternalSource final : public MediaStreamTrackPrivate::Observer,
public RealtimeMediaSource::Observer,
public RealtimeMediaSource::AudioSampleObserver,
public RealtimeMediaSource::VideoFrameObserver {
WTF_MAKE_FAST_ALLOCATED;
Expand Down Expand Up @@ -162,6 +163,21 @@ class InternalSource final : public MediaStreamTrackPrivate::Observer,
g_signal_connect(m_src.get(), "need-data", G_CALLBACK(+[](GstElement*, unsigned, InternalSource* data) {
data->m_enoughData = false;
}), this);

#if USE(GSTREAMER_WEBRTC)
auto pad = adoptGRef(gst_element_get_static_pad(m_src.get(), "src"));
gst_pad_add_probe(pad.get(), GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, reinterpret_cast<GstPadProbeCallback>(+[](GstPad*, GstPadProbeInfo* info, InternalSource* internalSource) -> GstPadProbeReturn {
auto& trackSource = internalSource->m_track.source();
if (trackSource.isIncomingAudioSource()) {
auto& source = static_cast<RealtimeIncomingAudioSourceGStreamer&>(trackSource);
source.handleUpstreamEvent(GRefPtr<GstEvent>(GST_PAD_PROBE_INFO_EVENT(info)));
} else if (trackSource.isIncomingVideoSource()) {
auto& source = static_cast<RealtimeIncomingVideoSourceGStreamer&>(trackSource);
source.handleUpstreamEvent(GRefPtr<GstEvent>(GST_PAD_PROBE_INFO_EVENT(info)));
}
return GST_PAD_PROBE_OK;
}), this, nullptr);
#endif
}

virtual ~InternalSource()
Expand Down Expand Up @@ -297,6 +313,12 @@ class InternalSource final : public MediaStreamTrackPrivate::Observer,
}
}

void handleDownstreamEvent(GRefPtr<GstEvent>&& event) final
{
auto pad = adoptGRef(gst_element_get_static_pad(m_src.get(), "src"));
gst_pad_push_event(pad.get(), event.leakRef());
}

void videoFrameAvailable(VideoFrame& videoFrame, VideoFrameTimeMetadata) final
{
if (!m_parent || !m_isObserving)
Expand Down
Expand Up @@ -31,8 +31,7 @@ GST_DEBUG_CATEGORY_EXTERN(webkit_webrtc_endpoint_debug);
namespace WebCore {

RealtimeIncomingAudioSourceGStreamer::RealtimeIncomingAudioSourceGStreamer(AtomString&& audioTrackId)
: RealtimeMediaSource(RealtimeMediaSource::Type::Audio, WTFMove(audioTrackId))
, RealtimeIncomingSourceGStreamer()
: RealtimeIncomingSourceGStreamer(RealtimeMediaSource::Type::Audio, WTFMove(audioTrackId))
{
static Atomic<uint64_t> sourceCounter = 0;
gst_element_set_name(bin(), makeString("incoming-audio-source-", sourceCounter.exchangeAdd(1)).ascii().data());
Expand All @@ -45,21 +44,6 @@ RealtimeIncomingAudioSourceGStreamer::~RealtimeIncomingAudioSourceGStreamer()
stop();
}

void RealtimeIncomingAudioSourceGStreamer::startProducingData()
{
openValve();
}

void RealtimeIncomingAudioSourceGStreamer::stopProducingData()
{
closeValve();
}

const RealtimeMediaSourceCapabilities& RealtimeIncomingAudioSourceGStreamer::capabilities()
{
return RealtimeMediaSourceCapabilities::emptyCapabilities();
}

const RealtimeMediaSourceSettings& RealtimeIncomingAudioSourceGStreamer::settings()
{
return m_currentSettings;
Expand Down
Expand Up @@ -22,11 +22,10 @@
#if USE(GSTREAMER_WEBRTC)

#include "RealtimeIncomingSourceGStreamer.h"
#include "RealtimeMediaSource.h"

namespace WebCore {

class RealtimeIncomingAudioSourceGStreamer : public RealtimeMediaSource, public RealtimeIncomingSourceGStreamer {
class RealtimeIncomingAudioSourceGStreamer : public RealtimeIncomingSourceGStreamer {
public:
static Ref<RealtimeIncomingAudioSourceGStreamer> create(AtomString&& audioTrackId) { return adoptRef(*new RealtimeIncomingAudioSourceGStreamer(WTFMove(audioTrackId))); }

Expand All @@ -36,9 +35,6 @@ class RealtimeIncomingAudioSourceGStreamer : public RealtimeMediaSource, public

private:
// RealtimeMediaSource API
void startProducingData() final;
void stopProducingData() final;
const RealtimeMediaSourceCapabilities& capabilities() final;
const RealtimeMediaSourceSettings& settings() final;
bool isIncomingAudioSource() const final { return true; }

Expand Down
Expand Up @@ -31,7 +31,8 @@ GST_DEBUG_CATEGORY_EXTERN(webkit_webrtc_endpoint_debug);

namespace WebCore {

RealtimeIncomingSourceGStreamer::RealtimeIncomingSourceGStreamer()
RealtimeIncomingSourceGStreamer::RealtimeIncomingSourceGStreamer(Type type, AtomString&& name)
: RealtimeMediaSource(type, WTFMove(name))
{
m_bin = gst_bin_new(nullptr);
m_valve = gst_element_factory_make("valve", nullptr);
Expand All @@ -47,6 +48,23 @@ RealtimeIncomingSourceGStreamer::RealtimeIncomingSourceGStreamer()
gst_element_add_pad(m_bin.get(), gst_ghost_pad_new("sink", sinkPad.get()));
}

void RealtimeIncomingSourceGStreamer::startProducingData()
{
GST_DEBUG_OBJECT(bin(), "Starting data flow");
openValve();
}

void RealtimeIncomingSourceGStreamer::stopProducingData()
{
GST_DEBUG_OBJECT(bin(), "Stopping data flow");
closeValve();
}

const RealtimeMediaSourceCapabilities& RealtimeIncomingSourceGStreamer::capabilities()
{
return RealtimeMediaSourceCapabilities::emptyCapabilities();
}

void RealtimeIncomingSourceGStreamer::closeValve() const
{
GST_DEBUG_OBJECT(m_bin.get(), "Closing valve");
Expand Down Expand Up @@ -79,6 +97,19 @@ void RealtimeIncomingSourceGStreamer::registerClient()
return GST_FLOW_OK;
}), this);

g_signal_connect_swapped(sink, "new-serialized-event", G_CALLBACK(+[](RealtimeIncomingSourceGStreamer* self, GstElement* sink) -> gboolean {
auto event = adoptGRef(GST_EVENT_CAST(gst_app_sink_pull_object(GST_APP_SINK(sink))));
switch (GST_EVENT_TYPE(event.get())) {
case GST_EVENT_STREAM_START:
case GST_EVENT_CAPS:
return false;
default:
break;
}
self->handleDownstreamEvent(WTFMove(event));
return true;
}), this);

gst_bin_add_many(GST_BIN_CAST(m_bin.get()), queue, sink, nullptr);
gst_element_link_many(m_tee.get(), queue, sink, nullptr);
gst_element_sync_state_with_parent(queue);
Expand All @@ -87,6 +118,21 @@ void RealtimeIncomingSourceGStreamer::registerClient()
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN_CAST(m_bin.get()), GST_DEBUG_GRAPH_SHOW_ALL, GST_OBJECT_NAME(m_bin.get()));
}

void RealtimeIncomingSourceGStreamer::handleUpstreamEvent(GRefPtr<GstEvent>&& event)
{
GST_DEBUG_OBJECT(m_bin.get(), "Handling %" GST_PTR_FORMAT, event.get());
auto pad = adoptGRef(gst_element_get_static_pad(m_tee.get(), "sink"));
gst_pad_push_event(pad.get(), event.leakRef());
}

void RealtimeIncomingSourceGStreamer::handleDownstreamEvent(GRefPtr<GstEvent>&& event)
{
GST_DEBUG_OBJECT(bin(), "Handling %" GST_PTR_FORMAT, event.get());
forEachObserver([event = WTFMove(event)](Observer& observer) {
observer.handleDownstreamEvent(GRefPtr<GstEvent>(event.get()));
});
}

} // namespace WebCore

#endif // USE(GSTREAMER_WEBRTC)
Expand Up @@ -22,24 +22,33 @@
#if USE(GSTREAMER_WEBRTC)

#include "GRefPtrGStreamer.h"
#include "RealtimeMediaSource.h"

namespace WebCore {

class RealtimeIncomingSourceGStreamer {
class RealtimeIncomingSourceGStreamer : public RealtimeMediaSource {
public:
GstElement* bin() { return m_bin.get(); }
void registerClient();

void handleUpstreamEvent(GRefPtr<GstEvent>&&);

protected:
RealtimeIncomingSourceGStreamer();
RealtimeIncomingSourceGStreamer(Type, AtomString&& name);

void closeValve() const;
void openValve() const;

GRefPtr<GstElement> m_valve;

private:
// RealtimeMediaSource API
void startProducingData() final;
void stopProducingData() final;
const RealtimeMediaSourceCapabilities& capabilities() final;

virtual void dispatchSample(GRefPtr<GstSample>&&) { }
void handleDownstreamEvent(GRefPtr<GstEvent>&&);

GRefPtr<GstElement> m_bin;
GRefPtr<GstElement> m_tee;
Expand Down
Expand Up @@ -33,8 +33,7 @@ GST_DEBUG_CATEGORY_EXTERN(webkit_webrtc_endpoint_debug);
namespace WebCore {

RealtimeIncomingVideoSourceGStreamer::RealtimeIncomingVideoSourceGStreamer(AtomString&& videoTrackId)
: RealtimeMediaSource(RealtimeMediaSource::Type::Video, WTFMove(videoTrackId))
, RealtimeIncomingSourceGStreamer()
: RealtimeIncomingSourceGStreamer(RealtimeMediaSource::Type::Video, WTFMove(videoTrackId))
{
static Atomic<uint64_t> sourceCounter = 0;
gst_element_set_name(bin(), makeString("incoming-video-source-", sourceCounter.exchangeAdd(1)).ascii().data());
Expand Down Expand Up @@ -65,23 +64,6 @@ RealtimeIncomingVideoSourceGStreamer::RealtimeIncomingVideoSourceGStreamer(AtomS
start();
}

void RealtimeIncomingVideoSourceGStreamer::startProducingData()
{
GST_DEBUG_OBJECT(bin(), "Starting data flow");
openValve();
}

void RealtimeIncomingVideoSourceGStreamer::stopProducingData()
{
GST_DEBUG_OBJECT(bin(), "Stopping data flow");
closeValve();
}

const RealtimeMediaSourceCapabilities& RealtimeIncomingVideoSourceGStreamer::capabilities()
{
return RealtimeMediaSourceCapabilities::emptyCapabilities();
}

const RealtimeMediaSourceSettings& RealtimeIncomingVideoSourceGStreamer::settings()
{
if (m_currentSettings)
Expand Down
Expand Up @@ -22,11 +22,10 @@
#if USE(GSTREAMER_WEBRTC)

#include "RealtimeIncomingSourceGStreamer.h"
#include "RealtimeMediaSource.h"

namespace WebCore {

class RealtimeIncomingVideoSourceGStreamer : public RealtimeMediaSource, public RealtimeIncomingSourceGStreamer {
class RealtimeIncomingVideoSourceGStreamer : public RealtimeIncomingSourceGStreamer {
public:
static Ref<RealtimeIncomingVideoSourceGStreamer> create(AtomString&& videoTrackId) { return adoptRef(*new RealtimeIncomingVideoSourceGStreamer(WTFMove(videoTrackId))); }
~RealtimeIncomingVideoSourceGStreamer() = default;
Expand All @@ -36,10 +35,7 @@ class RealtimeIncomingVideoSourceGStreamer : public RealtimeMediaSource, public

private:
// RealtimeMediaSource API
void startProducingData() final;
void stopProducingData() final;
void settingsDidChange(OptionSet<RealtimeMediaSourceSettings::Flag>) final;
const RealtimeMediaSourceCapabilities& capabilities() final;
const RealtimeMediaSourceSettings& settings() final;
bool isIncomingVideoSource() const final { return true; }

Expand Down

0 comments on commit fb4db1a

Please sign in to comment.