Skip to content
Permalink
Browse files
Teach CoreIPC the right way to send large messages on Windows
r63776 added support for ::WriteFile failing with ERROR_IO_PENDING,
but it had a major flaw: we didn't ensure that the data being sent
(which is owned by the ArgumentEncoder) stayed around until the write
finished. We'd destroy the data immediately, leading to ::WriteFile
accessing that freed memory later. This seemed to always manifest
itself as a crash in ::WaitForMultipleObjects.

The correct solution (as hinted above) is to make sure that the data
being written is not destroyed until the write completes. When
::WriteFile fails with ERROR_IO_PENDING, we store the data being sent
in Connection::m_pendingWriteArguments, and don't send any more
messages until that write completes. We use an event in the OVERLAPPED
structure passed to ::WriteFile to detect when the write has completed
(similar to what we do for reads).

Fixes <http://webkit.org/b/42785> <rdar://problem/8218522> Crash in
WebKit2WebProcess in WaitForMultipleObjects beneath
WorkQueue::workQueueThreadBody when running tests that produce a lot
of output

Reviewed by Anders Carlsson.

* Platform/CoreIPC/Connection.cpp:
(CoreIPC::Connection::canSendOutgoingMessages): Added. This calls out
to a platform-specific function to allow each platform to have its own
policy for when messages can and can't be sent.
(CoreIPC::Connection::sendOutgoingMessages): Use the new
canSendOutgoingMessages to determine whether we can send any messages
right now. We now remove one message at a time from m_outgoingMessages
and send it. We stop sending messages when sendOutgoingMessage returns
false.

* Platform/CoreIPC/Connection.h: Added m_pendingWriteArguments and
m_writeState on Windows.
(CoreIPC::Connection::Message::Message): Added this default
constructor.

* Platform/CoreIPC/MessageID.h:
(CoreIPC::MessageID::MessageID): Made the default constructor public
for Message's benefit.

* Platform/CoreIPC/mac/ConnectionMac.cpp:
(CoreIPC::Connection::platformCanSendOutgoingMessages): Added. Always
returns true.
(CoreIPC::Connection::sendOutgoingMessage): Changed to return a
boolean indicating whether more messages can be sent at this time.

* Platform/CoreIPC/qt/ConnectionQt.cpp:
(CoreIPC::Connection::platformCanSendOutgoingMessages): Added. Returns
true if we have a socket.
(CoreIPC::Connection::sendOutgoingMessage): Changed a null-check of
m_socket to an assertion since it should be checked for null in
platformCanSendOutgoingMessages. Changed to return a boolean
indicating whether more messages can be sent at this time.

* Platform/CoreIPC/win/ConnectionWin.cpp:
(CoreIPC::Connection::platformInitialize): Added initialization of
m_writeState.
(CoreIPC::Connection::platformInvalidate): Close m_writeState's event
handle.
(CoreIPC::Connection::writeEventHandler): Added. Checks if the pending
write has completed, cleans up our pending write state, and sends any
remaining messages.
(CoreIPC::Connection::open): Register our write event with the
WorkQueue so that writeEventHandler will be called when the event is
signaled.
(CoreIPC::Connection::platformCanSendOutgoingMessages): Added. We can
only send messages if there isn't a write pending.
(CoreIPC::Connection::sendOutgoingMessage): Changed to return a
boolean indicating whether more messages can be sent at this time. We
now pass m_writeState to ::WriteFile instead of an empty OVERLAPPED
struct so that our write event will be signaled when the write
completes. We also no longer pass a pointer to receive how many bytes
were written, as recommended by MSDN. If ::WriteFile fails with
ERROR_IO_PENDING, we save the ArgumentEncoder for this message and
return false to indicate that no more messages can be sent at this
time.

Canonical link: https://commits.webkit.org/55048@main
git-svn-id: https://svn.webkit.org/repository/webkit/trunk@64223 268f45cc-cd09-0410-ab3c-d52691b4dbfc
  • Loading branch information
aroben committed Jul 28, 2010
1 parent e6c5414 commit 9a07ad451584f93a4a13c08a7908ee76421a98ab
@@ -1,3 +1,85 @@
2010-07-28 Adam Roben <aroben@apple.com>

