From 8b5e1344b3c2704aa7e75f05f37f229e923f2608 Mon Sep 17 00:00:00 2001 From: Leonardo Romor Date: Fri, 27 Apr 2018 22:54:37 -0700 Subject: [PATCH] Use gcode-streamer in toplevel machine control. o Use the asynchronous file descriptor multiplexing to pass data to the gcode parser, but also be ready for further services that we want to multiplex on. --- src/common/fd-mux.cc | 2 +- src/machine-control.cc | 89 +++++++++++++++++++++++++++--------------- 2 files changed, 58 insertions(+), 33 deletions(-) diff --git a/src/common/fd-mux.cc b/src/common/fd-mux.cc index 4d358236..5837f053 100644 --- a/src/common/fd-mux.cc +++ b/src/common/fd-mux.cc @@ -104,7 +104,7 @@ bool FDMultiplexer::SingleCycle(unsigned int timeout_ms) { // file descriptors only can be registred from within handlers // or before running the Loop(). If no filedesctiptors are left, // there is no chance for any to re-appear, so we can exit. - fprintf(stderr, "No filedescriptor registered. Exiting loop()"); + Log_info("Exiting loop() after last file descriptor is gone."); return false; } diff --git a/src/machine-control.cc b/src/machine-control.cc index cd9dbdb7..92905cd8 100644 --- a/src/machine-control.cc +++ b/src/machine-control.cc @@ -36,11 +36,13 @@ #include #include "gcode-parser/gcode-parser.h" +#include "common/fd-mux.h" #include "common/logging.h" #include "common/string-util.h" #include "config-parser.h" #include "gcode-machine-control.h" +#include "gcode-streamer.h" #include "hardware-mapping.h" #include "pru-hardware-interface.h" #include "motion-queue.h" @@ -127,14 +129,16 @@ static bool drop_privileges(StringPiece privs) { // Reads the given "gcode_filename" with GCode and operates machine with it. // If "loop_count" is >= 0, repeats this number after the first execution. -static int send_file_to_machine(GCodeMachineControl *machine, - GCodeParser *parser, +static int send_file_to_machine(FDMultiplexer *event_server, + GCodeMachineControl *machine, + GCodeStreamer *streamer, const char *gcode_filename, int loop_count) { int ret; machine->SetMsgOut(stderr); while (loop_count < 0 || loop_count-- > 0) { int fd = open(gcode_filename, O_RDONLY); - ret = parser->ParseStream(fd, stderr); + streamer->ConnectStream(fd, stderr); + ret = event_server->Loop(); if (ret != 0) break; } @@ -173,16 +177,16 @@ static int open_server(const char *bind_addr, int port) { return s; } -static void eatsignal(int sig) {} +//static void eatsignal(int sig) {} // Accept connections and receive GCode. // Only one connection can be active at a time. // Socket must already be opened by open_server(). "bind_addr" and "port" // are just FYI information for nicer log-messages. -static int run_server(int listen_socket, - GCodeMachineControl *machine, GCodeParser *parser, +static int run_server(int listen_socket, FDMultiplexer *event_server, + GCodeMachineControl *machine, GCodeStreamer *streamer, const char *bind_addr, int port) { - signal(SIGPIPE, SIG_IGN); // Pesky clients, closing connections... + //signal(SIGPIPE, SIG_IGN); // Pesky clients, closing connections... if (listen(listen_socket, 2) < 0) { Log_error("listen() failed: %s", strerror(errno)); @@ -192,41 +196,53 @@ static int run_server(int listen_socket, Log_info("Ready to accept GCode-connections on %s:%d", bind_addr ? bind_addr : "0.0.0.0", port); - int process_result; - do { + event_server->RunOnReadable(listen_socket, + [listen_socket,machine,streamer]() { struct sockaddr_in client; socklen_t socklen = sizeof(client); - struct sigaction sa = {}; - sa.sa_handler = eatsignal; - sa.sa_flags = SA_RESETHAND | SA_NODEFER; // oneshot, no restart - sigaction(SIGINT, &sa, NULL); - sigaction(SIGTERM, &sa, NULL); int connection = accept(listen_socket, (struct sockaddr*) &client, &socklen); if (connection < 0) { Log_error("accept(): %s", strerror(errno)); - return (errno == EINTR) ? 2 : 1; + return true; + } + + // We need to set the fd to non blocking in order to avoid + // blocking reads caused by spurious situations in Linux. + // http://man7.org/linux/man-pages/man2/select.2.html#BUGS + const int flags = fcntl(connection, F_GETFL, 0); + if (flags < 0) { + Log_error("fcntl(): %s", strerror(errno)); + return true; + } + if (fcntl(connection, F_SETFL, flags | O_NONBLOCK) < 0) { + Log_error("fcntl(): %s", strerror(errno)); + return true; + } + + if (streamer->IsStreaming()) { + // For now, only one. Though we could have multiple. + dprintf(connection, "// Sorry, can only handle one connection at a time." + "There is only one machine after all.\n"); + close(connection); + return true; } - sa.sa_handler = SIG_DFL; - sigaction(SIGINT, &sa, NULL); - sigaction(SIGTERM, &sa, NULL); char ip_buffer[INET_ADDRSTRLEN]; const char *print_ip = inet_ntop(AF_INET, &client.sin_addr, ip_buffer, sizeof(ip_buffer)); Log_info("Accepting new connection from %s\n", print_ip); - FILE *msg_stream = fdopen(connection, "w"); - machine->SetMsgOut(msg_stream); - process_result = parser->ParseStream(connection, msg_stream); - fclose(msg_stream); - Log_info("Connection to %s closed.\n", print_ip); - } while (process_result == 0); + FILE *msg_stream = fdopen(connection, "w"); + // Stuff written to msg-stream needs to be unbuffered, otherwise + // they'll never make it. + if (msg_stream) setvbuf(msg_stream, NULL, _IONBF, 0); - close(listen_socket); - Log_error("Error gcode_machine_control_from_stream() == %d. Exiting\n", - process_result); + machine->SetMsgOut(msg_stream); + streamer->ConnectStream(connection, msg_stream); + return true; + }); - return process_result; + return event_server->Loop(); } // Create an absolute filename from a path, without the file not needed @@ -430,6 +446,7 @@ int main(int argc, char *argv[]) { Log_error("Can't become daemon: %s", strerror(errno)); } + FDMultiplexer event_server; // Open socket early, so that we // (a) can bail out early before messing with GPIO/PRU settings if // someone is alrady listening (starting as daemon twice?). @@ -494,21 +511,29 @@ int main(int argc, char *argv[]) { GCodeParser *parser = new GCodeParser(parser_cfg, machine_control->ParseEventReceiver(), allow_m111); - + GCodeStreamer *streamer = + new GCodeStreamer(&event_server, parser, + machine_control->ParseEventReceiver()); int ret = 0; if (has_filename) { const char *filename = argv[optind]; - ret = send_file_to_machine(machine_control, parser, + ret = send_file_to_machine(&event_server, machine_control, streamer, filename, file_loop_count); } else { - ret = run_server(listen_socket, machine_control, parser, + ret = run_server(listen_socket, &event_server, machine_control, streamer, bind_addr, listen_port); + if (listen_socket >= 0) { + close(listen_socket); + } } + Log_info("Exiting server"); + + delete streamer; delete parser; delete machine_control; - const bool caught_signal = (ret == 2); + const bool caught_signal = (ret == 1); if (caught_signal) { Log_info("Caught signal: immediate exit. " "Skipping potential remaining queue.");