Skip to content

Commit

Permalink
Merge pull request #4931 from yuyuyu101/wip-perf-msgr
Browse files Browse the repository at this point in the history
PerfMsgr: A tool to benchmark messenger module

Reviewed-by: Greg Farnum <gfarnum@redhat.com>
  • Loading branch information
gregsfortytwo committed Jun 18, 2015
2 parents 952836a + ce86b0a commit 459369e
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ceph.spec.in
Expand Up @@ -949,6 +949,8 @@ ln -sf %{_libdir}/librbd.so.1 /usr/lib64/qemu/librbd.so.1
%{_bindir}/ceph_omapbench
%{_bindir}/ceph_perf_objectstore
%{_bindir}/ceph_perf_local
%{_bindir}/ceph_perf_msgr_client
%{_bindir}/ceph_perf_msgr_server
%{_bindir}/ceph_psim
%{_bindir}/ceph_radosacl
%{_bindir}/ceph_rgw_jsonparser
Expand Down
2 changes: 2 additions & 0 deletions debian/ceph-test.install
Expand Up @@ -7,6 +7,8 @@ usr/bin/ceph_erasure_code_benchmark
usr/bin/ceph_omapbench
usr/bin/ceph_perf_objectstore
usr/bin/ceph_perf_local
usr/bin/ceph_perf_msgr_client
usr/bin/ceph_perf_msgr_server
usr/bin/ceph_psim
usr/bin/ceph_radosacl
usr/bin/ceph_rgw_jsonparser
Expand Down
33 changes: 33 additions & 0 deletions doc/dev/messenger.rst
@@ -0,0 +1,33 @@
============================
Messenger notes
============================

Messenger is the Ceph network layer implementation. Currently Ceph supports
three messenger type "simple", "async" and "xio". The latter two are both
experiment features and shouldn't use them in production environment.

ceph_perf_msgr
==============

ceph_perf_msgr is used to do benchmark for messenger module only and can help
to find the bottleneck or time consuming within messenger moduleIt just like
"iperf", we need to start server-side program firstly:

# ./ceph_perf_msgr_server 172.16.30.181:10001 0

The first argument is ip:port pair which is telling the destination address the
client need to specified. The second argument tells the "think time" when
dispatching messages. After Giant, CEPH_OSD_OP message which is the actual client
read/write io request is fast dispatched without queueing to Dispatcher, in order
to achieve better performance. So CEPH_OSD_OP message will be processed inline,
"think time" is used by mock this "inline process" process.

# ./ceph_perf_msgr_client 172.16.30.181:10001 1 32 10000 10 4096

The first argument is specified the server ip:port, and the second argument is
used to specify client threads. The third argument specify the concurrency(the
max inflight messages for each client thread), the fourth argument specify the
io numbers will be issued to server per client thread. The fifth argument is
used to indicate the "think time" for client thread when receiving messages,
this is also used to mock the client fast dispatch process. The last argument
specify the message data length to issue.
10 changes: 10 additions & 0 deletions src/test/Makefile-server.am
Expand Up @@ -39,6 +39,16 @@ ceph_perf_local_CXXFLAGS = ${AM_CXXFLAGS} \
noinst_HEADERS += test/perf_helper.h
bin_DEBUGPROGRAMS += ceph_perf_local

ceph_perf_msgr_server_SOURCES = test/msgr/perf_msgr_server.cc
ceph_perf_msgr_server_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
ceph_perf_msgr_server_CXXFLAGS = $(UNITTEST_CXXFLAGS)
bin_DEBUGPROGRAMS += ceph_perf_msgr_server

ceph_perf_msgr_client_SOURCES = test/msgr/perf_msgr_client.cc
ceph_perf_msgr_client_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
ceph_perf_msgr_client_CXXFLAGS = $(UNITTEST_CXXFLAGS)
bin_DEBUGPROGRAMS += ceph_perf_msgr_client

if LINUX
ceph_test_objectstore_SOURCES = test/objectstore/store_test.cc
ceph_test_objectstore_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL)
Expand Down
205 changes: 205 additions & 0 deletions src/test/msgr/perf_msgr_client.cc
@@ -0,0 +1,205 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Haomai Wang
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#include <stdlib.h>
#include <stdint.h>
#include <string>
#include <unistd.h>
#include <iostream>

using namespace std;

#include "include/atomic.h"
#include "common/ceph_argparse.h"
#include "common/debug.h"
#include "common/Cycles.h"
#include "global/global_init.h"
#include "msg/Messenger.h"
#include "messages/MOSDOp.h"

class MessengerClient {
class ClientThread;
class ClientDispatcher : public Dispatcher {
uint64_t think_time;
ClientThread *thread;

public:
ClientDispatcher(uint64_t delay, ClientThread *t): Dispatcher(g_ceph_context), think_time(delay), thread(t) {}
bool ms_can_fast_dispatch_any() const { return true; }
bool ms_can_fast_dispatch(Message *m) const {
switch (m->get_type()) {
case CEPH_MSG_OSD_OPREPLY:
return true;
default:
return false;
}
}

void ms_handle_fast_connect(Connection *con) {}
void ms_handle_fast_accept(Connection *con) {}
bool ms_dispatch(Message *m) { return true; }
void ms_fast_dispatch(Message *m);
bool ms_handle_reset(Connection *con) { return true; }
void ms_handle_remote_reset(Connection *con) {}
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
bufferlist& authorizer, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key) {
isvalid = true;
return true;
}
};

