Skip to content

Commit

Permalink
Fix SIGPIPE handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenick committed Aug 17, 2016
1 parent 6288295 commit c0be102
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -50,6 +50,7 @@ Testing
Debug
demo/cassandra_demo
test/unit_tests/cassandra_test
src/cassconfig.hpp

# API docs files
Doxyfile
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Expand Up @@ -173,6 +173,7 @@ set(CASS_SOURCE_DIR ${PROJECT_SOURCE_DIR})
CassSetCompilerFlags()
CassAddIncludes()
CassFindSourceFiles()
CassConfigure()

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR})
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR})
Expand Down
10 changes: 10 additions & 0 deletions cassconfig.hpp.in
@@ -0,0 +1,10 @@
#ifndef __CASSANDRA_CONFIG_HPP_INCLUDED__
#define __CASSANDRA_CONFIG_HPP_INCLUDED__

#cmakedefine HAVE_OPENSSL
#cmakedefine HAVE_STD_ATOMIC
#cmakedefine HAVE_BOOST_ATOMIC
#cmakedefine HAVE_NOSIGPIPE
#cmakedefine HAVE_SIGTIMEDWAIT

#endif
16 changes: 13 additions & 3 deletions cmake/modules/CppDriver.cmake
Expand Up @@ -4,6 +4,7 @@ cmake_minimum_required(VERSION 2.6.4)
# Includes
#-----------
include(FindPackageHandleStandardArgs)
include(CheckCXXSymbolExists)

#-----------
# Policies
Expand Down Expand Up @@ -305,10 +306,10 @@ macro(CassSetCompilerFlags)
# Enable specific cass::Atomic implementation
if(CASS_USE_BOOST_ATOMIC)
message(STATUS "Using boost::atomic implementation for atomic operations")
add_definitions(-DCASS_USE_BOOST_ATOMIC)
set(HAVE_BOOST_ATOMIC 1)
elseif(CASS_USE_STD_ATOMIC)
message(STATUS "Using std::atomic implementation for atomic operations")
add_definitions(-DCASS_USE_STD_ATOMIC)
set(HAVE_STD_ATOMIC 1)
endif()

