Permalink
1540 lines (1301 sloc) 41.6 KB
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2014
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef _MSC_VER
# define _CRT_SECURE_NO_WARNINGS
#endif
#include "amqp_private.h"
#include "amqp_socket.h"
#include "amqp_table.h"
#include "amqp_time.h"
#include <assert.h>
#include <limits.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
# ifndef WIN32_LEAN_AND_MEAN
# define WIN32_LEAN_AND_MEAN
# endif
# include <winsock2.h>
# include <ws2tcpip.h>
#else
# include <sys/types.h> /* On older BSD this must come before net includes */
# include <netinet/in.h>
# include <netinet/tcp.h>
# ifdef HAVE_SELECT
# include <sys/select.h>
# endif
# include <sys/socket.h>
# include <netdb.h>
# include <sys/uio.h>
# include <fcntl.h>
# ifdef HAVE_POLL
# include <poll.h>
# endif
# include <unistd.h>
#endif
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list );
static int
amqp_os_socket_init(void)
{
#ifdef _WIN32
static int called_wsastartup = 0;
if (!called_wsastartup) {
WSADATA data;
int res = WSAStartup(0x0202, &data);
if (res) {
return AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR;
}
called_wsastartup = 1;
}
return AMQP_STATUS_OK;
#else
return AMQP_STATUS_OK;
#endif
}
static int
amqp_os_socket_socket(int domain, int type, int protocol)
{
#ifdef _WIN32
/*
This cast is to squash warnings on Win64, see:
http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
*/
return (int)socket(domain, type, protocol);
#else
int flags;
int s = socket(domain, type, protocol);
if (s < 0) {
return s;
}
/* Always enable CLOEXEC on the socket */
flags = fcntl(s, F_GETFD);
if (flags == -1
|| fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
int e = errno;
close(s);
errno = e;
return -1;
}
return s;
#endif
}
static int
amqp_os_socket_setsockopt(int sock, int level, int optname,
const void *optval, size_t optlen)
{
#ifdef _WIN32
/* the winsock setsockopt function has its 4th argument as a
const char * */
return setsockopt(sock, level, optname, (const char *)optval, (int)optlen);
#else
return setsockopt(sock, level, optname, optval, optlen);
#endif
}
static int
amqp_os_socket_setsockblock(int sock, int block)
{
#ifdef _WIN32
u_long nonblock = !block;
if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) {
return AMQP_STATUS_SOCKET_ERROR;
} else {
return AMQP_STATUS_OK;
}
#else
long arg;
if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
return AMQP_STATUS_SOCKET_ERROR;
}
if (block) {
arg &= (~O_NONBLOCK);
} else {
arg |= O_NONBLOCK;
}
if (fcntl(sock, F_SETFL, arg) < 0) {
return AMQP_STATUS_SOCKET_ERROR;
}
return AMQP_STATUS_OK;
#endif
}
int
amqp_os_socket_error(void)
{
#ifdef _WIN32
return WSAGetLastError();
#else
return errno;
#endif
}
int
amqp_os_socket_close(int sockfd)
{
#ifdef _WIN32
return closesocket(sockfd);
#else
return close(sockfd);
#endif
}
ssize_t
amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags)
{
assert(self);
assert(self->klass->send);
return self->klass->send(self, buf, len, flags);
}
ssize_t
amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags)
{
assert(self);
assert(self->klass->recv);
return self->klass->recv(self, buf, len, flags);
}
int
amqp_socket_open(amqp_socket_t *self, const char *host, int port)
{
assert(self);
assert(self->klass->open);
return self->klass->open(self, host, port, NULL);
}
int
amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout)
{
assert(self);
assert(self->klass->open);
return self->klass->open(self, host, port, timeout);
}
int
amqp_socket_close(amqp_socket_t *self, amqp_socket_close_enum force)
{
assert(self);
assert(self->klass->close);
return self->klass->close(self, force);
}
void
amqp_socket_delete(amqp_socket_t *self)
{
if (self) {
assert(self->klass->delete);
self->klass->delete(self);
}
}
int
amqp_socket_get_sockfd(amqp_socket_t *self)
{
assert(self);
assert(self->klass->get_sockfd);
return self->klass->get_sockfd(self);
}
int amqp_poll(int fd, int event, amqp_time_t deadline) {
#ifdef HAVE_POLL
struct pollfd pfd;
int res;
int timeout_ms;
/* Function should only ever be called with one of these two */
assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT);
start_poll:
pfd.fd = fd;
switch (event) {
case AMQP_SF_POLLIN:
pfd.events = POLLIN;
break;
case AMQP_SF_POLLOUT:
pfd.events = POLLOUT;
break;
}
timeout_ms = amqp_time_ms_until(deadline);
if (-1 > timeout_ms) {
return timeout_ms;
}
res = poll(&pfd, 1, timeout_ms);
if (0 < res) {
/* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or
* equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall
* doesn't need to be made. */
return AMQP_STATUS_OK;
} else if (0 == res) {
return AMQP_STATUS_TIMEOUT;
} else {
switch (amqp_os_socket_error()) {
case EINTR:
goto start_poll;
default:
return AMQP_STATUS_SOCKET_ERROR;
}
}
return AMQP_STATUS_OK;
#elif defined(HAVE_SELECT)
fd_set fds;
fd_set exceptfds;
fd_set *exceptfdsp;
int res;
struct timeval tv;
struct timeval *tvp;
assert((0 != (event & AMQP_SF_POLLIN)) || (0 != (event & AMQP_SF_POLLOUT)));
#ifndef _WIN32
/* On Win32 connect() failure is indicated through the exceptfds, it does not
* make any sense to allow POLLERR on any other platform or condition */
assert(0 == (event & AMQP_SF_POLLERR));
#endif
start_select:
FD_ZERO(&fds);
FD_SET(fd, &fds);
if (event & AMQP_SF_POLLERR) {
FD_ZERO(&exceptfds);
FD_SET(fd, &exceptfds);
exceptfdsp = &exceptfds;
} else {
exceptfdsp = NULL;
}
res = amqp_time_tv_until(deadline, &tv, &tvp);
if (res != AMQP_STATUS_OK) {
return res;
}
if (event & AMQP_SF_POLLIN) {
res = select(fd + 1, &fds, NULL, exceptfdsp, tvp);
} else if (event & AMQP_SF_POLLOUT) {
res = select(fd + 1, NULL, &fds, exceptfdsp, tvp);
}
if (0 < res) {
return AMQP_STATUS_OK;
} else if (0 == res) {
return AMQP_STATUS_TIMEOUT;
} else {
switch (amqp_os_socket_error()) {
case EINTR:
goto start_select;
default:
return AMQP_STATUS_SOCKET_ERROR;
}
}
#else
# error "poll() or select() is needed to compile rabbitmq-c"
#endif
}
static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
amqp_time_t deadline) {
int fd = amqp_get_sockfd(state);
if (-1 == fd) {
return AMQP_STATUS_SOCKET_CLOSED;
}
switch (res) {
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
res = amqp_poll(fd, AMQP_SF_POLLIN, deadline);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline);
break;
}
return res;
}
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
size_t len, amqp_time_t deadline, int flags) {
ssize_t res;
void* buf_left = (void*)buf;
/* Assume that len is not going to be larger than ssize_t can hold. */
ssize_t len_left = (size_t)len;
start_send:
res = amqp_socket_send(state->socket, buf_left, len_left, flags);
if (res > 0) {
len_left -= res;
buf_left = (char*)buf_left + res;
if (0 == len_left) {
return (ssize_t)len;
}
goto start_send;
}
res = do_poll(state, res, deadline);
if (AMQP_STATUS_OK == res) {
goto start_send;
}
if (AMQP_STATUS_TIMEOUT == res) {
return (ssize_t)len - len_left;
}
return res;
}
int
amqp_open_socket(char const *hostname,
int portnumber)
{
return amqp_open_socket_inner(hostname, portnumber,
amqp_time_infinite());
}
int amqp_open_socket_noblock(char const *hostname,
int portnumber,
struct timeval *timeout)
{
amqp_time_t deadline;
int res = amqp_time_from_now(&deadline, timeout);
if (AMQP_STATUS_OK != res) {
return res;
}
return amqp_open_socket_inner(hostname, portnumber, deadline);
}
int amqp_open_socket_inner(char const *hostname,
int portnumber,
amqp_time_t deadline) {
struct addrinfo hint;
struct addrinfo *address_list;
struct addrinfo *addr;
char portnumber_string[33];
int sockfd = -1;
int last_error;
int one = 1; /* for setsockopt */
int res;
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
return last_error;
}
memset(&hint, 0, sizeof(hint));
hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */
hint.ai_socktype = SOCK_STREAM;
hint.ai_protocol = IPPROTO_TCP;
(void)sprintf(portnumber_string, "%d", portnumber);
last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list);
if (0 != last_error) {
return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED;
}
for (addr = address_list; addr; addr = addr->ai_next) {
if (-1 != sockfd) {
amqp_os_socket_close(sockfd);
}
sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (-1 == sockfd) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
#ifdef SO_NOSIGPIPE
if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
#endif /* SO_NOSIGPIPE */
if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
#ifdef _WIN32
res = connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen);
#else
res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
#endif
if (0 == res) {
last_error = AMQP_STATUS_OK;
break;
}
#ifdef _WIN32
if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
int event = AMQP_SF_POLLOUT | AMQP_SF_POLLERR;
#else
if (EINPROGRESS == amqp_os_socket_error()) {
int event = AMQP_SF_POLLOUT;
#endif
last_error = amqp_poll(sockfd, event, deadline);
if (AMQP_STATUS_OK == last_error) {
int result;
socklen_t result_len = sizeof(result);
#ifdef _WIN32
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&result,
(int *)&result_len) < 0
#else
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0
#endif
|| result != 0) {
last_error = AMQP_STATUS_SOCKET_ERROR;
} else {
last_error = AMQP_STATUS_OK;
}
}
if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT ||
last_error == AMQP_STATUS_TIMER_FAILURE) {
/* Exit for loop on timer errors or when connection established */
break;
}
} else {
/* Error connecting */
last_error = AMQP_STATUS_SOCKET_ERROR;
break;
}
}
freeaddrinfo(address_list);
if (last_error != AMQP_STATUS_OK) {
if (-1 != sockfd) {
amqp_os_socket_close(sockfd);
}
return last_error;
}
return sockfd;
}
static int send_header_inner(amqp_connection_state_t state,
amqp_time_t deadline) {
ssize_t res;
static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
AMQP_PROTOCOL_VERSION_MAJOR,
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE);
if (sizeof(header) == res) {
return AMQP_STATUS_OK;
}
return (int)res;
}
int amqp_send_header(amqp_connection_state_t state) {
return send_header_inner(state, amqp_time_infinite());
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
{
amqp_bytes_t res;
switch (method) {
case AMQP_SASL_METHOD_PLAIN:
res = amqp_cstring_bytes("PLAIN");
break;
case AMQP_SASL_METHOD_EXTERNAL:
res = amqp_cstring_bytes("EXTERNAL");
break;
default:
amqp_abort("Invalid SASL method: %d", (int) method);
}
return res;
}
static int bytes_equal(amqp_bytes_t l, amqp_bytes_t r)
{
if (l.len == r.len) {
if (l.bytes && r.bytes) {
if (0 == memcmp(l.bytes, r.bytes, l.len)) {
return 1;
}
}
}
return 0;
}
int sasl_mechanism_in_list(amqp_bytes_t mechanisms,
amqp_sasl_method_enum method)
{
amqp_bytes_t mechanism;
amqp_bytes_t supported_mechanism;
uint8_t* start;
uint8_t* end;
uint8_t* current;
assert(NULL != mechanisms.bytes);
mechanism = sasl_method_name(method);
start = (uint8_t*)mechanisms.bytes;
current = start;
end = start + mechanisms.len;
for ( ; current != end; start = current + 1) {
/* HACK: SASL states that we should be parsing this string as a UTF-8
* string, which we're plainly not doing here. At this point its not worth
* dragging an entire UTF-8 parser for this one case, and this should work
* most of the time */
current = memchr(start, ' ', end - start);
if (NULL == current) {
current = end;
}
supported_mechanism.bytes = start;
supported_mechanism.len = current - start;
if (bytes_equal(mechanism, supported_mechanism)) {
return 1;
}
}
return 0;
}
static amqp_bytes_t sasl_response(amqp_pool_t *pool,
amqp_sasl_method_enum method,
va_list args)
{
amqp_bytes_t response;
switch (method) {
case AMQP_SASL_METHOD_PLAIN: {
char *username = va_arg(args, char *);
size_t username_len = strlen(username);
char *password = va_arg(args, char *);
size_t password_len = strlen(password);
char *response_buf;
amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
if (response.bytes == NULL)
/* We never request a zero-length block, because of the +2
above, so a NULL here really is ENOMEM. */
{
return response;
}
response_buf = response.bytes;
response_buf[0] = 0;
memcpy(response_buf + 1, username, username_len);
response_buf[username_len + 1] = 0;
memcpy(response_buf + username_len + 2, password, password_len);
break;
}
case AMQP_SASL_METHOD_EXTERNAL: {
char *identity = va_arg(args, char *);
size_t identity_len = strlen(identity);
amqp_pool_alloc_bytes(pool, identity_len, &response);
if (response.bytes == NULL) {
return response;
}
memcpy(response.bytes, identity, identity_len);
break;
}
default:
amqp_abort("Invalid SASL method: %d", (int) method);
}
return response;
}
amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state)
{
return (state->first_queued_frame != NULL);
}
/*
* Check to see if we have data in our buffer. If this returns 1, we
* will avoid an immediate blocking read in amqp_simple_wait_frame.
*/
amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state)
{
return (state->sock_inbound_offset < state->sock_inbound_limit);
}
static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame)
{
int res;
amqp_bytes_t buffer;
buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
res = amqp_handle_input(state, buffer, decoded_frame);
if (res < 0) {
return res;
}
state->sock_inbound_offset += res;
return AMQP_STATUS_OK;
}
static int recv_with_timeout(amqp_connection_state_t state, amqp_time_t timeout) {
ssize_t res;
int fd;
start_recv:
res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
state->sock_inbound_buffer.len, 0);
if (res < 0) {
fd = amqp_get_sockfd(state);
if (-1 == fd) {
return AMQP_STATUS_CONNECTION_CLOSED;
}
switch (res) {
default:
return (int)res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
res = amqp_poll(fd, AMQP_SF_POLLIN, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
goto start_recv;
}
return (int)res;
}
state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;
res = amqp_time_s_from_now(&state->next_recv_heartbeat,
amqp_heartbeat_recv(state));
if (AMQP_STATUS_OK != res) {
return (int)res;
}
return AMQP_STATUS_OK;
}
int amqp_try_recv(amqp_connection_state_t state) {
amqp_time_t timeout;
while (amqp_data_in_buffer(state)) {
amqp_frame_t frame;
int res = consume_one_frame(state, &frame);
if (AMQP_STATUS_OK != res) {
return res;
}
if (frame.frame_type != 0) {
amqp_pool_t *channel_pool;
amqp_frame_t *frame_copy;
amqp_link_t *link;
channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
if (NULL == channel_pool) {
return AMQP_STATUS_NO_MEMORY;
}
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
if (frame_copy == NULL || link == NULL) {
return AMQP_STATUS_NO_MEMORY;
}
*frame_copy = frame;
link->next = NULL;
link->data = frame_copy;
if (state->last_queued_frame == NULL) {
state->first_queued_frame = link;
} else {
state->last_queued_frame->next = link;
}
state->last_queued_frame = link;
}
}
timeout = amqp_time_immediate();
return recv_with_timeout(state, timeout);
}
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
amqp_time_t timeout_deadline) {
amqp_time_t deadline;
int res;
for (;;) {
while (amqp_data_in_buffer(state)) {
res = consume_one_frame(state, decoded_frame);
if (AMQP_STATUS_OK != res) {
return res;
}
if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
amqp_maybe_release_buffers_on_channel(state, 0);
continue;
}
if (decoded_frame->frame_type != 0) {
/* Complete frame was read. Return it. */
return AMQP_STATUS_OK;
}
}
beginrecv:
res = amqp_time_has_past(state->next_send_heartbeat);
if (AMQP_STATUS_TIMER_FAILURE == res) {
return res;
} else if (AMQP_STATUS_TIMEOUT == res) {
amqp_frame_t heartbeat;
heartbeat.channel = 0;
heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
res = amqp_send_frame(state, &heartbeat);
if (AMQP_STATUS_OK != res) {
return res;
}
}
deadline = amqp_time_first(timeout_deadline,
amqp_time_first(state->next_recv_heartbeat,
state->next_send_heartbeat));
/* TODO this needs to wait for a _frame_ and not anything written from the
* socket */
res = recv_with_timeout(state, deadline);
if (AMQP_STATUS_TIMEOUT == res) {
if (amqp_time_equal(deadline, state->next_recv_heartbeat)) {
amqp_socket_close(state->socket, AMQP_SC_FORCE);
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (amqp_time_equal(deadline, timeout_deadline)) {
return AMQP_STATUS_TIMEOUT;
} else if (amqp_time_equal(deadline, state->next_send_heartbeat)) {
/* send heartbeat happens before we do recv_with_timeout */
goto beginrecv;
} else {
amqp_abort("Internal error: unable to determine timeout reason");
}
} else if (AMQP_STATUS_OK != res) {
return res;
}
}
}
static amqp_link_t * amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame)
{
amqp_link_t *link;
amqp_frame_t *frame_copy;
amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel);
if (NULL == channel_pool) {
return NULL;
}
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
if (NULL == link || NULL == frame_copy) {
return NULL;
}
*frame_copy = *frame;
link->data = frame_copy;
return link;
}
int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
{
amqp_link_t *link = amqp_create_link_for_frame(state, frame);
if (NULL == link) {
return AMQP_STATUS_NO_MEMORY;
}
if (NULL == state->first_queued_frame) {
state->first_queued_frame = link;
} else {
state->last_queued_frame->next = link;
}
link->next = NULL;
state->last_queued_frame = link;
return AMQP_STATUS_OK;
}
int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame)
{
amqp_link_t *link = amqp_create_link_for_frame(state, frame);
if (NULL == link) {
return AMQP_STATUS_NO_MEMORY;
}
if (NULL == state->first_queued_frame) {
state->first_queued_frame = link;
state->last_queued_frame = link;
link->next = NULL;
} else {
link->next = state->first_queued_frame;
state->first_queued_frame = link;
}
return AMQP_STATUS_OK;
}
int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_frame_t *decoded_frame)
{
amqp_frame_t *frame_ptr;
amqp_link_t *cur;
int res;
for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
frame_ptr = cur->data;
if (channel == frame_ptr->channel) {
state->first_queued_frame = cur->next;
if (NULL == state->first_queued_frame) {
state->last_queued_frame = NULL;
}
*decoded_frame = *frame_ptr;
return AMQP_STATUS_OK;
}
}
for (;;) {
res = wait_frame_inner(state, decoded_frame, amqp_time_infinite());
if (AMQP_STATUS_OK != res) {
return res;
}
if (channel == decoded_frame->channel) {
return AMQP_STATUS_OK;
} else {
res = amqp_queue_frame(state, decoded_frame);
if (res != AMQP_STATUS_OK) {
return res;
}
}
}
}
int amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL);
}
int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
struct timeval *timeout)
{
amqp_time_t deadline;
int res = amqp_time_from_now(&deadline, timeout);
if (AMQP_STATUS_OK != res) {
return res;
}
if (state->first_queued_frame != NULL) {
amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
state->first_queued_frame = state->first_queued_frame->next;
if (state->first_queued_frame == NULL) {
state->last_queued_frame = NULL;
}
*decoded_frame = *f;
return AMQP_STATUS_OK;
} else {
return wait_frame_inner(state, decoded_frame, deadline);
}
}
static int amqp_simple_wait_method_list(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t *expected_methods,
amqp_time_t deadline,
amqp_method_t *output) {
amqp_frame_t frame;
struct timeval tv;
struct timeval *tvp;
int res = amqp_time_tv_until(deadline, &tv, &tvp);
if (res != AMQP_STATUS_OK) {
return res;
}
res = amqp_simple_wait_frame_noblock(state, &frame, tvp);
if (AMQP_STATUS_OK != res) {
return res;
}
if (AMQP_FRAME_METHOD != frame.frame_type ||
expected_channel != frame.channel ||
!amqp_id_in_reply_list(frame.payload.method.id, expected_methods)) {
return AMQP_STATUS_WRONG_METHOD;
}
*output = frame.payload.method;
return AMQP_STATUS_OK;
}
static int simple_wait_method_inner(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t expected_method,
amqp_time_t deadline,
amqp_method_t *output) {
amqp_method_number_t expected_methods[] = {expected_method, 0};
return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
deadline, output);
}
int amqp_simple_wait_method(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t expected_method,
amqp_method_t *output)
{
return simple_wait_method_inner(state, expected_channel, expected_method,
amqp_time_infinite(), output);
}
int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
amqp_method_number_t id, void *decoded) {
return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE,
amqp_time_infinite());
}
int amqp_send_method_inner(amqp_connection_state_t state,
amqp_channel_t channel, amqp_method_number_t id,
void *decoded, int flags, amqp_time_t deadline) {
amqp_frame_t frame;
frame.frame_type = AMQP_FRAME_METHOD;
frame.channel = channel;
frame.payload.method.id = id;
frame.payload.method.decoded = decoded;
return amqp_send_frame_inner(state, &frame, flags, deadline);
}
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
{
while ( *list != 0 ) {
if ( *list == expected ) {
return 1;
}
list++;
}
return 0;
}
static amqp_rpc_reply_t simple_rpc_inner(
amqp_connection_state_t state, amqp_channel_t channel,
amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids,
void *decoded_request_method, amqp_time_t deadline) {
int status;
amqp_rpc_reply_t result;
memset(&result, 0, sizeof(result));
status = amqp_send_method(state, channel, request_id, decoded_request_method);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = status;
return result;
}
{
amqp_frame_t frame;
retry:
status = wait_frame_inner(state, &frame, deadline);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = status;
return result;
}
/*
* We store the frame for later processing unless it's something
* that directly affects us here, namely a method frame that is
* either
* - on the channel we want, and of the expected type, or
* - on the channel we want, and a channel.close frame, or
* - on channel zero, and a connection.close frame.
*/
if (!((frame.frame_type == AMQP_FRAME_METHOD)
&& (
((frame.channel == channel)
&& (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)
|| (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
||
((frame.channel == 0)
&& (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
)
)) {
amqp_pool_t *channel_pool;
amqp_frame_t *frame_copy;
amqp_link_t *link;
channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
if (NULL == channel_pool) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = AMQP_STATUS_NO_MEMORY;
return result;
}
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = AMQP_STATUS_NO_MEMORY;
return result;
}
*frame_copy = frame;
link->next = NULL;
link->data = frame_copy;
if (state->last_queued_frame == NULL) {
state->first_queued_frame = link;
} else {
state->last_queued_frame->next = link;
}
state->last_queued_frame = link;
goto retry;
}
result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
? AMQP_RESPONSE_NORMAL
: AMQP_RESPONSE_SERVER_EXCEPTION;
result.reply = frame.payload.method;
return result;
}
}
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
amqp_method_number_t *expected_reply_ids,
void *decoded_request_method) {
return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
decoded_request_method, amqp_time_infinite());
}
static void *simple_rpc_decoded_inner(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
amqp_method_number_t reply_id,
void *decoded_request_method,
amqp_time_t deadline) {
amqp_method_number_t replies[2];
replies[0] = reply_id;
replies[1] = 0;
state->most_recent_api_result = simple_rpc_inner(
state, channel, request_id, replies, decoded_request_method, deadline);
if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
return state->most_recent_api_result.reply.decoded;
} else {
return NULL;
}
}
void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_method_number_t request_id,
amqp_method_number_t reply_id,
void *decoded_request_method) {
return simple_rpc_decoded_inner(state, channel, request_id, reply_id,
decoded_request_method, amqp_time_infinite());
}
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
{
return state->most_recent_api_result;
}
/*
* Merge base and add tables. If the two tables contain an entry with the same
* key, the entry from the add table takes precedence. For entries that are both
* tables with the same key, the table is recursively merged.
*/
int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add,
amqp_table_t *result, amqp_pool_t *pool) {
int i;
int res;
amqp_pool_t temp_pool;
amqp_table_t temp_result;
assert(base != NULL);
assert(result != NULL);
assert(pool != NULL);
if (NULL == add) {
return amqp_table_clone(base, result, pool);
}
init_amqp_pool(&temp_pool, 4096);
temp_result.num_entries = 0;
temp_result.entries =
amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) *
(base->num_entries + add->num_entries));
if (NULL == temp_result.entries) {
res = AMQP_STATUS_NO_MEMORY;
goto error_out;
}
for (i = 0; i < base->num_entries; ++i) {
temp_result.entries[temp_result.num_entries] = base->entries[i];
temp_result.num_entries++;
}
for (i = 0; i < add->num_entries; ++i) {
amqp_table_entry_t *e =
amqp_table_get_entry_by_key(&temp_result, add->entries[i].key);
if (NULL != e) {
if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind &&
AMQP_FIELD_KIND_TABLE == e->value.kind) {
amqp_table_entry_t *be =
amqp_table_get_entry_by_key(base, add->entries[i].key);
res = amqp_merge_capabilities(&be->value.value.table,
&add->entries[i].value.value.table,
&e->value.value.table, &temp_pool);
if (AMQP_STATUS_OK != res) {
goto error_out;
}
} else {
e->value = add->entries[i].value;
}
} else {
temp_result.entries[temp_result.num_entries] = add->entries[i];
temp_result.num_entries++;
}
}
res = amqp_table_clone(&temp_result, result, pool);
error_out:
empty_amqp_pool(&temp_pool);
return res;
}
static amqp_rpc_reply_t amqp_login_inner(
amqp_connection_state_t state, char const *vhost, int channel_max,
int frame_max, int heartbeat, const amqp_table_t *client_properties,
struct timeval *timeout, amqp_sasl_method_enum sasl_method, va_list vl) {
int res;
amqp_method_t method;
uint16_t client_channel_max;
uint32_t client_frame_max;
uint16_t client_heartbeat;
uint16_t server_channel_max;
uint32_t server_frame_max;
uint16_t server_heartbeat;
amqp_rpc_reply_t result;
amqp_time_t deadline;
if (channel_max < 0 || channel_max > UINT16_MAX) {
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
}
client_channel_max = (uint16_t)channel_max;
if (frame_max < 0) {
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
}
client_frame_max = (uint32_t)frame_max;
if (heartbeat < 0 || heartbeat > UINT16_MAX) {
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
}
client_heartbeat = (uint16_t)heartbeat;
res = amqp_time_from_now(&deadline, timeout);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
res = send_header_inner(state, deadline);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD,
deadline, &method);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR)
|| (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
res = AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION;
goto error_res;
}
res = amqp_table_clone(&s->server_properties, &state->server_properties,
&state->properties_pool);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
/* TODO: check that our chosen SASL mechanism is in the list of
acceptable mechanisms. Or even let the application choose from
the list! */
if (!sasl_mechanism_in_list(s->mechanisms, sasl_method)) {
res = AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD;
goto error_res;
}
}
{
amqp_table_entry_t default_properties[6];
amqp_table_t default_table;
amqp_table_entry_t client_capabilities[1];
amqp_table_t client_capabilities_table;
amqp_connection_start_ok_t s;
amqp_pool_t *channel_pool;
amqp_bytes_t response_bytes;
channel_pool = amqp_get_or_create_channel_pool(state, 0);
if (NULL == channel_pool) {
res = AMQP_STATUS_NO_MEMORY;
goto error_res;
}
response_bytes = sasl_response(channel_pool,
sasl_method, vl);
if (response_bytes.bytes == NULL) {
res = AMQP_STATUS_NO_MEMORY;
goto error_res;
}
client_capabilities[0] =
amqp_table_construct_bool_entry("authentication_failure_close", 1);
client_capabilities_table.entries = client_capabilities;
client_capabilities_table.num_entries =
sizeof(client_capabilities) / sizeof(amqp_table_entry_t);
default_properties[0] =
amqp_table_construct_utf8_entry("product", "rabbitmq-c");
default_properties[1] =
amqp_table_construct_utf8_entry("version", AMQP_VERSION_STRING);
default_properties[2] =
amqp_table_construct_utf8_entry("platform", AMQ_PLATFORM);
default_properties[3] =
amqp_table_construct_utf8_entry("copyright", AMQ_COPYRIGHT);
default_properties[4] = amqp_table_construct_utf8_entry(
"information", "See https://github.com/alanxz/rabbitmq-c");
default_properties[5] = amqp_table_construct_table_entry(
"capabilities", &client_capabilities_table);
default_table.entries = default_properties;
default_table.num_entries =
sizeof(default_properties) / sizeof(amqp_table_entry_t);
res = amqp_merge_capabilities(&default_table, client_properties,
&state->client_properties, channel_pool);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
s.client_properties = state->client_properties;
s.mechanism = sasl_method_name(sasl_method);
s.response = response_bytes;
s.locale = amqp_cstring_bytes("en_US");
res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s,
AMQP_SF_NONE, deadline);
if (res < 0) {
goto error_res;
}
}
amqp_release_buffers(state);
{
amqp_method_number_t expected[] = { AMQP_CONNECTION_TUNE_METHOD,
AMQP_CONNECTION_CLOSE_METHOD, 0 };
res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
}
if (AMQP_CONNECTION_CLOSE_METHOD == method.id) {
result.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
result.reply = method;
result.library_error = 0;
goto out;
}
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
server_channel_max = s->channel_max;
server_frame_max = s->frame_max;
server_heartbeat = s->heartbeat;
}
if (server_channel_max != 0 &&
(server_channel_max < client_channel_max || client_channel_max == 0)) {
client_channel_max = server_channel_max;
} else if (server_channel_max == 0 && client_channel_max == 0) {
client_channel_max = UINT16_MAX;
}
if (server_frame_max != 0 && server_frame_max < client_frame_max) {
client_frame_max = server_frame_max;
}
if (server_heartbeat != 0 && server_heartbeat < client_heartbeat) {
client_heartbeat = server_heartbeat;
}
res = amqp_tune_connection(state, client_channel_max, client_frame_max,
client_heartbeat);
if (res < 0) {
goto error_res;
}
{
amqp_connection_tune_ok_t s;
s.frame_max = client_frame_max;
s.channel_max = client_channel_max;
s.heartbeat = client_heartbeat;
res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s,
AMQP_SF_NONE, deadline);
if (res < 0) {
goto error_res;
}
}
amqp_release_buffers(state);
{
amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 };
amqp_connection_open_t s;
s.virtual_host = amqp_cstring_bytes(vhost);
s.capabilities = amqp_empty_bytes;
s.insist = 1;
result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies,
&s, deadline);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
goto out;
}
}
result.reply_type = AMQP_RESPONSE_NORMAL;
result.reply.id = 0;
result.reply.decoded = NULL;
result.library_error = 0;
amqp_maybe_release_buffers(state);
out:
return result;
error_res:
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.reply.id = 0;
result.reply.decoded = NULL;
result.library_error = res;
goto out;
}
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
char const *vhost,
int channel_max,
int frame_max,
int heartbeat,
amqp_sasl_method_enum sasl_method,
...)
{
va_list vl;
amqp_rpc_reply_t ret;
va_start(vl, sasl_method);
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
&amqp_empty_table, state->handshake_timeout,
sasl_method, vl);
va_end(vl);
return ret;
}
amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state,
char const *vhost,
int channel_max,
int frame_max,
int heartbeat,
const amqp_table_t *client_properties,
amqp_sasl_method_enum sasl_method,
...)
{
va_list vl;
amqp_rpc_reply_t ret;
va_start(vl, sasl_method);
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
client_properties, state->handshake_timeout,
sasl_method, vl);
va_end(vl);
return ret;
}