Teach CoreIPC the right way to send large messages on Windows

r63776 added support for ::WriteFile failing with ERROR_IO_PENDING,
but it had a major flaw: we didn't ensure that the data being sent
(which is owned by the ArgumentEncoder) stayed around until the write
finished. We'd destroy the data immediately, leading to ::WriteFile
accessing that freed memory later. This seemed to always manifest
itself as a crash in ::WaitForMultipleObjects.

The correct solution (as hinted above) is to make sure that the data
being written is not destroyed until the write completes. When
::WriteFile fails with ERROR_IO_PENDING, we store the data being sent
in Connection::m_pendingWriteArguments, and don't send any more
messages until that write completes. We use an event in the OVERLAPPED
structure passed to ::WriteFile to detect when the write has completed
(similar to what we do for reads).

Fixes <http://webkit.org/b/42785> <rdar://problem/8218522> Crash in
WebKit2WebProcess in WaitForMultipleObjects beneath
WorkQueue::workQueueThreadBody when running tests that produce a lot
of output

Reviewed by Anders Carlsson.

* Platform/CoreIPC/Connection.cpp:
(CoreIPC::Connection::canSendOutgoingMessages): Added. This calls out
to a platform-specific function to allow each platform to have its own
policy for when messages can and can't be sent.
(CoreIPC::Connection::sendOutgoingMessages): Use the new
canSendOutgoingMessages to determine whether we can send any messages
right now. We now remove one message at a time from m_outgoingMessages
and send it. We stop sending messages when sendOutgoingMessage returns
false.

* Platform/CoreIPC/Connection.h: Added m_pendingWriteArguments and
m_writeState on Windows.
(CoreIPC::Connection::Message::Message): Added this default
constructor.

* Platform/CoreIPC/MessageID.h:
(CoreIPC::MessageID::MessageID): Made the default constructor public
for Message's benefit.

* Platform/CoreIPC/mac/ConnectionMac.cpp:
(CoreIPC::Connection::platformCanSendOutgoingMessages): Added. Always
returns true.
(CoreIPC::Connection::sendOutgoingMessage): Changed to return a
boolean indicating whether more messages can be sent at this time.

* Platform/CoreIPC/qt/ConnectionQt.cpp:
(CoreIPC::Connection::platformCanSendOutgoingMessages): Added. Returns
true if we have a socket.
(CoreIPC::Connection::sendOutgoingMessage): Changed a null-check of
m_socket to an assertion since it should be checked for null in
platformCanSendOutgoingMessages. Changed to return a boolean
indicating whether more messages can be sent at this time.

* Platform/CoreIPC/win/ConnectionWin.cpp:
(CoreIPC::Connection::platformInitialize): Added initialization of
m_writeState.
(CoreIPC::Connection::platformInvalidate): Close m_writeState's event
handle.
(CoreIPC::Connection::writeEventHandler): Added. Checks if the pending
write has completed, cleans up our pending write state, and sends any
remaining messages.
(CoreIPC::Connection::open): Register our write event with the
WorkQueue so that writeEventHandler will be called when the event is
signaled.
(CoreIPC::Connection::platformCanSendOutgoingMessages): Added. We can
only send messages if there isn't a write pending.
(CoreIPC::Connection::sendOutgoingMessage): Changed to return a
boolean indicating whether more messages can be sent at this time. We
now pass m_writeState to ::WriteFile instead of an empty OVERLAPPED
struct so that our write event will be signaled when the write
completes. We also no longer pass a pointer to receive how many bytes
were written, as recommended by MSDN. If ::WriteFile fails with
ERROR_IO_PENDING, we save the ArgumentEncoder for this message and
return false to indicate that no more messages can be sent at this
time.

2010-07-28 Adam Roben <aroben@apple.com>

Stop leaking Connection::m_readState.hEvent on Windows
@@ -207,21 +207,28 @@ void Connection::dispatchConnectionDidClose()
m_client = 0;
}

bool Connection::canSendOutgoingMessages() const
{
return m_isConnected && platformCanSendOutgoingMessages();
}

