Skip to content

Commit

Permalink
SIGPIPE suppression for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
Browse files Browse the repository at this point in the history
Signed-off-by: Rohan Mars <code@rohanmars.com>
  • Loading branch information
rohanmars committed Nov 13, 2015
1 parent f83f10a commit 131deb3
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 17 deletions.
5 changes: 5 additions & 0 deletions COPYING
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,8 @@ Files: src/include/timegm.h
Copyright (C) Copyright Howard Hinnant
Copyright (C) Copyright 2010-2011 Vicente J. Botet Escriba
License: Boost Software License, Version 1.0

Files: src/msg/async/AsyncConnection.cc, src/msg/simple/Pipe.cc (sigpipe suppression)
Copyright (C) 2010 Tomash Brechko. All rights reserved.
License: GPL3

12 changes: 0 additions & 12 deletions src/include/sock_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,4 @@
# define MSG_MORE 0
#endif

/*
* On BSD SO_NOSIGPIPE can be set via setsockopt to block SIGPIPE.
*/
#ifndef MSG_NOSIGNAL
# define MSG_NOSIGNAL 0
# ifdef SO_NOSIGPIPE
# define CEPH_USE_SO_NOSIGPIPE
# else
# error "Cannot block SIGPIPE!"
# endif
#endif

#endif
73 changes: 72 additions & 1 deletion src/msg/async/AsyncConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,82 @@ int AsyncConnection::read_bulk(int fd, char *buf, int len)
return nread;
}

/*
SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
*/
void AsyncConnection::suppress_sigpipe()
{
#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
/*
We want to ignore possible SIGPIPE that we can generate on write.
SIGPIPE is delivered *synchronously* and *only* to the thread
doing the write. So if it is reported as already pending (which
means the thread blocks it), then we do nothing: if we generate
SIGPIPE, it will be merged with the pending one (there's no
queuing), and that suits us well. If it is not pending, we block
it in this thread (and we avoid changing signal action, because it
is per-process).
*/
sigset_t pending;
sigemptyset(&pending);
sigpending(&pending);
sigpipe_pending = sigismember(&pending, SIGPIPE);
if (!sigpipe_pending) {
sigset_t blocked;
sigemptyset(&blocked);
pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);

/* Maybe is was blocked already? */
sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
}
#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
}


void AsyncConnection::restore_sigpipe()
{
#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
/*
If SIGPIPE was pending already we do nothing. Otherwise, if it
become pending (i.e., we generated it), then we sigwait() it (thus
clearing pending status). Then we unblock SIGPIPE, but only if it
were us who blocked it.
*/
if (!sigpipe_pending) {
sigset_t pending;
sigemptyset(&pending);
sigpending(&pending);
if (sigismember(&pending, SIGPIPE)) {
/*
Protect ourselves from a situation when SIGPIPE was sent
by the user to the whole process, and was delivered to
other thread before we had a chance to wait for it.
*/
static const struct timespec nowait = { 0, 0 };
TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
}

if (sigpipe_unblock)
pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
}
#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
}

// return the length of msg needed to be sent,
// < 0 means error occured
int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
{
suppress_sigpipe();

while (len > 0) {
int r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
int r;
#if defined(MSG_NOSIGNAL)
r = ::sendmsg(sd, &msg, MSG_NOSIGNAL);
#else
r = ::sendmsg(sd, &msg, 0);
#endif /* defined(MSG_NOSIGNAL) */

if (r == 0) {
ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl;
Expand Down Expand Up @@ -266,6 +336,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
break;
}
}
restore_sigpipe();
}
return len;
}
Expand Down
9 changes: 9 additions & 0 deletions src/msg/async/AsyncConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define CEPH_MSG_ASYNCCONNECTION_H

#include <pthread.h>
#include <signal.h>
#include <climits>
#include <list>
#include <map>
Expand Down Expand Up @@ -45,6 +46,8 @@ class AsyncMessenger;
class AsyncConnection : public Connection {

int read_bulk(int fd, char *buf, int len);
void suppress_sigpipe();
void restore_sigpipe();
int do_sendmsg(struct msghdr &msg, int len, bool more);
int try_send(bufferlist &bl, bool send=true) {
Mutex::Locker l(write_lock);
Expand Down Expand Up @@ -291,6 +294,12 @@ class AsyncConnection : public Connection {
EventCenter *center;
ceph::shared_ptr<AuthSessionHandler> session_security;

#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
sigset_t sigpipe_mask;
bool sigpipe_pending;
bool sigpipe_unblock;
#endif

public:
// used by eventcallback
void handle_write();
Expand Down
2 changes: 1 addition & 1 deletion src/msg/async/net_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void NetHandler::set_socket_options(int sd)
}

// block ESIGPIPE
#ifdef CEPH_USE_SO_NOSIGPIPE
#ifdef SO_NOSIGPIPE
int val = 1;
int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
if (r) {
Expand Down
90 changes: 87 additions & 3 deletions src/msg/simple/Pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ void Pipe::set_socket_options()
}

// block ESIGPIPE
#ifdef CEPH_USE_SO_NOSIGPIPE
#if defined(SO_NOSIGPIPE)
int val = 1;
int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
if (r) {
Expand All @@ -847,6 +847,7 @@ void Pipe::set_socket_options()
<< ": " << cpp_strerror(errno) << dendl;
}
#endif
#if defined(SO_PRIORITY)
// setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
// See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
// We need to call setsockopt(SO_PRIORITY) after it.
Expand All @@ -857,6 +858,7 @@ void Pipe::set_socket_options()
ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
<< ": " << cpp_strerror(errno) << dendl;
}
#endif
}
}

