Skip to content

Commit

Permalink
[global] create our own copy of mmsghdr in preparation to drop sendmm…
Browse files Browse the repository at this point in the history
…sg and recvmmsg

knet_bench will drop mmsghdr when dropping sendmmsg and recvmmsg

Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Feb 15, 2017
1 parent 989e7e3 commit 51c36b2
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 27 deletions.
5 changes: 0 additions & 5 deletions configure.ac
Expand Up @@ -197,11 +197,6 @@ if test "x$ac_cv_header_sys_epoll_h" = xyes && test "x$ac_cv_func_kevent" = xyes
AC_MSG_ERROR([Both epoll and kevent available on this OS, please contact the maintainers to fix the code])
fi

# Check entries in specific structs
AC_CHECK_MEMBER([struct mmsghdr.msg_hdr],
[AC_DEFINE_UNQUOTED([HAVE_MMSGHDR], [1], [struct mmsghdr exists])],
[], [[#include <sys/socket.h>]])

# checks (for kronosnetd)
if test "x$enable_kronosnetd" = xyes; then

Expand Down
7 changes: 0 additions & 7 deletions libknet/compat.h
Expand Up @@ -22,13 +22,6 @@
#undef HAVE_SENDMMSG
#endif

#ifndef HAVE_MMSGHDR
struct mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
#endif

#ifndef MSG_WAITFORONE
#define MSG_WAITFORONE 0x10000
#endif
Expand Down
7 changes: 6 additions & 1 deletion libknet/internals.h
Expand Up @@ -32,6 +32,11 @@ typedef void *knet_transport_link_t; /* per link transport handle */
typedef void *knet_transport_t; /* per knet_h transport handle */
struct knet_transport_ops; /* Forward because of circular dependancy */

struct knet_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};

struct knet_link {
/* required */
struct sockaddr_storage src_addr;
Expand Down Expand Up @@ -285,7 +290,7 @@ typedef struct knet_transport_ops {
* transport_rx_is_data is invoked with both global_rwlock
* and fd_tracker read lock (from RX thread)
*/
int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg);
int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg);
} knet_transport_ops_t;

socklen_t sockaddr_len(const struct sockaddr_storage *ss);
Expand Down
8 changes: 4 additions & 4 deletions libknet/threads_rx.c
Expand Up @@ -223,7 +223,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
return 1;
}

static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct mmsghdr *msg)
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0;
ssize_t outlen;
Expand Down Expand Up @@ -601,7 +601,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
}
}

static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
Expand Down Expand Up @@ -630,7 +630,7 @@ static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct mms
msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
}

msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
savederrno = errno;

/*
Expand Down Expand Up @@ -698,7 +698,7 @@ void *_handle_recv_from_links_thread(void *data)
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_FRAG_MAX];
struct mmsghdr msg[PCKT_FRAG_MAX];
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
struct iovec iov_in[PCKT_FRAG_MAX];

memset(&msg, 0, sizeof(msg));
Expand Down
16 changes: 8 additions & 8 deletions libknet/threads_tx.c
Expand Up @@ -29,11 +29,11 @@
* SEND
*/

static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct mmsghdr *msg, int msgs_to_send)
static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
{
int link_idx, msg_idx, sent_msgs, prev_sent, progress;
int err = 0, savederrno = 0;
struct mmsghdr *cur;
struct knet_mmsghdr *cur;

for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
sent_msgs = 0;
Expand All @@ -50,7 +50,7 @@ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host,
cur = &msg[prev_sent];

sent_msgs = sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
cur, msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
(struct mmsghdr *)&cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
savederrno = errno;

err = knet_h->transport_ops[dst_host->link[dst_host->active_links[link_idx]].transport_type]->transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].outsock, sent_msgs, savederrno);
Expand Down Expand Up @@ -126,7 +126,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, int buf_idx, ssize_t inle
int savederrno = 0;
int err = 0;
seq_num_t tx_seq_num;
struct mmsghdr msg[PCKT_FRAG_MAX];
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
int msgs_to_send, msg_idx;

inbuf = knet_h->recv_from_sock_buf[buf_idx];
Expand Down Expand Up @@ -469,7 +469,7 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len
return err;
}

static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct mmsghdr *msg, int type)
static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel, struct knet_mmsghdr *msg, int type)
{
ssize_t inlen = 0;
struct iovec iov_in;
Expand All @@ -495,7 +495,7 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t chann
knet_h->recv_from_sock_buf[0]->kh_type = type;
_parse_recv_from_sock(knet_h, 0, inlen, channel, 0);
} else {
msg_recv = recvmmsg(sockfd, msg, PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
msg_recv = recvmmsg(sockfd, (struct mmsghdr *)&msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL, NULL);
if (msg_recv < 0) {
inlen = msg_recv;
savederrno = errno;
Expand Down Expand Up @@ -546,12 +546,12 @@ void *_handle_send_to_links_thread(void *data)
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_FRAG_MAX];
struct mmsghdr msg[PCKT_FRAG_MAX];
struct knet_mmsghdr msg[PCKT_FRAG_MAX];
struct iovec iov_in[PCKT_FRAG_MAX];
int i, nev, type;
int8_t channel;

memset(&msg, 0, sizeof(struct mmsghdr));
memset(&msg, 0, sizeof(msg));

/* preparing data buffer */
for (i = 0; i < PCKT_FRAG_MAX; i++) {
Expand Down
2 changes: 1 addition & 1 deletion libknet/transport_sctp.c
Expand Up @@ -415,7 +415,7 @@ static int sctp_transport_rx_sock_error(knet_handle_t knet_h, int sockfd, int re
* delegate any FD error management to sctp_transport_rx_sock_error
* and keep this code to parsing incoming data only
*/
static int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
static int sctp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int i;
struct iovec *iov = msg->msg_hdr.msg_iov;
Expand Down
2 changes: 1 addition & 1 deletion libknet/transport_udp.c
Expand Up @@ -374,7 +374,7 @@ static int udp_transport_tx_sock_error(knet_handle_t knet_h, int sockfd, int rec
return 0;
}

static int udp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct mmsghdr *msg)
static int udp_transport_rx_is_data(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
if (msg->msg_len == 0)
return 0;
Expand Down

0 comments on commit 51c36b2

Please sign in to comment.