Skip to content

Commit

Permalink
rtpengine: reconnect socket in case of bad fds
Browse files Browse the repository at this point in the history
(cherry picked from commit 2bee3c6)
  • Loading branch information
razvancrainea committed Nov 14, 2018
1 parent bc85982 commit c940412
Showing 1 changed file with 94 additions and 67 deletions.
161 changes: 94 additions & 67 deletions modules/rtpengine/rtpengine.c
Expand Up @@ -1223,12 +1223,65 @@ static int mi_child_init(void)
return 0;
}

static int connect_rtpengines(void)
static inline int rtpengine_connect_node(struct rtpe_node *pnode)
{
int n;
char *cp;
char *hostname;
struct addrinfo hints, *res;

if (pnode->rn_umode == 0) {
rtpe_socks[pnode->idx] = -1;
return 1;
}

hostname = (char*)pkg_malloc(strlen(pnode->rn_address) + 1);
if (hostname==NULL) {
LM_ERR("no more pkg memory\n");
return 0;
}
strcpy(hostname, pnode->rn_address);

cp = strrchr(hostname, ':');
if (cp != NULL) {
*cp = '\0';
cp++;
}
if (cp == NULL || *cp == '\0')
cp = CPORT;

memset(&hints, 0, sizeof(hints));
hints.ai_flags = 0;
hints.ai_family = (pnode->rn_umode == 6) ? AF_INET6 : AF_INET;
hints.ai_socktype = SOCK_DGRAM;
if ((n = getaddrinfo(hostname, cp, &hints, &res)) != 0) {
LM_ERR("%s\n", gai_strerror(n));
pkg_free(hostname);
return 0;
}
pkg_free(hostname);

rtpe_socks[pnode->idx] = socket((pnode->rn_umode == 6)
? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
if ( rtpe_socks[pnode->idx] == -1) {
LM_ERR("can't create socket\n");
freeaddrinfo(res);
return 0;
}

if (connect(rtpe_socks[pnode->idx], res->ai_addr, res->ai_addrlen) == -1) {
LM_ERR("can't connect to a RTP proxy\n");
close( rtpe_socks[pnode->idx] );
rtpe_socks[pnode->idx] = -1;
freeaddrinfo(res);
return 0;
}
freeaddrinfo(res);
return 1;
}

static int connect_rtpengines(void)
{
struct rtpe_set *rtpe_list;
struct rtpe_node *pnode;

Expand All @@ -1251,56 +1304,9 @@ static int connect_rtpengines(void)
rtpe_list = rtpe_list->rset_next){

for (pnode=rtpe_list->rn_first; pnode!=0; pnode = pnode->rn_next){

if (pnode->rn_umode == 0) {
rtpe_socks[pnode->idx] = -1;
goto rptest;
}

hostname = (char*)pkg_malloc(strlen(pnode->rn_address) + 1);
if (hostname==NULL) {
LM_ERR("no more pkg memory\n");
return -1;
}
strcpy(hostname, pnode->rn_address);

cp = strrchr(hostname, ':');
if (cp != NULL) {
*cp = '\0';
cp++;
}
if (cp == NULL || *cp == '\0')
cp = CPORT;

memset(&hints, 0, sizeof(hints));
hints.ai_flags = 0;
hints.ai_family = (pnode->rn_umode == 6) ? AF_INET6 : AF_INET;
hints.ai_socktype = SOCK_DGRAM;
if ((n = getaddrinfo(hostname, cp, &hints, &res)) != 0) {
LM_ERR("%s\n", gai_strerror(n));
pkg_free(hostname);
return -1;
}
pkg_free(hostname);

rtpe_socks[pnode->idx] = socket((pnode->rn_umode == 6)
? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
if ( rtpe_socks[pnode->idx] == -1) {
LM_ERR("can't create socket\n");
freeaddrinfo(res);
return -1;
}

if (connect(rtpe_socks[pnode->idx], res->ai_addr, res->ai_addrlen) == -1) {
LM_ERR("can't connect to a RTP proxy\n");
close( rtpe_socks[pnode->idx] );
rtpe_socks[pnode->idx] = -1;
freeaddrinfo(res);
return -1;
}
freeaddrinfo(res);
rptest:
pnode->rn_disabled = rtpe_test(pnode, 0, 1);
if (rtpengine_connect_node(pnode))
pnode->rn_disabled = rtpe_test(pnode, 0, 1);
/* else, there is an error, and we try to connect the next one */
}
}

Expand Down Expand Up @@ -1330,6 +1336,7 @@ static int update_rtpengines(void)
for (i = 0; i < rtpe_number; i++) {
shutdown(rtpe_socks[i], SHUT_RDWR);
close(rtpe_socks[i]);
rtpe_socks[i] = -1;
}

return connect_rtpengines();
Expand Down Expand Up @@ -1982,6 +1989,15 @@ rtpe_test(struct rtpe_node *node, int isdisabled, int force)
return 1;
}

#define RTPE_IO_ERROR_CLOSE(_fd) \
do { \
if (errno == EPIPE || errno == EBADF) { \
LM_INFO("Closing rtpengine socket %d\n", (_fd)); \
close((_fd)); \
(_fd) = -1; \
} \
} while (0)

static char *
send_rtpe_command(struct rtpe_node *node, bencode_item_t *dict, int *outlen)
{
Expand Down Expand Up @@ -2038,33 +2054,42 @@ send_rtpe_command(struct rtpe_node *node, bencode_item_t *dict, int *outlen)
goto badproxy;
}
} else {
fds[0].fd = rtpe_socks[node->idx];
fds[0].events = POLLIN;
fds[0].revents = 0;
/* Drain input buffer */
while ((poll(fds, 1, 0) == 1) &&
((fds[0].revents & POLLIN) != 0)) {
if (fds[0].revents & (POLLERR|POLLNVAL|POLLHUP)) {
LM_WARN("error on rtpengine socket %d!\n", rtpe_socks[node->idx]);
break;
}
if (rtpe_socks[node->idx] != -1) {
fds[0].fd = rtpe_socks[node->idx];
fds[0].events = POLLIN;
fds[0].revents = 0;
if (recv(rtpe_socks[node->idx], buf, sizeof(buf) - 1, 0) < 0 &&
errno != EINTR) {
LM_WARN("error while draining rtpengine %d!\n", errno);
break;
/* Drain input buffer */
while ((poll(fds, 1, 0) == 1) &&
((fds[0].revents & POLLIN) != 0)) {
if (fds[0].revents & (POLLERR|POLLNVAL|POLLHUP)) {
LM_WARN("error on rtpengine socket %d!\n", rtpe_socks[node->idx]);
RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]);
break;
}
fds[0].revents = 0;
if (recv(rtpe_socks[node->idx], buf, sizeof(buf) - 1, 0) < 0 &&
errno != EINTR) {
LM_WARN("error while draining rtpengine %d!\n", errno);
RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]);
break;
}
}
}
v[0].iov_base = gencookie();
v[0].iov_len = strlen(v[0].iov_base);
for (i = 0; i < rtpengine_retr; i++) {
if (rtpe_socks[node->idx] == -1 && !rtpengine_connect_node(node)) {
LM_ERR("cannot reconnect RTP engine socket!\n");
goto badproxy;
}
do {
len = writev(rtpe_socks[node->idx], v, vcnt + 1);
} while (len == -1 && (errno == EINTR || errno == ENOBUFS || errno == EMSGSIZE));
if (len <= 0) {
LM_ERR("can't send command to a RTP proxy (%d:%s)\n",
errno, strerror(errno));
goto badproxy;
RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]);
continue;
}
while ((poll(fds, 1, rtpengine_tout * 1000) == 1) &&
(fds[0].revents & POLLIN) != 0) {
Expand All @@ -2073,7 +2098,8 @@ send_rtpe_command(struct rtpe_node *node, bencode_item_t *dict, int *outlen)
} while (len == -1 && errno == EINTR);
if (len <= 0) {
LM_ERR("can't read reply from a RTP proxy\n");
goto badproxy;
RTPE_IO_ERROR_CLOSE(rtpe_socks[node->idx]);
continue;
}
if (len >= (v[0].iov_len - 1) &&
memcmp(buf, v[0].iov_base, (v[0].iov_len - 1)) == 0) {
Expand Down Expand Up @@ -2104,6 +2130,7 @@ send_rtpe_command(struct rtpe_node *node, bencode_item_t *dict, int *outlen)
node->rn_recheck_ticks = get_ticks() + rtpengine_disable_tout;

return NULL;
#undef RTPE_IO_ERROR_CLOSE
}

/*
Expand Down

0 comments on commit c940412

Please sign in to comment.