Skip to content

Commit

Permalink
Cherry-pick 273851@main (d1fb588). https://bugs.webkit.org/show_bug.c…
Browse files Browse the repository at this point in the history
…gi?id=268362

    Unthrottled IPC::Connection loses messages when connection is closed
    https://bugs.webkit.org/show_bug.cgi?id=268362
    rdar://121910136

    Reviewed by Matt Woodrow.

    When IPC::Connection::invalidate() would be called, some of the messages
    already sent might have been lost:
    1. Unsent messages due to outgoing messages buffering
    2. Messages not delivered at the recipient because message delivery
       would check isValid()

    Fix 1. by adding a blocking IPC::Connection::flushSentMessages()
    that will wait until send list flips to zero.
    The flushSentMessages() call is distinct from invalidate() to preserve
    the ability to call invalidate() in non-blocking manner.

    Fix 2. by not checking for isValid() but for m_client / m_syncState.

    isValid() flips immediately in the IPC receive queue when OS signals
    that the connection was closed.
    m_client, m_syncState flips to nullptr when client signals that
    they do not want to receive messages anymore, via invalidate().
    By contract, IPC::Connection invalidates itself after Client::didClose(),
    too.

    Fixes mostly upcoming GPUP cases where one connection is closed, but
    not the whole per-WP session (GPUConnectionToWebProcess). The
    individual connections might carry important messages up until
    the disconnection, so all must be played back before handling the
    connection closing.

    * Source/WebKit/Platform/IPC/Connection.cpp:
    (IPC::Connection::flushSentMessages):
    (IPC::Connection::connectionDidClose):
    (IPC::Connection::sendOutgoingMessages):
    (IPC::Connection::dispatchMessage):
    * Source/WebKit/Platform/IPC/Connection.h:
    * Source/WebKit/Platform/IPC/StreamClientConnection.cpp:
    (IPC::StreamClientConnection::flushSentMessages):
    * Source/WebKit/Platform/IPC/StreamClientConnection.h:
    * Tools/TestWebKitAPI/Tests/IPC/ConnectionTests.cpp:
    (TestWebKitAPI::TEST_P):
    * Tools/TestWebKitAPI/Tests/IPC/StreamConnectionTests.cpp:
    (TestWebKitAPI::TEST_P):

    Canonical link: https://commits.webkit.org/273851@main

Canonical link: https://commits.webkit.org/266719.359@webkitglib/2.42
  • Loading branch information
kkinnunen-apple authored and aperezdc committed Mar 14, 2024
1 parent bc083b6 commit 6d9f342
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 2 deletions.
25 changes: 23 additions & 2 deletions Source/WebKit/Platform/IPC/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,19 @@ bool Connection::platformPrepareForOpen()
}
#endif

Error Connection::flushSentMessages(Timeout timeout)
{
Locker locker { m_outgoingMessagesLock };
do {
if (!isValid())
return Error::InvalidConnection;
if (m_outgoingMessages.isEmpty())
return Error::NoError;
m_outgoingMessagesEmptyCondition.waitUntil(m_outgoingMessagesLock, timeout.deadline());
} while (!timeout.didTimeOut());
return Error::Timeout;
}

void Connection::invalidate()
{
m_isValid = false;
Expand Down Expand Up @@ -1041,6 +1054,12 @@ void Connection::connectionDidClose()
}
m_waitForMessageCondition.notifyAll();

{
Locker locker { m_outgoingMessagesLock };
m_outgoingMessages.clear();
m_outgoingMessagesEmptyCondition.notifyAll();
}

if (m_didCloseOnConnectionWorkQueueCallback)
m_didCloseOnConnectionWorkQueueCallback(this);

Expand All @@ -1062,8 +1081,10 @@ void Connection::sendOutgoingMessages()

