Skip to content

Commit

Permalink
tests/server/sockfilt.c: avoid race condition without a mutex
Browse files Browse the repository at this point in the history
Avoid loosing any triggered handles by first aborting and joining
the waiting threads before evaluating the individual signal state.

This removes the race condition and therefore need for a mutex.
  • Loading branch information
mback2k committed Jun 16, 2022
1 parent 3084f87 commit a2b7457
Showing 1 changed file with 69 additions and 124 deletions.
193 changes: 69 additions & 124 deletions tests/server/sockfilt.c
Expand Up @@ -404,13 +404,12 @@ static void lograw(unsigned char *buffer, ssize_t len)
struct select_ws_wait_data {
HANDLE handle; /* actual handle to wait for during select */
HANDLE signal; /* internal event to signal handle trigger */
HANDLE abort; /* internal event to abort waiting thread */
HANDLE mutex; /* mutex to prevent event race-condition */
HANDLE abort; /* internal event to abort waiting threads */
};
static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter)
{
struct select_ws_wait_data *data;
HANDLE mutex, signal, handle, handles[2];
HANDLE signal, handle, handles[2];
INPUT_RECORD inputrecord;
LARGE_INTEGER size, pos;
DWORD type, length, ret;
Expand All @@ -422,7 +421,6 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter)
handles[0] = data->abort;
handles[1] = handle;
signal = data->signal;
mutex = data->mutex;
free(data);
}
else
Expand All @@ -442,41 +440,29 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter)
*/
while(WaitForMultipleObjectsEx(1, handles, FALSE, 0, FALSE)
== WAIT_TIMEOUT) {
ret = WaitForSingleObjectEx(mutex, 0, FALSE);
if(ret == WAIT_OBJECT_0) {
/* get total size of file */
length = 0;
size.QuadPart = 0;
size.LowPart = GetFileSize(handle, &length);
if((size.LowPart != INVALID_FILE_SIZE) ||
(GetLastError() == NO_ERROR)) {
size.HighPart = length;
/* get the current position within the file */
pos.QuadPart = 0;
pos.LowPart = SetFilePointer(handle, 0, &pos.HighPart,
FILE_CURRENT);
if((pos.LowPart != INVALID_SET_FILE_POINTER) ||
(GetLastError() == NO_ERROR)) {
/* compare position with size, abort if not equal */
if(size.QuadPart == pos.QuadPart) {
/* sleep and continue waiting */
SleepEx(0, FALSE);
ReleaseMutex(mutex);
continue;
}
/* get total size of file */
length = 0;
size.QuadPart = 0;
size.LowPart = GetFileSize(handle, &length);
if((size.LowPart != INVALID_FILE_SIZE) ||
(GetLastError() == NO_ERROR)) {
size.HighPart = length;
/* get the current position within the file */
pos.QuadPart = 0;
pos.LowPart = SetFilePointer(handle, 0, &pos.HighPart, FILE_CURRENT);
if((pos.LowPart != INVALID_SET_FILE_POINTER) ||
(GetLastError() == NO_ERROR)) {
/* compare position with size, abort if not equal */
if(size.QuadPart == pos.QuadPart) {
/* sleep and continue waiting */
SleepEx(0, FALSE);
continue;
}
}
/* there is some data available, stop waiting */
logmsg("[select_ws_wait_thread] data available, DISK: %p", handle);
SetEvent(signal);
ReleaseMutex(mutex);
break;
}
else if(ret == WAIT_ABANDONED) {
/* we are not allowed to process this event, because select_ws
is post-processing the signalled events and we must exit. */
break;
}
/* there is some data available, stop waiting */
logmsg("[select_ws_wait_thread] data available, DISK: %p", handle);
SetEvent(signal);
}
break;

Expand All @@ -490,33 +476,22 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter)
*/
while(WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE)
== WAIT_OBJECT_0 + 1) {
ret = WaitForSingleObjectEx(mutex, 0, FALSE);
if(ret == WAIT_OBJECT_0) {
/* check if this is an actual console handle */
if(GetConsoleMode(handle, &ret)) {
/* retrieve an event from the console buffer */
length = 0;
if(PeekConsoleInput(handle, &inputrecord, 1, &length)) {
/* check if the event is not an actual key-event */
if(length == 1 && inputrecord.EventType != KEY_EVENT) {
/* purge the non-key-event and continue waiting */
ReadConsoleInput(handle, &inputrecord, 1, &length);
ReleaseMutex(mutex);
continue;
}
/* check if this is an actual console handle */
if(GetConsoleMode(handle, &ret)) {
/* retrieve an event from the console buffer */
length = 0;
if(PeekConsoleInput(handle, &inputrecord, 1, &length)) {
/* check if the event is not an actual key-event */
if(length == 1 && inputrecord.EventType != KEY_EVENT) {
/* purge the non-key-event and continue waiting */
ReadConsoleInput(handle, &inputrecord, 1, &length);
continue;
}
}
/* there is some data available, stop waiting */
logmsg("[select_ws_wait_thread] data available, CHAR: %p", handle);
SetEvent(signal);
ReleaseMutex(mutex);
break;
}
else if(ret == WAIT_ABANDONED) {
/* we are not allowed to process this event, because select_ws
is post-processing the signalled events and we must exit. */
break;
}
/* there is some data available, stop waiting */
logmsg("[select_ws_wait_thread] data available, CHAR: %p", handle);
SetEvent(signal);
}
break;

Expand All @@ -530,65 +505,49 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter)
*/
while(WaitForMultipleObjectsEx(1, handles, FALSE, 0, FALSE)
== WAIT_TIMEOUT) {
ret = WaitForSingleObjectEx(mutex, 0, FALSE);
if(ret == WAIT_OBJECT_0) {
/* peek into the pipe and retrieve the amount of data available */
length = 0;
if(PeekNamedPipe(handle, NULL, 0, NULL, &length, NULL)) {
/* if there is no data available, sleep and continue waiting */
if(length == 0) {
SleepEx(0, FALSE);
ReleaseMutex(mutex);
continue;
}
else {
logmsg("[select_ws_wait_thread] PeekNamedPipe len: %d", length);
}
/* peek into the pipe and retrieve the amount of data available */
length = 0;
if(PeekNamedPipe(handle, NULL, 0, NULL, &length, NULL)) {
/* if there is no data available, sleep and continue waiting */
if(length == 0) {
SleepEx(0, FALSE);
continue;
}
else {
/* if the pipe has NOT been closed, sleep and continue waiting */
ret = GetLastError();
if(ret != ERROR_BROKEN_PIPE) {
logmsg("[select_ws_wait_thread] PeekNamedPipe error: %d", ret);
SleepEx(0, FALSE);
ReleaseMutex(mutex);
continue;
}
else {
logmsg("[select_ws_wait_thread] pipe closed, PIPE: %p", handle);
}
logmsg("[select_ws_wait_thread] PeekNamedPipe len: %d", length);
}
/* there is some data available, stop waiting */
logmsg("[select_ws_wait_thread] data available, PIPE: %p", handle);
SetEvent(signal);
ReleaseMutex(mutex);
break;
}
else if(ret == WAIT_ABANDONED) {
/* we are not allowed to process this event, because select_ws
is post-processing the signalled events and we must exit. */
break;
else {
/* if the pipe has NOT been closed, sleep and continue waiting */
ret = GetLastError();
if(ret != ERROR_BROKEN_PIPE) {
logmsg("[select_ws_wait_thread] PeekNamedPipe error: %d", ret);
SleepEx(0, FALSE);
continue;
}
else {
logmsg("[select_ws_wait_thread] pipe closed, PIPE: %p", handle);
}
}
/* there is some data available, stop waiting */
logmsg("[select_ws_wait_thread] data available, PIPE: %p", handle);
SetEvent(signal);
}
break;

default:
/* The handle has an unknown type, try to wait on it */
if(WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE)
== WAIT_OBJECT_0 + 1) {
if(WaitForSingleObjectEx(mutex, 0, FALSE) == WAIT_OBJECT_0) {
logmsg("[select_ws_wait_thread] data available, HANDLE: %p", handle);
SetEvent(signal);
ReleaseMutex(mutex);
}
logmsg("[select_ws_wait_thread] data available, HANDLE: %p", handle);
SetEvent(signal);
}
break;
}

return 0;
}
static HANDLE select_ws_wait(HANDLE handle, HANDLE signal,
HANDLE abort, HANDLE mutex)
static HANDLE select_ws_wait(HANDLE handle, HANDLE signal, HANDLE abort)
{
struct select_ws_wait_data *data;
HANDLE thread = NULL;
Expand All @@ -599,7 +558,6 @@ static HANDLE select_ws_wait(HANDLE handle, HANDLE signal,
data->handle = handle;
data->signal = signal;
data->abort = abort;
data->mutex = mutex;

/* launch waiting thread */
thread = CreateThread(NULL, 0,
Expand All @@ -625,8 +583,8 @@ struct select_ws_data {
static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *tv)
{
HANDLE abort, mutex, signal, handle, *handles;
DWORD timeout_ms, wait, nfd, nth, nws, i;
HANDLE abort, signal, handle, *handles;
fd_set readsock, writesock, exceptsock;
struct select_ws_data *data;
WSANETWORKEVENTS wsaevents;
Expand Down Expand Up @@ -661,19 +619,10 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
return -1;
}

/* create internal mutex to lock event handling in threads */
mutex = CreateMutex(NULL, FALSE, NULL);
if(!mutex) {
CloseHandle(abort);
errno = ENOMEM;
return -1;
}

/* allocate internal array for the internal data */
data = calloc(nfds, sizeof(struct select_ws_data));
if(!data) {
CloseHandle(abort);
CloseHandle(mutex);
errno = ENOMEM;
return -1;
}
Expand All @@ -682,7 +631,6 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
handles = calloc(nfds + 1, sizeof(HANDLE));
if(!handles) {
CloseHandle(abort);
CloseHandle(mutex);
free(data);
errno = ENOMEM;
return -1;
Expand Down Expand Up @@ -723,7 +671,7 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
signal = CreateEvent(NULL, TRUE, FALSE, NULL);
if(signal) {
handle = GetStdHandle(STD_INPUT_HANDLE);
handle = select_ws_wait(handle, signal, abort, mutex);
handle = select_ws_wait(handle, signal, abort);
if(handle) {
handles[nfd] = signal;
data[nth].signal = signal;
Expand Down Expand Up @@ -777,7 +725,7 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
signal = CreateEvent(NULL, TRUE, FALSE, NULL);
if(signal) {
handle = (HANDLE)wsasock;
handle = select_ws_wait(handle, signal, abort, mutex);
handle = select_ws_wait(handle, signal, abort);
if(handle) {
handles[nfd] = signal;
data[nth].signal = signal;
Expand Down Expand Up @@ -808,8 +756,12 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
/* wait for one of the internal handles to trigger */
wait = WaitForMultipleObjectsEx(wait, handles, FALSE, timeout_ms, FALSE);

/* wait for internal mutex to lock event handling in threads */
WaitForSingleObjectEx(mutex, INFINITE, FALSE);
/* signal the abort event handle and join the other waiting threads */
SetEvent(abort);
for(i = 0; i < nth; i++) {
WaitForSingleObjectEx(data[i].thread, INFINITE, FALSE);
CloseHandle(data[i].thread);
}

/* loop over the internal handles returned in the descriptors */
ret = 0; /* number of ready file descriptors */
Expand Down Expand Up @@ -868,9 +820,6 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
}
}

/* signal the event handle for the other waiting threads */
SetEvent(abort);

for(fd = 0; fd < nfds; fd++) {
if(FD_ISSET(fd, readfds))
logmsg("[select_ws] %d is readable", fd);
Expand All @@ -886,13 +835,9 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
}

for(i = 0; i < nth; i++) {
WaitForSingleObjectEx(data[i].thread, INFINITE, FALSE);
CloseHandle(data[i].thread);
CloseHandle(data[i].signal);
}

CloseHandle(abort);
CloseHandle(mutex);

free(handles);
free(data);
Expand Down

0 comments on commit a2b7457

Please sign in to comment.