Skip to content

Commit

Permalink
Use poll(2) for timeouts on socket
Browse files Browse the repository at this point in the history
Use poll(2) instead of select(2) to do timeout operations on sockets. This helps
with the situation where the fd is larger than FD_MAXSIZE.

Fixes #168
  • Loading branch information
alanxz committed Mar 30, 2014
1 parent a566929 commit 2300531
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
43 changes: 27 additions & 16 deletions librabbitmq/amqp_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "amqp_timer.h"

#include <assert.h>
#include <limits.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
Expand All @@ -64,6 +65,7 @@
# include <netdb.h>
# include <sys/uio.h>
# include <fcntl.h>
# include <poll.h>
# include <unistd.h>
#endif

Expand Down Expand Up @@ -275,7 +277,9 @@ int amqp_open_socket_noblock(char const *hostname,

AMQP_INIT_TIMER(timer)

if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
(uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) {
return AMQP_STATUS_INVALID_PARAMETER;
}

Expand Down Expand Up @@ -349,14 +353,12 @@ int amqp_open_socket_noblock(char const *hostname,
#endif

while(1) {
fd_set write_fd;
fd_set except_fd;

FD_ZERO(&write_fd);
FD_SET(sockfd, &write_fd);
struct pollfd pfd;
int timeout_ms;

FD_ZERO(&except_fd);
FD_SET(sockfd, &except_fd);
pfd.fd = sockfd;
pfd.events = POLLERR | POLLOUT;
pfd.revents = 0;

timer_error = amqp_timer_update(&timer, timeout);

Expand All @@ -365,11 +367,13 @@ int amqp_open_socket_noblock(char const *hostname,
break;
}

timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S +
timer.tv.tv_usec / AMQP_US_PER_MS;
/* Win32 requires except_fds to be passed to detect connection
* failure. Other platforms only need write_fds, passing except_fds
* seems to be harmless otherwise
*/
res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
res = poll(&pfd, 1, timeout_ms);

if (res > 0) {
int result;
Expand Down Expand Up @@ -547,22 +551,29 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru

if (timeout) {
int fd;
fd_set read_fd;
fd_set except_fd;

fd = amqp_get_sockfd(state);
if (-1 == fd) {
return AMQP_STATUS_CONNECTION_CLOSED;
}

if (INT_MAX < (uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
(uint64_t)timeout->tv_usec / AMQP_US_PER_MS) {
return AMQP_STATUS_INVALID_PARAMETER;
}

while (1) {
FD_ZERO(&read_fd);
FD_SET(fd, &read_fd);
struct pollfd pfd;
int timeout_ms;

pfd.fd = fd;
pfd.events = POLLIN;
pfd.revents = 0;

FD_ZERO(&except_fd);
FD_SET(fd, &except_fd);
timeout_ms = timeout->tv_sec * AMQP_MS_PER_S +
timeout->tv_usec * AMQP_US_PER_MS;

res = select(fd + 1, &read_fd, NULL, &except_fd, timeout);
res = poll(&pfd, 1, timeout_ms);

if (0 < res) {
break;
Expand Down
5 changes: 4 additions & 1 deletion librabbitmq/amqp_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
# include <sys/time.h>
#endif

#define AMQP_NS_PER_S 1000000000
#define AMQP_MS_PER_S 1000
#define AMQP_US_PER_MS 1000
#define AMQP_NS_PER_S 1000000000
#define AMQP_NS_PER_MS 1000000
#define AMQP_NS_PER_US 1000

#define AMQP_INIT_TIMER(structure) { \
Expand Down

0 comments on commit 2300531

Please sign in to comment.