Skip to content

Commit

Permalink
Use dgram sockets for message oriented communications
Browse files Browse the repository at this point in the history
This is to prevent partial sends and gettting stuck in
retry loops.

Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
  • Loading branch information
asalkeld committed Feb 18, 2013
1 parent 481bd0c commit 31d9f09
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 267 deletions.
5 changes: 3 additions & 2 deletions lib/ipc_int.h
Expand Up @@ -70,6 +70,7 @@ struct qb_ipc_one_way {
union {
struct {
int32_t sock;
char *sock_name;
void* shared_data;
char shared_file_name[NAME_MAX];
} us;
Expand Down Expand Up @@ -105,7 +106,8 @@ int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *one_way, int32_t ms_timeout, int32_t events);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
int32_t ms_timeout, int32_t events);

void qb_ipcc_us_sock_close(int32_t sock);

Expand Down Expand Up @@ -194,7 +196,6 @@ int32_t qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt);
int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data);
int32_t qb_ipcs_dispatch_service_request(int32_t fd, int32_t revents, void *data);
struct qb_ipcs_connection* qb_ipcs_connection_alloc(struct qb_ipcs_service *s);
void qb_ipcs_sockets_disconnect(struct qb_ipcs_connection *c);

int32_t qb_ipcs_process_request(struct qb_ipcs_service *s,
struct qb_ipc_request_header *hdr);
Expand Down
182 changes: 58 additions & 124 deletions lib/ipc_setup.c
Expand Up @@ -50,7 +50,6 @@ struct ipc_auth_ugp {

static int32_t qb_ipcs_us_connection_acceptor(int fd, int revent, void *data);


#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
Expand All @@ -66,9 +65,7 @@ qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len)

retry_send:
result = send(one_way->u.us.sock,
&rbuf[processed],
len - processed,
MSG_NOSIGNAL);
&rbuf[processed], len - processed, MSG_NOSIGNAL);

if (result == -1) {
if (errno == EAGAIN && processed > 0) {
Expand Down Expand Up @@ -144,30 +141,49 @@ qb_ipc_us_sock_error_is_disconnected(int err)
}

int32_t
qb_ipc_us_ready(struct qb_ipc_one_way * one_way,
qb_ipc_us_ready(struct qb_ipc_one_way * ow_data,
struct qb_ipc_one_way * ow_conn,
int32_t ms_timeout, int32_t events)
{
struct pollfd ufds;
struct pollfd ufds[2];
int32_t poll_events;
int numfds = 1;
int i;

ufds.fd = one_way->u.us.sock;
ufds.events = events;
ufds.revents = 0;
ufds[0].fd = ow_data->u.us.sock;
ufds[0].events = events;
ufds[0].revents = 0;

poll_events = poll(&ufds, 1, ms_timeout);
if (ow_conn && ow_data != ow_conn) {
numfds++;
ufds[1].fd = ow_conn->u.us.sock;
ufds[1].events = POLLIN;
ufds[1].revents = 0;
}
poll_events = poll(ufds, numfds, ms_timeout);
if ((poll_events == -1 && errno == EINTR) || poll_events == 0) {
return -EAGAIN;
} else if (poll_events == -1) {
return -errno;
} else if (poll_events == 1 && (ufds.revents & POLLERR)) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR", one_way->u.us.sock);
return -ENOTCONN;
} else if (poll_events == 1 && (ufds.revents & POLLHUP)) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP", one_way->u.us.sock);
return -ENOTCONN;
} else if (poll_events == 1 && (ufds.revents & POLLNVAL)) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL", one_way->u.us.sock);
return -ENOTCONN;
}
for (i = 0; i < poll_events; i++) {
if (ufds[i].revents & POLLERR) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLERR",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLHUP) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLHUP",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents & POLLNVAL) {
qb_util_log(LOG_DEBUG, "poll(fd %d) got POLLNVAL",
ufds[i].fd);
return -ENOTCONN;
} else if (ufds[i].revents == 0) {
qb_util_log(LOG_DEBUG, "poll(fd %d) zero revents",
ufds[i].fd);
return -ENOTCONN;
}
}
return 0;
}
Expand All @@ -192,10 +208,8 @@ qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
MSG_NOSIGNAL | MSG_WAITALL);

