Permalink
Browse files

pillowfight.cc: use pthreads

Change-Id: Ie9b4847e255bcb5a29c0566329da598920234a1d
Reviewed-on: http://review.couchbase.org/25111
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
  • Loading branch information...
1 parent b0fbf0f commit 1d3182c36c2697a433b693c50401b476a2967b8e @avsej avsej committed with trondn Mar 12, 2013
Showing with 81 additions and 13 deletions.
  1. +1 −1 Makefile.am
  2. +80 −12 example/pillowfight/pillowfight.cc
View
@@ -432,7 +432,7 @@ TESTS=${check_PROGRAMS}
example_pillowfight_pillowfight_SOURCES = tools/commandlineparser.cc \
example/pillowfight/pillowfight.cc
-example_pillowfight_pillowfight_LDADD = libcouchbase.la
+example_pillowfight_pillowfight_LDADD = libcouchbase.la -lpthread
example_debug_debug_SOURCES = example/debug/debug.cc
example_debug_debug_LDADD = libcouchbase_debug.la
@@ -30,6 +30,7 @@
#include <getopt.h>
#include "tools/commandlineparser.h"
#include <signal.h>
+#include <pthread.h>
using namespace std;
@@ -199,12 +200,15 @@ class InstancePool
{
public:
InstancePool(size_t size): io(NULL) {
+ pthread_mutex_init(&mutex, NULL);
+ pthread_cond_init(&cond, NULL);
+
if (config.getNumThreads() == 1) {
/* allow to share IO object in single-thread only */
lcb_error_t err = lcb_create_io_ops(&io, NULL);
if (err != LCB_SUCCESS) {
std::cerr << "Failed to create IO option: "
- << lcb_strerror(NULL, err) << std::endl;
+ << lcb_strerror(NULL, err) << std::endl;
exit(EXIT_FAILURE);
}
}
@@ -261,14 +265,22 @@ class InstancePool
}
lcb_t pop() {
+ pthread_mutex_lock(&mutex);
+ while (queue.empty()) {
+ pthread_cond_wait(&cond, &mutex);
+ }
assert(!queue.empty());
lcb_t ret = queue.front();
queue.pop();
+ pthread_mutex_unlock(&mutex);
return ret;
}
void push(lcb_t inst) {
+ pthread_mutex_lock(&mutex);
queue.push(inst);
+ pthread_cond_signal(&cond);
+ pthread_mutex_unlock(&mutex);
}
static void dumpTimings(lcb_t instance, std::string header) {
@@ -286,6 +298,8 @@ class InstancePool
std::queue<lcb_t> queue;
std::list<lcb_t> handles;
lcb_io_opt_t io;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
};
@@ -301,9 +315,6 @@ class ThreadContext
}
bool run() {
- if (config.isLoop()) {
- std::cerr << "Running in a loop. Press Ctrl-C to terminate..." << std::endl;
- }
do {
bool pending = false;
lcb_t instance = pool->pop();
@@ -590,7 +601,8 @@ static void handle_options(int argc, char **argv)
}
}
-ThreadContext *ctx = NULL;
+std::list<ThreadContext *> contexts;
+InstancePool *pool = NULL;
static void setup_sigint_handler(void (handler)(int));
@@ -601,13 +613,19 @@ extern "C" {
static void cruel_handler(int)
{
- delete ctx;
+ std::list<ThreadContext *>::iterator it;
+ for (it = contexts.begin(); it != contexts.end(); ++it) {
+ delete *it;
+ }
+ delete pool;
exit(EXIT_FAILURE);
}
static void gentle_handler(int)
{
config.setLoop(false);
+ std::cerr << "Termination requested. Waiting threads to finish. "
+ << "Ctrl-C to force termination." << std::endl;
setup_sigint_handler(cruel_handler);
}
@@ -621,6 +639,18 @@ static void setup_sigint_handler(void (handler)(int))
sigaction(SIGINT, &action, NULL);
}
+extern "C" {
+ static void *thread_worker(void *);
+}
+
+static void *thread_worker(void *arg)
+{
+ ThreadContext *ctx = static_cast<ThreadContext *>(arg);
+ ctx->populate(0, config.maxKey);
+ ctx->run();
+ pthread_exit(NULL);
+}
+
/**
* Program entry point
* @param argc argument count
@@ -629,17 +659,55 @@ static void setup_sigint_handler(void (handler)(int))
*/
int main(int argc, char **argv)
{
+ int exit_code = EXIT_SUCCESS, rc;
+
setup_sigint_handler(SIG_IGN);
+
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
handle_options(argc, argv);
- InstancePool pool(config.getNumInstances());
+ pool = new InstancePool(config.getNumInstances());
setup_sigint_handler(gentle_handler);
+ if (config.isLoop()) {
+ std::cerr << "Running in a loop. Press Ctrl-C to terminate..." << std::endl;
+ }
- ctx = new ThreadContext(&pool);
- ctx->populate(0, config.maxKey);
- ctx->run();
+ std::list<pthread_t> threads;
+ for (int ii = 0; ii < config.getNumThreads(); ++ii) {
+ ThreadContext *ctx = new ThreadContext(pool);
+ contexts.push_back(ctx);
- delete ctx;
+ pthread_t tid;
+ rc = pthread_create(&tid, &attr, thread_worker, ctx);
+ if (rc) {
+ std::cerr << "Failed to create thread: " << rc;
+ exit_code = EXIT_FAILURE;
+ break;
+ }
+ threads.push_back(tid);
+ }
- return 0;
+ if (contexts.size() == config.getNumThreads()) {
+ for (std::list<pthread_t>::iterator it = threads.begin();
+ it != threads.end(); ++it) {
+ rc = pthread_join(*it, NULL);
+ if (rc) {
+ std::cerr << "Failed to join thread: " << rc;
+ exit_code = EXIT_FAILURE;
+ break;
+ }
+ }
+ }
+
+ for (std::list<ThreadContext *>::iterator it = contexts.begin();
+ it != contexts.end(); ++it) {
+ delete *it;
+ }
+ delete pool;
+ pthread_attr_destroy(&attr);
+ pthread_exit(NULL);
+ return exit_code;
}

0 comments on commit 1d3182c

Please sign in to comment.