# Assign compiler specific flags
Expand Down Expand Up @@ -478,7 +479,7 @@ macro(CassFindSourceFiles)
set(SRC_FILES ${SRC_FILES}
${CASS_SOURCE_DIR}/src/ssl/ssl_openssl_impl.cpp
${CASS_SOURCE_DIR}/src/ssl/ring_buffer_bio.cpp)
add_definitions(-DCASS_USE_OPENSSL)
set(HAVE_OPENSSL 1)
else()
set(INC_FILES ${INC_FILES}
${CASS_SOURCE_DIR}/src/ssl/ssl_no_impl.hpp)
Expand All @@ -494,3 +495,12 @@ macro(CassFindSourceFiles)
set_source_files_properties(${SRC_FILE} PROPERTIES COMPILE_FLAGS -DLOG_FILE_=\\\"${LOG_FILE_}\\\")
endforeach()
endmacro()

macro(CassConfigure)
check_cxx_symbol_exists(SO_NOSIGPIPE "sys/socket.h;sys/types.h" HAVE_NOSIGPIPE)
check_cxx_symbol_exists(sigtimedwait "signal.h" HAVE_SIGTIMEDWAIT)
if (NOT WIN32 AND NOT HAVE_NOSIGPIPE AND NOT HAVE_SIGTIMEDWAIT)
message(WARNING "Unable to handle SIGPIPE on your platform")
endif()
configure_file(${CASS_SOURCE_DIR}/cassconfig.hpp.in ${CASS_SOURCE_DIR}/src/cassconfig.hpp)
endmacro()
6 changes: 4 additions & 2 deletions src/atomic.hpp
Expand Up @@ -17,9 +17,11 @@
#ifndef __CASS_ATOMIC_HPP_INCLUDED__
#define __CASS_ATOMIC_HPP_INCLUDED__

#if defined(CASS_USE_BOOST_ATOMIC)
#include "cassconfig.hpp"

#if defined(HAVE_BOOST_ATOMIC)
#include "atomic/atomic_boost.hpp"
#elif defined(CASS_USE_STD_ATOMIC)
#elif defined(HAVE_STD_ATOMIC)
#include "atomic/atomic_std.hpp"
#else
#include "atomic/atomic_intrinsics.hpp"
Expand Down
16 changes: 16 additions & 0 deletions src/connection.cpp
Expand Up @@ -20,6 +20,7 @@
#include "auth_requests.hpp"
#include "auth_responses.hpp"
#include "cassandra.h"
#include "cassconfig.hpp"
#include "constants.hpp"
#include "connector.hpp"
#include "timer.hpp"
Expand All @@ -35,6 +36,11 @@
#include "logger.hpp"
#include "utils.hpp"

#ifdef HAVE_NOSIGPIPE
#include <sys/socket.h>
#include <sys/types.h>
#endif

#include <iomanip>
#include <sstream>

Expand Down Expand Up @@ -199,6 +205,16 @@ Connection::Connection(uv_loop_t* loop,
socket_.data = this;
uv_tcp_init(loop_, &socket_);


#ifdef HAVE_NOSIGPIPE
uv_os_fd_t fd = 0;
int enabled = 1;
if (uv_fileno(copy_cast<uv_tcp_t*, uv_handle_t*>(&socket_), &fd) != 0 ||
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&enabled, sizeof(int)) != 0) {
LOG_WARN("Unable to set socket option SO_NOSIGPIPE");
}
#endif

if (uv_tcp_nodelay(&socket_,
config.tcp_nodelay_enable() ? 1 : 0) != 0) {
LOG_WARN("Unable to set tcp nodelay");
Expand Down
70 changes: 52 additions & 18 deletions src/loop_thread.hpp
Expand Up @@ -17,13 +17,39 @@
#ifndef __CASS_LOOP_THREAD_HPP_INCLUDED__
#define __CASS_LOOP_THREAD_HPP_INCLUDED__

#include "cassconfig.hpp"
#include "logger.hpp"
#include "macros.hpp"

#include <assert.h>
#include <uv.h>

#if !defined(_WIN32)
#include <signal.h>
#endif

namespace cass {

#if defined(HAVE_SIGTIMEDWAIT) && !defined(HAVE_NOSIGPIPE)
static int block_sigpipe() {
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGPIPE);
return pthread_sigmask(SIG_BLOCK, &set, NULL);
}

static void consume_blocked_sigpipe() {
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGPIPE);
struct timespec ts = { 0, 0 };
int num = sigtimedwait(&set, NULL, &ts);
if (num > 0) {
LOG_WARN("Caught and ignored SIGPIPE on loop thread");
}
}
#endif

class LoopThread {
public:
LoopThread()
Expand All @@ -34,9 +60,9 @@ class LoopThread {
#endif
, is_joinable_(false) {}

virtual ~LoopThread() {
virtual ~LoopThread() {
#if UV_VERSION_MAJOR == 0
uv_loop_delete(loop_);
uv_loop_delete(loop_);
#else
if (is_loop_initialized_) {
uv_loop_close(&loop_);
Expand All @@ -52,18 +78,21 @@ class LoopThread {
is_loop_initialized_ = true;
#endif

#if !defined(_WIN32)
rc = uv_signal_init(loop(), &sigpipe_);
#if defined(HAVE_SIGTIMEDWAIT) && !defined(HAVE_NOSIGPIPE)
rc = block_sigpipe();
if (rc != 0) return rc;
rc = uv_prepare_init(loop(), &prepare_);
if (rc != 0) return rc;
rc = uv_prepare_start(&prepare_, on_prepare);
if (rc != 0) return rc;
rc = uv_signal_start(&sigpipe_, on_signal, SIGPIPE);
#endif
return rc;
}

void close_handles() {
#if !defined(_WIN32)
uv_signal_stop(&sigpipe_);
uv_close(copy_cast<uv_signal_t*, uv_handle_t*>(&sigpipe_), NULL);
#if defined(HAVE_SIGTIMEDWAIT) && !defined(HAVE_NOSIGPIPE)
uv_prepare_stop(&prepare_);
uv_close(copy_cast<uv_prepare_t*, uv_handle_t*>(&prepare_), NULL);
#endif
}

Expand Down Expand Up @@ -100,25 +129,30 @@ class LoopThread {
thread->on_after_run();
}

#if !defined(_WIN32)
static void on_signal(uv_signal_t* signal, int signum) {
// Ignore SIGPIPE
}
#endif

#if UV_VERSION_MAJOR == 0
uv_loop_t* loop_;
#else
uv_loop_t loop_;
bool is_loop_initialized_;
#endif

uv_thread_t thread_;
bool is_joinable_;
#if defined(HAVE_SIGTIMEDWAIT) && !defined(HAVE_NOSIGPIPE)

#if !defined(_WIN32)
uv_signal_t sigpipe_;
#if UV_VERSION_MAJOR == 0
static void on_prepare(uv_prepare_t *prepare, int status) {
consume_blocked_sigpipe();
}
#else
static void on_prepare(uv_prepare_t *prepare) {
consume_blocked_sigpipe();
}
#endif

uv_prepare_t prepare_;
#endif

uv_thread_t thread_;
bool is_joinable_;
};

} // namespace cass
Expand Down
3 changes: 2 additions & 1 deletion src/mpmc_queue.hpp
Expand Up @@ -23,6 +23,7 @@
#define __CASS_MPMC_QUEUE_INCLUDED__

#include "atomic.hpp"
#include "cassconfig.hpp"
#include "utils.hpp"
#include "macros.hpp"

Expand Down Expand Up @@ -132,7 +133,7 @@ class MPMCQueue {
}

static void memory_fence() {
#if defined(CASS_USE_BOOST_ATOMIC) || defined(CASS_USE_STD_ATOMIC)
#if defined(HAVE_BOOST_ATOMIC) || defined(HAVE_STD_ATOMIC)
atomic_thread_fence(MEMORY_ORDER_SEQ_CST);
#endif
}
Expand Down
3 changes: 2 additions & 1 deletion src/spsc_queue.hpp
Expand Up @@ -30,6 +30,7 @@
#include <assert.h>

#include "atomic.hpp"
#include "cassconfig.hpp"
#include "utils.hpp"
#include "macros.hpp"

Expand Down Expand Up @@ -80,7 +81,7 @@ class SPSCQueue {
// before storing the data into the queue causing the data in the queue
// not to be consumed. This fence ensures that the load happens after the
// data has been store in the queue.
#if defined(CASS_USE_BOOST_ATOMIC) || defined(CASS_USE_STD_ATOMIC)
#if defined(HAVE_BOOST_ATOMIC) || defined(HAVE_STD_ATOMIC)
atomic_thread_fence(MEMORY_ORDER_SEQ_CST);
#endif
}
Expand Down
3 changes: 2 additions & 1 deletion src/ssl.hpp
Expand Up @@ -18,6 +18,7 @@
#define __CASS_SSL_HPP_INCLUDED__

#include "cassandra.h"
#include "cassconfig.hpp"
#include "host.hpp"
#include "ref_counted.hpp"
#include "ring_buffer.hpp"
Expand Down Expand Up @@ -100,7 +101,7 @@ class SslContextFactoryBase {

} // namespace cass

#ifdef CASS_USE_OPENSSL
#ifdef HAVE_OPENSSL
#include "ssl/ssl_openssl_impl.hpp"
#else
#include "ssl/ssl_no_impl.hpp"
Expand Down

0 comments on commit c0be102

Please sign in to comment.