diff --git a/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp b/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp index e70f35884f510..7d3b6b55fda1e 100644 --- a/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp +++ b/src/coreclr/src/debug/debug-pal/unix/diagnosticsipc.cpp @@ -12,13 +12,20 @@ #include "diagnosticsipc.h" #include "processdescriptor.h" -IpcStream::DiagnosticsIpc::DiagnosticsIpc(const int serverSocket, sockaddr_un *const pServerAddress) : +#if __GNUC__ + #include +#else + #include +#endif // __GNUC__ + +IpcStream::DiagnosticsIpc::DiagnosticsIpc(const int serverSocket, sockaddr_un *const pServerAddress, ConnectionMode mode) : + mode(mode), _serverSocket(serverSocket), _pServerAddress(new sockaddr_un), - _isClosed(false) + _isClosed(false), + _isListening(false) { _ASSERTE(_pServerAddress != nullptr); - _ASSERTE(_serverSocket != -1); _ASSERTE(pServerAddress != nullptr); if (_pServerAddress == nullptr || pServerAddress == nullptr) @@ -32,24 +39,8 @@ IpcStream::DiagnosticsIpc::~DiagnosticsIpc() delete _pServerAddress; } -IpcStream::DiagnosticsIpc *IpcStream::DiagnosticsIpc::Create(const char *const pIpcName, ErrorCallback callback) +IpcStream::DiagnosticsIpc *IpcStream::DiagnosticsIpc::Create(const char *const pIpcName, ConnectionMode mode, ErrorCallback callback) { -#ifdef __APPLE__ - mode_t prev_mask = umask(~(S_IRUSR | S_IWUSR)); // This will set the default permission bit to 600 -#endif // __APPLE__ - - const int serverSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); - if (serverSocket == -1) - { - if (callback != nullptr) - callback(strerror(errno), errno); -#ifdef __APPLE__ - umask(prev_mask); -#endif // __APPLE__ - _ASSERTE(!"Failed to create diagnostics IPC socket."); - return nullptr; - } - sockaddr_un serverAddress{}; serverAddress.sun_family = AF_UNIX; @@ -71,6 +62,24 @@ IpcStream::DiagnosticsIpc *IpcStream::DiagnosticsIpc::Create(const char *const p "socket"); } + if (mode == ConnectionMode::CLIENT) + return new IpcStream::DiagnosticsIpc(-1, &serverAddress, ConnectionMode::CLIENT); + +#ifdef __APPLE__ + mode_t prev_mask = umask(~(S_IRUSR | S_IWUSR)); // This will set the default permission bit to 600 +#endif // __APPLE__ + + const int serverSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + if (serverSocket == -1) + { + if (callback != nullptr) + callback(strerror(errno), errno); +#ifdef __APPLE__ + umask(prev_mask); +#endif // __APPLE__ + _ASSERTE(!"Failed to create diagnostics IPC socket."); + return nullptr; + } #ifndef __APPLE__ if (fchmod(serverSocket, S_IRUSR | S_IWUSR) == -1) @@ -99,33 +108,52 @@ IpcStream::DiagnosticsIpc *IpcStream::DiagnosticsIpc::Create(const char *const p return nullptr; } - const int fSuccessfulListen = ::listen(serverSocket, /* backlog */ 255); +#ifdef __APPLE__ + umask(prev_mask); +#endif // __APPLE__ + + return new IpcStream::DiagnosticsIpc(serverSocket, &serverAddress, mode); +} + +bool IpcStream::DiagnosticsIpc::Listen(ErrorCallback callback) +{ + _ASSERTE(mode == ConnectionMode::SERVER); + if (mode != ConnectionMode::SERVER) + { + if (callback != nullptr) + callback("Cannot call Listen on a client connection", -1); + return false; + } + + if (_isListening) + return true; + + const int fSuccessfulListen = ::listen(_serverSocket, /* backlog */ 255); if (fSuccessfulListen == -1) { if (callback != nullptr) callback(strerror(errno), errno); _ASSERTE(fSuccessfulListen != -1); - const int fSuccessUnlink = ::unlink(serverAddress.sun_path); + const int fSuccessUnlink = ::unlink(_pServerAddress->sun_path); _ASSERTE(fSuccessUnlink != -1); - const int fSuccessClose = ::close(serverSocket); + const int fSuccessClose = ::close(_serverSocket); _ASSERTE(fSuccessClose != -1); -#ifdef __APPLE__ - umask(prev_mask); -#endif // __APPLE__ - return nullptr; + return false; + } + else + { + _isListening = true; + return true; } - -#ifdef __APPLE__ - umask(prev_mask); -#endif // __APPLE__ - - return new IpcStream::DiagnosticsIpc(serverSocket, &serverAddress); } -IpcStream *IpcStream::DiagnosticsIpc::Accept(ErrorCallback callback) const +IpcStream *IpcStream::DiagnosticsIpc::Accept(ErrorCallback callback) { + _ASSERTE(mode == ConnectionMode::SERVER); + _ASSERTE(_isListening); + sockaddr_un from; socklen_t fromlen = sizeof(from); const int clientSocket = ::accept(_serverSocket, (sockaddr *)&from, &fromlen); @@ -136,7 +164,114 @@ IpcStream *IpcStream::DiagnosticsIpc::Accept(ErrorCallback callback) const return nullptr; } - return new IpcStream(clientSocket); + return new IpcStream(clientSocket, mode); +} + +IpcStream *IpcStream::DiagnosticsIpc::Connect(ErrorCallback callback) +{ + _ASSERTE(mode == ConnectionMode::CLIENT); + + sockaddr_un clientAddress{}; + clientAddress.sun_family = AF_UNIX; + const int clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + if (clientSocket == -1) + { + if (callback != nullptr) + callback(strerror(errno), errno); + return nullptr; + } + + // We don't expect this to block since this is a Unix Domain Socket. `connect` may block until the + // TCP handshake is complete for TCP/IP sockets, but UDS don't use TCP. `connect` will return even if + // the server hasn't called `accept`. + if (::connect(clientSocket, (struct sockaddr *)_pServerAddress, sizeof(*_pServerAddress)) < 0) + { + if (callback != nullptr) + callback(strerror(errno), errno); + return nullptr; + } + + return new IpcStream(clientSocket, ConnectionMode::CLIENT); +} + +int32_t IpcStream::DiagnosticsIpc::Poll(IpcPollHandle *rgIpcPollHandles, uint32_t nHandles, int32_t timeoutMs, ErrorCallback callback) +{ + // prepare the pollfd structs + pollfd *pollfds = new pollfd[nHandles]; + for (uint32_t i = 0; i < nHandles; i++) + { + rgIpcPollHandles[i].revents = 0; // ignore any values in revents + int fd = -1; + if (rgIpcPollHandles[i].pIpc != nullptr) + { + // SERVER + _ASSERTE(rgIpcPollHandles[i].pIpc->mode == ConnectionMode::SERVER); + fd = rgIpcPollHandles[i].pIpc->_serverSocket; + } + else + { + // CLIENT + _ASSERTE(rgIpcPollHandles[i].pStream != nullptr); + fd = rgIpcPollHandles[i].pStream->_clientSocket; + } + + pollfds[i].fd = fd; + pollfds[i].events = POLLIN; + } + + int retval = poll(pollfds, nHandles, timeoutMs); + + // Check results + if (retval < 0) + { + for (uint32_t i = 0; i < nHandles; i++) + { + if ((pollfds[i].revents & POLLERR) && callback != nullptr) + callback(strerror(errno), errno); + rgIpcPollHandles[i].revents = (uint8_t)PollEvents::ERR; + } + delete[] pollfds; + return -1; + } + else if (retval == 0) + { + // we timed out + delete[] pollfds; + return 0; + } + + for (uint32_t i = 0; i < nHandles; i++) + { + if (pollfds[i].revents != 0) + { + // error check FIRST + if (pollfds[i].revents & POLLHUP) + { + // check for hangup first because a closed socket + // will technically meet the requirements for POLLIN + // i.e., a call to recv/read won't block + rgIpcPollHandles[i].revents = (uint8_t)PollEvents::HANGUP; + delete[] pollfds; + return -1; + } + else if ((pollfds[i].revents & (POLLERR|POLLNVAL))) + { + if (callback != nullptr) + callback("Poll error", (uint32_t)pollfds[i].revents); + rgIpcPollHandles[i].revents = (uint8_t)PollEvents::ERR; + delete[] pollfds; + return -1; + } + else if (pollfds[i].revents & POLLIN) + { + rgIpcPollHandles[i].revents = (uint8_t)PollEvents::SIGNALED; + break; + } + } + } + + delete[] pollfds; + return 1; } void IpcStream::DiagnosticsIpc::Close(ErrorCallback callback) @@ -172,6 +307,11 @@ void IpcStream::DiagnosticsIpc::Unlink(ErrorCallback callback) } IpcStream::~IpcStream() +{ + Close(); +} + +void IpcStream::Close(ErrorCallback) { if (_clientSocket != -1) { @@ -179,38 +319,89 @@ IpcStream::~IpcStream() const int fSuccessClose = ::close(_clientSocket); _ASSERTE(fSuccessClose != -1); + _clientSocket = -1; } } -bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nBytesRead) const +bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nBytesRead, const int32_t timeoutMs) { _ASSERTE(lpBuffer != nullptr); - const ssize_t ssize = ::recv(_clientSocket, lpBuffer, nBytesToRead, 0); - const bool fSuccess = ssize != -1; + if (timeoutMs != InfiniteTimeout) + { + pollfd pfd; + pfd.fd = _clientSocket; + pfd.events = POLLIN; + int retval = poll(&pfd, 1, timeoutMs); + if (retval <= 0 || pfd.revents != POLLIN) + { + // timeout or error + return false; + } + // else fallthrough + } + + uint8_t *lpBufferCursor = (uint8_t*)lpBuffer; + ssize_t currentBytesRead = 0; + ssize_t totalBytesRead = 0; + bool fSuccess = true; + while (fSuccess && nBytesToRead - totalBytesRead > 0) + { + currentBytesRead = ::recv(_clientSocket, lpBufferCursor, nBytesToRead - totalBytesRead, 0); + fSuccess = currentBytesRead != 0; + if (!fSuccess) + break; + totalBytesRead += currentBytesRead; + lpBufferCursor += currentBytesRead; + } if (!fSuccess) { // TODO: Add error handling. } - nBytesRead = static_cast(ssize); + nBytesRead = static_cast(totalBytesRead); return fSuccess; } -bool IpcStream::Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32_t &nBytesWritten) const +bool IpcStream::Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32_t &nBytesWritten, const int32_t timeoutMs) { _ASSERTE(lpBuffer != nullptr); - const ssize_t ssize = ::send(_clientSocket, lpBuffer, nBytesToWrite, 0); - const bool fSuccess = ssize != -1; + if (timeoutMs != InfiniteTimeout) + { + pollfd pfd; + pfd.fd = _clientSocket; + pfd.events = POLLOUT; + int retval = poll(&pfd, 1, timeoutMs); + if (retval <= 0 || pfd.revents != POLLOUT) + { + // timeout or error + return false; + } + // else fallthrough + } + + uint8_t *lpBufferCursor = (uint8_t*)lpBuffer; + ssize_t currentBytesWritten = 0; + ssize_t totalBytesWritten = 0; + bool fSuccess = true; + while (fSuccess && nBytesToWrite - totalBytesWritten > 0) + { + currentBytesWritten = ::send(_clientSocket, lpBufferCursor, nBytesToWrite - totalBytesWritten, 0); + fSuccess = currentBytesWritten != -1; + if (!fSuccess) + break; + lpBufferCursor += currentBytesWritten; + totalBytesWritten += currentBytesWritten; + } if (!fSuccess) { // TODO: Add error handling. } - nBytesWritten = static_cast(ssize); + nBytesWritten = static_cast(totalBytesWritten); return fSuccess; } diff --git a/src/coreclr/src/debug/debug-pal/win/diagnosticsipc.cpp b/src/coreclr/src/debug/debug-pal/win/diagnosticsipc.cpp index 36c11857cabe9..81c325f2d867f 100644 --- a/src/coreclr/src/debug/debug-pal/win/diagnosticsipc.cpp +++ b/src/coreclr/src/debug/debug-pal/win/diagnosticsipc.cpp @@ -7,9 +7,14 @@ #include #include "diagnosticsipc.h" -IpcStream::DiagnosticsIpc::DiagnosticsIpc(const char(&namedPipeName)[MaxNamedPipeNameLength]) +#define _ASSERTE assert + +IpcStream::DiagnosticsIpc::DiagnosticsIpc(const char(&namedPipeName)[MaxNamedPipeNameLength], ConnectionMode mode) : + mode(mode), + _isListening(false) { memcpy(_pNamedPipeName, namedPipeName, sizeof(_pNamedPipeName)); + memset(&_oOverlap, 0, sizeof(OVERLAPPED)); } IpcStream::DiagnosticsIpc::~DiagnosticsIpc() @@ -17,7 +22,7 @@ IpcStream::DiagnosticsIpc::~DiagnosticsIpc() Close(); } -IpcStream::DiagnosticsIpc *IpcStream::DiagnosticsIpc::Create(const char *const pIpcName, ErrorCallback callback) +IpcStream::DiagnosticsIpc *IpcStream::DiagnosticsIpc::Create(const char *const pIpcName, ConnectionMode mode, ErrorCallback callback) { char namedPipeName[MaxNamedPipeNameLength]{}; int nCharactersWritten = -1; @@ -43,20 +48,32 @@ IpcStream::DiagnosticsIpc *IpcStream::DiagnosticsIpc::Create(const char *const p { if (callback != nullptr) callback("Failed to generate the named pipe name", nCharactersWritten); - assert(nCharactersWritten != -1); + _ASSERTE(nCharactersWritten != -1); return nullptr; } - return new IpcStream::DiagnosticsIpc(namedPipeName); + return new IpcStream::DiagnosticsIpc(namedPipeName, mode); } -IpcStream *IpcStream::DiagnosticsIpc::Accept(ErrorCallback callback) const +bool IpcStream::DiagnosticsIpc::Listen(ErrorCallback callback) { + _ASSERTE(mode == ConnectionMode::SERVER); + if (mode != ConnectionMode::SERVER) + { + if (callback != nullptr) + callback("Cannot call Listen on a client connection", -1); + return false; + } + + if (_isListening) + return true; + const uint32_t nInBufferSize = 16 * 1024; const uint32_t nOutBufferSize = 16 * 1024; - HANDLE hPipe = ::CreateNamedPipeA( + _hPipe = ::CreateNamedPipeA( _pNamedPipeName, // pipe name - PIPE_ACCESS_DUPLEX, // read/write access + PIPE_ACCESS_DUPLEX | // read/write access + FILE_FLAG_OVERLAPPED, // async listening PIPE_TYPE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, // message type pipe, message-read and blocking mode PIPE_UNLIMITED_INSTANCES, // max. instances nOutBufferSize, // output buffer size @@ -64,19 +81,32 @@ IpcStream *IpcStream::DiagnosticsIpc::Accept(ErrorCallback callback) const 0, // default client time-out NULL); // default security attribute - if (hPipe == INVALID_HANDLE_VALUE) + if (_hPipe == INVALID_HANDLE_VALUE) { if (callback != nullptr) callback("Failed to create an instance of a named pipe.", ::GetLastError()); - return nullptr; + return false; } - const BOOL fSuccess = ::ConnectNamedPipe(hPipe, NULL) != 0; + HANDLE hOverlapEvent = CreateEvent(NULL, true, false, NULL); + if (hOverlapEvent == NULL) + { + if (callback != nullptr) + callback("Failed to create overlap event", ::GetLastError()); + ::CloseHandle(_hPipe); + _hPipe = INVALID_HANDLE_VALUE; + return false; + } + _oOverlap.hEvent = hOverlapEvent; + + BOOL fSuccess = ::ConnectNamedPipe(_hPipe, &_oOverlap) != 0; if (!fSuccess) { const DWORD errorCode = ::GetLastError(); switch (errorCode) { + case ERROR_IO_PENDING: + // There was a pending connection that can be waited on (will happen in poll) case ERROR_PIPE_CONNECTED: // Occurs when a client connects before the function is called. // In this case, there is a connection between client and @@ -86,46 +116,329 @@ IpcStream *IpcStream::DiagnosticsIpc::Accept(ErrorCallback callback) const default: if (callback != nullptr) callback("A client process failed to connect.", errorCode); - ::CloseHandle(hPipe); - return nullptr; + ::CloseHandle(_hPipe); + _hPipe = INVALID_HANDLE_VALUE; + ::CloseHandle(_oOverlap.hEvent); + _oOverlap.hEvent = INVALID_HANDLE_VALUE; + return false; } } - return new IpcStream(hPipe); + _isListening = true; + return true; +} + +IpcStream *IpcStream::DiagnosticsIpc::Accept(ErrorCallback callback) +{ + _ASSERTE(_isListening); + _ASSERTE(mode == ConnectionMode::SERVER); + + DWORD dwDummy = 0; + bool fSuccess = GetOverlappedResult( + _hPipe, // handle + &_oOverlap, // overlapped + &dwDummy, // throw-away dword + true); // wait till event signals + + if (!fSuccess) + { + if (callback != nullptr) + callback("Failed to GetOverlappedResults for NamedPipe server", ::GetLastError()); + return nullptr; + } + + // create new IpcStream using handle and reset the Server object so it can listen again + IpcStream *pStream = new IpcStream(_hPipe, ConnectionMode::SERVER); + + // reset the server + _hPipe = INVALID_HANDLE_VALUE; + _isListening = false; + ::CloseHandle(_oOverlap.hEvent); + memset(&_oOverlap, 0, sizeof(OVERLAPPED)); // clear the overlapped objects state + fSuccess = Listen(callback); + if (!fSuccess) + { + delete pStream; + return nullptr; + } + + return pStream; +} + +IpcStream *IpcStream::DiagnosticsIpc::Connect(ErrorCallback callback) +{ + _ASSERTE(mode == ConnectionMode::CLIENT); + if (mode != ConnectionMode::CLIENT) + { + if (callback != nullptr) + callback("Cannot call connect on a server connection", 0); + return nullptr; + } + + HANDLE hPipe = ::CreateFileA( + _pNamedPipeName, // pipe name + PIPE_ACCESS_DUPLEX, // read/write access + 0, // no sharing + NULL, // default security attributes + OPEN_EXISTING, // opens existing pipe + FILE_FLAG_OVERLAPPED, // Overlapped + NULL); // no template file + + if (hPipe == INVALID_HANDLE_VALUE) + { + if (callback != nullptr) + callback("Failed to connect to named pipe.", ::GetLastError()); + return nullptr; + } + + return new IpcStream(hPipe, mode); } void IpcStream::DiagnosticsIpc::Close(ErrorCallback) { + if (_hPipe != INVALID_HANDLE_VALUE) + { + if (mode == DiagnosticsIpc::ConnectionMode::SERVER) + { + const BOOL fSuccessDisconnectNamedPipe = ::DisconnectNamedPipe(_hPipe); + _ASSERTE(fSuccessDisconnectNamedPipe != 0); + } + + const BOOL fSuccessCloseHandle = ::CloseHandle(_hPipe); + _ASSERTE(fSuccessCloseHandle != 0); + } + + if (_oOverlap.hEvent != INVALID_HANDLE_VALUE) + { + ::CloseHandle(_oOverlap.hEvent); + } +} + +IpcStream::IpcStream(HANDLE hPipe, DiagnosticsIpc::ConnectionMode mode) : + _hPipe(hPipe), + _mode(mode) +{ + memset(&_oOverlap, 0, sizeof(OVERLAPPED)); + _oOverlap.hEvent = CreateEvent(NULL, true, false, NULL); } IpcStream::~IpcStream() +{ + Close(); +} + +void IpcStream::Close(ErrorCallback) { if (_hPipe != INVALID_HANDLE_VALUE) { Flush(); - const BOOL fSuccessDisconnectNamedPipe = ::DisconnectNamedPipe(_hPipe); - assert(fSuccessDisconnectNamedPipe != 0); + if (_mode == DiagnosticsIpc::ConnectionMode::SERVER) + { + const BOOL fSuccessDisconnectNamedPipe = ::DisconnectNamedPipe(_hPipe); + _ASSERTE(fSuccessDisconnectNamedPipe != 0); + } const BOOL fSuccessCloseHandle = ::CloseHandle(_hPipe); - assert(fSuccessCloseHandle != 0); + _ASSERTE(fSuccessCloseHandle != 0); + } + + if (_oOverlap.hEvent != INVALID_HANDLE_VALUE) + { + ::CloseHandle(_oOverlap.hEvent); } } -bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nBytesRead) const +int32_t IpcStream::DiagnosticsIpc::Poll(IpcPollHandle *rgIpcPollHandles, uint32_t nHandles, int32_t timeoutMs, ErrorCallback callback) { - assert(lpBuffer != nullptr); + // load up an array of handles + HANDLE *pHandles = new HANDLE[nHandles]; + for (uint32_t i = 0; i < nHandles; i++) + { + rgIpcPollHandles[i].revents = 0; // ignore any inputs on revents + if (rgIpcPollHandles[i].pIpc != nullptr) + { + // SERVER + _ASSERTE(rgIpcPollHandles[i].pIpc->mode == DiagnosticsIpc::ConnectionMode::SERVER); + pHandles[i] = rgIpcPollHandles[i].pIpc->_oOverlap.hEvent; + } + else + { + // CLIENT + bool fSuccess = false; + DWORD dwDummy = 0; + if (!rgIpcPollHandles[i].pStream->_isTestReading) + { + // check for data by doing an asynchronous 0 byte read. + // This will signal if the pipe closes (hangup) or the server + // sends new data + fSuccess = ::ReadFile( + rgIpcPollHandles[i].pStream->_hPipe, // handle + nullptr, // null buffer + 0, // read 0 bytes + &dwDummy, // dummy variable + &rgIpcPollHandles[i].pStream->_oOverlap); // overlap object to use + rgIpcPollHandles[i].pStream->_isTestReading = true; + if (!fSuccess) + { + DWORD error = ::GetLastError(); + switch (error) + { + case ERROR_IO_PENDING: + pHandles[i] = rgIpcPollHandles[i].pStream->_oOverlap.hEvent; + break; + case ERROR_PIPE_NOT_CONNECTED: + // hangup + rgIpcPollHandles[i].revents = (uint8_t)PollEvents::HANGUP; + delete[] pHandles; + return -1; + default: + if (callback != nullptr) + callback("0 byte async read on client connection failed", error); + delete[] pHandles; + return -1; + } + } + } + else + { + pHandles[i] = rgIpcPollHandles[i].pStream->_oOverlap.hEvent; + } + } + } + + // call wait for multiple obj + DWORD dwWait = WaitForMultipleObjects( + nHandles, // count + pHandles, // handles + false, // Don't wait-all + timeoutMs); + + if (dwWait == WAIT_TIMEOUT) + { + // we timed out + delete[] pHandles; + return 0; + } + + if (dwWait == WAIT_FAILED) + { + // we errored + if (callback != nullptr) + callback("WaitForMultipleObjects failed", ::GetLastError()); + delete[] pHandles; + return -1; + } + + // determine which of the streams signaled + DWORD index = dwWait - WAIT_OBJECT_0; + // error check the index + if (index < 0 || index > (nHandles - 1)) + { + // check if we abandoned something + DWORD abandonedIndex = dwWait - WAIT_ABANDONED_0; + if (abandonedIndex > 0 || abandonedIndex < (nHandles - 1)) + { + rgIpcPollHandles[abandonedIndex].revents = (uint8_t)IpcStream::DiagnosticsIpc::PollEvents::HANGUP; + delete[] pHandles; + return -1; + } + else + { + if (callback != nullptr) + callback("WaitForMultipleObjects failed", ::GetLastError()); + delete[] pHandles; + return -1; + } + } + + // Set revents depending on what signaled the stream + if (rgIpcPollHandles[index].pIpc == nullptr) + { + // CLIENT + // check if the connection got hung up + DWORD dwDummy = 0; + bool fSuccess = GetOverlappedResult(rgIpcPollHandles[index].pStream->_hPipe, + &rgIpcPollHandles[index].pStream->_oOverlap, + &dwDummy, + true); + rgIpcPollHandles[index].pStream->_isTestReading = false; + if (!fSuccess) + { + DWORD error = ::GetLastError(); + if (error == ERROR_PIPE_NOT_CONNECTED) + rgIpcPollHandles[index].revents = (uint8_t)IpcStream::DiagnosticsIpc::PollEvents::HANGUP; + else + { + if (callback != nullptr) + callback("Client connection error", -1); + rgIpcPollHandles[index].revents = (uint8_t)IpcStream::DiagnosticsIpc::PollEvents::ERR; + delete[] pHandles; + return -1; + } + } + else + { + rgIpcPollHandles[index].revents = (uint8_t)IpcStream::DiagnosticsIpc::PollEvents::SIGNALED; + } + } + else + { + // SERVER + rgIpcPollHandles[index].revents = (uint8_t)IpcStream::DiagnosticsIpc::PollEvents::SIGNALED; + } + + delete[] pHandles; + return 1; +} + +bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nBytesRead, const int32_t timeoutMs) +{ + _ASSERTE(lpBuffer != nullptr); DWORD nNumberOfBytesRead = 0; - const bool fSuccess = ::ReadFile( + LPOVERLAPPED overlap = &_oOverlap; + bool fSuccess = ::ReadFile( _hPipe, // handle to pipe lpBuffer, // buffer to receive data nBytesToRead, // size of buffer &nNumberOfBytesRead, // number of bytes read - NULL) != 0; // not overlapped I/O + overlap) != 0; // overlapped I/O if (!fSuccess) { + if (timeoutMs == InfiniteTimeout) + { + fSuccess = GetOverlappedResult(_hPipe, + overlap, + &nNumberOfBytesRead, + true) != 0; + } + else + { + DWORD dwError = GetLastError(); + if (dwError == ERROR_IO_PENDING) + { + DWORD dwWait = WaitForSingleObject(_oOverlap.hEvent, (DWORD)timeoutMs); + if (dwWait == WAIT_OBJECT_0) + { + // get the result + fSuccess = GetOverlappedResult(_hPipe, + overlap, + &nNumberOfBytesRead, + true) != 0; + } + else + { + // cancel IO and ensure the cancel happened + if (CancelIo(_hPipe)) + { + // check if the async write beat the cancellation + fSuccess = GetOverlappedResult(_hPipe, overlap, &nNumberOfBytesRead, true) != 0; + } + } + } + } // TODO: Add error handling. } @@ -133,20 +446,54 @@ bool IpcStream::Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nByt return fSuccess; } -bool IpcStream::Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32_t &nBytesWritten) const +bool IpcStream::Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32_t &nBytesWritten, const int32_t timeoutMs) { - assert(lpBuffer != nullptr); + _ASSERTE(lpBuffer != nullptr); DWORD nNumberOfBytesWritten = 0; - const bool fSuccess = ::WriteFile( + LPOVERLAPPED overlap = &_oOverlap; + bool fSuccess = ::WriteFile( _hPipe, // handle to pipe lpBuffer, // buffer to write from nBytesToWrite, // number of bytes to write &nNumberOfBytesWritten, // number of bytes written - NULL) != 0; // not overlapped I/O + overlap) != 0; // overlapped I/O if (!fSuccess) { + DWORD dwError = GetLastError(); + if (dwError == ERROR_IO_PENDING) + { + if (timeoutMs == InfiniteTimeout) + { + // if we're waiting infinitely, don't bother with extra kernel call + fSuccess = GetOverlappedResult(_hPipe, + overlap, + &nNumberOfBytesWritten, + true) != 0; + } + else + { + DWORD dwWait = WaitForSingleObject(_oOverlap.hEvent, (DWORD)timeoutMs); + if (dwWait == WAIT_OBJECT_0) + { + // get the result + fSuccess = GetOverlappedResult(_hPipe, + overlap, + &nNumberOfBytesWritten, + true) != 0; + } + else + { + // cancel IO and ensure the cancel happened + if (CancelIo(_hPipe)) + { + // check if the async write beat the cancellation + fSuccess = GetOverlappedResult(_hPipe, overlap, &nNumberOfBytesWritten, true) != 0; + } + } + } + } // TODO: Add error handling. } diff --git a/src/coreclr/src/debug/inc/diagnosticsipc.h b/src/coreclr/src/debug/inc/diagnosticsipc.h index eabea6c3ceaea..94e817fa70fd6 100644 --- a/src/coreclr/src/debug/inc/diagnosticsipc.h +++ b/src/coreclr/src/debug/inc/diagnosticsipc.h @@ -18,23 +18,78 @@ typedef void (*ErrorCallback)(const char *szMessage, uint32_t code); class IpcStream final { public: + static constexpr int32_t InfiniteTimeout = -1; ~IpcStream(); - bool Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nBytesRead) const; - bool Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32_t &nBytesWritten) const; + bool Read(void *lpBuffer, const uint32_t nBytesToRead, uint32_t &nBytesRead, const int32_t timeoutMs = IpcStream::InfiniteTimeout); + bool Write(const void *lpBuffer, const uint32_t nBytesToWrite, uint32_t &nBytesWritten, const int32_t timeoutMs = IpcStream::InfiniteTimeout); bool Flush() const; + void Close(ErrorCallback callback = nullptr); class DiagnosticsIpc final { public: + enum ConnectionMode + { + CLIENT, + SERVER + }; + + enum class PollEvents : uint8_t + { + TIMEOUT = 0x00, // implies timeout + SIGNALED = 0x01, // ready for use + HANGUP = 0x02, // connection remotely closed + ERR = 0x04 // other error + }; + + // The bookeeping struct used for polling on server and client structs + struct IpcPollHandle + { + // Only one of these will be non-null, treat as a union + DiagnosticsIpc *pIpc; + IpcStream *pStream; + + // contains some set of PollEvents + // will be set by Poll + // Any values here are ignored by Poll + uint8_t revents; + + // a cookie assignable by upstream users for additional bookkeeping + void *pUserData; + }; + + // Poll + // Paramters: + // - IpcPollHandle * rgpIpcPollHandles: Array of IpcPollHandles to poll + // - uint32_t nHandles: The number of handles to poll + // - int32_t timeoutMs: The timeout in milliseconds for the poll (-1 == infinite) + // Returns: + // int32_t: -1 on error, 0 on timeout, >0 on successful poll + // Remarks: + // Check the events returned in revents for each IpcPollHandle to find the signaled handle. + // Signaled DiagnosticsIpcs can call Accept() without blocking. + // Signaled IpcStreams can call Read(...) without blocking. + // The caller is responsible for cleaning up "hung up" connections. + static int32_t Poll(IpcPollHandle *rgIpcPollHandles, uint32_t nHandles, int32_t timeoutMs, ErrorCallback callback = nullptr); + + ConnectionMode mode; + ~DiagnosticsIpc(); - //! Creates an IPC object - static DiagnosticsIpc *Create(const char *const pIpcName, ErrorCallback callback = nullptr); + // Creates an IPC object + static DiagnosticsIpc *Create(const char *const pIpcName, ConnectionMode mode, ErrorCallback callback = nullptr); + + // puts the DiagnosticsIpc into Listening Mode + // Re-entrant safe + bool Listen(ErrorCallback callback = nullptr); + + // produces a connected stream from a server-mode DiagnosticsIpc. Blocks until a connection is available. + IpcStream *Accept(ErrorCallback callback = nullptr); - //! Enables the underlaying IPC implementation to accept connection. - IpcStream *Accept(ErrorCallback callback = nullptr) const; + // Connect to a server and returns a connected stream + IpcStream *Connect(ErrorCallback callback = nullptr); - //! Closes an open IPC. + //!Closes an open IPC. void Close(ErrorCallback callback = nullptr); private: @@ -44,18 +99,22 @@ class IpcStream final sockaddr_un *const _pServerAddress; bool _isClosed; - DiagnosticsIpc(const int serverSocket, sockaddr_un *const pServerAddress); + DiagnosticsIpc(const int serverSocket, sockaddr_un *const pServerAddress, ConnectionMode mode = ConnectionMode::SERVER); - //! Used to unlink the socket so it can be removed from the filesystem - //! when the last reference to it is closed. + // Used to unlink the socket so it can be removed from the filesystem + // when the last reference to it is closed. void Unlink(ErrorCallback callback = nullptr); #else static const uint32_t MaxNamedPipeNameLength = 256; char _pNamedPipeName[MaxNamedPipeNameLength]; // https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-createnamedpipea + HANDLE _hPipe = INVALID_HANDLE_VALUE; + OVERLAPPED _oOverlap = {}; - DiagnosticsIpc(const char(&namedPipeName)[MaxNamedPipeNameLength]); + DiagnosticsIpc(const char(&namedPipeName)[MaxNamedPipeNameLength], ConnectionMode mode = ConnectionMode::SERVER); #endif /* TARGET_UNIX */ + bool _isListening; + DiagnosticsIpc() = delete; DiagnosticsIpc(const DiagnosticsIpc &src) = delete; DiagnosticsIpc(DiagnosticsIpc &&src) = delete; @@ -66,12 +125,17 @@ class IpcStream final private: #ifdef TARGET_UNIX int _clientSocket = -1; - IpcStream(int clientSocket) : _clientSocket(clientSocket) {} + IpcStream(int clientSocket, int serverSocket, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER) + : _clientSocket(clientSocket), _mode(mode) {} #else HANDLE _hPipe = INVALID_HANDLE_VALUE; - IpcStream(HANDLE hPipe) : _hPipe(hPipe) {} + OVERLAPPED _oOverlap = {}; + BOOL _isTestReading = false; // used to check whether we are already doing a 0-byte read to test for data + IpcStream(HANDLE hPipe, DiagnosticsIpc::ConnectionMode mode = DiagnosticsIpc::ConnectionMode::SERVER); #endif /* TARGET_UNIX */ + DiagnosticsIpc::ConnectionMode _mode; + IpcStream() = delete; IpcStream(const IpcStream &src) = delete; IpcStream(IpcStream &&src) = delete; diff --git a/src/coreclr/src/inc/clrconfigvalues.h b/src/coreclr/src/inc/clrconfigvalues.h index df0d44af48e92..ed3431c7f5df9 100644 --- a/src/coreclr/src/inc/clrconfigvalues.h +++ b/src/coreclr/src/inc/clrconfigvalues.h @@ -726,7 +726,7 @@ RETAIL_CONFIG_DWORD_INFO(INTERNAL_EventPipeProcNumbers, W("EventPipeProcNumbers" // // Diagnostics Server // -RETAIL_CONFIG_STRING_INFO_EX(EXTERNAL_DOTNET_DiagnosticsServerAddress, W("DOTNET_DiagnosticsServerAddress"), "The full path including filename for the OS transport (NamedPipe on Windows; Unix Domain Socket on Linux) to be used by the Diagnostics Server", CLRConfig::DontPrependCOMPlus_); +RETAIL_CONFIG_STRING_INFO_EX(EXTERNAL_DOTNET_DiagnosticsMonitorAddress, W("DOTNET_DiagnosticsMonitorAddress"), "NamedPipe path without '\\\\.\\pipe\\' on Windows; Full path of Unix Domain Socket on Linux/Unix. Used for Diagnostics Monitoring Agents.", CLRConfig::DontPrependCOMPlus_); // // LTTng diff --git a/src/coreclr/src/inc/corhlprpriv.h b/src/coreclr/src/inc/corhlprpriv.h index 8fcafd08d93c3..7b9e5f1f88567 100644 --- a/src/coreclr/src/inc/corhlprpriv.h +++ b/src/coreclr/src/inc/corhlprpriv.h @@ -507,6 +507,7 @@ class CQuickArrayList : protected CQuickArray using CQuickArray::AllocNoThrow; using CQuickArray::ReSizeNoThrow; using CQuickArray::MaxSize; + using CQuickArray::Ptr; CQuickArrayList() : m_curSize(0) diff --git a/src/coreclr/src/vm/CMakeLists.txt b/src/coreclr/src/vm/CMakeLists.txt index 2ce12c7250b46..3e37cebdc10cf 100644 --- a/src/coreclr/src/vm/CMakeLists.txt +++ b/src/coreclr/src/vm/CMakeLists.txt @@ -364,6 +364,7 @@ set(VM_SOURCES_WKS interoputil.cpp interpreter.cpp invokeutil.cpp + ipcstreamfactory.cpp jithelpers.cpp managedmdimport.cpp marshalnative.cpp @@ -482,6 +483,7 @@ set(VM_HEADERS_WKS interpreter.h interpreter.hpp invokeutil.h + ipcstreamfactory.h managedmdimport.hpp marshalnative.h methodtablebuilder.h diff --git a/src/coreclr/src/vm/diagnosticserver.cpp b/src/coreclr/src/vm/diagnosticserver.cpp index 5a80396179ae0..cb01fce5b4bbe 100644 --- a/src/coreclr/src/vm/diagnosticserver.cpp +++ b/src/coreclr/src/vm/diagnosticserver.cpp @@ -4,6 +4,7 @@ #include "common.h" #include "diagnosticserver.h" +#include "ipcstreamfactory.h" #include "eventpipeprotocolhelper.h" #include "dumpdiagnosticprotocolhelper.h" #include "profilerdiagnosticprotocolhelper.h" @@ -19,7 +20,6 @@ #ifdef FEATURE_PERFTRACING -IpcStream::DiagnosticsIpc *DiagnosticServer::s_pIpc = nullptr; Volatile DiagnosticServer::s_shuttingDown(false); DWORD WINAPI DiagnosticServer::DiagnosticsServerThread(LPVOID) @@ -29,11 +29,11 @@ DWORD WINAPI DiagnosticServer::DiagnosticsServerThread(LPVOID) NOTHROW; GC_TRIGGERS; MODE_PREEMPTIVE; - PRECONDITION(s_pIpc != nullptr); + PRECONDITION(IpcStreamFactory::HasActiveConnections()); } CONTRACTL_END; - if (s_pIpc == nullptr) + if (!IpcStreamFactory::HasActiveConnections()) { STRESS_LOG0(LF_DIAGNOSTICS_PORT, LL_ERROR, "Diagnostics IPC listener was undefined\n"); return 1; @@ -47,8 +47,7 @@ DWORD WINAPI DiagnosticServer::DiagnosticsServerThread(LPVOID) { while (!s_shuttingDown) { - // FIXME: Ideally this would be something like a std::shared_ptr - IpcStream *pStream = s_pIpc->Accept(LoggingCallback); + IpcStream *pStream = IpcStreamFactory::GetNextAvailableStream(LoggingCallback); if (pStream == nullptr) continue; @@ -136,7 +135,7 @@ bool DiagnosticServer::Initialize() }; NewArrayHolder address = nullptr; - CLRConfigStringHolder wAddress = CLRConfig::GetConfigValue(CLRConfig::EXTERNAL_DOTNET_DiagnosticsServerAddress); + CLRConfigStringHolder wAddress = CLRConfig::GetConfigValue(CLRConfig::EXTERNAL_DOTNET_DiagnosticsMonitorAddress); int nCharactersWritten = 0; if (wAddress != nullptr) { @@ -147,12 +146,14 @@ bool DiagnosticServer::Initialize() nCharactersWritten = WideCharToMultiByte(CP_UTF8, 0, wAddress, -1, address, nCharactersWritten, NULL, NULL); assert(nCharactersWritten != 0); } + + // Create the client mode connection + fSuccess &= IpcStreamFactory::CreateClient(address, ErrorCallback); } - // TODO: Should we handle/assert that (s_pIpc == nullptr)? - s_pIpc = IpcStream::DiagnosticsIpc::Create(address, ErrorCallback); + fSuccess &= IpcStreamFactory::CreateServer(nullptr, ErrorCallback); - if (s_pIpc != nullptr) + if (IpcStreamFactory::HasActiveConnections()) { #ifdef FEATURE_AUTO_TRACE auto_trace_init(); @@ -163,14 +164,13 @@ bool DiagnosticServer::Initialize() nullptr, // no security attribute 0, // default stack size DiagnosticsServerThread, // thread proc - (LPVOID)s_pIpc, // thread parameter + nullptr, // thread parameter 0, // not suspended &dwThreadId); // returns thread ID if (hServerThread == NULL) { - delete s_pIpc; - s_pIpc = nullptr; + IpcStreamFactory::CloseConnections(); // Failed to create IPC thread. STRESS_LOG1( @@ -215,7 +215,7 @@ bool DiagnosticServer::Shutdown() EX_TRY { - if (s_pIpc != nullptr) + if (IpcStreamFactory::HasActiveConnections()) { auto ErrorCallback = [](const char *szMessage, uint32_t code) { STRESS_LOG2( @@ -225,7 +225,8 @@ bool DiagnosticServer::Shutdown() code, // data1 szMessage); // data2 }; - s_pIpc->Close(ErrorCallback); // This will break the accept waiting for client connection. + + IpcStreamFactory::CloseConnections(); } fSuccess = true; } diff --git a/src/coreclr/src/vm/diagnosticserver.h b/src/coreclr/src/vm/diagnosticserver.h index 393fbda0bd9ae..a5b8f07f7847b 100644 --- a/src/coreclr/src/vm/diagnosticserver.h +++ b/src/coreclr/src/vm/diagnosticserver.h @@ -46,7 +46,6 @@ class DiagnosticServer final static DWORD WINAPI DiagnosticsServerThread(LPVOID lpThreadParameter); private: - static IpcStream::DiagnosticsIpc *s_pIpc; static Volatile s_shuttingDown; }; diff --git a/src/coreclr/src/vm/diagnosticsprotocol.h b/src/coreclr/src/vm/diagnosticsprotocol.h index bbc622a6411a3..e6bd3d4e89a8d 100644 --- a/src/coreclr/src/vm/diagnosticsprotocol.h +++ b/src/coreclr/src/vm/diagnosticsprotocol.h @@ -103,6 +103,60 @@ namespace DiagnosticsIpc const MagicVersion DotnetIpcMagic_V1 = { "DOTNET_IPC_V1" }; + /** + * ==ADVERTISE PROTOCOL== + * Before standard IPC Protocol communication can occur on a client-mode connection + * the runtime must advertise itself over the connection. ALL SUBSEQUENT COMMUNICATION + * IS STANDARD DIAGNOSTICS IPC PROTOCOL COMMUNICATION. + * + * See spec in: dotnet/diagnostics@documentation/design-docs/ipc-spec.md + * + * The flow for Advertise is a one-way burst of 24 bytes consisting of + * 8 bytes - "ADVR_V1\0" (ASCII chars + null byte) + * 16 bytes - random 128 bit number cookie (little-endian) + * 8 bytes - PID (little-endian) + * 2 bytes - unused 2 byte field for futureproofing + */ + + const uint8_t AdvertiseMagic_V1[8] = "ADVR_V1"; + + const uint32_t AdvertiseSize = 34; + + static GUID AdvertiseCookie_V1 = GUID_NULL; + + inline GUID GetAdvertiseCookie_V1() + { + if (AdvertiseCookie_V1 == GUID_NULL) + { + CoCreateGuid(&AdvertiseCookie_V1); + } + + return AdvertiseCookie_V1; + } + + inline bool SendIpcAdvertise_V1(IpcStream *pStream) + { + uint8_t advertiseBuffer[DiagnosticsIpc::AdvertiseSize]; + GUID cookie = GetAdvertiseCookie_V1(); + uint64_t pid = GetCurrentProcessId(); + + uint64_t *buffer = (uint64_t*)advertiseBuffer; + buffer[0] = *(uint64_t*)AdvertiseMagic_V1; + buffer[1] = (((uint64_t)VAL32(cookie.Data1) << 32) | ((uint64_t)VAL16(cookie.Data2) << 16) | VAL16((uint64_t)cookie.Data3)); + buffer[2] = *(uint64_t*)cookie.Data4; + buffer[3] = VAL64(pid); + + // zero out unused field + ((uint16_t*)advertiseBuffer)[16] = VAL16(0); + + uint32_t nBytesWritten = 0; + if (!pStream->Write(advertiseBuffer, sizeof(advertiseBuffer), nBytesWritten, 100 /* ms */)) + return false; + + _ASSERTE(nBytesWritten == sizeof(advertiseBuffer)); + return nBytesWritten == sizeof(advertiseBuffer); + } + const IpcHeader GenericSuccessHeader = { { DotnetIpcMagic_V1 }, diff --git a/src/coreclr/src/vm/ipcstreamfactory.cpp b/src/coreclr/src/vm/ipcstreamfactory.cpp new file mode 100644 index 0000000000000..07e9d4f17ff2d --- /dev/null +++ b/src/coreclr/src/vm/ipcstreamfactory.cpp @@ -0,0 +1,198 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +#include "common.h" +#include "diagnosticsprotocol.h" +#include "ipcstreamfactory.h" + +#ifdef FEATURE_PERFTRACING + +CQuickArrayList IpcStreamFactory::s_rgpConnectionStates = CQuickArrayList(); +Volatile IpcStreamFactory::s_isShutdown = false; + +bool IpcStreamFactory::ClientConnectionState::GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback) +{ + if (_pStream == nullptr) + { + // cache is empty, reconnect, e.g., there was a disconnect + IpcStream *pConnection = _pIpc->Connect(callback); + if (pConnection == nullptr) + { + if (callback != nullptr) + callback("Failed to connect to client connection", -1); + return false; + } + if (!DiagnosticsIpc::SendIpcAdvertise_V1(pConnection)) + { + if (callback != nullptr) + callback("Failed to send advertise message", -1); + delete pConnection; + return false; + } + + _pStream = pConnection; + } + *pIpcPollHandle = { nullptr, _pStream, 0, this }; + return true; +} + +IpcStream *IpcStreamFactory::ClientConnectionState::GetConnectedStream(ErrorCallback callback) +{ + IpcStream *pStream = _pStream; + _pStream = nullptr; + return pStream; +} + +void IpcStreamFactory::ClientConnectionState::Reset(ErrorCallback callback) +{ + delete _pStream; + _pStream = nullptr; +} + +bool IpcStreamFactory::ServerConnectionState::GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback) +{ + *pIpcPollHandle = { _pIpc, nullptr, 0, this }; + return true; +} + +IpcStream *IpcStreamFactory::ServerConnectionState::GetConnectedStream(ErrorCallback callback) +{ + return _pIpc->Accept(callback); +} + +// noop for server +void IpcStreamFactory::ServerConnectionState::Reset(ErrorCallback) +{ + return; +} + +bool IpcStreamFactory::CreateServer(const char *const pIpcName, ErrorCallback callback) +{ + IpcStream::DiagnosticsIpc *pIpc = IpcStream::DiagnosticsIpc::Create(pIpcName, IpcStream::DiagnosticsIpc::ConnectionMode::SERVER, callback); + if (pIpc != nullptr) + { + if (pIpc->Listen(callback)) + { + s_rgpConnectionStates.Push(new ServerConnectionState(pIpc)); + return true; + } + else + { + delete pIpc; + return false; + } + } + else + { + return false; + } +} + +bool IpcStreamFactory::CreateClient(const char *const pIpcName, ErrorCallback callback) +{ + IpcStream::DiagnosticsIpc *pIpc = IpcStream::DiagnosticsIpc::Create(pIpcName, IpcStream::DiagnosticsIpc::ConnectionMode::CLIENT, callback); + if (pIpc != nullptr) + { + s_rgpConnectionStates.Push(new ClientConnectionState(pIpc)); + return true; + } + else + { + return false; + } +} + +bool IpcStreamFactory::HasActiveConnections() +{ + return !s_isShutdown && s_rgpConnectionStates.Size() > 0; +} + +void IpcStreamFactory::CloseConnections(ErrorCallback callback) +{ + s_isShutdown = true; + for (uint32_t i = 0; i < (uint32_t)s_rgpConnectionStates.Size(); i++) + s_rgpConnectionStates[i]->Close(callback); +} + +// helper function for getting timeout +int32_t IpcStreamFactory::GetNextTimeout(int32_t currentTimeoutMs) +{ + if (currentTimeoutMs == s_pollTimeoutInfinite) + { + return s_pollTimeoutMinMs; + } + else + { + return (currentTimeoutMs >= s_pollTimeoutMaxMs) ? + s_pollTimeoutMaxMs : + (int32_t)((float)currentTimeoutMs * s_pollTimeoutFalloffFactor); + } +} + +IpcStream *IpcStreamFactory::GetNextAvailableStream(ErrorCallback callback) +{ + IpcStream *pStream = nullptr; + CQuickArrayList rgIpcPollHandles; + + int32_t pollTimeoutMs = s_pollTimeoutInfinite; + bool fConnectSuccess = true; + uint32_t nPollAttempts = 0; + + while (pStream == nullptr) + { + fConnectSuccess = true; + for (uint32_t i = 0; i < (uint32_t)s_rgpConnectionStates.Size(); i++) + { + IpcStream::DiagnosticsIpc::IpcPollHandle pollHandle = {}; + if (s_rgpConnectionStates[i]->GetIpcPollHandle(&pollHandle, callback)) + { + rgIpcPollHandles.Push(pollHandle); + } + else + { + fConnectSuccess = false; + } + } + + pollTimeoutMs = fConnectSuccess ? + s_pollTimeoutInfinite : + GetNextTimeout(pollTimeoutMs); + + int32_t retval = IpcStream::DiagnosticsIpc::Poll(rgIpcPollHandles.Ptr(), (uint32_t)rgIpcPollHandles.Size(), pollTimeoutMs, callback); + nPollAttempts++; + STRESS_LOG2(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - Poll attempt: %d, timeout: %dms.\n", nPollAttempts, pollTimeoutMs); + + if (retval != 0) + { + for (uint32_t i = 0; i < (uint32_t)rgIpcPollHandles.Size(); i++) + { + switch ((IpcStream::DiagnosticsIpc::PollEvents)rgIpcPollHandles[i].revents) + { + case IpcStream::DiagnosticsIpc::PollEvents::HANGUP: + ((ConnectionState*)(rgIpcPollHandles[i].pUserData))->Reset(callback); + STRESS_LOG1(LF_DIAGNOSTICS_PORT, LL_INFO10, "IpcStreamFactory::GetNextAvailableStream - Poll attempt: %d, connection hung up.\n", nPollAttempts); + pollTimeoutMs = s_pollTimeoutMinMs; + break; + case IpcStream::DiagnosticsIpc::PollEvents::SIGNALED: + if (pStream == nullptr) // only use first signaled stream; will get others on subsequent calls + pStream = ((ConnectionState*)(rgIpcPollHandles[i].pUserData))->GetConnectedStream(callback); + break; + case IpcStream::DiagnosticsIpc::PollEvents::ERR: + return nullptr; + default: + // TODO: Error handling + break; + } + } + } + + // clear the view + while (rgIpcPollHandles.Size() > 0) + rgIpcPollHandles.Pop(); + } + + return pStream; +} + +#endif // FEATURE_PERFTRACING \ No newline at end of file diff --git a/src/coreclr/src/vm/ipcstreamfactory.h b/src/coreclr/src/vm/ipcstreamfactory.h new file mode 100644 index 0000000000000..f39aa92f46cd2 --- /dev/null +++ b/src/coreclr/src/vm/ipcstreamfactory.h @@ -0,0 +1,101 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +#ifndef __IPC_STREAM_FACTORY_H__ +#define __IPC_STREAM_FACTORY_H__ + +#ifdef FEATURE_PERFTRACING + +#include "diagnosticsipc.h" + +class IpcStreamFactory +{ +public: + struct ConnectionState + { + public: + ConnectionState(IpcStream::DiagnosticsIpc *pIpc) : + _pIpc(pIpc), + _pStream(nullptr) + { } + + // returns a pollable handle and performs any preparation required + // e.g., as a side-effect, will connect and advertise on reverse connections + virtual bool GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback = nullptr) = 0; + + // Returns the signaled stream in a usable state + virtual IpcStream *GetConnectedStream(ErrorCallback callback = nullptr) = 0; + + // Resets the connection in the event of a hangup + virtual void Reset(ErrorCallback callback = nullptr) = 0; + + // closes the underlying connections + void Close(ErrorCallback callback = nullptr) + { + if (_pIpc != nullptr) + _pIpc->Close(callback); + if (_pStream != nullptr) + _pStream->Close(callback); + } + + protected: + IpcStream::DiagnosticsIpc *_pIpc; + IpcStream *_pStream; + }; + + struct ClientConnectionState : public ConnectionState + { + ClientConnectionState(IpcStream::DiagnosticsIpc *pIpc) : ConnectionState(pIpc) { } + + // returns a pollable handle and performs any preparation required + bool GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback = nullptr) override; + + // Returns the signaled stream in a usable state + IpcStream *GetConnectedStream(ErrorCallback callback = nullptr) override; + + // Resets the connection in the event of a hangup + void Reset(ErrorCallback callback = nullptr) override; + }; + + struct ServerConnectionState : public ConnectionState + { + ServerConnectionState(IpcStream::DiagnosticsIpc *pIpc) : ConnectionState(pIpc) { } + + // returns a pollable handle and performs any preparation required + bool GetIpcPollHandle(IpcStream::DiagnosticsIpc::IpcPollHandle *pIpcPollHandle, ErrorCallback callback = nullptr) override; + + // Returns the signaled stream in a usable state + IpcStream *GetConnectedStream(ErrorCallback callback = nullptr) override; + + // Resets the connection in the event of a hangup + void Reset(ErrorCallback callback = nullptr) override; + }; + + static bool CreateServer(const char *const pIpcName, ErrorCallback = nullptr); + static bool CreateClient(const char *const pIpcName, ErrorCallback = nullptr); + static IpcStream *GetNextAvailableStream(ErrorCallback = nullptr); + static bool HasActiveConnections(); + static void CloseConnections(ErrorCallback callback = nullptr); +private: + static CQuickArrayList s_rgpConnectionStates; + static Volatile s_isShutdown; + + // Polling timeout semantics + // If client connection is opted in + // and connection succeeds => set timeout to infinite + // and connection fails => set timeout to minimum and scale by falloff factor + // else => set timeout to -1 (infinite) + // + // If an agent closes its socket while we're still connected, + // Poll will return and let us know which connection hung up + static int32_t GetNextTimeout(int32_t currentTimeoutMs); + constexpr static float s_pollTimeoutFalloffFactor = 1.25; + constexpr static int32_t s_pollTimeoutInfinite = -1; + constexpr static int32_t s_pollTimeoutMinMs = 10; + constexpr static int32_t s_pollTimeoutMaxMs = 500; +}; + +#endif // FEATURE_PERFTRACING + +#endif // __IPC_STREAM_FACTORY_H__ \ No newline at end of file diff --git a/src/coreclr/tests/src/tracing/eventpipe/common/Reverse.cs b/src/coreclr/tests/src/tracing/eventpipe/common/Reverse.cs new file mode 100644 index 0000000000000..c4f1c9464e609 --- /dev/null +++ b/src/coreclr/tests/src/tracing/eventpipe/common/Reverse.cs @@ -0,0 +1,178 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.IO; +using System.IO.Pipes; +using System.Net.Sockets; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +namespace Tracing.Tests.Common +{ + /** + * ==ADVERTISE PROTOCOL== + * Before standard IPC Protocol communication can occur on a client-mode connection + * the runtime must advertise itself over the connection. ALL SUBSEQUENT COMMUNICATION + * IS STANDARD DIAGNOSTICS IPC PROTOCOL COMMUNICATION. + * + * The flow for Advertise is a one-way burst of 32 bytes consisting of + * 8 bytes - "ADVR_V1\0" (ASCII chars + null byte) + * 16 bytes - CLR Instance Cookie (little-endian) + * 8 bytes - PID (little-endian) + * 2 bytes - unused for futureproofing + */ + + public class IpcAdvertise + { + public static int Size_V1 => 34; + public static byte[] Magic_V1 => System.Text.Encoding.ASCII.GetBytes("ADVR_V1" + '\0'); + public static int MagicSize_V1 => 8; + + public byte[] Magic = Magic_V1; + public UInt64 ProcessId; + public Guid RuntimeInstanceCookie; + public UInt16 Unused; + + /// + /// + /// + /// (pid, clrInstanceId) + public static IpcAdvertise Parse(Stream stream) + { + var binaryReader = new BinaryReader(stream); + var advertise = new IpcAdvertise() + { + Magic = binaryReader.ReadBytes(Magic_V1.Length), + RuntimeInstanceCookie = new Guid(binaryReader.ReadBytes(16)), + ProcessId = binaryReader.ReadUInt64(), + Unused = binaryReader.ReadUInt16() + }; + + for (int i = 0; i < Magic_V1.Length; i++) + if (advertise.Magic[i] != Magic_V1[i]) + throw new Exception("Invalid advertise message from client connection"); + + // FUTURE: switch on incoming magic and change if version ever increments + return advertise; + } + + override public string ToString() + { + return $"{{ Magic={Magic}; ClrInstanceId={RuntimeInstanceCookie}; ProcessId={ProcessId}; Unused={Unused}; }}"; + } + } + public class ReverseServer + { + public static string MakeServerAddress() + { + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + return "DOTNET_TRACE_TESTS_" + Path.GetRandomFileName(); + } + else + { + return Path.Combine(Path.GetTempPath(), "DOTNET_TRACE_TESTS_" + Path.GetRandomFileName()); + } + } + + private object _server; // _server ::= socket | NamedPipeServerStream + private Socket _clientSocket; // only used on non-Windows + private string _serverAddress; + + public ReverseServer(string serverAddress, int bufferSize = 16 * 1024) + { + _serverAddress = serverAddress; + if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) + { + _server = new NamedPipeServerStream( + serverAddress, + PipeDirection.InOut, + NamedPipeServerStream.MaxAllowedServerInstances, + PipeTransmissionMode.Byte, + PipeOptions.None, + bufferSize, + bufferSize); + } + else + { + if (File.Exists(serverAddress)) + File.Delete(serverAddress); + var remoteEP = new UnixDomainSocketEndPoint(serverAddress); + + var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified); + // socket(7) states that SO_RCVBUF has a minimum of 128 and SO_SNDBUF has minimum of 1024 + socket.SendBufferSize = Math.Max(bufferSize, 1024); + socket.ReceiveBufferSize = Math.Max(bufferSize, 128); + socket.Bind(remoteEP); + socket.Listen(255); + socket.LingerState.Enabled = false; + _server = socket; + } + } + + public async Task AcceptAsync() + { + switch (_server) + { + case NamedPipeServerStream serverStream: + await serverStream.WaitForConnectionAsync(); + return serverStream; + case Socket socket: + _clientSocket = await socket.AcceptAsync(); + return new NetworkStream(_clientSocket); + default: + throw new ArgumentException("Invalid server type"); + } + } + + public void Shutdown() + { + switch (_server) + { + case NamedPipeServerStream serverStream: + try + { + serverStream.Disconnect(); + } + catch {} + finally + { + serverStream.Dispose(); + } + break; + case Socket socket: + try + { + socket.Shutdown(SocketShutdown.Both); + } + catch {} + finally + { + _clientSocket?.Close(); + socket.Close(); + socket.Dispose(); + _clientSocket?.Dispose(); + if (File.Exists(_serverAddress)) + File.Delete(_serverAddress); + } + break; + default: + throw new ArgumentException("Invalid server type"); + } + } + + // Creates the server, listens, and closes the server + public static async Task CreateServerAndReceiveAdvertisement(string serverAddress) + { + var server = new ReverseServer(serverAddress); + Logger.logger.Log("Waiting for connection"); + Stream stream = await server.AcceptAsync(); + Logger.logger.Log("Got a connection"); + IpcAdvertise advertise = IpcAdvertise.Parse(stream); + server.Shutdown(); + return advertise; + } + } +} \ No newline at end of file diff --git a/src/coreclr/tests/src/tracing/eventpipe/common/common.csproj b/src/coreclr/tests/src/tracing/eventpipe/common/common.csproj index 0df38b391df23..a0b36c8336dcc 100644 --- a/src/coreclr/tests/src/tracing/eventpipe/common/common.csproj +++ b/src/coreclr/tests/src/tracing/eventpipe/common/common.csproj @@ -9,5 +9,6 @@ + diff --git a/src/coreclr/tests/src/tracing/eventpipe/reverse/reverse.cs b/src/coreclr/tests/src/tracing/eventpipe/reverse/reverse.cs new file mode 100644 index 0000000000000..5fdc7ca11a235 --- /dev/null +++ b/src/coreclr/tests/src/tracing/eventpipe/reverse/reverse.cs @@ -0,0 +1,339 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Diagnostics.Tracing; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Reflection; +using Microsoft.Diagnostics.Tools.RuntimeClient; +using Tracing.Tests.Common; +using System.Threading; +using System.IO; +using Microsoft.Diagnostics.Tracing; + +namespace Tracing.Tests.ReverseValidation +{ + public class ReverseValidation + { + // The runtime will do an exponential falloff by a factor of 1.25 starting at 10ms with a max of 500ms + // We can time tests out after waiting 30s which should have sufficient attempts + private static int _maxPollTimeMS = 30_000; + + private static async Task WaitTillTimeout(Task task, TimeSpan timeout) + { + using var cts = new CancellationTokenSource(); + var completedTask = await Task.WhenAny(task, Task.Delay(timeout, cts.Token)); + if (completedTask == task) + { + cts.Cancel(); + return await task; + } + else + { + throw new TimeoutException("Task timed out"); + } + } + + public static async Task RunSubprocess(string serverName, Func beforeExecution = null, Func duringExecution = null, Func afterExecution = null) + { + using (var process = new Process()) + { + if (beforeExecution != null) + await beforeExecution(); + + process.StartInfo.UseShellExecute = false; + process.StartInfo.CreateNoWindow = true; + process.StartInfo.Environment.Add("DOTNET_DiagnosticsMonitorAddress", serverName); + process.StartInfo.FileName = Process.GetCurrentProcess().MainModule.FileName; + process.StartInfo.Arguments = new Uri(Assembly.GetExecutingAssembly().CodeBase).LocalPath + " 0"; + Logger.logger.Log($"running sub-process: {process.StartInfo.FileName} {process.StartInfo.Arguments}"); + bool fSuccess = process.Start(); + Logger.logger.Log($"subprocess started: {fSuccess}"); + Logger.logger.Log($"subprocess PID: {process.Id}"); + + while (!EventPipeClient.ListAvailablePorts().Contains(process.Id)) + await Task.Delay(100); + try + { + if (duringExecution != null) + await duringExecution(process.Id); + } + finally + { + process.Kill(); + } + + + if (afterExecution != null) + await afterExecution(); + } + } + + public static async Task TEST_RuntimeIsResilientToServerClosing() + { + string serverName = ReverseServer.MakeServerAddress(); + Logger.logger.Log($"Server name is '{serverName}'"); + await RunSubprocess( + serverName: serverName, + duringExecution: async (_) => + { + var ad1 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad1.ToString()); + var ad2 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad2.ToString()); + var ad3 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad3.ToString()); + var ad4 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad4.ToString()); + } + ); + + return true; + } + + public static async Task TEST_RuntimeConnectsToExistingServer() + { + string serverName = ReverseServer.MakeServerAddress(); + Task advertiseTask = ReverseServer.CreateServerAndReceiveAdvertisement(serverName); + Logger.logger.Log($"Server name is `{serverName}`"); + await RunSubprocess( + serverName: serverName, + duringExecution: async (_) => + { + IpcAdvertise advertise = await WaitTillTimeout(advertiseTask, TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(advertise.ToString()); + } + ); + + return true; + } + + + public static async Task TEST_CanConnectServerAndClientAtSameTime() + { + string serverName = ReverseServer.MakeServerAddress(); + Logger.logger.Log($"Server name is '{serverName}'"); + var server = new ReverseServer(serverName); + await RunSubprocess( + serverName: serverName, + duringExecution: async (int pid) => + { + Task reverseTask = Task.Run(async () => + { + Logger.logger.Log($"Waiting for reverse connection"); + Stream reverseStream = await server.AcceptAsync(); + Logger.logger.Log("Got reverse connection"); + IpcAdvertise advertise = IpcAdvertise.Parse(reverseStream); + Logger.logger.Log(advertise.ToString()); + }); + + Task regularTask = Task.Run(async () => + { + var config = new SessionConfiguration( + circularBufferSizeMB: 1000, + format: EventPipeSerializationFormat.NetTrace, + providers: new List { + new Provider("Microsoft-DotNETCore-SampleProfiler") + }); + Logger.logger.Log("Starting EventPipeSession over standard connection"); + using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId); + Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}"); + using var source = new EventPipeEventSource(stream); + Task readerTask = Task.Run(() => source.Process()); + await Task.Delay(500); + Logger.logger.Log("Stopping EventPipeSession over standard connection"); + EventPipeClient.StopTracing(pid, sessionId); + await readerTask; + Logger.logger.Log("Stopped EventPipeSession over standard connection"); + }); + + await Task.WhenAll(reverseTask, regularTask); + } + ); + + server.Shutdown(); + + return true; + } + + public static async Task TEST_ServerWorksIfClientDoesntAccept() + { + string serverName = ReverseServer.MakeServerAddress(); + Logger.logger.Log($"Server name is '{serverName}'"); + var server = new ReverseServer(serverName); + await RunSubprocess( + serverName: serverName, + duringExecution: async (int pid) => + { + var config = new SessionConfiguration( + circularBufferSizeMB: 10, + format: EventPipeSerializationFormat.NetTrace, + providers: new List { + new Provider("Microsoft-DotNETCore-SampleProfiler") + }); + Logger.logger.Log("Starting EventPipeSession over standard connection"); + using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId); + Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}"); + using var source = new EventPipeEventSource(stream); + Task readerTask = Task.Run(() => source.Process()); + await Task.Delay(500); + Logger.logger.Log("Stopping EventPipeSession over standard connection"); + EventPipeClient.StopTracing(pid, sessionId); + await readerTask; + Logger.logger.Log("Stopped EventPipeSession over standard connection"); + } + ); + + server.Shutdown(); + + return true; + } + + public static async Task TEST_ServerIsResilientToNoBufferAgent() + { + // N.B. - this test is only testing behavior on Windows since Unix Domain Sockets get their buffer size from the + // system configuration and isn't set here. Tests passing on Windows should indicate it would pass on Unix systems as well. + string serverName = ReverseServer.MakeServerAddress(); + Logger.logger.Log($"Server name is '{serverName}'"); + var server = new ReverseServer(serverName, 0); + await RunSubprocess( + serverName: serverName, + duringExecution: async (int pid) => + { + var config = new SessionConfiguration( + circularBufferSizeMB: 10, + format: EventPipeSerializationFormat.NetTrace, + providers: new List { + new Provider("Microsoft-DotNETCore-SampleProfiler") + }); + Logger.logger.Log("Starting EventPipeSession over standard connection"); + using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId); + Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}"); + using var source = new EventPipeEventSource(stream); + Task readerTask = Task.Run(() => source.Process()); + await Task.Delay(500); + Logger.logger.Log("Stopping EventPipeSession over standard connection"); + EventPipeClient.StopTracing(pid, sessionId); + await readerTask; + Logger.logger.Log("Stopped EventPipeSession over standard connection"); + } + ); + + server.Shutdown(); + + return true; + } + + public static async Task TEST_ReverseConnectionCanRecycleWhileTracing() + { + string serverName = ReverseServer.MakeServerAddress(); + Logger.logger.Log($"Server name is '{serverName}'"); + await RunSubprocess( + serverName: serverName, + duringExecution: async (int pid) => + { + Task regularTask = Task.Run(async () => + { + var config = new SessionConfiguration( + circularBufferSizeMB: 1000, + format: EventPipeSerializationFormat.NetTrace, + providers: new List { + new Provider("Microsoft-DotNETCore-SampleProfiler") + }); + Logger.logger.Log("Starting EventPipeSession over standard connection"); + using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId); + Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}"); + using var source = new EventPipeEventSource(stream); + Task readerTask = Task.Run(() => source.Process()); + await Task.Delay(500); + Logger.logger.Log("Stopping EventPipeSession over standard connection"); + EventPipeClient.StopTracing(pid, sessionId); + await readerTask; + Logger.logger.Log("Stopped EventPipeSession over standard connection"); + }); + + Task reverseTask = Task.Run(async () => + { + var ad1 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad1.ToString()); + var ad2 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad2.ToString()); + var ad3 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad3.ToString()); + var ad4 = await WaitTillTimeout(ReverseServer.CreateServerAndReceiveAdvertisement(serverName), TimeSpan.FromMilliseconds(_maxPollTimeMS)); + Logger.logger.Log(ad4.ToString()); + }); + + await Task.WhenAll(reverseTask, regularTask); + } + ); + + return true; + } + + public static async Task TEST_StandardConnectionStillWorksIfReverseConnectionIsBroken() + { + string serverName = ReverseServer.MakeServerAddress(); + Logger.logger.Log($"Server name is '{serverName}'"); + await RunSubprocess( + serverName: serverName, + duringExecution: async (int pid) => + { + var config = new SessionConfiguration( + circularBufferSizeMB: 1000, + format: EventPipeSerializationFormat.NetTrace, + providers: new List { + new Provider("Microsoft-DotNETCore-SampleProfiler") + }); + Logger.logger.Log("Starting EventPipeSession over standard connection"); + using Stream stream = EventPipeClient.CollectTracing(pid, config, out var sessionId); + Logger.logger.Log($"Started EventPipeSession over standard connection with session id: 0x{sessionId:x}"); + using var source = new EventPipeEventSource(stream); + Task readerTask = Task.Run(() => source.Process()); + await Task.Delay(500); + Logger.logger.Log("Stopping EventPipeSession over standard connection"); + EventPipeClient.StopTracing(pid, sessionId); + await readerTask; + Logger.logger.Log("Stopped EventPipeSession over standard connection"); + } + ); + + return true; + } + + public static async Task Main(string[] args) + { + if (args.Length >= 1) + { + await Task.Delay(TimeSpan.FromMinutes(10)); // will be killed in test + return 1; + } + + bool fSuccess = true; + IEnumerable tests = typeof(ReverseValidation).GetMethods().Where(mi => mi.Name.StartsWith("TEST_")); + foreach (var test in tests) + { + Logger.logger.Log($"::== Running test: {test.Name}"); + bool result = true; + try + { + result = await (Task)test.Invoke(null, new object[] {}); + } + catch (Exception e) + { + result = false; + Logger.logger.Log(e.ToString()); + } + fSuccess &= result; + Logger.logger.Log($"Test passed: {result}"); + Logger.logger.Log($""); + + } + return fSuccess ? 100 : -1; + } + } +} \ No newline at end of file diff --git a/src/coreclr/tests/src/tracing/eventpipe/reverse/reverse.csproj b/src/coreclr/tests/src/tracing/eventpipe/reverse/reverse.csproj new file mode 100644 index 0000000000000..2c10c6ed46533 --- /dev/null +++ b/src/coreclr/tests/src/tracing/eventpipe/reverse/reverse.csproj @@ -0,0 +1,15 @@ + + + .NETCoreApp + exe + BuildAndRun + true + 0 + true + true + + + + + + \ No newline at end of file