Skip to content

Commit

Permalink
THRIFT-1361 Optional replacement of pthread by boost::thread
Browse files Browse the repository at this point in the history
Patch: alexandre parenteau

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1178176 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
bufferoverflow committed Oct 2, 2011
1 parent 3516e0e commit 3faaedf
Show file tree
Hide file tree
Showing 27 changed files with 762 additions and 83 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
/lib/rb/ext/thrift_native.so
/lib/rb/spec/gen-*
/lib/rb/test/
/lib/rb/thrift-*.gem
/lib/php/Makefile
/lib/php/Makefile.in
/lib/php/src/ext/thrift_protocol/.deps
Expand Down
17 changes: 17 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,23 @@ AC_SUBST(GCOV_CFLAGS)
AC_SUBST(GCOV_CXXFLAGS)
AC_SUBST(GCOV_LDFLAGS)

AC_ARG_ENABLE(boostthreads,
[ --enable-boostthreads use boost threads, instead of POSIX pthread (experimental) ],
[case "${enableval}" in
yes) ENABLE_BOOSTTHREADS=1 ;;
no) ENABLE_BOOSTTHREADS=0 ;;
*) AC_MSG_ERROR(bad value ${enableval} for --enable-cov) ;;
esac],
[ENABLE_BOOSTTHREADS=2])


if test "x[$]ENABLE_BOOSTTHREADS" = "x1"; then
AC_MSG_WARN(enable boostthreads)
AC_DEFINE([USE_BOOST_THREAD], [1], [experimental --enable-boostthreads that replaces POSIX pthread by boost::thread])
fi

AM_CONDITIONAL([WITH_BOOSTTHREADS], [test "x[$]ENABLE_BOOSTTHREADS" = "x1"])

AC_CONFIG_HEADERS(config.h:config.hin)

