Skip to content
Permalink
Browse files
Use CheckedLock more in IPC::Connection
https://bugs.webkit.org/show_bug.cgi?id=226021

Reviewed by Darin Adler.

Use CheckedLock more in IPC::Connection. This is heavily multithreaded code and
it would benefit significantly from Clang Thread Safety Analysis.

* Platform/IPC/Connection.cpp:
(IPC::Connection::SyncMessageState::processIncomingMessage):
(IPC::Connection::enqueueMatchingMessagesToMessageReceiveQueue):
(IPC::Connection::addMessageReceiveQueue):
(IPC::Connection::addWorkQueueMessageReceiver):
(IPC::Connection::addThreadMessageReceiver):
(IPC::Connection::removeMessageReceiveQueue):
(IPC::Connection::sendMessage):
(IPC::Connection::waitForMessage):
(IPC::Connection::pushPendingSyncRequestID):
(IPC::Connection::popPendingSyncRequestID):
(IPC::Connection::waitForSyncReply):
(IPC::Connection::processIncomingSyncReply):
(IPC::Connection::processIncomingMessage):
(IPC::Connection::installIncomingSyncMessageCallback):
(IPC::Connection::uninstallIncomingSyncMessageCallback):
(IPC::Connection::hasIncomingSyncMessage):
(IPC::Connection::connectionDidClose):
(IPC::Connection::sendOutgoingMessages):
(IPC::Connection::enqueueIncomingMessage):
(IPC::Connection::dispatchMessage):
(IPC::Connection::dispatchOneIncomingMessage):
(IPC::Connection::dispatchIncomingMessages):
* Platform/IPC/Connection.h:
(IPC::Connection::WTF_GUARDED_BY_LOCK):


Canonical link: https://commits.webkit.org/237961@main
git-svn-id: https://svn.webkit.org/repository/webkit/trunk@277806 268f45cc-cd09-0410-ab3c-d52691b4dbfc
  • Loading branch information
cdumez committed May 20, 2021
1 parent 10935a0 commit 834b774dd66061fd0b4d1d9218e156d3ad232424
Showing with 86 additions and 51 deletions.
  1. +36 −0 Source/WebKit/ChangeLog
  2. +33 −33 Source/WebKit/Platform/IPC/Connection.cpp
  3. +17 −18 Source/WebKit/Platform/IPC/Connection.h
@@ -1,3 +1,39 @@
2021-05-20 Chris Dumez <cdumez@apple.com>

Use CheckedLock more in IPC::Connection
https://bugs.webkit.org/show_bug.cgi?id=226021

Reviewed by Darin Adler.

Use CheckedLock more in IPC::Connection. This is heavily multithreaded code and
it would benefit significantly from Clang Thread Safety Analysis.

* Platform/IPC/Connection.cpp:
(IPC::Connection::SyncMessageState::processIncomingMessage):
(IPC::Connection::enqueueMatchingMessagesToMessageReceiveQueue):
(IPC::Connection::addMessageReceiveQueue):
(IPC::Connection::addWorkQueueMessageReceiver):
(IPC::Connection::addThreadMessageReceiver):
(IPC::Connection::removeMessageReceiveQueue):
(IPC::Connection::sendMessage):
(IPC::Connection::waitForMessage):
(IPC::Connection::pushPendingSyncRequestID):
(IPC::Connection::popPendingSyncRequestID):
(IPC::Connection::waitForSyncReply):
(IPC::Connection::processIncomingSyncReply):
(IPC::Connection::processIncomingMessage):
(IPC::Connection::installIncomingSyncMessageCallback):
(IPC::Connection::uninstallIncomingSyncMessageCallback):
(IPC::Connection::hasIncomingSyncMessage):
(IPC::Connection::connectionDidClose):
(IPC::Connection::sendOutgoingMessages):
(IPC::Connection::enqueueIncomingMessage):
(IPC::Connection::dispatchMessage):
(IPC::Connection::dispatchOneIncomingMessage):
(IPC::Connection::dispatchIncomingMessages):
* Platform/IPC/Connection.h:
(IPC::Connection::WTF_GUARDED_BY_LOCK):

2021-05-20 Geoffrey Garen <ggaren@apple.com>

