Skip to content

Commit

Permalink
[send/recv] rework handling of 0 byte read/write on local sockets
Browse files Browse the repository at this point in the history
considering there is no obvious use case trying to send 0 byte packets
and that 0 byte packtes are filtered by iovec calls across the all
code, there is no point trying to handle 0 byte differently from
any other socket error.

this commit makes sure that every time there is an error (-1) or a 0
byte read from the locally provided sockets, a call back is issued.

on read errors (-1) the socket will be removed from the epoll
to avoid spinning and it is safe to call knet_handle_remove_datafd
afterwards to remove it completely.

it is now mandatory to enable a sock_notify callback before adding
datafd.

read libknet.h carefully on what the callback is supposed to do
based on the type of socket your application is adding.

Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Dec 20, 2015
1 parent 19cfe08 commit d2e9894
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 114 deletions.
5 changes: 0 additions & 5 deletions TODO
Expand Up @@ -4,11 +4,6 @@ distro/packaging:
https://bugzilla.redhat.com/show_bug.cgi?id=893015

link/host level:
- (issue) fix 0 byte read from local socket case handling
use flag to determine the behavior:
0 byte -> process/send
0 byte -> do callback
0 byte -> ignore
- (issue) simplify handling of DATA and HOSTINFO code paths in send/recv code
- (issue) need bind to interface for dynamic ip local interfaces vs src ip
address or find a way to autodetect the new ip on that interface
Expand Down
15 changes: 15 additions & 0 deletions kronosnetd/vty_cli_cmds.c
Expand Up @@ -1557,6 +1557,14 @@ static int knet_cmd_no_interface(struct knet_vty *vty)
return err;
}

static void sock_notify_fn(void *private_data, int datafd, int8_t chan, uint8_t tx_rx, int error, int errorno)
{
struct knet_vty *vty = (struct knet_vty *)private_data;

knet_vty_write(vty, "Error: received sock notify, datafd: %d channel: %d direction: %u error: %d errno: %d (%s)%s",
datafd, chan, tx_rx, error, errorno, strerror(errorno), telnet_newline);
}

static int knet_cmd_interface(struct knet_vty *vty)
{
int err = 0, paramlen = 0, paramoffset = 0, found = 0, requested_id, tapfd;
Expand Down Expand Up @@ -1622,6 +1630,13 @@ static int knet_cmd_interface(struct knet_vty *vty)
goto out_clean;
}

if (knet_handle_enable_sock_notify(knet_iface->cfg_ring.knet_h, &vty, sock_notify_fn)) {
knet_vty_write(vty, "Error: Unable to add sock notify callback to to knet_handle %s%s",
strerror(errno), telnet_newline);
err = -1;
goto out_clean;
}

if (knet_handle_add_datafd(knet_iface->cfg_ring.knet_h, &tapfd, &channel) < 0) {
knet_vty_write(vty, "Error: Unable to add tapfd to knet_handle %s%s",
strerror(errno), telnet_newline);
Expand Down
106 changes: 59 additions & 47 deletions libknet/handle.c
Expand Up @@ -691,6 +691,45 @@ int knet_handle_free(knet_handle_t knet_h)
return 0;
}