if (result == -1) {
if (errno == EAGAIN &&
(processed > 0 || timeout == -1)) {
result = qb_ipc_us_ready(one_way, timeout,
POLLIN);
if (errno == EAGAIN && (processed > 0 || timeout == -1)) {
result = qb_ipc_us_ready(one_way, NULL, timeout, POLLIN);
if (result == 0 || result == -EAGAIN) {
goto retry_recv;
}
Expand All @@ -221,13 +235,13 @@ qb_ipc_us_recv(struct qb_ipc_one_way * one_way,
}
final_rc = processed;

cleanup_sigpipe:
cleanup_sigpipe:
qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
return final_rc;
}

int32_t
qb_ipcc_us_sock_connect(const char *socket_name, int32_t * sock_pt)
static int32_t
qb_ipcc_stream_sock_connect(const char *socket_name, int32_t * sock_pt)
{
int32_t request_fd;
struct sockaddr_un address;
Expand Down Expand Up @@ -291,13 +305,13 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
int on = 1;
#endif

res = qb_ipcc_us_sock_connect(c->name, &c->setup.u.us.sock);
res = qb_ipcc_stream_sock_connect(c->name, &c->setup.u.us.sock);
if (res != 0) {
return res;
}

#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on));
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &on,
sizeof(on));
#endif

memset(&request, 0, sizeof(request));
Expand All @@ -309,9 +323,9 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return res;
}

#ifdef QB_LINUX
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off, sizeof(off));
setsockopt(c->setup.u.us.sock, SOL_SOCKET, SO_PASSCRED, &off,
sizeof(off));
#endif

res =
Expand Down Expand Up @@ -452,8 +466,7 @@ handle_new_connection(struct qb_ipcs_service *s,
c->auth.mode = 0600;
c->stats.client_pid = ugp->pid;
snprintf(c->description, CONNECTION_DESCRIPTION,
"%d-%d-%d", s->pid, ugp->pid,
c->setup.u.us.sock);
"%d-%d-%d", s->pid, ugp->pid, c->setup.u.us.sock);

