Skip to content

Commit

Permalink
[transports] initialize transports with knet_handle and protects from…
Browse files Browse the repository at this point in the history
… missing proto ops

Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Dec 19, 2016
1 parent 7b7b362 commit 9f6a0f1
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 64 deletions.
78 changes: 55 additions & 23 deletions libknet/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,52 @@ static void _close_epolls(knet_handle_t knet_h)
close(knet_h->dst_link_handler_epollfd);
}

static int _start_transports(knet_handle_t knet_h)
{
int i, savederrno = 0, err = 0;

for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
switch (i) {
case KNET_TRANSPORT_UDP:
knet_h->transport_ops[i] = get_udp_transport();
break;
#ifdef HAVE_NETINET_SCTP_H
case KNET_TRANSPORT_SCTP:
knet_h->transport_ops[i] = get_sctp_transport();
break;
#endif
}
if ((knet_h->transport_ops[i]) &&
(knet_h->transport_ops[i]->handle_allocate)) {
knet_h->transport_ops[i]->handle_allocate(knet_h, &knet_h->transports[i]);
if (!knet_h->transports[i]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_HANDLE, "Failed to allocate transport handle for %s: %s",
knet_h->transport_ops[i]->transport_name,
strerror(savederrno));
err = -1;
goto out;
}
}
}

out:
errno = savederrno;
return err;
}

static void _stop_transports(knet_handle_t knet_h)
{
int i;

for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if ((knet_h->transport_ops[i]) &&
(knet_h->transport_ops[i]->handle_free)) {
knet_h->transport_ops[i]->handle_free(knet_h, knet_h->transports[i]);
}
}
}

static int _start_threads(knet_handle_t knet_h)
{
int savederrno = 0;
Expand Down Expand Up @@ -497,29 +543,6 @@ static int _start_threads(knet_handle_t knet_h)
return -1;
}


static void _stop_transports(knet_handle_t knet_h)
{
int i;
knet_transport_ops_t *ops = NULL;

for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
switch (i) {
case KNET_TRANSPORT_UDP:
ops = get_udp_transport();
break;
#ifdef HAVE_NETINET_SCTP_H
case KNET_TRANSPORT_SCTP:
ops = get_sctp_transport();
break;
#endif
}
if (ops) {
ops->handle_free(knet_h, knet_h->transports[i]);
}
}
}

static void _stop_threads(knet_handle_t knet_h)
{
void *retval;
Expand Down Expand Up @@ -665,6 +688,15 @@ knet_handle_t knet_handle_new(uint16_t host_id,
goto exit_fail;
}

/*
* start transports
*/

if (_start_transports(knet_h)) {
savederrno = errno;
goto exit_fail;
}

/*
* start internal threads
*/
Expand Down
39 changes: 6 additions & 33 deletions libknet/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ int knet_link_set_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id
return -1;
}

if (!knet_h->transport_ops[transport]) {
errno = EINVAL;
return -1;
}

savederrno = pthread_rwlock_wrlock(&knet_h->global_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_LINK, "Unable to get write lock: %s",
Expand Down Expand Up @@ -121,39 +126,6 @@ int knet_link_set_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id
goto exit_unlock;
}

link->transport_type = transport;

switch (transport) {
case KNET_TRANSPORT_UDP:
knet_h->transport_ops[link->transport_type] = get_udp_transport();
break;
case KNET_TRANSPORT_SCTP:
#ifdef HAVE_NETINET_SCTP_H
knet_h->transport_ops[link->transport_type] = get_sctp_transport();
break;
#else
log_warn(knet_h, KNET_SUB_LINK,
"SCTP protocol not supported in this build");
#endif
default:
savederrno = EINVAL;
err = -1;
goto exit_unlock;
}

/* First time we've used this transport for this handle */
if (!knet_h->transports[transport]) {
knet_h->transport_ops[link->transport_type]->handle_allocate(knet_h, &knet_h->transports[transport]);
}
if (!knet_h->transports[transport]) {
savederrno = errno;
log_err(knet_h, KNET_SUB_LISTENER, "Failed to allocate transport handle for %s: %s",
knet_h->transport_ops[link->transport_type]->transport_name,
strerror(savederrno));
err = -1;
goto exit_unlock;
}