Expand Down Expand Up @@ -2120,8 +2122,73 @@ int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
return ret;
}

/*
SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
*/
void Pipe::suppress_sigpipe()
{
#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
/*
We want to ignore possible SIGPIPE that we can generate on write.
SIGPIPE is delivered *synchronously* and *only* to the thread
doing the write. So if it is reported as already pending (which
means the thread blocks it), then we do nothing: if we generate
SIGPIPE, it will be merged with the pending one (there's no
queuing), and that suits us well. If it is not pending, we block
it in this thread (and we avoid changing signal action, because it
is per-process).
*/
sigset_t pending;
sigemptyset(&pending);
sigpending(&pending);
sigpipe_pending = sigismember(&pending, SIGPIPE);
if (!sigpipe_pending) {
sigset_t blocked;
sigemptyset(&blocked);
pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);

/* Maybe is was blocked already? */
sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
}
#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
}


void Pipe::restore_sigpipe()
{
#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
/*
If SIGPIPE was pending already we do nothing. Otherwise, if it
become pending (i.e., we generated it), then we sigwait() it (thus
clearing pending status). Then we unblock SIGPIPE, but only if it
were us who blocked it.
*/
if (!sigpipe_pending) {
sigset_t pending;
sigemptyset(&pending);
sigpending(&pending);
if (sigismember(&pending, SIGPIPE)) {
/*
Protect ourselves from a situation when SIGPIPE was sent
by the user to the whole process, and was delivered to
other thread before we had a chance to wait for it.
*/
static const struct timespec nowait = { 0, 0 };
TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
}

if (sigpipe_unblock)
pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
}
#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
}


int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
{
suppress_sigpipe();
while (len > 0) {
if (0) { // sanity
int l = 0;
Expand All @@ -2130,16 +2197,23 @@ int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
assert(l == len);
}

int r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
int r;
#if defined(MSG_NOSIGNAL)
r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
#else
r = ::sendmsg(sd, msg, (more ? MSG_MORE : 0));
#endif
if (r == 0)
ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
if (r < 0) {
ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(errno) << dendl;
restore_sigpipe();
return -1;
}
if (state == STATE_CLOSED) {
ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
errno = EINTR;
restore_sigpipe();
return -1; // close enough
}

Expand All @@ -2164,6 +2238,7 @@ int Pipe::do_sendmsg(struct msghdr *msg, int len, bool more)
}
}
}
restore_sigpipe();
return 0;
}

Expand Down Expand Up @@ -2527,8 +2602,15 @@ int Pipe::tcp_write(const char *buf, int len)

//lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
assert(len > 0);
suppress_sigpipe();

while (len > 0) {
int did = ::send( sd, buf, len, MSG_NOSIGNAL );
int did;
#if defined(MSG_NOSIGNAL)
did = ::send( sd, buf, len, MSG_NOSIGNAL );
#else
did = ::send( sd, buf, len, 0);
#endif
if (did < 0) {
//lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
//lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
Expand All @@ -2538,5 +2620,7 @@ int Pipe::tcp_write(const char *buf, int len)
buf += did;
//lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
}
restore_sigpipe();

return 0;
}
9 changes: 9 additions & 0 deletions src/msg/simple/Pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ class DispatchQueue;
private:
int sd;
struct iovec msgvec[IOV_MAX];
#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
sigset_t sigpipe_mask;
bool sigpipe_pending;
bool sigpipe_unblock;
#endif

public:
int port;
Expand Down Expand Up @@ -247,6 +252,10 @@ class DispatchQueue;
int write_keepalive();
int write_keepalive2(char tag, const utime_t &t);

void suppress_sigpipe();
void restore_sigpipe();


void fault(bool reader=false);

void was_session_reset();
Expand Down

0 comments on commit 131deb3

Please sign in to comment.