{
Locker locker { m_outgoingMessagesLock };
if (m_outgoingMessages.isEmpty())
if (m_outgoingMessages.isEmpty()) {
m_outgoingMessagesEmptyCondition.notifyAll();
break;
}
message = m_outgoingMessages.takeFirst().moveToUniquePtr();
}
ASSERT(message);
Expand Down Expand Up @@ -1235,7 +1256,7 @@ void Connection::dispatchMessage(Decoder& decoder)

void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
{
if (!isValid())
if (!m_syncState)
return;
assertIsCurrent(dispatcher());
{
Expand Down
3 changes: 3 additions & 0 deletions Source/WebKit/Platform/IPC/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ class Connection : public ThreadSafeRefCountedAndCanMakeThreadSafeWeakPtr<Connec
void removeMessageReceiver(ReceiverName, uint64_t destinationID = 0);

bool open(Client&, SerialFunctionDispatcher& = RunLoop::current());
// Ensures that messages sent prior to the call are not affected by invalidate() or crash done after the call returns.
Error flushSentMessages(Timeout);
void invalidate();
void markCurrentlyDispatchedMessageAsInvalid();

Expand Down Expand Up @@ -576,6 +578,7 @@ class Connection : public ThreadSafeRefCountedAndCanMakeThreadSafeWeakPtr<Connec
// Outgoing messages.
Lock m_outgoingMessagesLock;
Deque<UniqueRef<Encoder>> m_outgoingMessages WTF_GUARDED_BY_LOCK(m_outgoingMessagesLock);
Condition m_outgoingMessagesEmptyCondition WTF_GUARDED_BY_LOCK(m_outgoingMessagesLock);

Condition m_waitForMessageCondition;
Lock m_waitForMessageLock;
Expand Down
6 changes: 6 additions & 0 deletions Source/WebKit/Platform/IPC/StreamClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ void StreamClientConnection::open(Connection::Client& receiver, SerialFunctionDi
m_connection->open(*m_dedicatedConnectionClient, dispatcher);
}

Error StreamClientConnection::flushSentMessages(Timeout timeout)
{
wakeUpServer(WakeUpServer::Yes);
return m_connection->flushSentMessages(WTFMove(timeout));
}

void StreamClientConnection::invalidate()
{
m_connection->invalidate();
Expand Down
1 change: 1 addition & 0 deletions Source/WebKit/Platform/IPC/StreamClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class StreamClientConnection final : public ThreadSafeRefCounted<StreamClientCon
void setMaxBatchSize(unsigned);

void open(Connection::Client&, SerialFunctionDispatcher& = RunLoop::current());
Error flushSentMessages(Timeout);
void invalidate();

template<typename T, typename U, typename V> Error send(T&& message, ObjectIdentifierGeneric<U, V> destinationID, Timeout);
Expand Down
50 changes: 50 additions & 0 deletions Tools/TestWebKitAPI/Tests/IPC/ConnectionTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ static constexpr Seconds kWaitForAbsenceTimeout = 300_ms;

struct MockTestMessageWithConnection {
static constexpr bool isSync = false;
static constexpr bool canDispatchOutOfOrder = false;
static constexpr bool replyCanDispatchOutOfOrder = false;
static constexpr IPC::MessageName name() { return static_cast<IPC::MessageName>(123); }
auto&& arguments() { return WTFMove(m_arguments); }
MockTestMessageWithConnection(IPC::Connection::Handle&& handle)
Expand All @@ -47,6 +49,24 @@ struct MockTestMessageWithConnection {
std::tuple<IPC::Connection::Handle&&> m_arguments;
};

struct MockTestSyncMessage {
static constexpr bool isSync = true;
static constexpr bool canDispatchOutOfOrder = false;
static constexpr bool replyCanDispatchOutOfOrder = false;
static constexpr IPC::MessageName name() { return static_cast<IPC::MessageName>(124); }
using ReplyArguments = std::tuple<>;
auto&& arguments()
{
return WTFMove(m_arguments);
}

MockTestSyncMessage()
{
}

std::tuple<> m_arguments;
};

namespace {
class SimpleConnectionTest : public testing::Test {
public:
Expand Down Expand Up @@ -88,6 +108,36 @@ TEST_F(SimpleConnectionTest, ConnectLocalConnection)
clientConnection->invalidate();
}

TEST_F(SimpleConnectionTest, ClearOutgoingMessages)
{
// Create a connection, but leave the client
// handle pending.
auto firstIdentifiers = IPC::Connection::createConnectionIdentifierPair();
ASSERT_NE(firstIdentifiers, std::nullopt);
Ref<IPC::Connection> firstServerConnection = IPC::Connection::createServerConnection(WTFMove(firstIdentifiers->server));
firstServerConnection->open(m_mockServerClient);

// Create a second connection, and send the client
// handle in a message over the first connection (such
// that it will be stored as a pending message).
auto secondIdentifiers = IPC::Connection::createConnectionIdentifierPair();
ASSERT_NE(secondIdentifiers, std::nullopt);
Ref<IPC::Connection> secondServerConnection = IPC::Connection::createServerConnection(WTFMove(secondIdentifiers->server));
MockConnectionClient mockSecondServerClient;
secondServerConnection->open(mockSecondServerClient);

firstServerConnection->send(MockTestMessageWithConnection { WTFMove(secondIdentifiers->client) }, 0);

// Invalidate the first connection's client handle,
// which should clear pending messages and also invalidate
// the second connection.
firstIdentifiers->client = IPC::Connection::Handle();

// Try a sync send over the second connection, which should
// fail immediately if the client handle has been released.
secondServerConnection->sendSync(MockTestSyncMessage(), 0, IPC::Timeout::infinity(), IPC::SendSyncOption::UseFullySynchronousModeForTesting);
}

class ConnectionTest : public testing::Test, protected ConnectionTestBase {
public:
void SetUp() override
Expand Down
32 changes: 32 additions & 0 deletions Tools/TestWebKitAPI/Tests/IPC/StreamConnectionTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ struct MockStreamTestMessage1 {
std::tuple<> arguments() { return { }; }
};

struct MockStreamTestMessage2 {
static constexpr bool isSync = false;
static constexpr bool isStreamEncodable = false;
static constexpr IPC::MessageName name() { return IPC::MessageName::RemoteRenderingBackend_ReleaseAllDrawingResources; }
explicit MockStreamTestMessage2(IPC::Semaphore&& s)
: semaphore(WTFMove(s))
{
}
std::tuple<IPC::Semaphore> arguments() { return { WTFMove(semaphore) }; }
IPC::Semaphore semaphore;
};

struct MockStreamTestMessageWithAsyncReply1 {
static constexpr bool isSync = false;
static constexpr bool isStreamEncodable = true;
Expand Down Expand Up @@ -365,6 +377,26 @@ TEST_P(StreamMessageTest, SendWithSwitchingDestinationIDs)
}
}

TEST_P(StreamMessageTest, SendAndInvalidate)
{
const uint64_t messageCount = 2004;
auto cleanup = localReferenceBarrier();

for (uint64_t i = 0u; i < messageCount; ++i) {
auto result = m_clientConnection->send(MockStreamTestMessage2 { IPC::Semaphore { } }, defaultDestinationID(), defaultSendTimeout);
EXPECT_EQ(result, IPC::Error::NoError);
}
auto flushResult = m_clientConnection->flushSentMessages(defaultSendTimeout);
EXPECT_EQ(flushResult, IPC::Error::NoError);
m_clientConnection->invalidate();

for (uint64_t i = 0u; i < messageCount; ++i) {
auto message = m_mockServerReceiver->waitForMessage();
EXPECT_EQ(message.messageName, MockStreamTestMessage2::name());
EXPECT_EQ(message.destinationID, defaultDestinationID().toUInt64());
}
}

TEST_P(StreamMessageTest, SendAsyncReply)
{
auto cleanup = localReferenceBarrier();
Expand Down

0 comments on commit 6d9f342

Please sign in to comment.