GraphicsLayer::setName() causes heap fragmentation
@@ -90,7 +90,7 @@ class Connection::SyncMessageState {

// Returns true if this message will be handled on a client thread that is currently
// waiting for a reply to a synchronous message.
bool processIncomingMessage(Connection&, std::unique_ptr<Decoder>&);
bool processIncomingMessage(Connection& connection, std::unique_ptr<Decoder>&) WTF_REQUIRES_LOCK(connection.m_incomingMessagesLock);

// Dispatch pending sync messages.
void dispatchMessages(Function<void(MessageName, uint64_t)>&& willDispatchMessage = { });
@@ -173,7 +173,7 @@ bool Connection::SyncMessageState::processIncomingMessage(Connection& connection
{
auto locker = holdLock(m_mutex);
shouldDispatch = m_didScheduleDispatchMessagesWorkSet.add(&connection).isNewEntry;
ASSERT(connection.m_incomingMessagesMutex.isHeld());
ASSERT(connection.m_incomingMessagesLock.isHeld());
if (message->shouldMaintainOrderingWithAsyncMessages()) {
// This sync message should maintain ordering with async messages so we need to process the pending async messages first.
while (!connection.m_incomingMessages.isEmpty())
@@ -333,7 +333,7 @@ void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMess

// Enqueue any pending message to the MessageReceiveQueue that is meant to go on that queue. This is important to maintain the ordering of
// IPC messages as some messages may get received on the IPC thread before the message receiver registered itself on the main thread.
void Connection::enqueueMatchingMessagesToMessageReceiveQueue(Locker<Lock>&, MessageReceiveQueue& receiveQueue, ReceiverName receiverName, uint64_t destinationID)
void Connection::enqueueMatchingMessagesToMessageReceiveQueue(MessageReceiveQueue& receiveQueue, ReceiverName receiverName, uint64_t destinationID)
{
ASSERT(isMainRunLoop());

@@ -351,30 +351,30 @@ void Connection::enqueueMatchingMessagesToMessageReceiveQueue(Locker<Lock>&, Mes

void Connection::addMessageReceiveQueue(MessageReceiveQueue& receiveQueue, ReceiverName receiverName, uint64_t destinationID)
{
auto incomingMessagesLocker = holdLock(m_incomingMessagesMutex);
enqueueMatchingMessagesToMessageReceiveQueue(incomingMessagesLocker, receiveQueue, receiverName, destinationID);
Locker incomingMessagesLocker { m_incomingMessagesLock };
enqueueMatchingMessagesToMessageReceiveQueue(receiveQueue, receiverName, destinationID);
m_receiveQueues.add(receiveQueue, receiverName, destinationID);
}

void Connection::addWorkQueueMessageReceiver(ReceiverName receiverName, WorkQueue& workQueue, WorkQueueMessageReceiver* receiver, uint64_t destinationID)
{
auto receiveQueue = makeUnique<WorkQueueMessageReceiverQueue>(workQueue, *receiver);
auto incomingMessagesLocker = holdLock(m_incomingMessagesMutex);
enqueueMatchingMessagesToMessageReceiveQueue(incomingMessagesLocker, *receiveQueue, receiverName, destinationID);
Locker incomingMessagesLocker { m_incomingMessagesLock };
enqueueMatchingMessagesToMessageReceiveQueue(*receiveQueue, receiverName, destinationID);
m_receiveQueues.add(WTFMove(receiveQueue), receiverName, destinationID);
}

void Connection::addThreadMessageReceiver(ReceiverName receiverName, ThreadMessageReceiver* receiver, uint64_t destinationID)
{
auto receiveQueue = makeUnique<ThreadMessageReceiverQueue>(*receiver);
auto incomingMessagesLocker = holdLock(m_incomingMessagesMutex);
enqueueMatchingMessagesToMessageReceiveQueue(incomingMessagesLocker, *receiveQueue, receiverName, destinationID);
Locker incomingMessagesLocker { m_incomingMessagesLock };
enqueueMatchingMessagesToMessageReceiveQueue(*receiveQueue, receiverName, destinationID);
m_receiveQueues.add(WTFMove(receiveQueue), receiverName, destinationID);
}

void Connection::removeMessageReceiveQueue(ReceiverName receiverName, uint64_t destinationID)
{
auto locker = holdLock(m_incomingMessagesMutex);
Locker locker { m_incomingMessagesLock };
m_receiveQueues.remove(receiverName, destinationID);
}

@@ -481,7 +481,7 @@ bool Connection::sendMessage(UniqueRef<Encoder>&& encoder, OptionSet<SendOption>
encoder->setShouldDispatchMessageWhenWaitingForSyncReply(ShouldDispatchWhenWaitingForSyncReply::YesDuringUnboundedIPC);

{
auto locker = holdLock(m_outgoingMessagesMutex);
Locker locker { m_outgoingMessagesLock };
m_outgoingMessages.append(WTFMove(encoder));
}

@@ -512,7 +512,7 @@ std::unique_ptr<Decoder> Connection::waitForMessage(MessageName messageName, uin
WaitForMessageState waitingForMessage(messageName, destinationID, waitForOptions);

{
Locker locker { m_waitForMessageMutex };
Locker locker { m_waitForMessageLock };

// We don't support having multiple clients waiting for messages.
ASSERT(!m_waitingForMessage);
@@ -528,7 +528,7 @@ std::unique_ptr<Decoder> Connection::waitForMessage(MessageName messageName, uin

// First, check if this message is already in the incoming messages queue.
{
auto locker = holdLock(m_incomingMessagesMutex);
Locker locker { m_incomingMessagesLock };
for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
std::unique_ptr<Decoder>& message = *it;

@@ -564,7 +564,7 @@ std::unique_ptr<Decoder> Connection::waitForMessage(MessageName messageName, uin
wasMessageToWaitForAlreadyDispatched |= messageName == nameOfMessageToDispatch && destinationID == destinationOfMessageToDispatch;
});

Locker lock { m_waitForMessageMutex };
Locker locker { m_waitForMessageLock };

if (wasMessageToWaitForAlreadyDispatched) {
m_waitingForMessage = nullptr;
@@ -584,7 +584,7 @@ std::unique_ptr<Decoder> Connection::waitForMessage(MessageName messageName, uin
}

// Now we wait.
bool didTimeout = !m_waitForMessageCondition.waitUntil(m_waitForMessageMutex, timeout.deadline());
bool didTimeout = !m_waitForMessageCondition.waitUntil(m_waitForMessageLock, timeout.deadline());
// We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
if (didTimeout || m_waitingForMessage->messageWaitingInterrupted) {
m_waitingForMessage = nullptr;
@@ -598,7 +598,7 @@ std::unique_ptr<Decoder> Connection::waitForMessage(MessageName messageName, uin
bool Connection::pushPendingSyncRequestID(SyncRequestID syncRequestID)
{
{
LockHolder locker(m_syncReplyStateMutex);
Locker locker { m_syncReplyStateLock };
if (!m_shouldWaitForSyncReplies)
return false;
m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
@@ -610,7 +610,7 @@ bool Connection::pushPendingSyncRequestID(SyncRequestID syncRequestID)
void Connection::popPendingSyncRequestID(SyncRequestID syncRequestID)
{
--m_inSendSyncCount;
LockHolder locker(m_syncReplyStateMutex);
Locker locker { m_syncReplyStateLock };
ASSERT_UNUSED(syncRequestID, m_pendingSyncReplies.last().syncRequestID == syncRequestID);
m_pendingSyncReplies.removeLast();
}
@@ -665,7 +665,7 @@ std::unique_ptr<Decoder> Connection::waitForSyncReply(SyncRequestID syncRequestI
SyncMessageState::singleton().dispatchMessages();

{
LockHolder locker(m_syncReplyStateMutex);
Locker locker { m_syncReplyStateLock };

// Second, check if there is a sync reply at the top of the stack.
ASSERT(!m_pendingSyncReplies.isEmpty());
@@ -710,7 +710,7 @@ std::unique_ptr<Decoder> Connection::waitForSyncReply(SyncRequestID syncRequestI
void Connection::processIncomingSyncReply(std::unique_ptr<Decoder> decoder)
{
{
LockHolder locker(m_syncReplyStateMutex);
Locker locker { m_syncReplyStateLock };

// Go through the stack of sync requests that have pending replies and see which one
// this reply is for.
@@ -754,16 +754,16 @@ void Connection::processIncomingMessage(std::unique_ptr<Decoder> message)
}

// FIXME: These are practically the same mutex, so maybe they could be merged.
Locker waitForMessagesLocker { m_waitForMessageMutex };
Locker waitForMessagesLocker { m_waitForMessageLock };

auto incomingMessagesLocker = holdLock(m_incomingMessagesMutex);
Locker incomingMessagesLocker { m_incomingMessagesLock };
if (auto* receiveQueue = m_receiveQueues.get(*message)) {
receiveQueue->enqueueMessage(*this, WTFMove(message));
return;
}

if (message->isSyncMessage()) {
auto locker = holdLock(m_incomingSyncMessageCallbackMutex);
Locker locker { m_incomingSyncMessageCallbackLock };

for (auto& callback : m_incomingSyncMessageCallbacks.values())
m_incomingSyncMessageCallbackQueue->dispatch(WTFMove(callback));
@@ -804,7 +804,7 @@ void Connection::processIncomingMessage(std::unique_ptr<Decoder> message)

uint64_t Connection::installIncomingSyncMessageCallback(WTF::Function<void ()>&& callback)
{
auto locker = holdLock(m_incomingSyncMessageCallbackMutex);
Locker locker { m_incomingSyncMessageCallbackLock };

m_nextIncomingSyncMessageCallbackID++;

@@ -818,13 +818,13 @@ uint64_t Connection::installIncomingSyncMessageCallback(WTF::Function<void ()>&&

void Connection::uninstallIncomingSyncMessageCallback(uint64_t callbackID)
{
auto locker = holdLock(m_incomingSyncMessageCallbackMutex);
Locker locker { m_incomingSyncMessageCallbackLock };
m_incomingSyncMessageCallbacks.remove(callbackID);
}

bool Connection::hasIncomingSyncMessage()
{
auto locker = holdLock(m_incomingMessagesMutex);
Locker locker { m_incomingMessagesLock };

for (auto& message : m_incomingMessages) {
if (message->isSyncMessage())
@@ -862,7 +862,7 @@ void Connection::connectionDidClose()
platformInvalidate();

{
LockHolder locker(m_syncReplyStateMutex);
Locker locker { m_syncReplyStateLock };

ASSERT(m_shouldWaitForSyncReplies);
m_shouldWaitForSyncReplies = false;
@@ -872,7 +872,7 @@ void Connection::connectionDidClose()
}

{
Locker locker { m_waitForMessageMutex };
Locker locker { m_waitForMessageLock };

ASSERT(m_shouldWaitForMessages);
m_shouldWaitForMessages = false;
@@ -915,7 +915,7 @@ void Connection::sendOutgoingMessages()
std::unique_ptr<Encoder> message;

{
auto locker = holdLock(m_outgoingMessagesMutex);
Locker locker { m_outgoingMessagesLock };
if (m_outgoingMessages.isEmpty())
break;
message = m_outgoingMessages.takeFirst().moveToUniquePtr();
@@ -992,7 +992,7 @@ void Connection::didFailToSendSyncMessage()

void Connection::enqueueIncomingMessage(std::unique_ptr<Decoder> incomingMessage)
{
ASSERT(m_incomingMessagesMutex.isHeld());
ASSERT(m_incomingMessagesLock.isHeld());
{
#if PLATFORM(COCOA)
if (m_wasKilled)
@@ -1064,7 +1064,7 @@ void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
// SyncState::m_messagesToDispatchWhileWaitingForSyncReply while others
// go to Connection::m_incomingMessages. Should be fixed by adding all
// messages to one list.
auto incomingMessagesLocker = holdLock(m_incomingMessagesMutex);
Locker incomingMessagesLocker { m_incomingMessagesLock };
if (auto* receiveQueue = m_receiveQueues.get(*message)) {
receiveQueue->enqueueMessage(*this, WTFMove(message));
return;
@@ -1155,7 +1155,7 @@ void Connection::dispatchOneIncomingMessage()
{
std::unique_ptr<Decoder> message;
{
auto locker = holdLock(m_incomingMessagesMutex);
Locker locker { m_incomingMessagesLock };
if (m_incomingMessages.isEmpty())
return;

@@ -1173,7 +1173,7 @@ void Connection::dispatchIncomingMessages()

size_t messagesToProcess = 0;
{
auto locker = holdLock(m_incomingMessagesMutex);
Locker locker { m_incomingMessagesLock };
if (m_incomingMessages.isEmpty())
return;

@@ -1201,7 +1201,7 @@ void Connection::dispatchIncomingMessages()

for (size_t i = 1; i < messagesToProcess; ++i) {
{
auto locker = holdLock(m_incomingMessagesMutex);
Locker locker { m_incomingMessagesLock };
if (m_incomingMessages.isEmpty())
return;

@@ -39,7 +39,6 @@
#include <wtf/Deque.h>
#include <wtf/Forward.h>
#include <wtf/HashMap.h>
#include <wtf/Lock.h>
#include <wtf/ObjectIdentifier.h>
#include <wtf/OptionSet.h>
#include <wtf/RunLoop.h>
@@ -334,7 +333,7 @@ class Connection : public ThreadSafeRefCounted<Connection, WTF::DestructionThrea
void popPendingSyncRequestID(SyncRequestID);
std::unique_ptr<Decoder> waitForSyncReply(SyncRequestID, MessageName, Timeout, OptionSet<SendSyncOption>);

void enqueueMatchingMessagesToMessageReceiveQueue(Locker<Lock>& incomingMessagesLocker, MessageReceiveQueue&, ReceiverName, uint64_t destinationID);
void enqueueMatchingMessagesToMessageReceiveQueue(MessageReceiveQueue&, ReceiverName, uint64_t destinationID) WTF_REQUIRES_LOCK(m_incomingMessagesLock);

// Called on the connection work queue.
void processIncomingMessage(std::unique_ptr<Decoder>);
@@ -355,7 +354,7 @@ class Connection : public ThreadSafeRefCounted<Connection, WTF::DestructionThrea
void didFailToSendSyncMessage();

// Can be called on any thread.
void enqueueIncomingMessage(std::unique_ptr<Decoder>);
void enqueueIncomingMessage(std::unique_ptr<Decoder>) WTF_REQUIRES_LOCK(m_incomingMessagesLock);
size_t incomingMessagesDispatchingBatchSize() const;

void willSendSyncMessage(OptionSet<SendSyncOption>);
@@ -405,33 +404,33 @@ class Connection : public ThreadSafeRefCounted<Connection, WTF::DestructionThrea
bool m_didReceiveInvalidMessage { false };

// Incoming messages.
Lock m_incomingMessagesMutex;
Deque<std::unique_ptr<Decoder>> m_incomingMessages;
CheckedLock m_incomingMessagesLock;
Deque<std::unique_ptr<Decoder>> m_incomingMessages WTF_GUARDED_BY_LOCK(m_incomingMessagesLock);
std::unique_ptr<MessagesThrottler> m_incomingMessagesThrottler;
MessageReceiveQueueMap m_receiveQueues;
MessageReceiveQueueMap m_receiveQueues WTF_GUARDED_BY_LOCK(m_incomingMessagesLock);

// Outgoing messages.
Lock m_outgoingMessagesMutex;
Deque<UniqueRef<Encoder>> m_outgoingMessages;
CheckedLock m_outgoingMessagesLock;
Deque<UniqueRef<Encoder>> m_outgoingMessages WTF_GUARDED_BY_LOCK(m_outgoingMessagesLock);

CheckedCondition m_waitForMessageCondition;
CheckedLock m_waitForMessageMutex;
CheckedLock m_waitForMessageLock;

struct WaitForMessageState;
WaitForMessageState* m_waitingForMessage WTF_GUARDED_BY_LOCK(m_waitForMessageMutex) { nullptr }; // NOLINT
WaitForMessageState* m_waitingForMessage WTF_GUARDED_BY_LOCK(m_waitForMessageLock) { nullptr }; // NOLINT

class SyncMessageState;

Lock m_syncReplyStateMutex;
bool m_shouldWaitForSyncReplies { true };
bool m_shouldWaitForMessages WTF_GUARDED_BY_LOCK(m_waitForMessageMutex) { true };
CheckedLock m_syncReplyStateLock;
bool m_shouldWaitForSyncReplies WTF_GUARDED_BY_LOCK(m_syncReplyStateLock) { true };
bool m_shouldWaitForMessages WTF_GUARDED_BY_LOCK(m_waitForMessageLock) { true };
struct PendingSyncReply;
Vector<PendingSyncReply> m_pendingSyncReplies;
Vector<PendingSyncReply> m_pendingSyncReplies WTF_GUARDED_BY_LOCK(m_syncReplyStateLock);

Lock m_incomingSyncMessageCallbackMutex;
HashMap<uint64_t, WTF::Function<void()>> m_incomingSyncMessageCallbacks;
RefPtr<WorkQueue> m_incomingSyncMessageCallbackQueue;
uint64_t m_nextIncomingSyncMessageCallbackID { 0 };
CheckedLock m_incomingSyncMessageCallbackLock;
HashMap<uint64_t, WTF::Function<void()>> m_incomingSyncMessageCallbacks WTF_GUARDED_BY_LOCK(m_incomingSyncMessageCallbackLock);
RefPtr<WorkQueue> m_incomingSyncMessageCallbackQueue WTF_GUARDED_BY_LOCK(m_incomingSyncMessageCallbackLock);
uint64_t m_nextIncomingSyncMessageCallbackID WTF_GUARDED_BY_LOCK(m_incomingSyncMessageCallbackLock) { 0 };

#if ENABLE(IPC_TESTING_API)
Vector<WeakPtr<MessageObserver>> m_messageObservers;

0 comments on commit 834b774

Please sign in to comment.