void Connection::sendOutgoingMessages()
{
if (!m_isConnected)
if (!canSendOutgoingMessages())
return;

Vector<OutgoingMessage> outgoingMessages;
while (true) {
OutgoingMessage message;
{
MutexLocker locker(m_outgoingMessagesLock);
if (m_outgoingMessages.isEmpty())
break;
message = m_outgoingMessages.takeFirst();
}

{
MutexLocker locker(m_outgoingMessagesLock);
m_outgoingMessages.swap(outgoingMessages);
if (!sendOutgoingMessage(message.messageID(), adoptPtr(message.arguments())))
break;
}

// Send messages.
for (size_t i = 0; i < outgoingMessages.size(); ++i)
sendOutgoingMessage(outgoingMessages[i].messageID(), adoptPtr(outgoingMessages[i].arguments()));
}

void Connection::dispatchMessages()
@@ -92,6 +92,11 @@ class Connection : public ThreadSafeShared<Connection> {
private:
template<typename T> class Message {
public:
Message()
: m_arguments(0)
{
}

Message(MessageID messageID, PassOwnPtr<T> arguments)
: m_messageID(messageID)
, m_arguments(arguments.leakPtr())
@@ -126,8 +131,10 @@ class Connection : public ThreadSafeShared<Connection> {

// Called on the connection work queue.
void processIncomingMessage(MessageID, PassOwnPtr<ArgumentDecoder>);
bool canSendOutgoingMessages() const;
bool platformCanSendOutgoingMessages() const;
void sendOutgoingMessages();
void sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
bool sendOutgoingMessage(MessageID, PassOwnPtr<ArgumentEncoder>);
void connectionDidClose();

// Called on the listener thread.
@@ -150,7 +157,7 @@ class Connection : public ThreadSafeShared<Connection> {

// Outgoing messages.
Mutex m_outgoingMessagesLock;
Vector<OutgoingMessage> m_outgoingMessages;
Deque<OutgoingMessage> m_outgoingMessages;

ThreadCondition m_waitForMessageCondition;
Mutex m_waitForMessageMutex;
@@ -166,9 +173,12 @@ class Connection : public ThreadSafeShared<Connection> {
#elif PLATFORM(WIN)
// Called on the connection queue.
void readEventHandler();
void writeEventHandler();

Vector<uint8_t> m_readBuffer;
OVERLAPPED m_readState;
OwnPtr<ArgumentEncoder> m_pendingWriteArguments;
OVERLAPPED m_writeState;
HANDLE m_connectionPipe;
#elif PLATFORM(QT)
// Called on the connection queue.
@@ -67,6 +67,11 @@ class MessageID {
SyncMessage = 1 << 0,
};

MessageID()
: m_messageID(0)
{
}

template <typename EnumType>
explicit MessageID(EnumType messageKind, unsigned char flags = 0)
: m_messageID(stripMostSignificantBit(flags << 24 | (MessageKindTraits<EnumType>::messageClass) << 16 | messageKind))
@@ -113,11 +118,6 @@ class MessageID {
unsigned char getFlags() const { return (m_messageID & 0xff000000) >> 24; }
unsigned char getClass() const { return (m_messageID & 0x00ff0000) >> 16; }

MessageID()
: m_messageID(0)
{
}

unsigned m_messageID;
};

@@ -115,7 +115,12 @@ static inline size_t machMessageSize(size_t bodySize, size_t numberOfPortDescrip
return round_msg(size);
}

void Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
bool Connection::platformCanSendOutgoingMessages() const
{
return true;
}

bool Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
{
Vector<Attachment> attachments = arguments->releaseAttachments();

@@ -199,8 +204,9 @@ void Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEnc
kern_return_t kr = mach_msg(header, MACH_SEND_MSG, messageSize, 0, MACH_PORT_NULL, MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
if (kr != KERN_SUCCESS) {
// FIXME: What should we do here?
return;
}

return true;
}

void Connection::initializeDeadNameSource()
@@ -104,10 +104,14 @@ bool Connection::open()
return m_isConnected;
}

void Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
bool Connection::platformCanSendOutgoingMessages() const
{
if (!m_socket)
return;
return m_socket;
}

bool Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
{
ASSERT(m_socket);

// We put the message ID last.
arguments->encodeUInt32(messageID.toInt());
@@ -121,6 +125,7 @@ void Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEnc
qint64 bytesWritten = m_socket->write(reinterpret_cast<char*>(arguments->buffer()), arguments->bufferSize());

ASSERT(bytesWritten == arguments->bufferSize());
return true;
}

} // namespace CoreIPC
@@ -82,7 +82,10 @@ bool Connection::createServerAndClientIdentifiers(HANDLE& serverIdentifier, HAND
void Connection::platformInitialize(Identifier identifier)
{
memset(&m_readState, 0, sizeof(m_readState));
m_readState.hEvent = ::CreateEventW(0, false, false, 0);
m_readState.hEvent = ::CreateEventW(0, FALSE, FALSE, 0);

memset(&m_writeState, 0, sizeof(m_writeState));
m_writeState.hEvent = ::CreateEventW(0, FALSE, FALSE, 0);

m_connectionPipe = identifier;
}
@@ -97,6 +100,9 @@ void Connection::platformInvalidate()
::CloseHandle(m_readState.hEvent);
m_readState.hEvent = 0;

::CloseHandle(m_writeState.hEvent);
m_writeState.hEvent = 0;

::CloseHandle(m_connectionPipe);
m_connectionPipe = INVALID_HANDLE_VALUE;
}
@@ -227,10 +233,27 @@ void Connection::readEventHandler()
}
}

void Connection::writeEventHandler()
{
DWORD numberOfBytesWritten = 0;
if (!::GetOverlappedResult(m_connectionPipe, &m_writeState, &numberOfBytesWritten, FALSE)) {
DWORD error = ::GetLastError();
ASSERT_NOT_REACHED();
}

// The pending write has finished, so we are now done with its arguments. Clearing this member
// will allow us to send messages again.
m_pendingWriteArguments = 0;

// Now that the pending write has finished, we can try to send a new message.
sendOutgoingMessages();
}

bool Connection::open()
{
// Start listening for read state events.
// Start listening for read and write state events.
m_connectionQueue.registerHandle(m_readState.hEvent, WorkItem::create(this, &Connection::readEventHandler));
m_connectionQueue.registerHandle(m_writeState.hEvent, WorkItem::create(this, &Connection::writeEventHandler));

if (m_isServer) {
// Wait for a connection.
@@ -261,31 +284,44 @@ bool Connection::open()
return true;
}

void Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
bool Connection::platformCanSendOutgoingMessages() const
{
// We only allow sending one asynchronous message at a time. If we wanted to send more than one
// at once, we'd have to use multiple OVERLAPPED structures and hold onto multiple pending
// ArgumentEncoders (one of each for each simultaneous asynchronous message).
return !m_pendingWriteArguments;
}

bool Connection::sendOutgoingMessage(MessageID messageID, PassOwnPtr<ArgumentEncoder> arguments)
{
ASSERT(!m_pendingWriteArguments);

// Just bail if the handle has been closed.
if (m_connectionPipe == INVALID_HANDLE_VALUE)
return;
return false;

// We put the message ID last.
arguments->encodeUInt32(messageID.toInt());

// Write the outgoing message.
OVERLAPPED overlapped = { 0 };

DWORD bytesWritten;
if (::WriteFile(m_connectionPipe, arguments->buffer(), arguments->bufferSize(), &bytesWritten, &overlapped)) {
if (::WriteFile(m_connectionPipe, arguments->buffer(), arguments->bufferSize(), 0, &m_writeState)) {
// We successfully sent this message.
return;
return true;
}

DWORD error = ::GetLastError();
if (error == ERROR_IO_PENDING) {
// The message will be sent soon.
return;
if (error != ERROR_IO_PENDING) {
ASSERT_NOT_REACHED();
return false;
}

ASSERT_NOT_REACHED();
// The message will be sent soon. Hold onto the arguments so that they won't be destroyed
// before the write completes.
m_pendingWriteArguments = arguments;

// We can only send one asynchronous message at a time (see comment in platformCanSendOutgoingMessages).
return false;
}

} // namespace CoreIPC

0 comments on commit 9a07ad4

Please sign in to comment.