Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Extend pillowfight a bit

Change-Id: I575bcfba3951f1e5327ee0c22aae9043f28cf2db
Reviewed-on: http://review.couchbase.org/24987
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Tested-by: Trond Norbye <trond.norbye@gmail.com>
  • Loading branch information...
commit ecc4f62efd16a1055113b45c40a6d803e9af1421 1 parent 6c2e985
@trondn trondn authored
Showing with 160 additions and 84 deletions.
  1. +160 −84 example/pillowfight/pillowfight.cc
View
244 example/pillowfight/pillowfight.cc
@@ -1,4 +1,4 @@
-/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2011-2012 Couchbase, Inc.
*
@@ -21,11 +21,12 @@
#include <iostream>
#include <map>
#include <sstream>
+#include <queue>
#include <cstring>
+#include <cassert>
#include <cstdio>
#include <cstdlib>
#include <getopt.h>
-
#include "tools/commandlineparser.h"
@@ -41,7 +42,10 @@ class Configuration
setprc(33),
prefix(""),
maxSize(1024),
- numThreads(1) {
+ numThreads(1),
+ numInstances(1),
+ timings(false),
+ loop(false) {
// @todo initialize the random sequence in seqno
data = static_cast<void *>(new char[maxSize]);
}
@@ -110,6 +114,29 @@ class Configuration
numThreads = val;
}
+ void setNumInstances(uint32_t val) {
+ numInstances = val;
+ }
+
+ uint32_t getNumInstances(void) {
+ return numInstances;
+ }
+
+ bool isTimings(void) {
+ return timings;
+ }
+
+ void setTimings(bool val) {
+ timings = val;
+ }
+
+ bool isLoop(void) {
+ return loop;
+ }
+
+ void setLoop(bool val) {
+ loop = val;
+ }
void *data;
std::string host;
@@ -124,7 +151,9 @@ class Configuration
std::string prefix;
uint32_t maxSize;
uint32_t numThreads;
-
+ uint32_t numInstances;
+ bool timings;
+ bool loop;
} config;
extern "C" {
@@ -141,67 +170,108 @@ extern "C" {
lcb_uint32_t);
}
-class ThreadContext
+class InstancePool
{
public:
- ThreadContext() :
- currSeqno(0), instance(NULL) {
- // @todo fill the random seqnos
-
- }
- ~ThreadContext() {
- if (instance != NULL) {
- lcb_destroy(instance);
+ InstancePool(size_t size, lcb_io_opt_t io) {
+ for (size_t ii = 0; ii < size; ++ii) {
+ lcb_t instance;
+ std::cout << "\rCreating instance " << ii;
+ std::cout.flush();
+
+ struct lcb_create_st options(config.getHost(),
+ config.getUser(),
+ config.getPasswd(),
+ config.getBucket(),
+ io);
+
+ lcb_error_t error = lcb_create(&instance, &options);
+ if (error == LCB_SUCCESS) {
+ (void)lcb_set_store_callback(instance, storageCallback);
+ (void)lcb_set_get_callback(instance, getCallback);
+ queue.push(instance);
+ } else {
+ std::cout << std::endl;
+ std::cerr << "Failed to create instance: "
+ << lcb_strerror(NULL, error) << std::endl;
+ exit(EXIT_FAILURE);
+ }
+ if (config.isTimings()) {
+ if ((error = lcb_enable_timings(instance)) != LCB_SUCCESS) {
+ std::cerr << "Failed to enable timings!: "
+ << lcb_strerror(instance, error) << std::endl;
+ }
+ }
+ lcb_connect(instance);
+ lcb_wait(instance);
+ error = lcb_get_last_error(instance);
+ if (error != LCB_SUCCESS) {
+ std::cerr << "Failed to connect: "
+ << lcb_strerror(instance, error) << std::endl;
+ exit(EXIT_FAILURE);
+ }
}
+ std::cout << std::endl;
}
- bool create(void) {
- lcb_io_opt_t io;
- if (lcb_create_io_ops(&io, NULL) != LCB_SUCCESS) {
- std::cerr << "Failed to create an IO instance" << std::endl;
- return false;
+
+ ~InstancePool() {
+ while (!queue.empty()) {
+ lcb_destroy(queue.front());
+ queue.pop();
}
+ }
- struct lcb_create_st options(config.getHost(), config.getUser(),
- config.getPasswd(), config.getBucket(),
- io);
+ lcb_t pop() {
+ assert(!queue.empty());
+ lcb_t ret = queue.front();
+ queue.pop();
+ return ret;
+ }
- if (lcb_create(&instance, &options) == LCB_SUCCESS) {
- (void)lcb_set_store_callback(instance, storageCallback);
- (void)lcb_set_get_callback(instance, getCallback);
- return true;
- } else {
- return false;
- }
+ void push(lcb_t inst) {
+ queue.push(inst);
}
- bool connect(void) {
- if ((error = lcb_connect(instance)) != LCB_SUCCESS) {
- std::cerr << "Failed to connect: "
- << lcb_strerror(instance, error) << std::endl;
- return false;
- }
+ static void dumpTimings(lcb_t instance, std::string header) {
+ std::stringstream ss;
+ ss << header << std::endl;
+ ss << " +---------+---------+---------+---------+" << std::endl;
+ lcb_get_timings(instance, reinterpret_cast<void *>(&ss),
+ timingsCallback);
+ ss << " +----------------------------------------" << endl;
+ std::cout << ss.str();
+ }
- lcb_wait(instance);
- error = lcb_get_last_error(instance);
- if (error != LCB_SUCCESS) {
- std::cerr << "Failed to connect: "
- << lcb_strerror(instance, error) << std::endl;
- return false;
+
+private:
+ std::queue<lcb_t> queue;
+};
+
+
+class ThreadContext
+{
+public:
+ ThreadContext(size_t poolSize) :
+ currSeqno(0), pool(0) {
+ // @todo fill the random seqnos
+ lcb_io_opt_t io;
+ lcb_error_t err = lcb_create_io_ops(&io, NULL);
+ if (err != LCB_SUCCESS) {
+ std::cerr << "Failed to create IO option: "
+ << lcb_strerror(NULL, err) << std::endl;
+ exit(EXIT_FAILURE);
}
+ pool = new InstancePool(poolSize, io);
+ }
- return true;
+ ~ThreadContext() {
+ delete pool;
}
bool run(bool loop) {
do {
- bool timings = true;
- if ((error = lcb_enable_timings(instance)) != LCB_SUCCESS) {
- std::cerr << "Failed to enable timings!: "
- << lcb_strerror(instance, error) << std::endl;
- timings = false;
- }
-
bool pending = false;
+ lcb_t instance = pool->pop();
for (uint32_t ii = 0; ii < config.iterations; ++ii) {
std::string key;
generateKey(key);
@@ -231,8 +301,7 @@ class ThreadContext
if (ii % 10 == 0) {
lcb_wait(instance);
} else {
- lcb_wait(instance);
- //pending = true;
+ pending = true;
}
}
@@ -240,22 +309,21 @@ class ThreadContext
lcb_wait(instance);
}
- if (timings) {
- dumpTimings("Run");
- lcb_disable_timings(instance);
+ if (config.isTimings()) {
+ pool->dumpTimings(instance, "Run");
}
+
+ pool->push(instance);
+
} while (loop);
return true;
}
bool populate(uint32_t start, uint32_t stop) {
- bool timings = true;
- if ((error = lcb_enable_timings(instance)) != LCB_SUCCESS) {
- std::cerr << "Failed to enable timings!: "
- << lcb_strerror(instance, error) << std::endl;
- timings = false;
- }
+
+ bool timings = config.isTimings();
+ lcb_t instance = pool->pop();
for (uint32_t ii = start; ii < stop; ++ii) {
std::string key;
@@ -277,9 +345,9 @@ class ThreadContext
}
if (timings) {
- dumpTimings("Populate");
- lcb_disable_timings(instance);
+ pool->dumpTimings(instance, "Populate");
}
+ pool->push(instance);
return true;
}
@@ -296,16 +364,6 @@ class ThreadContext
error = e;
}
- void dumpTimings(std::string header) {
- std::stringstream ss;
- ss << header << std::endl;
- ss << " +---------+---------+---------+---------+" << std::endl;
- lcb_get_timings(instance, reinterpret_cast<void *>(&ss),
- timingsCallback);
- ss << " +----------------------------------------" << endl;
- std::cout << ss.str();
- }
-
private:
uint32_t nextSeqno() {
uint32_t ret = seqno[currSeqno];
@@ -331,8 +389,8 @@ class ThreadContext
uint32_t seqno[8192];
uint32_t currSeqno;
- lcb_t instance;
lcb_error_t error;
+ InstancePool *pool;
};
static void storageCallback(lcb_t, const void *cookie,
@@ -410,11 +468,20 @@ static void handle_options(int argc, char **argv)
"Username for the rest port"));
getopt.addOption(new CommandLineOption('P', "password", true,
"password for the rest port"));
- getopt.addOption(new CommandLineOption('i', "iterations", true, "Number of iterations to run"));
- getopt.addOption(new CommandLineOption('I', "num-items", true, "Number of items to operate on"));
- getopt.addOption(new CommandLineOption('p', "key-prefix", true, "Use the following prefix for keys"));
- getopt.addOption(new CommandLineOption('t', "num-threads", true, "The number of threads to use"));
- /* getopt.addOption(new CommandLineOption()); */
+ getopt.addOption(new CommandLineOption('i', "iterations", true,
+ "Number of iterations to run"));
+ getopt.addOption(new CommandLineOption('I', "num-items", true,
+ "Number of items to operate on"));
+ getopt.addOption(new CommandLineOption('p', "key-prefix", true,
+ "Use the following prefix for keys"));
+ getopt.addOption(new CommandLineOption('t', "num-threads", true,
+ "The number of threads to use"));
+ getopt.addOption(new CommandLineOption('Q', "num-instances", true,
+ "The number of instances to use"));
+ getopt.addOption(new CommandLineOption('l', "loop", false,
+ "Loop running load"));
+ getopt.addOption(new CommandLineOption('T', "timings", false,
+ "Enable internal timings"));
/* getopt.addOption(new CommandLineOption()); */
if (!getopt.parse(argc, argv)) {
@@ -458,13 +525,26 @@ static void handle_options(int argc, char **argv)
config.setNumThreads(atoi((*iter)->argument));
break;
+ case 'Q' :
+ config.setNumInstances(atoi((*iter)->argument));
+ break;
+
+ case 'l' :
+ config.setLoop(true);
+ break;
+
+ case 'T' :
+ config.setTimings(true);
+ break;
+
case '?':
getopt.usage(argv[0]);
exit(EXIT_FAILURE);
+
default:
+ std::cerr << "Unhandled option.. Fix the code!" << std::endl;
abort();
}
-
}
}
}
@@ -479,13 +559,9 @@ int main(int argc, char **argv)
{
handle_options(argc, argv);
- ThreadContext ctx;
- if (!ctx.create() || !ctx.connect()) {
- return 1;
- }
-
+ ThreadContext ctx(config.getNumInstances());
ctx.populate(0, config.maxKey);
- ctx.run(true);
+ ctx.run(config.isLoop());
return 0;
}
Please sign in to comment.
Something went wrong with that request. Please try again.