if (!dst_addr) {
link->dynamic = KNET_LINK_DYNIP;
err = 0;
Expand Down Expand Up @@ -184,6 +156,7 @@ int knet_link_set_config(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id

exit_unlock:
if (!err) {
link->transport_type = transport;
link->configured = 1;
link->pong_count = KNET_LINK_DEFAULT_PONG_COUNT;
link->has_valid_mtu = 0;
Expand Down
6 changes: 4 additions & 2 deletions libknet/listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ int _listener_add(knet_handle_t knet_h, uint16_t host_id, uint8_t link_id)

memset(listener, 0, sizeof(struct knet_listener));
memmove(&listener->address, &lnk->src_addr, sizeof(struct sockaddr_storage));
if (knet_h->transport_ops[lnk->transport_type]->link_listener_start(knet_h, lnk->transport, link_id,
&lnk->src_addr, &lnk->dst_addr) < 0) {
if ((knet_h->transport_ops[lnk->transport_type]) &&
(knet_h->transport_ops[lnk->transport_type]->link_listener_start) &&
(knet_h->transport_ops[lnk->transport_type]->link_listener_start(knet_h, lnk->transport, link_id,
&lnk->src_addr, &lnk->dst_addr) < 0)) {
savederrno = errno;
err = -1;
free(listener);
Expand Down
10 changes: 9 additions & 1 deletion libknet/threads_pmtud.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@ static int _handle_check_link_pmtud(knet_handle_t knet_h, struct knet_host *dst_
failsafe = 0;
pad_len = 0;

dst_link->last_bad_mtu = knet_h->transport_ops[dst_link->transport_type]->link_get_mtu_overhead(dst_link->transport);
/*
* FIXME: proto overhead should be included in overhead_len
*/
if ((knet_h->transport_ops[dst_link->transport_type]) &&
(knet_h->transport_ops[dst_link->transport_type]->link_get_mtu_overhead)) {
dst_link->last_bad_mtu = knet_h->transport_ops[dst_link->transport_type]->link_get_mtu_overhead(dst_link->transport);
} else {
dst_link->last_bad_mtu = 0;
}

knet_h->pmtudbuf->khp_pmtud_link = dst_link->link_id;

Expand Down
11 changes: 6 additions & 5 deletions libknet/transport_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "transports.h"
#include "../common/netutils.h"


int _configure_transport_socket(knet_handle_t knet_h, int sock, struct sockaddr_storage *address, const char *type)
{
int err = 0;
Expand Down Expand Up @@ -134,8 +133,9 @@ void _close_socket(knet_handle_t knet_h, int sockfd)

/* Tell transport that the FD has been closed */
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if (knet_h->transports[i] &&
!knet_h->transport_ops[i]->handle_fd_eof(knet_h, sockfd))
if ((knet_h->transport_ops[i]) &&
(knet_h->transport_ops[i]->handle_fd_eof) &&
(!knet_h->transport_ops[i]->handle_fd_eof(knet_h, sockfd)))
break;
}
}
Expand All @@ -146,8 +146,9 @@ void _handle_socket_notification(knet_handle_t knet_h, int sockfd, struct iovec

/* Find the transport and post the message */
for (i=0; i<KNET_MAX_TRANSPORTS; i++) {
if (knet_h->transports[i] && knet_h->transport_ops[i]->handle_fd_notification &&
knet_h->transport_ops[i]->handle_fd_notification(knet_h, sockfd, iov, iovlen))
if ((knet_h->transport_ops[i]) &&
(knet_h->transport_ops[i]->handle_fd_notification) &&
(knet_h->transport_ops[i]->handle_fd_notification(knet_h, sockfd, iov, iovlen)))
break;
}
}
Expand Down

0 comments on commit 9f6a0f1

Please sign in to comment.