Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions bessctl/conf/testing/module_tests/vlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def gen_untagged_packet():
'output_port': default_gate,
'input_packet': q,
'output_packet': q})
return [VLANSplit(), 1, 150, expected]
return [VLANSplit(), 1, 30, expected]

OUTPUT_TEST_INPUTS.append(output_test([1, 100, 77, -1, 149, 50, 100, -1]))
OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70]))
OUTPUT_TEST_INPUTS.append(output_test([100, 77, -1, 149, 50, 100, -1, 33, 70], True))
OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7]))
OUTPUT_TEST_INPUTS.append(output_test([1, 17, -1, 29, 10, 13, 7], True))
145 changes: 94 additions & 51 deletions core/drivers/unix_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.


#include <glog/logging.h>
#include <poll.h>
#include <signal.h>

#include <cerrno>
#include <cstring>

#include "unix_socket.h"

// TODO(barath): Clarify these comments.
Expand All @@ -38,53 +46,64 @@
// TODO: Revise this once the interrupt mode is implemented.

#define RECV_SKIP_TICKS 256
#define MAX_TX_FRAGS 8
#define SIG_THREAD_EXIT SIGUSR2

void UnixSocketPort::AcceptNewClient() {
int ret;

for (;;) {
ret = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK);
if (ret >= 0) {
break;
}
void UnixSocketPort::AcceptThread() {
sigset_t sigset;
sigfillset(&sigset);
sigdelset(&sigset, SIG_THREAD_EXIT);

if (errno != EINTR) {
PLOG(ERROR) << "[UnixSocket]:accept4()";
}
}
struct pollfd fds[2];
memset(fds, 0, sizeof(fds));
fds[0].fd = listen_fd_;
fds[0].events = POLLIN;
fds[1].events = POLLRDHUP;

recv_skip_cnt_ = 0;
while (true) {
// negative FDs are ignored by ppoll()
fds[1].fd = client_fd_;
int res = ppoll(fds, 2, nullptr, &sigset);

if (old_client_fd_ != kNotConnectedFd) {
// Reuse the old file descriptor number by atomically exchanging the new fd
// with the
// old one. The zombie socket is closed silently (see dup2).
dup2(ret, client_fd_);
close(ret);
} else {
client_fd_ = ret;
}
}
if (accept_thread_stop_req_) {
return;

} else if (res < 0) {
if (errno == EINTR) {
continue;
} else {
PLOG(ERROR) << "ppoll()";
}

// This accept thread terminates once a new client is connected.
void *AcceptThreadMain(void *arg) {
UnixSocketPort *p = reinterpret_cast<UnixSocketPort *>(arg);
p->AcceptNewClient();
return nullptr;
} else if (fds[0].revents & POLLIN) {
// new client connected
int fd;
while (true) {
fd = accept4(listen_fd_, nullptr, nullptr, SOCK_NONBLOCK);
if (fd >= 0 || errno != EINTR) {
break;
}
}
if (fd < 0) {
PLOG(ERROR) << "accept4()";
} else if (client_fd_ != kNotConnectedFd) {
LOG(WARNING) << "Ignoring additional client\n";
close(fd);
} else {
client_fd_ = fd;
}

} else if (fds[1].revents & (POLLRDHUP | POLLHUP)) {
// connection dropped by client
int fd = client_fd_;
client_fd_ = kNotConnectedFd;
close(fd);
}
}
}

