From d5573e6f982a4a2f7a0d333bb2972dc35b62a795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Mon, 21 Aug 2017 11:49:03 +0300 Subject: [PATCH 01/11] drivers/unix_socket: Let clients to freely connect/disconnect In order to allow external controllers (connected through a UNIX socket) truly independent of bessd, client disconnection must always be detected, regardless of whether the port is used or not. This is done by keeping a thread around (using epoll). Before, this driver would only reliably detect disconnects when the port is used to receive packets. When used to send packets, the port would only detect disconnects on an actual send, which depends on the processed packets; this led to only some instances accepting new clients, making external program reconnection impossible. --- core/drivers/unix_socket.cc | 99 ++++++++++++++++++++++++------------- core/drivers/unix_socket.h | 16 +++--- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index aa47eced1..a56f0dfe9 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -28,6 +28,8 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. +#include + #include "unix_socket.h" // TODO(barath): Clarify these comments. @@ -43,50 +45,65 @@ void UnixSocketPort::AcceptNewClient() { int ret; - for (;;) { - ret = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); - if (ret >= 0) { - break; + while (true) { + for (;;) { + ret = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); + if (ret >= 0) { + // New client while we already have an active one; drop connection + if (client_fd_ != kNotConnectedFd) { + close(ret); + } + break; + } + + if (errno != EINTR) { + PLOG(ERROR) << "[UnixSocket]:accept4()"; + } } - if (errno != EINTR) { - PLOG(ERROR) << "[UnixSocket]:accept4()"; + recv_skip_cnt_ = 0; + client_fd_ = ret; + + // Detect when connection is dropped by client + struct epoll_event event; + event.events = EPOLLRDHUP; + ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, ret, &event); + if (ret < 0) { + PLOG(ERROR) << "[UnixSocket]:epoll_ctl()"; + break; } - } - recv_skip_cnt_ = 0; + while (true) { + ret = epoll_wait(epoll_fd_, &event, 1, -1); + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + PLOG(ERROR) << "[UnixSocket]:epoll_wait()"; + break; + } + } - if (old_client_fd_ != kNotConnectedFd) { - // Reuse the old file descriptor number by atomically exchanging the new fd - // with the - // old one. The zombie socket is closed silently (see dup2). - dup2(ret, client_fd_); - close(ret); - } else { - client_fd_ = ret; + if (event.events & EPOLLRDHUP) { + break; + } + } + + // Close dropped connection (Send/Recv are resilient) + epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd_, &event); + close(client_fd_); + client_fd_ = -1; } } // This accept thread terminates once a new client is connected. void *AcceptThreadMain(void *arg) { + // TODO: should make sure it's pinned to a non-worker CPU UnixSocketPort *p = reinterpret_cast(arg); p->AcceptNewClient(); return nullptr; } -// The file descriptor for the connection will not be closed, until we have a -// new client. -// This is to avoid race condition in TX process. -void UnixSocketPort::CloseConnection() { - // Keep client_fd, since it may be being used in unix_send_pkts(). - old_client_fd_ = client_fd_; - client_fd_ = kNotConnectedFd; - - // Relaunch the accept thread. - std::thread accept_thread(AcceptThreadMain, reinterpret_cast(this)); - accept_thread.detach(); -} - CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { const std::string path = arg.path(); int num_txq = num_queues[PACKET_DIR_OUT]; @@ -97,12 +114,17 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { int ret; client_fd_ = kNotConnectedFd; - old_client_fd_ = kNotConnectedFd; if (num_txq > 1 || num_rxq > 1) { return CommandFailure(EINVAL, "Cannot have more than 1 queue per RX/TX"); } + // TODO: close in error paths to avoid leaking FDs + epoll_fd_ = epoll_create(1); + if (epoll_fd_ < 0) { + return CommandFailure(errno, "epoll_create(1) failed"); + } + listen_fd_ = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (listen_fd_ < 0) { return CommandFailure(errno, "socket(AF_UNIX) failed"); @@ -146,6 +168,7 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { void UnixSocketPort::DeInit() { close(listen_fd_); + close(epoll_fd_); if (client_fd_ >= 0) { close(client_fd_); @@ -153,9 +176,11 @@ void UnixSocketPort::DeInit() { } int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { + int client_fd = client_fd_; + DCHECK_EQ(qid, 0); - if (client_fd_ == kNotConnectedFd) { + if (client_fd == kNotConnectedFd) { return 0; } @@ -174,7 +199,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { } // Datagrams larger than 2KB will be truncated. - ret = recv(client_fd_, pkt->data(), SNBUF_DATA, 0); + ret = recv(client_fd, pkt->data(), SNBUF_DATA, 0); if (ret > 0) { pkt->append(ret); @@ -185,7 +210,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { bess::Packet::Free(pkt); if (ret < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EBADF) { break; } @@ -195,7 +220,6 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { } // Connection closed. - CloseConnection(); break; } @@ -208,9 +232,14 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) { int sent = 0; + int client_fd = client_fd_; DCHECK_EQ(qid, 0); + if (client_fd == kNotConnectedFd) { + return 0; + } + for (int i = 0; i < cnt; i++) { bess::Packet *pkt = pkts[i]; @@ -229,7 +258,7 @@ int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) { pkt = pkt->next(); } - ret = sendmsg(client_fd_, &msg, 0); + ret = sendmsg(client_fd, &msg, 0); if (ret < 0) { break; } diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index e4c2fc709..c3a1cd5bb 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -57,8 +57,7 @@ class UnixSocketPort final : public Port { recv_skip_cnt_(), listen_fd_(), addr_(), - client_fd_(), - old_client_fd_() {} + client_fd_() {} /*! * Initialize the port, ie, open the socket. @@ -116,11 +115,6 @@ class UnixSocketPort final : public Port { // Value for a disconnected socket. static const int kNotConnectedFd = -1; - /*! - * Closes the client connection but does not shut down the listener fd. - */ - void CloseConnection(); - /*! * Calling recv() system call is expensive so we only do it every * RECV_SKIP_TICKS times -- this counter keeps track of how many ticks its been @@ -138,13 +132,15 @@ class UnixSocketPort final : public Port { */ struct sockaddr_un addr_; + /*! + * The epoll fd -- detect when client closes. + */ + int epoll_fd_; + // NOTE: three threads (accept / recv / send) may race on this, so use // volatile. /* FD for client connection.*/ volatile int client_fd_; - /* If client FD is not connected, what was the fd the last time we were - * connected to a client? */ - int old_client_fd_; }; #endif // BESS_DRIVERS_UNIXSOCKET_H_ From 55ea0285780bded3f13f6699a42e78f3607068b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Tue, 22 Aug 2017 14:07:52 +0300 Subject: [PATCH 02/11] drivers/unix_socket: Make sure no FDs are leaked during initialization error --- core/drivers/unix_socket.cc | 16 +++++++++++----- core/drivers/unix_socket.h | 5 +++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index a56f0dfe9..f6b608c7b 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -119,14 +119,15 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { return CommandFailure(EINVAL, "Cannot have more than 1 queue per RX/TX"); } - // TODO: close in error paths to avoid leaking FDs epoll_fd_ = epoll_create(1); if (epoll_fd_ < 0) { + DeInit(); return CommandFailure(errno, "epoll_create(1) failed"); } listen_fd_ = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (listen_fd_ < 0) { + DeInit(); return CommandFailure(errno, "socket(AF_UNIX) failed"); } @@ -152,11 +153,13 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { ret = bind(listen_fd_, reinterpret_cast(&addr_), addrlen); if (ret < 0) { + DeInit(); return CommandFailure(errno, "bind(%s) failed", addr_.sun_path); } ret = listen(listen_fd_, 1); if (ret < 0) { + DeInit(); return CommandFailure(errno, "listen() failed"); } @@ -167,10 +170,13 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { } void UnixSocketPort::DeInit() { - close(listen_fd_); - close(epoll_fd_); - - if (client_fd_ >= 0) { + if (listen_fd_ != kNotConnectedFd) { + close(listen_fd_); + } + if (epoll_fd_ != kNotConnectedFd) { + close(epoll_fd_); + } + if (client_fd_ != kNotConnectedFd) { close(client_fd_); } } diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index c3a1cd5bb..1d71ec2aa 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -55,9 +55,10 @@ class UnixSocketPort final : public Port { UnixSocketPort() : Port(), recv_skip_cnt_(), - listen_fd_(), + listen_fd_(kNotConnectedFd), addr_(), - client_fd_() {} + epoll_fd_(kNotConnectedFd), + client_fd_(kNotConnectedFd) {} /*! * Initialize the port, ie, open the socket. From a610eb5e14d03242b900a76fc3e13d7a76d2dca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Tue, 22 Aug 2017 19:52:32 +0300 Subject: [PATCH 03/11] drivers/unix_socket: Ensure accept thread and FDs are properly cleaned up --- core/drivers/unix_socket.cc | 178 +++++++++++++++++++++++------------- core/drivers/unix_socket.h | 22 ++++- 2 files changed, 129 insertions(+), 71 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index f6b608c7b..cc03e8068 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -28,6 +28,7 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. +#include #include #include "unix_socket.h" @@ -42,66 +43,20 @@ #define RECV_SKIP_TICKS 256 #define MAX_TX_FRAGS 8 -void UnixSocketPort::AcceptNewClient() { - int ret; - - while (true) { - for (;;) { - ret = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); - if (ret >= 0) { - // New client while we already have an active one; drop connection - if (client_fd_ != kNotConnectedFd) { - close(ret); - } - break; - } - - if (errno != EINTR) { - PLOG(ERROR) << "[UnixSocket]:accept4()"; - } - } +#define SIG_THREAD_EXIT SIGUSR2 - recv_skip_cnt_ = 0; - client_fd_ = ret; - // Detect when connection is dropped by client +static void EpollAdd(int epoll_fd, int new_fd, uint32_t events) +{ struct epoll_event event; - event.events = EPOLLRDHUP; - ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, ret, &event); - if (ret < 0) { - PLOG(ERROR) << "[UnixSocket]:epoll_ctl()"; - break; - } - - while (true) { - ret = epoll_wait(epoll_fd_, &event, 1, -1); - if (ret < 0) { - if (errno == EINTR) { - continue; - } else { - PLOG(ERROR) << "[UnixSocket]:epoll_wait()"; - break; - } - } - - if (event.events & EPOLLRDHUP) { - break; - } - } - - // Close dropped connection (Send/Recv are resilient) - epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd_, &event); - close(client_fd_); - client_fd_ = -1; - } + event.events = events; + event.data.fd = new_fd; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &event); } -// This accept thread terminates once a new client is connected. -void *AcceptThreadMain(void *arg) { - // TODO: should make sure it's pinned to a non-worker CPU - UnixSocketPort *p = reinterpret_cast(arg); - p->AcceptNewClient(); - return nullptr; +static void EpollDel(int epoll_fd, int fd) +{ + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr); } CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { @@ -119,12 +74,6 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { return CommandFailure(EINVAL, "Cannot have more than 1 queue per RX/TX"); } - epoll_fd_ = epoll_create(1); - if (epoll_fd_ < 0) { - DeInit(); - return CommandFailure(errno, "epoll_create(1) failed"); - } - listen_fd_ = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (listen_fd_ < 0) { DeInit(); @@ -163,22 +112,119 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { return CommandFailure(errno, "listen() failed"); } - std::thread accept_thread(AcceptThreadMain, reinterpret_cast(this)); - accept_thread.detach(); + + int control_fds[2]; + ret = pipe2(control_fds, 0); + if (ret < 0) { + DeInit(); + return CommandFailure(errno, "pipe2() failed"); + } + accept_thread_stopped_fd_ = control_fds[0]; + accept_thread_stop_fd_ = control_fds[1]; + + + epoll_fd_ = epoll_create(1); + if (epoll_fd_ < 0) { + DeInit(); + return CommandFailure(errno, "epoll_create(1) failed"); + } + EpollAdd(epoll_fd_, listen_fd_, EPOLLIN); + EpollAdd(epoll_fd_, accept_thread_stopped_fd_, EPOLLIN); + + + accept_thread_ = std::thread([&]() { + while (true) { + struct epoll_event event; + if (epoll_wait(epoll_fd_, &event, 1, -1) < 0) { + if (errno == EINTR) { + continue; + } else { + PLOG(ERROR) << "[UnixSocketPort]:epoll_wait()"; + } + } else { + if (event.data.fd == accept_thread_stopped_fd_) { + if (event.events & EPOLLIN) { + // thread requested to stop + break; + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in accept_thread_stopped_fd_"; + } + + } else if (event.data.fd == listen_fd_) { + if (event.events & EPOLLIN) { + // new client connected + int fd; + while (true) { + fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); + if (fd >= 0 || errno != EINTR) { + break; + } + } + if (fd < 0) { + PLOG(ERROR) << "[UnixSocketPort]:accept4()"; + } else { + EpollAdd(epoll_fd_, fd, EPOLLRDHUP); + client_fd_ = fd; + } + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in listen_fd_"; + } + + } else if (event.data.fd == client_fd_) { + if (event.events & EPOLLRDHUP) { + // connection dropped by client + int fd = client_fd_; + client_fd_ = -1; + EpollDel(epoll_fd_, fd); + close(fd); + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in client_fd_"; + } + + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected fd " << event.data.fd; + } + } + } + }); return CommandSuccess(); } void UnixSocketPort::DeInit() { + if (accept_thread_.joinable()) { + LOG(INFO) << "test: signalling\n"; + char c = 0; + if (write(accept_thread_stop_fd_, &c, sizeof(c)) < 0) { + PLOG(ERROR) << "[UnixSocketPort]:write(accept_thread_stop_fd_)"; + } + LOG(INFO) << "test: join\n"; + accept_thread_.join(); + LOG(INFO) << "test: joined\n"; + } + + if (accept_thread_stopped_fd_ != kNotConnectedFd) { + close(accept_thread_stopped_fd_); + } + if (accept_thread_stop_fd_ != kNotConnectedFd) { + close(accept_thread_stop_fd_); + } if (listen_fd_ != kNotConnectedFd) { close(listen_fd_); } - if (epoll_fd_ != kNotConnectedFd) { - close(epoll_fd_); - } if (client_fd_ != kNotConnectedFd) { close(client_fd_); } + if (epoll_fd_ != kNotConnectedFd) { + close(epoll_fd_); + } } int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index 1d71ec2aa..24d57937f 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -55,6 +55,8 @@ class UnixSocketPort final : public Port { UnixSocketPort() : Port(), recv_skip_cnt_(), + accept_thread_stopped_fd_(kNotConnectedFd), + accept_thread_stop_fd_(kNotConnectedFd), listen_fd_(kNotConnectedFd), addr_(), epoll_fd_(kNotConnectedFd), @@ -107,11 +109,6 @@ class UnixSocketPort final : public Port { */ int SendPackets(queue_t qid, bess::Packet **pkts, int cnt) override; - /*! - * Waits for a client to connect to the socket. - */ - void AcceptNewClient(); - private: // Value for a disconnected socket. static const int kNotConnectedFd = -1; @@ -123,6 +120,21 @@ class UnixSocketPort final : public Port { * */ uint32_t recv_skip_cnt_; + /*! + * Thread accepting and monitoring clients. + */ + std::thread accept_thread_; + + /*! + * FD to read stop signal in accept thread. + */ + int accept_thread_stopped_fd_; + + /*! + * FD to signal accept thread to stop. + */ + int accept_thread_stop_fd_; + /*! * The listener fd -- listen for new connections here. */ From c1c1b55a21138b5d503984bbcad6304c0eb68067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Tue, 22 Aug 2017 21:25:28 +0300 Subject: [PATCH 04/11] testing: [vlan] Limit number of UnixSocketPort instances created by test It seems that 32-bit builds reach the limit of open file descriptors. --- bessctl/conf/testing/module_tests/vlan.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bessctl/conf/testing/module_tests/vlan.py b/bessctl/conf/testing/module_tests/vlan.py index 24435a7be..fbf7e8c41 100644 --- a/bessctl/conf/testing/module_tests/vlan.py +++ b/bessctl/conf/testing/module_tests/vlan.py @@ -81,8 +81,7 @@ def gen_untagged_packet(): 'output_port': default_gate, 'input_packet': q, 'output_packet': q}) - return [VLANSplit(), 1, 150, expected] + return [VLANSplit(), 1, 30, expected] - OUTPUT_TEST_INPUTS.append(output_test([1, 100, 77, -1, 149, 50, 100, -1])) - OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70])) - OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70], True)) + OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7])) + OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7], True)) From 520f66b49fb5d8145185b4d312752fcb1a507e28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Tue, 22 Aug 2017 23:09:10 +0300 Subject: [PATCH 05/11] drivers/unix_socket: Cleanup stray code --- core/drivers/unix_socket.cc | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index cc03e8068..1a306ee0a 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -41,9 +41,6 @@ // TODO: Revise this once the interrupt mode is implemented. #define RECV_SKIP_TICKS 256 -#define MAX_TX_FRAGS 8 - -#define SIG_THREAD_EXIT SIGUSR2 static void EpollAdd(int epoll_fd, int new_fd, uint32_t events) @@ -68,8 +65,6 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { int ret; - client_fd_ = kNotConnectedFd; - if (num_txq > 1 || num_rxq > 1) { return CommandFailure(EINVAL, "Cannot have more than 1 queue per RX/TX"); } From fc9d8e4024ee967806296b49addc89f65f1642bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Wed, 23 Aug 2017 01:27:43 +0300 Subject: [PATCH 06/11] drivers/unix_socket: Move accept thread body to a separate function --- core/drivers/unix_socket.cc | 124 +++++++++++++++++++----------------- core/drivers/unix_socket.h | 7 +- 2 files changed, 70 insertions(+), 61 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index 1a306ee0a..82a521046 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -56,6 +56,69 @@ static void EpollDel(int epoll_fd, int fd) epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr); } +void UnixSocketPort::AcceptThread() { + while (true) { + struct epoll_event event; + if (epoll_wait(epoll_fd_, &event, 1, -1) < 0) { + if (errno == EINTR) { + continue; + } else { + PLOG(ERROR) << "[UnixSocketPort]:epoll_wait()"; + } + } else { + if (event.data.fd == accept_thread_stopped_fd_) { + if (event.events & EPOLLIN) { + // thread requested to stop + break; + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in accept_thread_stopped_fd_"; + } + + } else if (event.data.fd == listen_fd_) { + if (event.events & EPOLLIN) { + // new client connected + int fd; + while (true) { + fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); + if (fd >= 0 || errno != EINTR) { + break; + } + } + if (fd < 0) { + PLOG(ERROR) << "[UnixSocketPort]:accept4()"; + } else { + EpollAdd(epoll_fd_, fd, EPOLLRDHUP); + client_fd_ = fd; + } + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in listen_fd_"; + } + + } else if (event.data.fd == client_fd_) { + if (event.events & EPOLLRDHUP) { + // connection dropped by client + int fd = client_fd_; + client_fd_ = -1; + EpollDel(epoll_fd_, fd); + close(fd); + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in client_fd_"; + } + + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected fd " << event.data.fd; + } + } + } +} + CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { const std::string path = arg.path(); int num_txq = num_queues[PACKET_DIR_OUT]; @@ -128,66 +191,7 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { accept_thread_ = std::thread([&]() { - while (true) { - struct epoll_event event; - if (epoll_wait(epoll_fd_, &event, 1, -1) < 0) { - if (errno == EINTR) { - continue; - } else { - PLOG(ERROR) << "[UnixSocketPort]:epoll_wait()"; - } - } else { - if (event.data.fd == accept_thread_stopped_fd_) { - if (event.events & EPOLLIN) { - // thread requested to stop - break; - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in accept_thread_stopped_fd_"; - } - - } else if (event.data.fd == listen_fd_) { - if (event.events & EPOLLIN) { - // new client connected - int fd; - while (true) { - fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); - if (fd >= 0 || errno != EINTR) { - break; - } - } - if (fd < 0) { - PLOG(ERROR) << "[UnixSocketPort]:accept4()"; - } else { - EpollAdd(epoll_fd_, fd, EPOLLRDHUP); - client_fd_ = fd; - } - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in listen_fd_"; - } - - } else if (event.data.fd == client_fd_) { - if (event.events & EPOLLRDHUP) { - // connection dropped by client - int fd = client_fd_; - client_fd_ = -1; - EpollDel(epoll_fd_, fd); - close(fd); - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in client_fd_"; - } - - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected fd " << event.data.fd; - } - } - } + this->AcceptThread(); }); return CommandSuccess(); diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index 24d57937f..1b489172f 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -121,7 +121,12 @@ class UnixSocketPort final : public Port { uint32_t recv_skip_cnt_; /*! - * Thread accepting and monitoring clients. + * Function for the thread accepting and monitoring clients (accept thread). + */ + void AcceptThread(); + + /*! + * Accept thread handle. */ std::thread accept_thread_; From c9c62183323eeff50d939b768dd9b1426fa968a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Wed, 23 Aug 2017 02:39:38 +0300 Subject: [PATCH 07/11] drivers/unix_socket: Use a signal to activate exit of accept thread Using signals instead of pipes minimizes the number of open file descriptors (two FDs per instance). --- core/drivers/unix_socket.cc | 145 ++++++++++++++++-------------------- core/drivers/unix_socket.h | 13 +--- 2 files changed, 70 insertions(+), 88 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index 82a521046..f5885ad7d 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -41,82 +41,82 @@ // TODO: Revise this once the interrupt mode is implemented. #define RECV_SKIP_TICKS 256 +#define SIG_THREAD_EXIT SIGUSR2 -static void EpollAdd(int epoll_fd, int new_fd, uint32_t events) -{ +static void EpollAdd(int epoll_fd, int new_fd, uint32_t events) { struct epoll_event event; event.events = events; event.data.fd = new_fd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &event); } -static void EpollDel(int epoll_fd, int fd) -{ +static void EpollDel(int epoll_fd, int fd) { epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr); } void UnixSocketPort::AcceptThread() { + sigset_t sigset; + sigfillset(&sigset); + sigdelset(&sigset, SIG_THREAD_EXIT); + while (true) { struct epoll_event event; - if (epoll_wait(epoll_fd_, &event, 1, -1) < 0) { + if (epoll_pwait(epoll_fd_, &event, 1, -1, &sigset) < 0) { if (errno == EINTR) { - continue; + if (accept_thread_stop_req_) { + break; + } else { + continue; + } } else { PLOG(ERROR) << "[UnixSocketPort]:epoll_wait()"; } - } else { - if (event.data.fd == accept_thread_stopped_fd_) { - if (event.events & EPOLLIN) { - // thread requested to stop - break; - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in accept_thread_stopped_fd_"; - } - - } else if (event.data.fd == listen_fd_) { - if (event.events & EPOLLIN) { - // new client connected - int fd; - while (true) { - fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); - if (fd >= 0 || errno != EINTR) { - break; - } - } - if (fd < 0) { - PLOG(ERROR) << "[UnixSocketPort]:accept4()"; - } else { - EpollAdd(epoll_fd_, fd, EPOLLRDHUP); - client_fd_ = fd; - } - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in listen_fd_"; - } - - } else if (event.data.fd == client_fd_) { - if (event.events & EPOLLRDHUP) { - // connection dropped by client - int fd = client_fd_; - client_fd_ = -1; - EpollDel(epoll_fd_, fd); - close(fd); - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in client_fd_"; - } - - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected fd " << event.data.fd; + + } else if (event.data.fd == listen_fd_) { + if (event.events & EPOLLIN) { + // new client connected + int fd; + while (true) { + fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); + if (fd >= 0 || errno != EINTR) { + break; } } + if (fd < 0) { + PLOG(ERROR) << "[UnixSocketPort]:accept4()"; + } else { + EpollAdd(epoll_fd_, fd, EPOLLRDHUP); + client_fd_ = fd; + } + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in listen_fd_"; + } + + } else if (event.data.fd == client_fd_) { + if (event.events & EPOLLRDHUP) { + // connection dropped by client + int fd = client_fd_; + client_fd_ = -1; + EpollDel(epoll_fd_, fd); + close(fd); + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected event " << event.events + << " in client_fd_"; } + + } else { + LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " + << "Unexpected fd " << event.data.fd; + } + } +} + +static void AcceptThreadHandler(int sig) { + sig = sig; } CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { @@ -171,23 +171,21 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { } - int control_fds[2]; - ret = pipe2(control_fds, 0); - if (ret < 0) { - DeInit(); - return CommandFailure(errno, "pipe2() failed"); - } - accept_thread_stopped_fd_ = control_fds[0]; - accept_thread_stop_fd_ = control_fds[1]; - - epoll_fd_ = epoll_create(1); if (epoll_fd_ < 0) { DeInit(); return CommandFailure(errno, "epoll_create(1) failed"); } EpollAdd(epoll_fd_, listen_fd_, EPOLLIN); - EpollAdd(epoll_fd_, accept_thread_stopped_fd_, EPOLLIN); + + + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = AcceptThreadHandler; + if (sigaction(SIG_THREAD_EXIT, &sa, NULL) < 0) { + DeInit(); + return CommandFailure(errno, "sigaction(SIG_THREAD_EXIT) failed"); + } accept_thread_ = std::thread([&]() { @@ -199,22 +197,11 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { void UnixSocketPort::DeInit() { if (accept_thread_.joinable()) { - LOG(INFO) << "test: signalling\n"; - char c = 0; - if (write(accept_thread_stop_fd_, &c, sizeof(c)) < 0) { - PLOG(ERROR) << "[UnixSocketPort]:write(accept_thread_stop_fd_)"; - } - LOG(INFO) << "test: join\n"; + accept_thread_stop_req_ = true; + pthread_kill(accept_thread_.native_handle(), SIG_THREAD_EXIT); accept_thread_.join(); - LOG(INFO) << "test: joined\n"; } - if (accept_thread_stopped_fd_ != kNotConnectedFd) { - close(accept_thread_stopped_fd_); - } - if (accept_thread_stop_fd_ != kNotConnectedFd) { - close(accept_thread_stop_fd_); - } if (listen_fd_ != kNotConnectedFd) { close(listen_fd_); } diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index 1b489172f..9ca4ae0f0 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -32,6 +32,7 @@ #define BESS_DRIVERS_UNIXSOCKET_H_ #include +#include #include #include #include @@ -55,8 +56,7 @@ class UnixSocketPort final : public Port { UnixSocketPort() : Port(), recv_skip_cnt_(), - accept_thread_stopped_fd_(kNotConnectedFd), - accept_thread_stop_fd_(kNotConnectedFd), + accept_thread_stop_req_(false), listen_fd_(kNotConnectedFd), addr_(), epoll_fd_(kNotConnectedFd), @@ -131,14 +131,9 @@ class UnixSocketPort final : public Port { std::thread accept_thread_; /*! - * FD to read stop signal in accept thread. + * Sent stop request to accept thread. */ - int accept_thread_stopped_fd_; - - /*! - * FD to signal accept thread to stop. - */ - int accept_thread_stop_fd_; + std::atomic accept_thread_stop_req_; /*! * The listener fd -- listen for new connections here. From 36c7ed7c219a6c655e7537d0977da83e0f3c7f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Wed, 23 Aug 2017 02:58:39 +0300 Subject: [PATCH 08/11] drivers/unix_socket: Use ppoll to monitor file descriptors Using ppoll() instead of epoll_pwait() saves us from using one additional file descriptor per instance, without introducing any measurable performance overhead. --- core/drivers/unix_socket.cc | 93 ++++++++++++------------------------- core/drivers/unix_socket.h | 6 --- 2 files changed, 30 insertions(+), 69 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index f5885ad7d..0b9b9179a 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -29,7 +29,7 @@ // POSSIBILITY OF SUCH DAMAGE. #include -#include +#include #include "unix_socket.h" @@ -44,73 +44,51 @@ #define SIG_THREAD_EXIT SIGUSR2 -static void EpollAdd(int epoll_fd, int new_fd, uint32_t events) { - struct epoll_event event; - event.events = events; - event.data.fd = new_fd; - epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_fd, &event); -} - -static void EpollDel(int epoll_fd, int fd) { - epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr); -} - void UnixSocketPort::AcceptThread() { sigset_t sigset; sigfillset(&sigset); sigdelset(&sigset, SIG_THREAD_EXIT); + struct pollfd fds[2]; + memset(fds, 0, sizeof(fds)); + fds[0].fd = listen_fd_; + fds[0].events = POLLIN; + fds[1].events = POLLRDHUP; + while (true) { - struct epoll_event event; - if (epoll_pwait(epoll_fd_, &event, 1, -1, &sigset) < 0) { + fds[1].fd = client_fd_; + int res = ppoll(fds, 2, nullptr, &sigset); + + if (accept_thread_stop_req_) { + return; + + } else if (res < 0) { if (errno == EINTR) { - if (accept_thread_stop_req_) { - break; - } else { - continue; - } + continue; } else { PLOG(ERROR) << "[UnixSocketPort]:epoll_wait()"; } - } else if (event.data.fd == listen_fd_) { - if (event.events & EPOLLIN) { - // new client connected - int fd; - while (true) { - fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); - if (fd >= 0 || errno != EINTR) { - break; - } - } - if (fd < 0) { - PLOG(ERROR) << "[UnixSocketPort]:accept4()"; - } else { - EpollAdd(epoll_fd_, fd, EPOLLRDHUP); - client_fd_ = fd; + } else if (fds[0].revents & POLLIN) { + // new client connected + int fd; + while (true) { + fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK); + if (fd >= 0 || errno != EINTR) { + break; } - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in listen_fd_"; } - - } else if (event.data.fd == client_fd_) { - if (event.events & EPOLLRDHUP) { - // connection dropped by client - int fd = client_fd_; - client_fd_ = -1; - EpollDel(epoll_fd_, fd); - close(fd); + if (fd < 0) { + PLOG(ERROR) << "[UnixSocketPort]:accept4()"; } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected event " << event.events - << " in client_fd_"; + client_fd_ = fd; } - } else { - LOG(ERROR) << "[UnixSocketPort]:epoll_wait(): " - << "Unexpected fd " << event.data.fd; + } else if (fds[1].revents & (POLLRDHUP | POLLHUP)) { + // connection dropped by client + int fd = client_fd_; + client_fd_ = -1; + close(fd); } } } @@ -171,14 +149,6 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { } - epoll_fd_ = epoll_create(1); - if (epoll_fd_ < 0) { - DeInit(); - return CommandFailure(errno, "epoll_create(1) failed"); - } - EpollAdd(epoll_fd_, listen_fd_, EPOLLIN); - - struct sigaction sa; memset(&sa, 0, sizeof(sa)); sa.sa_handler = AcceptThreadHandler; @@ -208,9 +178,6 @@ void UnixSocketPort::DeInit() { if (client_fd_ != kNotConnectedFd) { close(client_fd_); } - if (epoll_fd_ != kNotConnectedFd) { - close(epoll_fd_); - } } int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) { diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index 9ca4ae0f0..ae6dba128 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -59,7 +59,6 @@ class UnixSocketPort final : public Port { accept_thread_stop_req_(false), listen_fd_(kNotConnectedFd), addr_(), - epoll_fd_(kNotConnectedFd), client_fd_(kNotConnectedFd) {} /*! @@ -145,11 +144,6 @@ class UnixSocketPort final : public Port { */ struct sockaddr_un addr_; - /*! - * The epoll fd -- detect when client closes. - */ - int epoll_fd_; - // NOTE: three threads (accept / recv / send) may race on this, so use // volatile. /* FD for client connection.*/ From 5f4dbcb7d8a33f695f6a212cf756626b71fe414e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Wed, 23 Aug 2017 03:10:18 +0300 Subject: [PATCH 09/11] drivers/unix_socket: Ignore additional clients Only one client connection is supported now, so ignore new ones as long as the current connection is not severed. --- core/drivers/unix_socket.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index 0b9b9179a..fa3d0ed26 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -80,6 +80,9 @@ void UnixSocketPort::AcceptThread() { } if (fd < 0) { PLOG(ERROR) << "[UnixSocketPort]:accept4()"; + } else if (client_fd_ != kNotConnectedFd) { + LOG(WARNING) << "[UnixSocketPort]: Ignoring additional client\n"; + close(fd); } else { client_fd_ = fd; } From c8e9e988b6ee97d32eb84ee6130f545244ff9dad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Wed, 23 Aug 2017 03:12:13 +0300 Subject: [PATCH 10/11] drivers/unix_socket: Keep compiler happy about warnings --- core/drivers/unix_socket.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index fa3d0ed26..04741e8b7 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -97,7 +97,7 @@ void UnixSocketPort::AcceptThread() { } static void AcceptThreadHandler(int sig) { - sig = sig; + int arg __attribute__((unused)) = sig; // keep compiler happy } CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { From 8cd0de6d70e2950fa7643a7d66bddca4680ccd5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Llu=C3=ADs=20Vilanova?= Date: Thu, 24 Aug 2017 16:25:34 +0300 Subject: [PATCH 11/11] drivers/unix_socket: Code cleanup --- core/drivers/unix_socket.cc | 22 ++++++++++++++-------- core/drivers/unix_socket.h | 10 ++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/drivers/unix_socket.cc b/core/drivers/unix_socket.cc index 04741e8b7..e8d828ada 100644 --- a/core/drivers/unix_socket.cc +++ b/core/drivers/unix_socket.cc @@ -28,8 +28,13 @@ // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. -#include + +#include #include +#include + +#include +#include #include "unix_socket.h" @@ -56,6 +61,7 @@ void UnixSocketPort::AcceptThread() { fds[1].events = POLLRDHUP; while (true) { + // negative FDs are ignored by ppoll() fds[1].fd = client_fd_; int res = ppoll(fds, 2, nullptr, &sigset); @@ -66,7 +72,7 @@ void UnixSocketPort::AcceptThread() { if (errno == EINTR) { continue; } else { - PLOG(ERROR) << "[UnixSocketPort]:epoll_wait()"; + PLOG(ERROR) << "ppoll()"; } } else if (fds[0].revents & POLLIN) { @@ -79,9 +85,9 @@ void UnixSocketPort::AcceptThread() { } } if (fd < 0) { - PLOG(ERROR) << "[UnixSocketPort]:accept4()"; + PLOG(ERROR) << "accept4()"; } else if (client_fd_ != kNotConnectedFd) { - LOG(WARNING) << "[UnixSocketPort]: Ignoring additional client\n"; + LOG(WARNING) << "Ignoring additional client\n"; close(fd); } else { client_fd_ = fd; @@ -90,14 +96,14 @@ void UnixSocketPort::AcceptThread() { } else if (fds[1].revents & (POLLRDHUP | POLLHUP)) { // connection dropped by client int fd = client_fd_; - client_fd_ = -1; + client_fd_ = kNotConnectedFd; close(fd); } } } -static void AcceptThreadHandler(int sig) { - int arg __attribute__((unused)) = sig; // keep compiler happy +static void AcceptThreadHandler(int) { + // empty handler, we only care about blocking syscalls being interrupted } CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { @@ -161,7 +167,7 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) { } - accept_thread_ = std::thread([&]() { + accept_thread_ = std::thread([this]() { this->AcceptThread(); }); diff --git a/core/drivers/unix_socket.h b/core/drivers/unix_socket.h index ae6dba128..f7f3cf8dc 100644 --- a/core/drivers/unix_socket.h +++ b/core/drivers/unix_socket.h @@ -31,18 +31,12 @@ #ifndef BESS_DRIVERS_UNIXSOCKET_H_ #define BESS_DRIVERS_UNIXSOCKET_H_ -#include -#include -#include -#include -#include -#include #include #include -#include #include -#include +#include +#include #include "../message.h" #include "../port.h"