if (auth_result == 0 && c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
Expand All @@ -479,24 +492,6 @@ handle_new_connection(struct qb_ipcs_service *s,
c->state = QB_IPCS_CONNECTION_ACTIVE;
qb_list_add(&c->list, &s->connections);

if (s->needs_sock_for_poll) {
qb_ipcs_connection_ref(c);
res = s->poll_fns.dispatch_add(s->poll_priority,
c->setup.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
if (res < 0) {
qb_util_log(LOG_ERR,
"Error adding socket to mainloop (%s).",
c->description);
}
}
if (s->type == QB_IPC_SOCKET) {
c->request.u.us.sock = c->setup.u.us.sock;
c->response.u.us.sock = c->setup.u.us.sock;
}

send_response:
response.hdr.id = QB_IPC_MSG_AUTHENTICATE;
response.hdr.size = sizeof(response);
Expand All @@ -514,65 +509,29 @@ handle_new_connection(struct qb_ipcs_service *s,
}

if (res == 0) {
if (s->type != QB_IPC_SOCKET) {
qb_ipcs_connection_ref(c);
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
qb_ipcs_connection_unref(c);
qb_ipcs_connection_ref(c);
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
qb_ipcs_connection_unref(c);
} else {
if (res == -EACCES) {
qb_util_log(LOG_ERR, "Invalid IPC credentials (%s).",
c->description);
} else {
errno = -res;
qb_util_perror(LOG_ERR, "Error in connection setup (%s)",
qb_util_perror(LOG_ERR,
"Error in connection setup (%s)",
c->description);
}
qb_ipcs_disconnect(c);
}
return res;
}

static void
handle_connection_new_sock(struct qb_ipcs_service *s, int32_t sock, void *msg)
{
struct qb_ipcs_connection *c = NULL;
struct qb_ipc_event_connection_request *req = msg;

c = (struct qb_ipcs_connection *)req->connection;
qb_ipcs_connection_ref(c);
c->event.u.us.sock = sock;
if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
c->state = QB_IPCS_CONNECTION_ESTABLISHED;
}
if (s->serv_fns.connection_created) {
s->serv_fns.connection_created(c);
}

if (c->state == QB_IPCS_CONNECTION_ESTABLISHED &&
s->type == QB_IPC_SOCKET) {
int32_t res;
qb_ipcs_connection_ref(c);
res = s->poll_fns.dispatch_add(s->poll_priority,
c->request.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c,
qb_ipcs_dispatch_connection_request);
if (res < 0) {
qb_util_log(LOG_ERR,
"Error adding socket to mainloop (%s).",
c->description);
}
}

qb_ipcs_connection_unref(c);
}

static int32_t
qb_ipcs_uc_recv_and_auth(int32_t sock, void *msg, size_t len,
struct ipc_auth_ugp *ugp)
Expand Down Expand Up @@ -662,7 +621,8 @@ qb_ipcs_uc_recv_and_auth(int32_t sock, void *msg, size_t len,
struct cmsghdr *cmsg;

res = -EINVAL;
for (cmsg = CMSG_FIRSTHDR(&msg_recv); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg_recv, cmsg)) {
for (cmsg = CMSG_FIRSTHDR(&msg_recv); cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg_recv, cmsg)) {
if (cmsg->cmsg_type != SCM_CREDENTIALS)
continue;

Expand Down Expand Up @@ -701,7 +661,7 @@ qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
struct ipc_auth_ugp ugp;
socklen_t addrlen = sizeof(struct sockaddr_un);

if (revent & (POLLNVAL|POLLHUP|POLLERR)) {
if (revent & (POLLNVAL | POLLHUP | POLLERR)) {
/*
* handle shutdown more cleanly.
*/
Expand Down Expand Up @@ -751,35 +711,9 @@ qb_ipcs_us_connection_acceptor(int fd, int revent, void *data)
if (setup_msg.hdr.id == QB_IPC_MSG_AUTHENTICATE) {
(void)handle_new_connection(s, res, new_fd, &setup_msg,
sizeof(setup_msg), &ugp);
} else if (setup_msg.hdr.id == QB_IPC_MSG_NEW_EVENT_SOCK) {
if (res == 0) {
handle_connection_new_sock(s, new_fd, &setup_msg);
} else {
close(new_fd);
}
} else {
close(new_fd);
}

return 0;
}

void
qb_ipcs_sockets_disconnect(struct qb_ipcs_connection *c)
{
int sock = -1;

qb_enter();
if (c->service->needs_sock_for_poll && c->setup.u.us.sock > 0) {
sock = c->setup.u.us.sock;
qb_ipcc_us_sock_close(sock);
c->setup.u.us.sock = -1;
}
if (c->request.type == QB_IPC_SOCKET) {
sock = c->request.u.us.sock;
}
if (sock > 0) {
(void)c->service->poll_fns.dispatch_del(sock);
qb_ipcs_connection_unref(c);
}
}
45 changes: 35 additions & 10 deletions lib/ipc_shm.c
Expand Up @@ -224,17 +224,29 @@ qb_ipcc_shm_connect(struct qb_ipcc_connection * c,
static void
qb_ipcs_shm_disconnect(struct qb_ipcs_connection *c)
{
if (c->response.u.shm.rb) {
qb_rb_close(c->response.u.shm.rb);
c->response.u.shm.rb = NULL;
}
if (c->event.u.shm.rb) {
qb_rb_close(c->event.u.shm.rb);
c->event.u.shm.rb = NULL;
if (c->state == QB_IPCS_CONNECTION_ESTABLISHED ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
if (c->setup.u.us.sock > 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
qb_ipcs_connection_unref(c);
c->setup.u.us.sock = -1;
}
}
if (c->request.u.shm.rb) {
qb_rb_close(c->request.u.shm.rb);
c->request.u.shm.rb = NULL;
if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN ||
c->state == QB_IPCS_CONNECTION_ACTIVE) {
if (c->response.u.shm.rb) {
qb_rb_close(c->response.u.shm.rb);
c->response.u.shm.rb = NULL;
}
if (c->event.u.shm.rb) {
qb_rb_close(c->event.u.shm.rb);
c->event.u.shm.rb = NULL;
}
if (c->request.u.shm.rb) {
qb_rb_close(c->request.u.shm.rb);
c->request.u.shm.rb = NULL;
}
}
}

Expand Down Expand Up @@ -306,6 +318,19 @@ qb_ipcs_shm_connect(struct qb_ipcs_service *s,
goto cleanup_request_response;
}

res = s->poll_fns.dispatch_add(s->poll_priority,
c->setup.u.us.sock,
POLLIN | POLLPRI | POLLNVAL,
c, qb_ipcs_dispatch_connection_request);
if (res == 0) {
qb_ipcs_connection_ref(c);
} else {
qb_util_log(LOG_ERR,
"Error adding socket to mainloop (%s).",
c->description);
goto cleanup_request_response;
}

r->hdr.error = 0;
return 0;

Expand Down

0 comments on commit 31d9f09

Please sign in to comment.