Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GStreamer][WebRTC] Additional data-channel improvements and initial layout tests coverage #2060

Merged
merged 1 commit into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 18 additions & 6 deletions LayoutTests/platform/glib/TestExpectations
Original file line number Diff line number Diff line change
Expand Up @@ -883,16 +883,28 @@ webkit.org/b/234084 media/track/video-track-configuration.html [ Failure ]

webkit.org/b/237901 media/media-source/media-source-interruption-with-resume-allowing-play.html [ Slow ]

# DataChannel GstWebRTC implementation incomplete
webkit.org/b/235885 webrtc/datachannel [ Skip ]
webkit.org/b/235885 http/wpt/webrtc/transfer-datachannel-service-worker.https.html [ Skip ]
# DataChannel GstWebRTC implementation incomplete, missing stats support.
webkit.org/b/235885 webrtc/datachannel/datachannel-stats.html [ Skip ]
webkit.org/b/235885 webrtc/datachannel/getStats-no-prflx-remote-candidate.html [ Skip ]
webkit.org/b/235885 fast/mediastream/RTCPeerConnection-statsSelector.html [ Skip ]
webkit.org/b/235885 fast/mediastream/RTCPeerConnection-datachannel.html [ Skip ]

# Expected to pass since bug #242025 was fixed.
webrtc/datachannel/creation.html [ Pass ]
# GStreamer's DTLS agent currently generates RSA certificates only. DTLS 1.2 is not supported yet (AFAIK).
webrtc/datachannel/dtls10.html [ Failure ]

# Too slow with filtering implemented in WebKit. Should be done directly by GstWebRTC.
webrtc/datachannel/filter-ice-candidate.html [ Skip ]
webrtc/datachannel/mdns-ice-candidates.html [ Skip ]

# Expected to pass/fail in GStreamer 1.22, and a bit slow for us, creating 256 end-points in a row.
webkit.org/b/235885 webrtc/datachannel/multiple-connections.html [ Pass Failure Timeout ]

# Expectations for GStreamer 1.20. Remove these when updating to GStreamer 1.22.
webrtc/datachannel/basic.html [ Pass Failure Timeout ]
webrtc/datachannel/multi-channel.html [ Failure ]
webrtc/datachannel/binary.html [ Failure ]

# The GstWebRTC backend doesn't support transforms yet.
webkit.org/b/235885 http/wpt/webrtc/no-webrtc-transform.html [ Skip ]
webkit.org/b/235885 http/wpt/webrtc/audiovideo-script-transform.html [ Skip ]
webkit.org/b/235885 http/wpt/webrtc/video-script-transform-keyframe-only.html [ Skip ]
webkit.org/b/235885 http/wpt/webrtc/video-script-transform.html [ Skip ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
closeConnections();
}

var longString = "abcdefgh";
var longString = "abcd";
for (var cptr = 0; cptr < 14; ++cptr)
longString += longString;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

#include <wtf/MainThread.h>

GST_DEBUG_CATEGORY_EXTERN(webkit_webrtc_endpoint_debug);
#define GST_CAT_DEFAULT webkit_webrtc_endpoint_debug
GST_DEBUG_CATEGORY(webkit_webrtc_data_channel_debug);
#define GST_CAT_DEFAULT webkit_webrtc_data_channel_debug

