diff --git a/src/clparse.cpp b/src/clparse.cpp index f5d60972842..bd895b56dbe 100644 --- a/src/clparse.cpp +++ b/src/clparse.cpp @@ -45,6 +45,7 @@ #include "wrappers.h" #include "multilobbycommands.h" #include "gamehistorylogger.h" +#include "stdinreader.h" #include @@ -89,7 +90,6 @@ static bool wz_autoratingEnable = false; static bool wz_cli_headless = false; static bool wz_streamer_spectator_mode = false; static bool wz_lobby_slashcommands = false; -static WZ_Command_Interface wz_cmd_interface = WZ_Command_Interface::None; static int wz_min_autostart_players = -1; #if defined(WZ_OS_WIN) @@ -435,7 +435,7 @@ static const struct poptOption *getOptionsTable() { "enablelobbyslashcmd", POPT_ARG_NONE, CLI_LOBBY_SLASHCOMMANDS, N_("Enable lobby slash commands (for connecting clients)"), nullptr}, { "addlobbyadminhash", POPT_ARG_STRING, CLI_ADD_LOBBY_ADMINHASH, N_("Add a lobby admin identity hash (for slash commands)"), _("hash string")}, { "addlobbyadminpublickey", POPT_ARG_STRING, CLI_ADD_LOBBY_ADMINPUBLICKEY, N_("Add a lobby admin public key (for slash commands)"), N_("b64-pub-key")}, - { "enablecmdinterface", POPT_ARG_STRING, CLI_COMMAND_INTERFACE, N_("Enable command interface"), N_("(stdin)")}, + { "enablecmdinterface", POPT_ARG_STRING, CLI_COMMAND_INTERFACE, N_("Enable command interface"), N_("(stdin, unixsocket:path)")}, { "startplayers", POPT_ARG_STRING, CLI_STARTPLAYERS, N_("Minimum required players to auto-start game"), N_("startplayers")}, { "gamelog-output", POPT_ARG_STRING, CLI_GAMELOG_OUTPUTMODES, N_("Game history log output mode(s)"), "(log,cmdinterface)"}, { "gamelog-outputkey", POPT_ARG_STRING, CLI_GAMELOG_OUTPUTKEY, N_("Game history log output key"), "[playerindex, playerposition]"}, @@ -1101,20 +1101,42 @@ bool ParseCommandLine(int argc, const char * const *argv) break; case CLI_COMMAND_INTERFACE: - token = poptGetOptArg(poptCon); - if (token == nullptr || strlen(token) == 0) - { - // use default, which is currently "stdin" - token = "stdin"; - } - if (strcmp(token, "stdin") == 0) - { - // enable stdin - wz_cmd_interface = WZ_Command_Interface::StdIn_Interface; - } - else { - qFatal("Unsupported / invalid enablecmdinterface value"); + token = poptGetOptArg(poptCon); + if (token == nullptr || strlen(token) == 0) + { + // use default, which is currently "stdin" + token = "stdin"; + } + WZ_Command_Interface mode = WZ_Command_Interface::None; + std::string value; + if (strcmp(token, "stdin") == 0) + { + mode = WZ_Command_Interface::StdIn_Interface; + } + else if (strncmp(token, "unixsocket", strlen("unixsocket")) == 0) + { + mode = WZ_Command_Interface::Unix_Socket; + // expected form is "unixsocket:path" - parse for the path + if (strlen(token) > strlen("unixsocket")) + { + size_t delimeterIdx = strlen("unixsocket"); + if (token[delimeterIdx] == ':' && token[delimeterIdx+1] != '\0') + { + // grab the rest of the string as the path value + value = &token[delimeterIdx+1]; + } + else + { + qFatal("Invalid enablecmdinterface unixsocket value (expecting unixsocket:path)"); + } + } + } + else + { + qFatal("Unsupported / invalid enablecmdinterface value"); + } + configSetCmdInterface(mode, value); } break; case CLI_STARTPLAYERS: @@ -1287,11 +1309,6 @@ bool lobby_slashcommands_enabled() return wz_lobby_slashcommands; } -WZ_Command_Interface wz_command_interface() -{ - return wz_cmd_interface; -} - int min_autostart_player_count() { return wz_min_autostart_players; diff --git a/src/clparse.h b/src/clparse.h index c55e66bb0aa..298affabdea 100644 --- a/src/clparse.h +++ b/src/clparse.h @@ -39,13 +39,6 @@ bool getAutoratingEnable(); bool streamer_spectator_mode(); bool lobby_slashcommands_enabled(); -enum class WZ_Command_Interface -{ - None, - StdIn_Interface, -}; -WZ_Command_Interface wz_command_interface(); - int min_autostart_player_count(); #endif // __INCLUDED_SRC_CLPARSE_H__ diff --git a/src/main.cpp b/src/main.cpp index 75626d21ce6..0ac9c4f4a20 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1685,19 +1685,15 @@ static bool initializeCrashHandlingContext(optional gfxbackend) static void wzCmdInterfaceInit() { - switch (wz_command_interface()) + if (wz_command_interface_enabled()) { - case WZ_Command_Interface::None: - return; - case WZ_Command_Interface::StdIn_Interface: - stdInThreadInit(); - break; + cmdInterfaceThreadInit(); } } static void wzCmdInterfaceShutdown() { - stdInThreadShutdown(); + cmdInterfaceThreadShutdown(); } static void cleanupOldLogFiles() diff --git a/src/stdinreader.cpp b/src/stdinreader.cpp index 3d898b80578..14849091995 100644 --- a/src/stdinreader.cpp +++ b/src/stdinreader.cpp @@ -29,10 +29,36 @@ #include #include +#include +#include + +#if defined(__clang__) +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wcast-align" +#elif defined(__GNUC__) +# pragma GCC diagnostic push +# pragma GCC diagnostic ignored "-Wcast-align" +#endif + +#include <3rdparty/readerwriterqueue/readerwriterqueue.h> + +#if defined(__clang__) +# pragma clang diagnostic pop +#elif defined(__GNUC__) +# pragma GCC diagnostic pop +#endif + +typedef std::vector CmdInterfaceMessageBuffer; +static std::unique_ptr> cmdInterfaceOutputQueue; +static CmdInterfaceMessageBuffer latestWriteBuffer; +constexpr size_t maxReserveMessageBufferSize = 2048; #if defined(WZ_OS_UNIX) # include # include +# include +# include +# include #endif #if defined(HAVE_POLL_H) @@ -54,8 +80,28 @@ #define strncmpl(a, b) strncmp(a, b, strlen(b)) #define errlog(...) do { fprintf(stderr, __VA_ARGS__); fflush(stderr); } while(0); -static WZ_THREAD *stdinThread = nullptr; -static std::atomic stdinThreadQuit; +#define wz_command_interface_output_onmainthread(...) \ +wzAsyncExecOnMainThread([]{ \ + wz_command_interface_output(__VA_ARGS__); \ +}); + +static WZ_Command_Interface wz_cmd_interface = WZ_Command_Interface::None; +static std::string wz_cmd_interface_param; + +WZ_Command_Interface wz_command_interface() +{ + return wz_cmd_interface; +} + +static WZ_THREAD *cmdInputThread = nullptr; +static WZ_THREAD *cmdOutputThread = nullptr; +static std::atomic cmdInputThreadQuit; +static std::atomic cmdOutputThreadQuit; +int readFd = -1; +bool readFdIsSocket = false; +int writeFd = -1; +bool writeFdIsNonBlocking = false; +std::function cleanupIOFunc; #if defined(HAVE_SYS_EVENTFD_H) int quitSignalEventFd = -1; #elif defined(HAVE_UNISTD_H) @@ -73,76 +119,129 @@ constexpr size_t readChunkSize = 1024; static size_t newlineSearchStart = 0; static size_t actualAvailableBytes = 0; -enum class StdInReadyStatus +enum class CmdIOReadyStatus { Error, NotReady, - Ready, + ReadyRead, + ReadyWrite, Exit, }; -static StdInReadyStatus stdInHasDataToBeRead(int quitSignalFd, int msTimeout = 2000) +static CmdIOReadyStatus cmdIOIsReady(optional inputFd, optional outputFd, int quitSignalFd, int msTimeout = 2000) { #if defined(_WIN32) # error "Not supported on Windows" #endif #if defined(HAVE_WORKING_POLL) - struct pollfd pfds[2]; - pfds[0].fd = STDIN_FILENO; - pfds[0].events = POLLIN; - pfds[0].revents = 0; + struct pollfd pfds[3]; + nfds_t nfds = 0; + optional readIdx; + optional writeIdx; + optional quitIdx; + if (inputFd.has_value()) + { + pfds[nfds].fd = inputFd.value(); + pfds[nfds].events = POLLIN; + pfds[nfds].revents = 0; + readIdx = nfds; + nfds++; + } + if (outputFd.has_value()) + { + pfds[nfds].fd = outputFd.value(); + pfds[nfds].events = POLLOUT; + pfds[nfds].revents = 0; + writeIdx = nfds; + nfds++; + } if (quitSignalFd >= 0) { - pfds[1].fd = quitSignalFd; - pfds[1].events = POLLIN; - pfds[1].revents = 0; + pfds[nfds].fd = quitSignalFd; + pfds[nfds].events = POLLIN; + pfds[nfds].revents = 0; + quitIdx = nfds; + nfds++; } int timeoutValue = (quitSignalFd >= 0) ? -1 : msTimeout; - int retval = poll(pfds, (quitSignalFd >= 0) ? 2 : 1, timeoutValue); + int retval = poll(pfds, nfds, timeoutValue); if (retval < 0) { - return StdInReadyStatus::Error; + return CmdIOReadyStatus::Error; } if (retval > 0) { - if (pfds[1].revents & POLLIN) + if (quitIdx.has_value()) { - return StdInReadyStatus::Exit; + if (pfds[quitIdx.value()].revents & POLLIN) + { + return CmdIOReadyStatus::Exit; + } } - if (pfds[0].revents & POLLIN) + if (readIdx.has_value()) { - return StdInReadyStatus::Ready; + if (pfds[readIdx.value()].revents & POLLIN) + { + return CmdIOReadyStatus::ReadyRead; + } + } + if (writeIdx.has_value()) + { + if (pfds[writeIdx.value()].revents & POLLOUT) + { + return CmdIOReadyStatus::ReadyWrite; + } } } #else - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(STDIN_FILENO, &rfds); + fd_set readset; + fd_set writeset; + FD_ZERO(&readset); + FD_ZERO(&writeset); + if (inputFd.has_value()) + { + FD_SET(inputFd.value(), &readset); + } + if (outputFd.has_value()) + { + FD_SET(outputFd.value(), &writeset); + } if (quitSignalFd >= 0) { - FD_SET(quitSignalFd, &rfds); + FD_SET(quitSignalFd, &readset); } + int maxFd = std::max({inputFd.value_or(-1), outputFd.value_or(-1), quitSignalFd}); struct timeval tv; tv.tv_sec = msTimeout / 1000; tv.tv_usec = (msTimeout % 1000) * 1000; - int retval = select(std::max(STDIN_FILENO, quitSignalFd)+1, &rfds, NULL, NULL, (quitSignalFd >= 0) ? NULL : &tv); + int retval = select(maxFd+1, &readset, (outputFd.has_value()) ? &writeset : NULL, NULL, (quitSignalFd >= 0) ? NULL : &tv); if (retval == -1) { - return StdInReadyStatus::Error; + return CmdIOReadyStatus::Error; } - if (retval > 0 && quitSignalFd >= 0 && FD_ISSET(quitSignalFd, &rfds)) + if (retval > 0 && quitSignalFd >= 0 && FD_ISSET(quitSignalFd, &readset)) { - return StdInReadyStatus::Exit; + return CmdIOReadyStatus::Exit; } - if (retval > 0 && FD_ISSET(STDIN_FILENO, &rfds)) + if (inputFd.has_value()) { - return StdInReadyStatus::Ready; + if (retval > 0 && FD_ISSET(inputFd.value(), &readset)) + { + return CmdIOReadyStatus::ReadyRead; + } + } + if (outputFd.has_value()) + { + if (retval > 0 && FD_ISSET(outputFd.value(), &writeset)) + { + return CmdIOReadyStatus::ReadyWrite; + } } #endif - return StdInReadyStatus::NotReady; + return CmdIOReadyStatus::NotReady; } optional getNextLineFromBuffer() @@ -162,17 +261,39 @@ optional getNextLineFromBuffer() return nullopt; } -optional getStdInLine() +// Returns true if read was successful, sets nextLine to the next available line (if any) +// Returns false if there was an unrecoverable error reading (such as a socket peer closing the socket), and no more reads should be attempted +bool getInputLine(int fd, bool isSocketFd, optional &nextLine) { // read more data from STDIN stdInReadBuffer.resize((((stdInReadBuffer.size() + readChunkSize - 1) / readChunkSize) + 1) * readChunkSize); - auto bytesRead = read(STDIN_FILENO, stdInReadBuffer.data() + actualAvailableBytes, readChunkSize); - if (bytesRead <= 0) + ssize_t bytesRead = -1; + if (!isSocketFd) + { + bytesRead = read(fd, stdInReadBuffer.data() + actualAvailableBytes, readChunkSize); + } + else + { + bytesRead = recv(fd, stdInReadBuffer.data() + actualAvailableBytes, readChunkSize, 0); + if (bytesRead == 0) + { + // stream socket peers will cause a return of "0" when there is an orderly shutdown + return false; + } + } + if (bytesRead < 0) { - return nullopt; + int errno_cpy = errno; + errlog("Failed to read/recv with errno: %d\n", errno_cpy); + return false; + } + if (bytesRead == 0) + { + return true; } actualAvailableBytes += static_cast(bytesRead); - return getNextLineFromBuffer(); + nextLine = getNextLineFromBuffer(); + return true; } static void convertEscapedNewlines(std::string& input) @@ -193,10 +314,162 @@ static void convertEscapedNewlines(std::string& input) } } -int stdinThreadFunc(void *) +int cmdOutputThreadFunc(void *) +{ + if (!cmdInterfaceOutputQueue) + { + errlog("WZCMD FAILURE: No output queue?\n"); + return 1; + } + + int quitSignalFd = -1; +#if defined(HAVE_SYS_EVENTFD_H) + quitSignalFd = quitSignalEventFd; +#elif defined(HAVE_UNISTD_H) + quitSignalFd = quitSignalPipeFds[0]; +#endif + +#if defined(MSG_NOSIGNAL) && !defined(__APPLE__) + int writeFlags = MSG_NOSIGNAL; +#else + int writeFlags = 0; +#endif + + // Returns true if it wrote, false if it didn't (and the caller should wait until writing is possible) + // Returns nullopt if an unrecoverable error occurred + auto tryWrite = [](int writeFd, const CmdInterfaceMessageBuffer& msg, int writeFlags) -> optional { + // Attempt to write + ssize_t outputBytes = -1; + do { + outputBytes = send(writeFd, msg.data(), msg.size(), writeFlags); + } while (outputBytes == -1 && errno == EINTR); + + if (outputBytes == -1) + { + switch (errno) + { + case EAGAIN: + #if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: + #endif + // Must wait to be ready to write + return false; + case EPIPE: + // Other end died / closed connection + errlog("WZCMD NOTICE: Other end closed connection\n"); + return nullopt; + default: + // Unrecoverable + errlog("WZCMD FAILURE: Unrecoverable error trying to send data\n"); + return nullopt; + } + } + else + { + // wrote it + return true; + } + }; + + CmdInterfaceMessageBuffer msg; + while (true) + { + if (!cmdInterfaceOutputQueue->wait_dequeue_timed(msg, std::chrono::milliseconds(2000))) + { + // no messages to write - check if signaled to quit + auto result = cmdIOIsReady(nullopt, writeFd, quitSignalFd); + if (result == CmdIOReadyStatus::Exit) + { + // quit thread + return 0; + } + else if (result == CmdIOReadyStatus::NotReady) + { + if (cmdOutputThreadQuit.load()) + { + // quit thread + return 0; + } + } + // loop and check for a msg again + continue; + } + + // Have a message to write + + if (writeFdIsNonBlocking) + { + // Attempt to write + auto writeResult = tryWrite(writeFd, msg, writeFlags); + if (!writeResult.has_value()) + { + // unrecoverable error + return 1; + } + if (writeResult.value()) + { + // write succeeded - continue + continue; + } + else + { + // write failed - need to wait to be ready to write + // fall-through + } + } + + bool successfullyWroteMsg = false; + do { + // Wait to be ready to write + auto result = cmdIOIsReady(nullopt, writeFd, quitSignalFd); + if (result == CmdIOReadyStatus::Exit) + { + // quit thread + return 0; + } + else if (result == CmdIOReadyStatus::NotReady) + { + if (cmdOutputThreadQuit.load()) + { + // quit thread + return 0; + } + } + else if (result == CmdIOReadyStatus::ReadyWrite) + { + // Attempt to write + // Can still fail, so need to handle if it does + auto writeResult = tryWrite(writeFd, msg, writeFlags); + if (!writeResult.has_value()) + { + // unrecoverable error + return 1; + } + if (writeResult.value()) + { + // write succeeded + successfullyWroteMsg = true; + } + else + { + // write failed - need to wait to be ready to write + // loop again + continue; + } + } + else + { + return 1; + } + } while (!successfullyWroteMsg); + } + return 0; +} + +int cmdInputThreadFunc(void *) { fseek(stdin, 0, SEEK_END); - errlog("WZCMD: stdinReadReady\n"); + wz_command_interface_output_onmainthread("WZCMD: stdinReadReady\n"); bool inexit = false; int quitSignalFd = -1; #if defined(HAVE_SYS_EVENTFD_H) @@ -209,28 +482,32 @@ int stdinThreadFunc(void *) optional nextLine = getNextLineFromBuffer(); while (!nextLine.has_value()) { - auto result = stdInHasDataToBeRead(quitSignalFd); - if (result == StdInReadyStatus::Exit) + auto result = cmdIOIsReady(readFd, nullopt, quitSignalFd); + if (result == CmdIOReadyStatus::Exit) { // quit thread return 0; } - else if (result == StdInReadyStatus::NotReady) + else if (result == CmdIOReadyStatus::NotReady) { - if (stdinThreadQuit.load()) + if (cmdInputThreadQuit.load()) { // quit thread return 0; } } - else if (result == StdInReadyStatus::Ready) + else if (result == CmdIOReadyStatus::ReadyRead) { - nextLine = getStdInLine(); + if (!getInputLine(readFd, readFdIsSocket, nextLine)) + { + errlog("WZCMD FAILURE: get input line failed! (did peer close the connection?)\n"); + return 1; + } break; } else { - errlog("WZCMD error: getline failed!\n"); + errlog("WZCMD FAILURE: getline failed!\n"); return 1; } } @@ -244,7 +521,7 @@ int stdinThreadFunc(void *) if(!strncmpl(line, "exit")) { - errlog("WZCMD info: exit command received - stdin reader will now stop procesing commands\n"); + wz_command_interface_output_onmainthread("WZCMD info: exit command received - stdin reader will now stop procesing commands\n"); inexit = true; } else if(!strncmpl(line, "admin add-hash ")) @@ -253,13 +530,13 @@ int stdinThreadFunc(void *) int r = sscanf(line, "admin add-hash %1023[^\n]s", newadmin); if (r != 1) { - errlog("WZCMD error: Failed to add room admin hash! (Expecting one parameter)\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to add room admin hash! (Expecting one parameter)\n"); } else { std::string newAdminStrCopy(newadmin); wzAsyncExecOnMainThread([newAdminStrCopy]{ - errlog("WZCMD info: Room admin hash added: %s\n", newAdminStrCopy.c_str()); + wz_command_interface_output("WZCMD info: Room admin hash added: %s\n", newAdminStrCopy.c_str()); addLobbyAdminIdentityHash(newAdminStrCopy); auto roomAdminMessage = astringf("Room admin assigned to: %s", newAdminStrCopy.c_str()); sendRoomSystemMessage(roomAdminMessage.c_str()); @@ -272,13 +549,13 @@ int stdinThreadFunc(void *) int r = sscanf(line, "admin add-public-key %1023[^\n]s", newadmin); if (r != 1) { - errlog("WZCMD error: Failed to add room admin public key! (Expecting one parameter)\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to add room admin public key! (Expecting one parameter)\n"); } else { std::string newAdminStrCopy(newadmin); wzAsyncExecOnMainThread([newAdminStrCopy]{ - errlog("WZCMD info: Room admin public key added: %s\n", newAdminStrCopy.c_str()); + wz_command_interface_output("WZCMD info: Room admin public key added: %s\n", newAdminStrCopy.c_str()); addLobbyAdminPublicKey(newAdminStrCopy); auto roomAdminMessage = astringf("Room admin assigned to: %s", newAdminStrCopy.c_str()); sendRoomSystemMessage(roomAdminMessage.c_str()); @@ -291,7 +568,7 @@ int stdinThreadFunc(void *) int r = sscanf(line, "admin remove %1023[^\n]s", newadmin); if (r != 1) { - errlog("WZCMD error: Failed to remove room admin! (Expecting one parameter)\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to remove room admin! (Expecting one parameter)\n"); } else { @@ -299,19 +576,19 @@ int stdinThreadFunc(void *) wzAsyncExecOnMainThread([newAdminStrCopy]{ if (removeLobbyAdminPublicKey(newAdminStrCopy)) { - errlog("WZCMD info: Room admin public key removed: %s\n", newAdminStrCopy.c_str()); + wz_command_interface_output("WZCMD info: Room admin public key removed: %s\n", newAdminStrCopy.c_str()); auto roomAdminMessage = astringf("Room admin removed: %s", newAdminStrCopy.c_str()); sendRoomSystemMessage(roomAdminMessage.c_str()); } else if (removeLobbyAdminIdentityHash(newAdminStrCopy)) { - errlog("WZCMD info: Room admin hash removed: %s\n", newAdminStrCopy.c_str()); + wz_command_interface_output("WZCMD info: Room admin hash removed: %s\n", newAdminStrCopy.c_str()); auto roomAdminMessage = astringf("Room admin removed: %s", newAdminStrCopy.c_str()); sendRoomSystemMessage(roomAdminMessage.c_str()); } else { - errlog("WZCMD info: Failed to remove room admin! (Provided parameter not found as either admin hash or public key)\n"); + wz_command_interface_output("WZCMD info: Failed to remove room admin! (Provided parameter not found as either admin hash or public key)\n"); } }); } @@ -323,7 +600,7 @@ int stdinThreadFunc(void *) int r = sscanf(line, "kick identity %1023s %1023[^\n]s", playeridentitystring, kickreasonstr); if (r != 1 && r != 2) { - errlog("WZCMD error: Failed to get player public key or hash!\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to get player public key or hash!\n"); } else { @@ -369,7 +646,7 @@ int stdinThreadFunc(void *) { if (i == NetPlay.hostPlayer) { - errlog("WZCMD error: Can't kick host!\n"); + wz_command_interface_output("WZCMD error: Can't kick host!\n"); continue; } kickPlayer(i, kickReasonStrCopy.c_str(), ERROR_KICKED, false); @@ -380,7 +657,7 @@ int stdinThreadFunc(void *) } if (!foundActivePlayer) { - errlog("WZCMD error: Failed to find currently-connected player with matching public key or hash?\n"); + wz_command_interface_output("WZCMD error: Failed to find currently-connected player with matching public key or hash?\n"); } }); } @@ -392,7 +669,7 @@ int stdinThreadFunc(void *) int r = sscanf(line, "ban ip %1023s %1023[^\n]s", tobanip, banreasonstr); if (r != 1 && r != 2) { - errlog("WZCMD error: Failed to get ban ip!\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to get ban ip!\n"); } else { @@ -430,7 +707,7 @@ int stdinThreadFunc(void *) int r = sscanf(line, "unban ip %1023[^\n]s", tounbanip); if (r != 1) { - errlog("WZCMD error: Failed to get unban ip!\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to get unban ip!\n"); } else { @@ -438,7 +715,7 @@ int stdinThreadFunc(void *) wzAsyncExecOnMainThread([unbanIPStrCopy] { if (!removeIPFromBanList(unbanIPStrCopy.c_str())) { - errlog("WZCMD error: IP was not on the ban list!\n"); + wz_command_interface_output("WZCMD error: IP was not on the ban list!\n"); } }); } @@ -449,7 +726,7 @@ int stdinThreadFunc(void *) int r = sscanf(line, "chat bcast %1023[^\n]s", chatmsg); if (r != 1) { - errlog("WZCMD error: Failed to get bcast message!\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to get bcast message!\n"); } else { @@ -458,7 +735,7 @@ int stdinThreadFunc(void *) if (!NetPlay.isHostAlive) { // can't send this message when the host isn't alive - errlog("WZCMD error: Failed to send bcast message because host isn't yet hosting!\n"); + wz_command_interface_output("WZCMD error: Failed to send bcast message because host isn't yet hosting!\n"); } sendRoomSystemMessage(chatmsgstr.c_str()); }); @@ -471,7 +748,7 @@ int stdinThreadFunc(void *) int r = sscanf(line, "chat direct %1023s %1023[^\n]s", playeridentitystring, chatmsg); if (r != 2) { - errlog("WZCMD error: Failed to get chat receiver or message!\n"); + wz_command_interface_output_onmainthread("WZCMD error: Failed to get chat receiver or message!\n"); } else { @@ -481,7 +758,7 @@ int stdinThreadFunc(void *) if (!NetPlay.isHostAlive) { // can't send this message when the host isn't alive - errlog("WZCMD error: Failed to send chat direct message because host isn't yet hosting!\n"); + wz_command_interface_output("WZCMD error: Failed to send chat direct message because host isn't yet hosting!\n"); } bool foundActivePlayer = false; @@ -526,33 +803,149 @@ int stdinThreadFunc(void *) } if (!foundActivePlayer) { - errlog("WZCMD error: Failed to find currently-connected player with matching public key or hash?\n"); + wz_command_interface_output("WZCMD error: Failed to find currently-connected player with matching public key or hash?\n"); } }); } } else if(!strncmpl(line, "shutdown now")) { - errlog("WZCMD info: shutdown now command received - shutting down\n"); - wzQuit(0); inexit = true; + wzAsyncExecOnMainThread([] { + wz_command_interface_output("WZCMD info: shutdown now command received - shutting down\n"); + wzQuit(0); + }); } } return 0; } -void stdInThreadInit() +static void sockBlockSIGPIPE(const int fd, bool block_sigpipe) { - if (!stdinThread) +#if defined(SO_NOSIGPIPE) + const int no_sigpipe = block_sigpipe ? 1 : 0; + + if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(no_sigpipe)) != 0) + { + errlog("WZCMD INFO: Failed to set SO_NOSIGPIPE on socket, SIGPIPE might be raised when connection gets broken."); + } +#else + // Prevent warnings + (void)fd; + (void)block_sigpipe; +#endif +} + +bool cmdInterfaceCreateUnixSocket(const std::string& local_path) +{ + if (local_path.empty()) + { + errlog("WZCMD FAILURE: Requested socket path is empty - cancelling\n"); + return false; + } + int socket_type = SOCK_STREAM; +#if defined(SOCK_CLOEXEC) + socket_type |= SOCK_CLOEXEC; +#endif + int cmdSockFd = socket(AF_UNIX, socket_type, 0); + if (cmdSockFd == -1) + { + int errno_cpy = errno; + errlog("WZCMD FAILURE: socket() call failed with errno: %d\n", errno_cpy); + return false; + } + sockBlockSIGPIPE(cmdSockFd, true); + + sockaddr_un addr; + socklen_t sockLen = sizeof(addr); + memset(&addr, 0, sizeof(struct sockaddr_un)); + addr.sun_family = AF_UNIX; + snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", local_path.c_str()); + if (strlen(addr.sun_path) != local_path.length()) + { + errlog("WZCMD FAILURE: Requested socket path %s would be truncated to %s - cancelling\n", local_path.c_str(), addr.sun_path); + return false; + } +#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__DragonFly__) || defined(__OpenBSD__) || defined(__NetBSD__) + addr.sun_len = SUN_LEN(&addr); + sockLen = addr.sun_len; +#endif + + unlink(addr.sun_path); + + if (bind(cmdSockFd, reinterpret_cast(&addr), sockLen) != 0) + { + int errno_cpy = errno; + errlog("WZCMD FAILURE: bind() failed with errno: %d\n", errno_cpy); + return false; + } + auto listenResult = listen(cmdSockFd, 1); + if (listenResult != 0) + { + int errno_cpy = errno; + errlog("WZCMD FAILURE: listen() failed with errno: %d\n", errno_cpy); + return false; + } + struct sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int cmdPeerFd = -1; +#if defined(SOCK_CLOEXEC) + cmdPeerFd = accept4(cmdSockFd, (struct sockaddr*)&ss, &slen, SOCK_CLOEXEC); +#else + cmdPeerFd = accept(cmdSockFd, (struct sockaddr*)&ss, &slen); +#endif + if (cmdPeerFd == -1) { - stdinThreadQuit.store(false); + int errno_cpy = errno; + errlog("WZCMD FAILURE: accept() failed with errno: %d\n", errno_cpy); + close(cmdSockFd); + return false; + } + + if (fcntl(cmdPeerFd, F_SETFL, O_NONBLOCK) == 0) + { + writeFdIsNonBlocking = true; + } + else + { + int errno_cpy = errno; + errlog("WZCMD NOTICE: Attempt to set non-blocking flag failed with errno: %d\n", errno_cpy); + writeFdIsNonBlocking = false; + } + sockBlockSIGPIPE(cmdPeerFd, true); + + readFd = cmdPeerFd; + readFdIsSocket = true; + writeFd = cmdPeerFd; + + cleanupIOFunc = [cmdPeerFd, cmdSockFd, addr](){ + close(cmdPeerFd); + close(cmdSockFd); + unlink(addr.sun_path); + }; + + return true; +} + +void cmdInterfaceThreadInit() +{ + if (wz_command_interface() == WZ_Command_Interface::None) + { + return; + } + #if defined(HAVE_SYS_EVENTFD_H) + if (quitSignalEventFd == -1) + { int flags = 0; # if defined(EFD_CLOEXEC) flags = EFD_CLOEXEC; # endif quitSignalEventFd = eventfd(0, flags); + } #elif defined(HAVE_UNISTD_H) + if (quitSignalPipeFds[0] == -1 && quitSignalPipeFds[1] == -1) + { int result = -1; # if defined(HAVE_PIPE2) && defined(O_CLOEXEC) result = pipe2(quitSignalPipeFds, O_CLOEXEC); @@ -564,19 +957,62 @@ void stdInThreadInit() quitSignalPipeFds[0] = -1; quitSignalPipeFds[1] = -1; } + } #endif - stdinThread = wzThreadCreate(stdinThreadFunc, nullptr); - wzThreadStart(stdinThread); + // initialize output queue (if used) + if (wz_command_interface() != WZ_Command_Interface::StdIn_Interface) + { + cmdInterfaceOutputQueue = std::make_unique>(1024); + latestWriteBuffer.reserve(maxReserveMessageBufferSize); + } + + switch (wz_command_interface()) + { + case WZ_Command_Interface::StdIn_Interface: + readFd = STDIN_FILENO; + readFdIsSocket = false; + writeFd = STDERR_FILENO; + writeFdIsNonBlocking = false; + break; + case WZ_Command_Interface::Unix_Socket: + // sets readFd, writeFd, etc as appropriate + if (!cmdInterfaceCreateUnixSocket(wz_cmd_interface_param)) + { + // failed (and logged a failure to stderr) + return; + } + break; + default: + return; + } + + if (!cmdInputThread) + { + cmdInputThreadQuit.store(false); + cmdInputThread = wzThreadCreate(cmdInputThreadFunc, nullptr, "wzCmdInterfaceInputThread"); + wzThreadStart(cmdInputThread); + } + + if (!cmdOutputThread) + { + // Only in non-stdin case (for now) + if (wz_command_interface() != WZ_Command_Interface::StdIn_Interface) + { + cmdOutputThreadQuit.store(false); + cmdOutputThread = wzThreadCreate(cmdOutputThreadFunc, nullptr, "wzCmdInterfaceOutputThread"); + wzThreadStart(cmdOutputThread); + } } } -void stdInThreadShutdown() +void cmdInterfaceThreadShutdown() { - if (stdinThread) + if (cmdInputThread || cmdOutputThread) { - // Signal the stdin thread to quit - stdinThreadQuit.store(true); + // Signal the threads to quit + cmdInputThreadQuit.store(true); + cmdOutputThreadQuit.store(true); int quitSignalFd = -1; #if defined(HAVE_SYS_EVENTFD_H) quitSignalFd = quitSignalEventFd; @@ -592,44 +1028,64 @@ void stdInThreadShutdown() } } - wzThreadJoin(stdinThread); - stdinThread = nullptr; - -#if defined(HAVE_SYS_EVENTFD_H) - if (quitSignalEventFd != -1) + if (cmdInputThread) { - close(quitSignalEventFd); - quitSignalEventFd = -1; + wzThreadJoin(cmdInputThread); + cmdInputThread = nullptr; } -#elif defined(HAVE_UNISTD_H) - if (quitSignalPipeFds[0] != -1) + if (cmdOutputThread) { - close(quitSignalPipeFds[0]); - quitSignalPipeFds[0] = -1; + wzThreadJoin(cmdOutputThread); + cmdOutputThread = nullptr; } - if (quitSignalPipeFds[1] != -1) - { - close(quitSignalPipeFds[1]); - quitSignalPipeFds[1] = -1; - } -#endif } + + if (cleanupIOFunc) + { + cleanupIOFunc(); + cleanupIOFunc = nullptr; + } + +#if defined(HAVE_SYS_EVENTFD_H) + if (quitSignalEventFd != -1) + { + close(quitSignalEventFd); + quitSignalEventFd = -1; + } +#elif defined(HAVE_UNISTD_H) + if (quitSignalPipeFds[0] != -1) + { + close(quitSignalPipeFds[0]); + quitSignalPipeFds[0] = -1; + } + if (quitSignalPipeFds[1] != -1) + { + close(quitSignalPipeFds[1]); + quitSignalPipeFds[1] = -1; + } +#endif } #else // !defined(WZ_STDIN_READER_SUPPORTED) // For unsupported platforms -void stdInThreadInit() +void cmdInterfaceThreadInit() { - if (!stdinThread) + if (wz_command_interface() == WZ_Command_Interface::None) { - debug(LOG_ERROR, "This platform does not support the stdin command reader"); - stdinThreadQuit.store(false); + return; + } + + if (!cmdInputThread && !cmdOutputThread) + { + debug(LOG_ERROR, "This platform does not support the command interface"); + cmdInputThreadQuit.store(false); + cmdOutputThreadQuit.store(true); } } -void stdInThreadShutdown() +void cmdInterfaceThreadShutdown() { // no-op } @@ -648,11 +1104,55 @@ void wz_command_interface_output(const char *str, ...) return; } va_list ap; - static char outputBuffer[2048]; + static char outputBuffer[maxReserveMessageBufferSize]; va_start(ap, str); vssprintf(outputBuffer, str, ap); va_end(ap); - fwrite(outputBuffer, sizeof(char), strlen(outputBuffer), stderr); - fflush(stderr); + + size_t outputBufferLen = strlen(outputBuffer); + if (outputBufferLen == 0) + { + return; + } + + if (wz_command_interface() == WZ_Command_Interface::StdIn_Interface) + { + fwrite(outputBuffer, sizeof(char), outputBufferLen, stderr); + fflush(stderr); + } + else + { + latestWriteBuffer.insert(latestWriteBuffer.end(), outputBuffer, outputBuffer + outputBufferLen); + cmdInterfaceOutputQueue->enqueue(std::move(latestWriteBuffer)); + latestWriteBuffer = std::vector(); + latestWriteBuffer.reserve(maxReserveMessageBufferSize); + } } +void configSetCmdInterface(WZ_Command_Interface mode, std::string value) +{ + if (cmdInputThread || cmdOutputThread) + { + return; + } + + wz_cmd_interface = mode; + if (mode == WZ_Command_Interface::Unix_Socket && value.empty()) + { +#if defined(WZ_OS_UNIX) + char cwdBuff[PATH_MAX] = {0}; + if (getcwd(cwdBuff, PATH_MAX) != nullptr) + { + errlog("WZCMD INFO: No unix socket path specified - will create wz2100.cmd.sock in: %s\n", cwdBuff); + } + else + { + errlog("WZCMD INFO: No unix socket path specified - will create wz2100.cmd.sock in the working directory\n"); + } +#else + errlog("WZCMD INFO: No unix socket path specified - will create ./wz2100.cmd.sock\n"); +#endif + value = "./wz2100.cmd.sock"; + } + wz_cmd_interface_param = value; +} diff --git a/src/stdinreader.h b/src/stdinreader.h index feb24a3b02e..e7b1ca77ebd 100644 --- a/src/stdinreader.h +++ b/src/stdinreader.h @@ -19,11 +19,22 @@ #pragma once -void stdInThreadInit(); -void stdInThreadShutdown(); - +#include #include "lib/framework/wzglobal.h" +enum class WZ_Command_Interface +{ + None, + StdIn_Interface, + Unix_Socket, +}; + +// used from clparse: +void configSetCmdInterface(WZ_Command_Interface mode, std::string value); + +void cmdInterfaceThreadInit(); +void cmdInterfaceThreadShutdown(); + bool wz_command_interface_enabled(); #if defined(WZ_CC_MINGW)