class ClientThread : public Thread {
Messenger *msgr;
int concurrent;
ConnectionRef conn;
atomic_t client_inc;
object_t oid;
object_locator_t oloc;
pg_t pgid;
int msg_len;
bufferlist data;
int ops;
ClientDispatcher dispatcher;

public:
Mutex lock;
Cond cond;
uint64_t inflight;

ClientThread(Messenger *m, int c, ConnectionRef con, int len, int ops, int think_time_us):
msgr(m), concurrent(c), conn(con), client_inc(0), oid("object-name"), oloc(1, 1), msg_len(len), ops(ops),
dispatcher(think_time_us, this), lock("MessengerBenchmark::ClientThread::lock") {
m->add_dispatcher_head(&dispatcher);
bufferptr ptr(msg_len);
memset(ptr.c_str(), 0, msg_len);
data.append(ptr);
}
void *entry() {
lock.Lock();
for (int i = 0; i < ops; ++i) {
if (inflight > uint64_t(concurrent)) {
cond.Wait(lock);
}
MOSDOp *m = new MOSDOp(client_inc.read(), 0, oid, oloc, pgid, 0, 0, 0);
m->write(0, msg_len, data);
inflight++;
conn->send_message(m);
//cerr << __func__ << " send m=" << m << std::endl;
}
lock.Unlock();
msgr->shutdown();
return 0;
}
};

string type;
string serveraddr;
int think_time_us;
vector<Messenger*> msgrs;
vector<ClientThread*> clients;

public:
MessengerClient(string t, string addr, int delay):
type(t), serveraddr(addr), think_time_us(delay) {
}
~MessengerClient() {
for (uint64_t i = 0; i < clients.size(); ++i)
delete clients[i];
for (uint64_t i = 0; i < msgrs.size(); ++i) {
msgrs[i]->shutdown();
msgrs[i]->wait();
}
}
void ready(int c, int jobs, int ops, int msg_len) {
entity_addr_t addr;
addr.parse(serveraddr.c_str());
addr.set_nonce(0);
for (int i = 0; i < jobs; ++i) {
Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
entity_inst_t inst(entity_name_t::OSD(0), addr);
ConnectionRef conn = msgr->get_connection(inst);
ClientThread *t = new ClientThread(msgr, c, conn, msg_len, ops, think_time_us);
msgrs.push_back(msgr);
clients.push_back(t);
msgr->start();
}
usleep(1000*1000);
}
void start() {
for (uint64_t i = 0; i < clients.size(); ++i)
clients[i]->create();
for (uint64_t i = 0; i < msgrs.size(); ++i)
msgrs[i]->wait();
}
};

void MessengerClient::ClientDispatcher::ms_fast_dispatch(Message *m) {
usleep(think_time);
m->put();
Mutex::Locker l(thread->lock);
thread->inflight--;
thread->cond.Signal();
}


void usage(const string &name) {
cerr << "Usage: " << name << " [server ip:port] [numjobs] [concurrency] [ios] [thinktime us] [msg length]" << std::endl;
cerr << " [server ip:port]: connect to the ip:port pair" << std::endl;
cerr << " [numjobs]: how much client threads spawned and do benchmark" << std::endl;
cerr << " [concurrency]: the max inflight messages(like iodepth in fio)" << std::endl;
cerr << " [ios]: how much messages sent for each client" << std::endl;
cerr << " [thinktime]: sleep time when do fast dispatching(match client logic)" << std::endl;
cerr << " [msg length]: message data bytes" << std::endl;
}

int main(int argc, char **argv)
{
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);

global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);
g_ceph_context->_conf->apply_changes(NULL);

if (args.size() < 6) {
usage(argv[0]);
return 1;
}

int numjobs = atoi(args[1]);
int concurrent = atoi(args[2]);
int ios = atoi(args[3]);
int think_time = atoi(args[4]);
int len = atoi(args[5]);

cerr << " using ms-type " << g_ceph_context->_conf->ms_type << std::endl;
cerr << " server ip:port " << args[0] << std::endl;
cerr << " numjobs " << numjobs << std::endl;
cerr << " concurrency " << concurrent << std::endl;
cerr << " ios " << ios << std::endl;
cerr << " thinktime(us) " << think_time << std::endl;
cerr << " message data bytes " << len << std::endl;
MessengerClient client(g_ceph_context->_conf->ms_type, args[0], think_time);
client.ready(concurrent, numjobs, ios, len);
uint64_t start = Cycles::rdtsc();
client.start();
uint64_t stop = Cycles::rdtsc();
cerr << " Total op " << ios << " run time " << Cycles::to_microseconds(stop - start) << "us." << std::endl;

return 0;
}

0 comments on commit 459369e

Please sign in to comment.