namespace WebCore {

Expand Down Expand Up @@ -71,33 +71,20 @@ GUniquePtr<GstStructure> GStreamerDataChannelHandler::fromRTCDataChannelInit(con
return init;
}

Ref<RTCDataChannelEvent> GStreamerDataChannelHandler::createDataChannelEvent(Document& document, GRefPtr<GstWebRTCDataChannel>&& dataChannel)
{
GUniqueOutPtr<char> label;
GUniqueOutPtr<char> protocol;
gboolean ordered, negotiated;
gint maxPacketLifeTime, maxRetransmits, id;
g_object_get(dataChannel.get(), "ordered", &ordered, "label", &label.outPtr(),
"max-packet-lifetime", &maxPacketLifeTime, "max-retransmits", &maxRetransmits,
"protocol", &protocol.outPtr(), "negotiated", &negotiated, "id", &id, nullptr);

RTCDataChannelInit init;
init.ordered = ordered;
init.maxPacketLifeTime = maxPacketLifeTime;
init.maxRetransmits = maxRetransmits;
init.protocol = String::fromLatin1(protocol.get());
init.negotiated = negotiated;
init.id = id;

auto handler = WTF::makeUnique<GStreamerDataChannelHandler>(WTFMove(dataChannel));
auto channel = RTCDataChannel::create(document, WTFMove(handler), String::fromUTF8(label.get()), WTFMove(init));
return RTCDataChannelEvent::create(eventNames().datachannelEvent, Event::CanBubble::No, Event::IsCancelable::No, WTFMove(channel));
}

GStreamerDataChannelHandler::GStreamerDataChannelHandler(GRefPtr<GstWebRTCDataChannel>&& channel)
: m_channel(WTFMove(channel))
{
ASSERT(m_channel);
static std::once_flag debugRegisteredFlag;
std::call_once(debugRegisteredFlag, [] {
GST_DEBUG_CATEGORY_INIT(webkit_webrtc_data_channel_debug, "webkitwebrtcdatachannel", 0, "WebKit WebRTC data-channel");
});
GST_DEBUG("New GStreamerDataChannelHandler for channel %p", m_channel.get());

{
Locker locker { m_clientLock };
checkState();
}

g_signal_connect_swapped(m_channel.get(), "notify::ready-state", G_CALLBACK(+[](GStreamerDataChannelHandler* handler) {
handler->readyStateChanged();
Expand All @@ -114,33 +101,67 @@ GStreamerDataChannelHandler::GStreamerDataChannelHandler(GRefPtr<GstWebRTCDataCh
g_signal_connect_swapped(m_channel.get(), "on-error", G_CALLBACK(+[](GStreamerDataChannelHandler* handler, GError* error) {
handler->onError(error);
}), this);
g_signal_connect_swapped(m_channel.get(), "on-close", G_CALLBACK(+[](GStreamerDataChannelHandler* handler) {
handler->onClose();
}), this);
}

GStreamerDataChannelHandler::~GStreamerDataChannelHandler()
{
g_signal_handlers_disconnect_by_data(m_channel.get(), this);
GST_DEBUG("Deleting GStreamerDataChannelHandler for channel %p", m_channel.get());
if (m_channel)
g_signal_handlers_disconnect_by_data(m_channel.get(), this);
}

RTCDataChannelInit GStreamerDataChannelHandler::dataChannelInit() const
{
GUniqueOutPtr<char> protocol;
gboolean ordered, negotiated;
gint maxPacketLifeTime, maxRetransmits, id;
g_object_get(m_channel.get(), "ordered", &ordered, "max-packet-lifetime", &maxPacketLifeTime, "max-retransmits", &maxRetransmits,
"protocol", &protocol.outPtr(), "negotiated", &negotiated, "id", &id, nullptr);

RTCDataChannelInit init;
init.ordered = ordered;
init.maxPacketLifeTime = maxPacketLifeTime;
init.maxRetransmits = maxRetransmits;
init.protocol = String::fromLatin1(protocol.get());
init.negotiated = negotiated;
init.id = id;
return init;
}

String GStreamerDataChannelHandler::label() const
{
GUniqueOutPtr<char> label;
g_object_get(m_channel.get(), "label", &label.outPtr(), nullptr);
return String::fromUTF8(label.get());
}

void GStreamerDataChannelHandler::setClient(RTCDataChannelHandlerClient& client, ScriptExecutionContextIdentifier contextIdentifier)
{
Locker locker { m_clientLock };
ASSERT(!m_client);
GST_DEBUG("Setting client on channel %p", m_channel.get());
m_client = client;
m_contextIdentifier = contextIdentifier;

checkState();

for (auto& message : m_pendingMessages) {
switchOn(message, [&](Ref<FragmentedSharedBuffer>& data) {
GST_DEBUG("Notifying queued raw data (size: %zu)", data->size());
client.didReceiveRawData(data->makeContiguous()->data(), data->size());
}, [&](String& text) {
GST_DEBUG("Notifying queued string %s", text.ascii().data());
GST_DEBUG("Notifying queued string of size %d", text.sizeInBytes());
GST_TRACE("Notifying queued string %s", text.ascii().data());
client.didReceiveStringData(text);
}, [&](StateChange stateChange) {
if (stateChange.error) {
if (auto rtcError = toRTCError(*stateChange.error))
client.didDetectError(rtcError.releaseNonNull());
}
GST_DEBUG("Dispatching state change to %d", static_cast<int>(stateChange.state));
GST_DEBUG("Dispatching state change to %d on channel %p", static_cast<int>(stateChange.state), m_channel.get());
client.didChangeReadyState(stateChange.state);
});
}
Expand All @@ -149,21 +170,30 @@ void GStreamerDataChannelHandler::setClient(RTCDataChannelHandlerClient& client,

bool GStreamerDataChannelHandler::sendStringData(const CString& text)
{
GST_DEBUG("Sending string %s", text.data());
GST_DEBUG("Sending string of length: %zu", text.length());
GST_TRACE("Sending string %s", text.data());
g_signal_emit_by_name(m_channel.get(), "send-string", text.data());
return true;
}

bool GStreamerDataChannelHandler::sendRawData(const uint8_t* data, size_t length)
{
GST_DEBUG("Sending raw data of length: %zu", length);
auto bytes = adoptGRef(g_bytes_new(data, length));
g_signal_emit_by_name(m_channel.get(), "send-data", bytes.get());
return true;
}

void GStreamerDataChannelHandler::close()
{
g_signal_emit_by_name(m_channel.get(), "close");
GST_DEBUG("Closing channel %p", m_channel.get());
m_closing = true;

GstWebRTCDataChannelState channelState;
g_object_get(m_channel.get(), "ready-state", &channelState, nullptr);

if (channelState == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN)
g_signal_emit_by_name(m_channel.get(), "close");
}

std::optional<unsigned short> GStreamerDataChannelHandler::id() const
Expand All @@ -177,6 +207,9 @@ void GStreamerDataChannelHandler::checkState()
{
ASSERT(m_clientLock.isHeld());

if (!m_channel)
return;

GstWebRTCDataChannelState channelState;
g_object_get(m_channel.get(), "ready-state", &channelState, nullptr);

Expand All @@ -201,18 +234,25 @@ void GStreamerDataChannelHandler::checkState()
}

if (!m_client) {
GST_DEBUG("No client yet, queueing state");
GST_DEBUG("No client yet on channel %p, queueing state", m_channel.get());
m_pendingMessages.append(StateChange { state, { } });
return;
}

if (channelState == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN && m_closing) {
GST_DEBUG("Ignoring open state notification on channel %p because it was pending to be closed", m_channel.get());
return;
}

if (!*m_client)
return;

GST_DEBUG("Dispatching state change to %d", static_cast<int>(state));
GST_DEBUG("Dispatching state change to %d on channel %p", static_cast<int>(state), m_channel.get());
postTask([client = m_client, state] {
if (!*client)
if (!*client) {
GST_DEBUG("No client");
return;
}
client.value()->didChangeReadyState(state);
});
}
Expand All @@ -227,10 +267,11 @@ void GStreamerDataChannelHandler::bufferedAmountChanged()
{
Locker locker { m_clientLock };

uint64_t bufferedAmount;
g_object_get(m_channel.get(), "buffered-amount", &bufferedAmount, nullptr);
uint64_t currentBufferedAmount;
g_object_get(m_channel.get(), "buffered-amount", &currentBufferedAmount, nullptr);

GST_DEBUG("New buffered amount on channel %p: %" G_GUINT64_FORMAT " old: %" G_GUINT64_FORMAT, m_channel.get(), bufferedAmount, m_cachedBufferedAmount ? *m_cachedBufferedAmount : -1);
auto bufferedAmount = static_cast<size_t>(currentBufferedAmount);
GST_DEBUG("New buffered amount on channel %p: %" G_GSIZE_FORMAT " old: %" G_GSIZE_FORMAT, m_channel.get(), bufferedAmount, m_cachedBufferedAmount ? *m_cachedBufferedAmount : -1);

if (m_cachedBufferedAmount && (*m_cachedBufferedAmount >= bufferedAmount)) {
GST_DEBUG("Buffered amount getting low on channel %p", m_channel.get());
Expand All @@ -239,13 +280,13 @@ void GStreamerDataChannelHandler::bufferedAmountChanged()
return;
}

auto bufferedAmount = m_client.value()->bufferedAmount();
uint64_t amount = std::min(*m_cachedBufferedAmount - bufferedAmount, bufferedAmount);
postTask([client = m_client, amount] {
postTask([client = m_client, amount = *m_cachedBufferedAmount - bufferedAmount] {
if (!client)
return;

client.value()->bufferedAmountIsDecreasing(amount);
size_t clientAmount = client.value()->bufferedAmount();
size_t clampedAmount = amount > clientAmount ? clientAmount : amount;
client.value()->bufferedAmountIsDecreasing(clampedAmount);
});
}

Expand All @@ -254,10 +295,12 @@ void GStreamerDataChannelHandler::bufferedAmountChanged()

void GStreamerDataChannelHandler::onMessageData(GBytes* bytes)
{
auto size = g_bytes_get_size(bytes);
GST_DEBUG("Incoming data of size: %zu", size);
Locker locker { m_clientLock };

if (!m_client) {
m_pendingMessages.append(FragmentedSharedBuffer::create(bytes));
m_pendingMessages.append(SharedBuffer::create(bytes));
return;
}

Expand All @@ -277,7 +320,7 @@ void GStreamerDataChannelHandler::onMessageString(const char* message)
{
Locker locker { m_clientLock };

GST_DEBUG("Incoming string: %s", message);
GST_TRACE("Incoming string: %s", message);
if (!m_client) {
GST_DEBUG("No client yet, keeping as buffered message");
m_pendingMessages.append(String::fromUTF8(message));
Expand All @@ -287,13 +330,12 @@ void GStreamerDataChannelHandler::onMessageString(const char* message)
if (!*m_client)
return;

GST_DEBUG("Dispatching payload");
GST_DEBUG("Dispatching string of size %zu", strlen(message));
postTask([client = m_client, string = String::fromUTF8(message)] {
if (!*client)
return;

client.value()->didReceiveStringData(string);
GST_DEBUG("Done");
});
}

Expand All @@ -316,6 +358,13 @@ void GStreamerDataChannelHandler::onError(GError* error)
});
}

void GStreamerDataChannelHandler::onClose()
{
Locker locker { m_clientLock };
GST_DEBUG("Channel %p closed!", m_channel.get());
checkState();
}

void GStreamerDataChannelHandler::postTask(Function<void()>&& function)
{
ASSERT(m_clientLock.isHeld());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ class GStreamerDataChannelHandler final : public RTCDataChannelHandler {
explicit GStreamerDataChannelHandler(GRefPtr<GstWebRTCDataChannel>&&);
~GStreamerDataChannelHandler();

RTCDataChannelInit dataChannelInit() const;
String label() const;

static GUniquePtr<GstStructure> fromRTCDataChannelInit(const RTCDataChannelInit&);
static Ref<RTCDataChannelEvent> createDataChannelEvent(Document&, GRefPtr<GstWebRTCDataChannel>&&);

const GstWebRTCDataChannel* channel() const { return m_channel.get(); }

private:
// RTCDataChannelHandler API
Expand All @@ -58,6 +62,8 @@ class GStreamerDataChannelHandler final : public RTCDataChannelHandler {
void onMessageData(GBytes*);
void onMessageString(const char*);
void onError(GError*);
void onClose();

void readyStateChanged();
void bufferedAmountChanged();
void checkState();
Expand All @@ -74,12 +80,10 @@ class GStreamerDataChannelHandler final : public RTCDataChannelHandler {
GRefPtr<GstWebRTCDataChannel> m_channel;
std::optional<WeakPtr<RTCDataChannelHandlerClient>> m_client WTF_GUARDED_BY_LOCK(m_clientLock);
ScriptExecutionContextIdentifier m_contextIdentifier;
std::optional<uint64_t> m_previousBufferedAmount;
PendingMessages m_pendingMessages WTF_GUARDED_BY_LOCK(m_clientLock);

Lock m_openLock;
Condition m_openCondition WTF_GUARDED_BY_LOCK(m_openLock);
std::optional<uint64_t> m_cachedBufferedAmount;
std::optional<size_t> m_cachedBufferedAmount;
bool m_closing { false };
};

} // namespace WebCore
Expand Down