AC_CONFIG_FILES([
Expand Down
19 changes: 16 additions & 3 deletions lib/cpp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ endif

AM_CXXFLAGS = -Wall
AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(srcdir)/src
AM_LDFLAGS = $(BOOST_LDFLAGS)

# Define the source files for the module

libthrift_la_SOURCES = src/Thrift.cpp \
src/TApplicationException.cpp \
src/concurrency/Mutex.cpp \
src/concurrency/Monitor.cpp \
src/concurrency/PosixThreadFactory.cpp \
src/concurrency/ThreadManager.cpp \
src/concurrency/TimerManager.cpp \
src/concurrency/Util.cpp \
Expand Down Expand Up @@ -77,6 +75,17 @@ libthrift_la_SOURCES = src/Thrift.cpp \
src/async/TAsyncChannel.cpp \
src/processor/PeekProcessor.cpp

if WITH_BOOSTTHREADS
libthrift_la_SOURCES += src/concurrency/BoostThreadFactory.cpp \
src/concurrency/BoostMonitor.cpp \
src/concurrency/BoostMutex.cpp
else
libthrift_la_SOURCES += src/concurrency/Mutex.cpp \
src/concurrency/Monitor.cpp \
src/concurrency/PosixThreadFactory.cpp
endif


libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \
src/async/TAsyncProtocolProcessor.cpp \
src/async/TEvhttpServer.cpp \
Expand All @@ -91,6 +100,10 @@ libthriftz_la_CPPFLAGS = $(AM_CPPFLAGS) $(ZLIB_CPPFLAGS)
libthriftnb_la_CXXFLAGS = $(AM_CXXFLAGS)
libthriftz_la_CXXFLAGS = $(AM_CXXFLAGS)

if WITH_BOOSTTHREADS
libthrift_la_LIBADD = -lboost_thread
endif

include_thriftdir = $(includedir)/thrift
include_thrift_HEADERS = \
$(top_builddir)/config.h \
Expand Down
203 changes: 203 additions & 0 deletions lib/cpp/src/concurrency/BoostMonitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include "Monitor.h"
#include "Exception.h"
#include "Util.h"

#include <assert.h>
#include <errno.h>

#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

namespace apache { namespace thrift { namespace concurrency {

using namespace boost::interprocess;

/**
* Monitor implementation using the boost interprocess library
*
* @version $Id:$
*/
class Monitor::Impl : public interprocess_condition {

public:

Impl()
: ownedMutex_(new Mutex()),
mutex_(NULL) {
init(ownedMutex_.get());
}

Impl(Mutex* mutex)
: mutex_(NULL) {
init(mutex);
}

Impl(Monitor* monitor)
: mutex_(NULL) {
init(&(monitor->mutex()));
}

Mutex& mutex() { return *mutex_; }
void lock() { mutex().lock(); }
void unlock() { mutex().unlock(); }

/**
* Exception-throwing version of waitForTimeRelative(), called simply
* wait(int64) for historical reasons. Timeout is in milliseconds.
*
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
void wait(int64_t timeout_ms) {
int result = waitForTimeRelative(timeout_ms);
if (result == ETIMEDOUT) {
throw TimedOutException();
} else if (result != 0) {
throw TException(
"Monitor::wait() failed");
}
}

/**
* Waits until the specified timeout in milliseconds for the condition to
* occur, or waits forever if timeout_ms == 0.
*
* Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
*/
int waitForTimeRelative(int64_t timeout_ms) {
if (timeout_ms == 0LL) {
return waitForever();
}

assert(mutex_);
interprocess_mutex* mutexImpl =
reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);

scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT;
lock.release();
return res;
}

/**
* Waits until the absolute time specified using struct timespec.
* Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const timespec* abstime) {
assert(mutex_);
interprocess_mutex* mutexImpl =
reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);

struct timespec currenttime;
Util::toTimespec(currenttime, Util::currentTime());

long tv_sec = abstime->tv_sec - currenttime.tv_sec;
long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec;
if(tv_sec < 0)
tv_sec = 0;
if(tv_nsec < 0)
tv_nsec = 0;

scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
int res = timed_wait(lock, boost::get_system_time() +
boost::posix_time::seconds(tv_sec) +
boost::posix_time::microseconds(tv_nsec / 1000)
) ? 0 : ETIMEDOUT;
lock.release();
return res;
}

/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
int waitForever() {
assert(mutex_);
interprocess_mutex* mutexImpl =
reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);

scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
((interprocess_condition*)this)->wait(lock);
lock.release();
return 0;
}


void notify() {
notify_one();
}

void notifyAll() {
notify_all();
}

private:

void init(Mutex* mutex) {
mutex_ = mutex;
}

boost::scoped_ptr<Mutex> ownedMutex_;
Mutex* mutex_;
};

Monitor::Monitor() : impl_(new Monitor::Impl()) {}
Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}

Monitor::~Monitor() { delete impl_; }

Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); }

void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); }

void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); }

void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); }

int Monitor::waitForTime(const timespec* abstime) const {
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}

int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
}

int Monitor::waitForever() const {
return const_cast<Monitor::Impl*>(impl_)->waitForever();
}

void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); }

void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); }

}}} // apache::thrift::concurrency
59 changes: 59 additions & 0 deletions lib/cpp/src/concurrency/BoostMutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include "Mutex.h"
#include "Util.h"

#include <cassert>
#include <boost/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>

using namespace boost::interprocess;

namespace apache { namespace thrift { namespace concurrency {

/**
* Implementation of Mutex class using boost interprocess mutex
*
* @version $Id:$
*/
class Mutex::impl : public interprocess_mutex {
};

Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {}

void* Mutex::getUnderlyingImpl() const { return impl_.get(); }

void Mutex::lock() const { impl_->lock(); }

bool Mutex::trylock() const { return impl_->try_lock(); }

bool Mutex::timedlock(int64_t ms) const { return impl_->timed_lock(boost::get_system_time()+boost::posix_time::milliseconds(ms)); }

void Mutex::unlock() const { impl_->unlock(); }

void Mutex::DEFAULT_INITIALIZER(void* arg) {
}

}}} // apache::thrift::concurrency

Loading

0 comments on commit 3faaedf

Please sign in to comment.