diff --git a/bessctl/conf/testing/module_tests/vlan.py b/bessctl/conf/testing/module_tests/vlan.py index 24435a7be..fbf7e8c41 100644 --- a/bessctl/conf/testing/module_tests/vlan.py +++ b/bessctl/conf/testing/module_tests/vlan.py @@ -81,8 +81,7 @@ def gen_untagged_packet(): 'output_port': default_gate, 'input_packet': q, 'output_packet': q}) - return [VLANSplit(), 1, 150, expected] + return [VLANSplit(), 1, 30, expected] - OUTPUT_TEST_INPUTS.append(output_test([1, 100, 77, -1, 149, 50, 100, -1])) - OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70])) - OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70], True)) + OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7])) + OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7], True)) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index aa47eced1..e8d828ada 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -28,6 +28,14 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. + +#include +#include +#include + +#include +#include + #include "unix_socket.h" // TODO(barath): Clarify these comments. @@ -38,53 +46,64 @@ // TODO: Revise this once the interrupt mode is implemented. #define RECV_SKIP_TICKS 256 -#define MAX_TX_FRAGS 8 +#define SIG_THREAD_EXIT SIGUSR2 -void UnixSocketPort::AcceptNewClient() { - int ret; - for (;;) { - ret = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); - if (ret >= 0) { - break; - } +void UnixSocketPort::AcceptThread() { + sigset_t sigset; + sigfillset(&sigset); + sigdelset(&sigset, SIG_THREAD_EXIT); - if (errno != EINTR) { - PLOG(ERROR) << "[UnixSocket]:accept4()"; - } - } + struct pollfd fds[2]; + memset(fds, 0, sizeof(fds)); + fds[0].fd = listen_fd_; + fds[0].events = POLLIN; + fds[1].events = POLLRDHUP; - recv_skip_cnt_ = 0; + while (true) { + // negative FDs are ignored by ppoll() + fds[1].fd = client_fd_; + int res = ppoll(fds, 2, nullptr, &sigset); - if (old_client_fd_ != kNotConnectedFd) { - // Reuse the old file descriptor number by atomically exchanging the new fd - // with the - // old one. The zombie socket is closed silently (see dup2). - dup2(ret, client_fd_); - close(ret); - } else { - client_fd_ = ret; - } -} + if (accept_thread_stop_req_) { + return; + + } else if (res < 0) { + if (errno == EINTR) { + continue; + } else { + PLOG(ERROR) << "ppoll()"; + } -// This accept thread terminates once a new client is connected. -void *AcceptThreadMain(void *arg) { - UnixSocketPort *p = reinterpret_cast(arg); - p->AcceptNewClient(); - return nullptr; + } else if (fds[0].revents & POLLIN) { + // new client connected + int fd; + while (true) { + fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); + if (fd >= 0 || errno != EINTR) { + break; + } + } + if (fd < 0) { + PLOG(ERROR) << "accept4()"; + } else if (client_fd_ != kNotConnectedFd) { + LOG(WARNING) << "Ignoring additional client\n"; + close(fd); + } else { + client_fd_ = fd; + } + + } else if (fds[1].revents & (POLLRDHUP | POLLHUP)) { + // connection dropped by client + int fd = client_fd_; + client_fd_ = kNotConnectedFd; + close(fd); + } + } } -// The file descriptor for the connection will not be closed, until we have a -// new client. -// This is to avoid race condition in TX process. -void UnixSocketPort::CloseConnection() { - // Keep client_fd, since it may be being used in unix_send_pkts(). - old_client_fd_ = client_fd_; - client_fd_ = kNotConnectedFd; - - // Relaunch the accept thread. - std::thread accept_thread(AcceptThreadMain, reinterpret_cast(this)); - accept_thread.detach(); +static void AcceptThreadHandler(int) { + // empty handler, we only care about blocking syscalls being interrupted } CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { @@ -96,15 +115,13 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { int ret; - client_fd_ = kNotConnectedFd; - old_client_fd_ = kNotConnectedFd; - if (num_txq > 1 || num_rxq > 1) { return CommandFailure(EINVAL, "Cannot have more than 1 queue per RX/TX"); } listen_fd_ = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (listen_fd_ < 0) { + DeInit(); return CommandFailure(errno, "socket(AF_UNIX) failed"); } @@ -130,32 +147,54 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { ret = bind(listen_fd_, reinterpret_cast(&addr_), addrlen); if (ret < 0) { + DeInit(); return CommandFailure(errno, "bind(%s) failed", addr_.sun_path); } ret = listen(listen_fd_, 1); if (ret < 0) { + DeInit(); return CommandFailure(errno, "listen() failed"); } - std::thread accept_thread(AcceptThreadMain, reinterpret_cast(this)); - accept_thread.detach(); + + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = AcceptThreadHandler; + if (sigaction(SIG_THREAD_EXIT, &sa, NULL) < 0) { + DeInit(); + return CommandFailure(errno, "sigaction(SIG_THREAD_EXIT) failed"); + } + + + accept_thread_ = std::thread([this]() { + this->AcceptThread(); + }); return CommandSuccess(); } void UnixSocketPort::DeInit() { - close(listen_fd_); + if (accept_thread_.joinable()) { + accept_thread_stop_req_ = true; + pthread_kill(accept_thread_.native_handle(), SIG_THREAD_EXIT); + accept_thread_.join(); + } - if (client_fd_ >= 0) { + if (listen_fd_ != kNotConnectedFd) { + close(listen_fd_); + } + if (client_fd_ != kNotConnectedFd) { close(client_fd_); } } int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { + int client_fd = client_fd_; + DCHECK_EQ(qid, 0); - if (client_fd_ == kNotConnectedFd) { + if (client_fd == kNotConnectedFd) { return 0; } @@ -174,7 +213,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { } // Datagrams larger than 2KB will be truncated. - ret = recv(client_fd_, pkt->data(), SNBUF_DATA, 0); + ret = recv(client_fd, pkt->data(), SNBUF_DATA, 0); if (ret > 0) { pkt->append(ret); @@ -185,7 +224,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { bess::Packet::Free(pkt); if (ret < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EBADF) { break; } @@ -195,7 +234,6 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { } // Connection closed. - CloseConnection(); break; } @@ -208,9 +246,14 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) { int sent = 0; + int client_fd = client_fd_; DCHECK_EQ(qid, 0); + if (client_fd == kNotConnectedFd) { + return 0; + } + for (int i = 0; i < cnt; i++) { bess::Packet *pkt = pkts[i]; @@ -229,7 +272,7 @@ int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) { pkt = pkt->next(); } - ret = sendmsg(client_fd_, &msg, 0); + ret = sendmsg(client_fd, &msg, 0); if (ret < 0) { break; } diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index e4c2fc709..f7f3cf8dc 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -31,17 +31,12 @@ #ifndef BESS_DRIVERS_UNIXSOCKET_H_ #define BESS_DRIVERS_UNIXSOCKET_H_ -#include -#include -#include -#include -#include #include #include -#include #include -#include +#include +#include #include "../message.h" #include "../port.h" @@ -55,10 +50,10 @@ class UnixSocketPort final : public Port { UnixSocketPort() : Port(), recv_skip_cnt_(), - listen_fd_(), + accept_thread_stop_req_(false), + listen_fd_(kNotConnectedFd), addr_(), - client_fd_(), - old_client_fd_() {} + client_fd_(kNotConnectedFd) {} /*! * Initialize the port, ie, open the socket. @@ -107,20 +102,10 @@ class UnixSocketPort final : public Port { */ int SendPackets(queue_t qid, bess::Packet **pkts, int cnt) override; - /*! - * Waits for a client to connect to the socket. - */ - void AcceptNewClient(); - private: // Value for a disconnected socket. static const int kNotConnectedFd = -1; - /*! - * Closes the client connection but does not shut down the listener fd. - */ - void CloseConnection(); - /*! * Calling recv() system call is expensive so we only do it every * RECV_SKIP_TICKS times -- this counter keeps track of how many ticks its been @@ -128,6 +113,21 @@ class UnixSocketPort final : public Port { * */ uint32_t recv_skip_cnt_; + /*! + * Function for the thread accepting and monitoring clients (accept thread). + */ + void AcceptThread(); + + /*! + * Accept thread handle. + */ + std::thread accept_thread_; + + /*! + * Sent stop request to accept thread. + */ + std::atomic accept_thread_stop_req_; + /*! * The listener fd -- listen for new connections here. */ @@ -142,9 +142,6 @@ class UnixSocketPort final : public Port { // volatile. /* FD for client connection.*/ volatile int client_fd_; - /* If client FD is not connected, what was the fd the last time we were - * connected to a client? */ - int old_client_fd_; }; #endif // BESS_DRIVERS_UNIXSOCKET_H_