int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno))
{
int savederrno = 0, err = 0;

if (!knet_h) {
errno = EINVAL;
return -1;
}

if (!sock_notify_fn) {
errno = EINVAL;
return -1;
}

savederrno = pthread_rwlock_wrlock(&knet_h->list_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");

pthread_rwlock_unlock(&knet_h->list_rwlock);

return err;
}

int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
{
int err = 0, savederrno = 0;
Expand Down Expand Up @@ -725,6 +764,13 @@ int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
return -1;
}

if (!knet_h->sock_notify_fn) {
log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
savederrno = EINVAL;
err = -1;
goto out_unlock;
}

if (*datafd > 0) {
for (i = 0; i < KNET_DATAFD_MAX; i++) {
if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
Expand Down Expand Up @@ -761,6 +807,7 @@ int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)

knet_h->sockfd[*channel].is_created = 0;
knet_h->sockfd[*channel].is_socket = 0;
knet_h->sockfd[*channel].has_error = 0;

if (*datafd > 0) {
int sockopt;
Expand Down Expand Up @@ -847,15 +894,17 @@ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
goto out_unlock;
}

memset(&ev, 0, sizeof(struct epoll_event));
if (!knet_h->sockfd[channel].has_error) {
memset(&ev, 0, sizeof(struct epoll_event));

if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[i].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[knet_h->sockfd[i].is_created], strerror(savederrno));
goto out_unlock;
if (epoll_ctl(knet_h->send_to_links_epollfd,
EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
goto out_unlock;
}
}

if (knet_h->sockfd[channel].is_created) {
Expand All @@ -870,43 +919,6 @@ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
return err;
}

int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
int error,
int errorno))
{
int savederrno = 0;

if (!knet_h) {
errno = EINVAL;
return -1;
}

savederrno = pthread_rwlock_wrlock(&knet_h->list_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
knet_h->sock_notify_fn = sock_notify_fn;
if (knet_h->sock_notify_fn) {
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
} else {
log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn disabled");
}

pthread_rwlock_unlock(&knet_h->list_rwlock);

return 0;
}

int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
{
int err = 0, savederrno = 0;
Expand Down Expand Up @@ -1241,7 +1253,7 @@ ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const
return -1;
}

if ((buff == NULL) || (buff_len == 0)) {
if ((buff == NULL) || (buff_len <= 0)) {
errno = EINVAL;
return -1;
}
Expand Down Expand Up @@ -1269,7 +1281,7 @@ ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len,

if ((!knet_h) ||
(buff == NULL) ||
(buff_len == 0) || (buff_len > KNET_MAX_PACKET_SIZE)) {
(buff_len <= 0) || (buff_len > KNET_MAX_PACKET_SIZE)) {
errno = EINVAL;
return -1;
}
Expand Down
3 changes: 3 additions & 0 deletions libknet/internals.h
Expand Up @@ -107,6 +107,8 @@ struct knet_sock {
int is_socket; /* check if it's a socket for recvmmsg usage */
int is_created; /* knet created this socket and has to clean up on exit/del */
int in_use; /* set to 1 if it's use, 0 if free */
int has_error; /* set to 1 if there were errors reading from the sock
* and socket has been removed from epoll */
};

struct knet_handle {
Expand Down Expand Up @@ -190,6 +192,7 @@ struct knet_handle {
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno);
int fini_in_progress;
Expand Down
84 changes: 54 additions & 30 deletions libknet/libknet.h
Expand Up @@ -44,6 +44,13 @@
#define KNET_MAX_HOST_LEN 64
#define KNET_MAX_PORT_LEN 6

/*
* Some notifications can be generated either on TX or RX
*/

#define KNET_NOTIFY_TX 0
#define KNET_NOTIFY_RX 1

typedef struct knet_handle *knet_handle_t;

/*
Expand Down Expand Up @@ -95,9 +102,56 @@ knet_handle_t knet_handle_new(uint16_t host_id,

int knet_handle_free(knet_handle_t knet_h);

/*
* knet_handle_enable_sock_notify
*
* knet_h - pointer to knet_handle_t
*
* sock_notify_fn_private_data
* void pointer to data that can be used to identify
* the callback.
*
* sock_notify_fn
* A callback function that is invoked every time
* a socket in the datafd pool will report an error (-1)
* or an end of read (0) (see socket.7).
* This function MUST NEVER block or add substantial delays.
* The callback is invoked in an internal unlocked area
* to allow calls to knet_handle_add_datafd/knet_handle_remove_datafd
* to swap/replace the bad fd.
* if both err and errno are 0, it means that the socket
* has received a 0 byte packet (EOF?).
* The callback function must either remove the fd from knet
* (by calling knet_handle_remove_fd()) or dup a new fd in its place.
* Failure to do this can cause problems.
*
* knet_handle_enable_sock_notify returns:
*
* 0 on success
* -1 on error and errno is set.
*/

int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno)); /* sorry! can't call it errno ;) */

/*
* knet_handle_add_datafd
*
* IMPORTANT: In order to add datafd to knet, knet_handle_enable_sock_notify
* _MUST_ be set and be able to handle both errors (-1) and
* 0 bytes read / write from the provided datafd.
* On read error (< 0) from datafd, the socket is automatically
* removed from polling to avoid spinning on dead sockets.
* It is safe to call knet_handle_remove_datafd even on sockets
* that have been removed.
*
* knet_h - pointer to knet_handle_t
*
* *datafd - read/write file descriptor.
Expand Down Expand Up @@ -201,33 +255,6 @@ int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd);
* -1 on error and errno is set.
*/

int knet_handle_enable_sock_notify(knet_handle_t knet_h,
void *sock_notify_fn_private_data,
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
int error,
int errorno)); /* sorry! can't call it errno ;) */

/*
* knet_handle_get_channel
*
* knet_h - pointer to knet_handle_t
*
* datafd - file descriptor to search
*
* *channel - will contain the result
*
* knet_handle_get_channel returns:
*
* 0 on success
* and *channel will contain the results
*
* -1 on error and errno is set.
* and *channel content is meaningless
*/

int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel);

/*
Expand Down Expand Up @@ -328,9 +355,6 @@ ssize_t knet_send(knet_handle_t knet_h,
* -1 on error and errno is set.
*/

#define KNET_DST_HOST_FILTER_TX 0
#define KNET_DST_HOST_FILTER_RX 1

int knet_handle_enable_filter(knet_handle_t knet_h,
void *dst_host_filter_fn_private_data,
int (*dst_host_filter_fn) (
Expand Down
9 changes: 6 additions & 3 deletions libknet/ping_test.c
Expand Up @@ -297,10 +297,13 @@ static void host_notify(void *private_data, uint16_t host_id, uint8_t reachable,
return;
}

static void sock_notify(void *private_data, int datafd, int8_t chan, int error, int errorno)
static void sock_notify(void *private_data, int datafd, int8_t chan, uint8_t tx_rx, int error, int errorno)
{
printf("Received sock notify, datafd: %d channel: %d error: %d errno: %d (%s)\n",
datafd, chan, error, errorno, strerror(errorno));
printf("Received sock notify, datafd: %d channel: %d direction: %u error: %d errno: %d (%s)\n",
datafd, chan, tx_rx, error, errorno, strerror(errorno));

printf("Something went wrong with our sockets!\n");
exit(EXIT_FAILURE);
}

static void recv_data(knet_handle_t khandle, int inchannel, int has_crypto)
Expand Down

0 comments on commit d2e9894

Please sign in to comment.