// The file descriptor for the connection will not be closed, until we have a
// new client.
// This is to avoid race condition in TX process.
void UnixSocketPort::CloseConnection() {
// Keep client_fd, since it may be being used in unix_send_pkts().
old_client_fd_ = client_fd_;
client_fd_ = kNotConnectedFd;

// Relaunch the accept thread.
std::thread accept_thread(AcceptThreadMain, reinterpret_cast<void *>(this));
accept_thread.detach();
static void AcceptThreadHandler(int) {
// empty handler, we only care about blocking syscalls being interrupted
}

CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {
Expand All @@ -96,15 +115,13 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {

int ret;

client_fd_ = kNotConnectedFd;
old_client_fd_ = kNotConnectedFd;

if (num_txq > 1 || num_rxq > 1) {
return CommandFailure(EINVAL, "Cannot have more than 1 queue per RX/TX");
}

listen_fd_ = socket(AF_UNIX, SOCK_SEQPACKET, 0);
if (listen_fd_ < 0) {
DeInit();
return CommandFailure(errno, "socket(AF_UNIX) failed");
}

Expand All @@ -130,32 +147,54 @@ CommandResponse UnixSocketPort::Init(const bess::pb::UnixSocketPortArg &arg) {

ret = bind(listen_fd_, reinterpret_cast<struct sockaddr *>(&addr_), addrlen);
if (ret < 0) {
DeInit();
return CommandFailure(errno, "bind(%s) failed", addr_.sun_path);
}

ret = listen(listen_fd_, 1);
if (ret < 0) {
DeInit();
return CommandFailure(errno, "listen() failed");
}

std::thread accept_thread(AcceptThreadMain, reinterpret_cast<void *>(this));
accept_thread.detach();

struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = AcceptThreadHandler;
if (sigaction(SIG_THREAD_EXIT, &sa, NULL) < 0) {
DeInit();
return CommandFailure(errno, "sigaction(SIG_THREAD_EXIT) failed");
}


accept_thread_ = std::thread([this]() {
this->AcceptThread();
});

return CommandSuccess();
}

void UnixSocketPort::DeInit() {
close(listen_fd_);
if (accept_thread_.joinable()) {
accept_thread_stop_req_ = true;
pthread_kill(accept_thread_.native_handle(), SIG_THREAD_EXIT);
accept_thread_.join();
}

if (client_fd_ >= 0) {
if (listen_fd_ != kNotConnectedFd) {
close(listen_fd_);
}
if (client_fd_ != kNotConnectedFd) {
close(client_fd_);
}
}

int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
int client_fd = client_fd_;

DCHECK_EQ(qid, 0);

if (client_fd_ == kNotConnectedFd) {
if (client_fd == kNotConnectedFd) {
return 0;
}

Expand All @@ -174,7 +213,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
}

// Datagrams larger than 2KB will be truncated.
ret = recv(client_fd_, pkt->data(), SNBUF_DATA, 0);
ret = recv(client_fd, pkt->data(), SNBUF_DATA, 0);

if (ret > 0) {
pkt->append(ret);
Expand All @@ -185,7 +224,7 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
bess::Packet::Free(pkt);

if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EBADF) {
break;
}

Expand All @@ -195,7 +234,6 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {
}

// Connection closed.
CloseConnection();
break;
}

Expand All @@ -208,9 +246,14 @@ int UnixSocketPort::RecvPackets(queue_t qid, bess::Packet **pkts, int cnt) {

int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) {
int sent = 0;
int client_fd = client_fd_;

DCHECK_EQ(qid, 0);

if (client_fd == kNotConnectedFd) {
return 0;
}

for (int i = 0; i < cnt; i++) {
bess::Packet *pkt = pkts[i];

Expand All @@ -229,7 +272,7 @@ int UnixSocketPort::SendPackets(queue_t qid, bess::Packet **pkts, int cnt) {
pkt = pkt->next();
}

ret = sendmsg(client_fd_, &msg, 0);
ret = sendmsg(client_fd, &msg, 0);
if (ret < 0) {
break;
}
Expand Down
43 changes: 20 additions & 23 deletions core/drivers/unix_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,12 @@
#ifndef BESS_DRIVERS_UNIXSOCKET_H_
#define BESS_DRIVERS_UNIXSOCKET_H_

#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <unistd.h>

#include <glog/logging.h>
#include <atomic>
#include <thread>

#include "../message.h"
#include "../port.h"
Expand All @@ -55,10 +50,10 @@ class UnixSocketPort final : public Port {
UnixSocketPort()
: Port(),
recv_skip_cnt_(),
listen_fd_(),
accept_thread_stop_req_(false),
listen_fd_(kNotConnectedFd),
addr_(),
client_fd_(),
old_client_fd_() {}
client_fd_(kNotConnectedFd) {}

/*!
* Initialize the port, ie, open the socket.
Expand Down Expand Up @@ -107,27 +102,32 @@ class UnixSocketPort final : public Port {
*/
int SendPackets(queue_t qid, bess::Packet **pkts, int cnt) override;

/*!
* Waits for a client to connect to the socket.
*/
void AcceptNewClient();

private:
// Value for a disconnected socket.
static const int kNotConnectedFd = -1;

/*!
* Closes the client connection but does not shut down the listener fd.
*/
void CloseConnection();

/*!
* Calling recv() system call is expensive so we only do it every
* RECV_SKIP_TICKS times -- this counter keeps track of how many ticks its been
* since we last called recv().
* */
uint32_t recv_skip_cnt_;

/*!
* Function for the thread accepting and monitoring clients (accept thread).
*/
void AcceptThread();

/*!
* Accept thread handle.
*/
std::thread accept_thread_;

/*!
* Sent stop request to accept thread.
*/
std::atomic<bool> accept_thread_stop_req_;

/*!
* The listener fd -- listen for new connections here.
*/
Expand All @@ -142,9 +142,6 @@ class UnixSocketPort final : public Port {
// volatile.
/* FD for client connection.*/
volatile int client_fd_;
/* If client FD is not connected, what was the fd the last time we were
* connected to a client? */
int old_client_fd_;
};

#endif // BESS_DRIVERS_UNIXSOCKET_H_