Skip to content

Commit

Permalink
osd, messenger, librados: lttng enhancements
Browse files Browse the repository at this point in the history
Few critical functions have been instrumented with function tracing
and oid tracing calls to help routing performance analysis.
Oid tracing events are heavily dependent on functional changes and may need
to be revised as and when core data flow logic changes.

Signed-off-by: Anjaneya Chagam <anjaneya.chagam@intel.com>
  • Loading branch information
rchagam authored and yehudasa committed Dec 14, 2016
1 parent 948c34f commit a67630d
Show file tree
Hide file tree
Showing 16 changed files with 160 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/librados/CMakeLists.txt
Expand Up @@ -40,3 +40,6 @@ if(WITH_EMBEDDED)
$<TARGET_OBJECTS:librados_api_obj>
$<TARGET_OBJECTS:librados_objs>)
endif()
if(WITH_LTTNG AND WITH_EVENTTRACE)
add_dependencies(librados_api_obj eventtrace_tp)
endif()
69 changes: 69 additions & 0 deletions src/librados/IoCtxImpl.cc
Expand Up @@ -21,6 +21,7 @@
#include "librados/RadosClient.h"
#include "include/assert.h"
#include "common/valgrind.h"
#include "common/EventTrace.h"

#define dout_subsys ceph_subsys_rados
#undef dout_prefix
Expand Down Expand Up @@ -745,8 +746,12 @@ int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
int flags,
bufferlist *pbl)
{
FUNCTRACE();
Context *onack = new C_aio_Ack(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
((C_aio_Ack *) onack)->oid = oid;
#endif
c->is_read = true;
c->io = this;

Expand All @@ -761,13 +766,20 @@ int librados::IoCtxImpl::aio_operate(const object_t& oid,
::ObjectOperation *o, AioCompletionImpl *c,
const SnapContext& snap_context, int flags)
{
FUNCTRACE();
OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
auto ut = ceph::real_clock::now(client->cct);
/* can't write to a snapshot */
if (snap_seq != CEPH_NOSNAP)
return -EROFS;

Context *onack = c->wants_ack() ? new C_aio_Ack(c) : NULL;
Context *oncommit = new C_aio_Safe(c);
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (onack)
((C_aio_Ack *) onack)->oid = oid;
((C_aio_Safe *) oncommit)->oid = oid;
#endif

c->io = this;
queue_aio_write(c);
Expand All @@ -784,11 +796,16 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
bufferlist *pbl, size_t len, uint64_t off,
uint64_t snapid)
{
FUNCTRACE();
if (len > (size_t) INT_MAX)
return -EDOM;

OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
Context *onack = new C_aio_Ack(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
((C_aio_Ack *) onack)->oid = oid;
#endif
c->is_read = true;
c->io = this;
c->blp = pbl;
Expand All @@ -805,11 +822,16 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
char *buf, size_t len, uint64_t off,
uint64_t snapid)
{
FUNCTRACE();
if (len > (size_t) INT_MAX)
return -EDOM;

OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
Context *onack = new C_aio_Ack(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
((C_aio_Ack *) onack)->oid = oid;
#endif
c->is_read = true;
c->io = this;
c->bl.clear();
Expand Down Expand Up @@ -842,12 +864,16 @@ int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
bufferlist *data_bl, size_t len,
uint64_t off, uint64_t snapid)
{
FUNCTRACE();
if (len > (size_t) INT_MAX)
return -EDOM;

Context *nested = new C_aio_Ack(c);
C_ObjectOperation *onack = new C_ObjectOperation(nested);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
((C_aio_Ack *) nested)->oid = oid;
#endif
c->is_read = true;
c->io = this;

Expand All @@ -865,8 +891,10 @@ int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len,
uint64_t off)
{
FUNCTRACE();
auto ut = ceph::real_clock::now(client->cct);
ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");

if (len > UINT_MAX/2)
return -E2BIG;
Expand All @@ -877,6 +905,11 @@ int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
Context *onack = c->wants_ack() ? new C_aio_Ack(c) : NULL;
Context *onsafe = new C_aio_Safe(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (onack)
((C_aio_Ack *) onack)->oid = oid;
((C_aio_Safe *) onsafe)->oid = oid;
#endif
c->io = this;
queue_aio_write(c);

Expand All @@ -892,6 +925,7 @@ int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
const bufferlist& bl, size_t len)
{
FUNCTRACE();
auto ut = ceph::real_clock::now(client->cct);

if (len > UINT_MAX/2)
Expand All @@ -902,6 +936,11 @@ int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,

Context *onack = c->wants_ack() ? new C_aio_Ack(c) : NULL;
Context *onsafe = new C_aio_Safe(c);
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (onack)
((C_aio_Ack *) onack)->oid = oid;
((C_aio_Safe *) onsafe)->oid = oid;
#endif

c->io = this;
queue_aio_write(c);
Expand All @@ -919,6 +958,7 @@ int librados::IoCtxImpl::aio_write_full(const object_t &oid,
AioCompletionImpl *c,
const bufferlist& bl)
{
FUNCTRACE();
auto ut = ceph::real_clock::now(client->cct);

if (bl.length() > UINT_MAX/2)
Expand All @@ -928,6 +968,10 @@ int librados::IoCtxImpl::aio_write_full(const object_t &oid,
return -EROFS;

Context *onack = c->wants_ack() ? new C_aio_Ack(c) : NULL;
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (onack)
((C_aio_Ack *) onack)->oid = oid;
#endif
Context *onsafe = new C_aio_Safe(c);

c->io = this;
Expand All @@ -948,6 +992,7 @@ int librados::IoCtxImpl::aio_writesame(const object_t &oid,
size_t write_len,
uint64_t off)
{
FUNCTRACE();
auto ut = ceph::real_clock::now(client->cct);

if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
Expand All @@ -961,6 +1006,10 @@ int librados::IoCtxImpl::aio_writesame(const object_t &oid,
Context *onack = c->wants_ack() ? new C_aio_Ack(c) : NULL;
Context *onsafe = new C_aio_Safe(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (onack)
((C_aio_Ack *) onack)->oid = oid;
#endif
c->io = this;
queue_aio_write(c);

Expand All @@ -976,6 +1025,7 @@ int librados::IoCtxImpl::aio_writesame(const object_t &oid,

int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
{
FUNCTRACE();
auto ut = ceph::real_clock::now(client->cct);

/* can't write to a snapshot */
Expand All @@ -985,6 +1035,10 @@ int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, i
Context *onack = c->wants_ack() ? new C_aio_Ack(c) : NULL;
Context *onsafe = new C_aio_Safe(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (onack)
((C_aio_Ack *) onack)->oid = oid;
#endif
c->io = this;
queue_aio_write(c);

Expand Down Expand Up @@ -1244,8 +1298,12 @@ int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
const char *cls, const char *method,
bufferlist& inbl, bufferlist *outbl)
{
FUNCTRACE();
Context *onack = new C_aio_Ack(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
((C_aio_Ack *) onack)->oid = oid;
#endif
c->is_read = true;
c->io = this;

Expand All @@ -1262,8 +1320,12 @@ int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
const char *cls, const char *method,
bufferlist& inbl, char *buf, size_t out_len)
{
FUNCTRACE();
Context *onack = new C_aio_Ack(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
((C_aio_Ack *) onack)->oid = oid;
#endif
c->is_read = true;
c->io = this;
c->bl.clear();
Expand All @@ -1285,6 +1347,7 @@ int librados::IoCtxImpl::read(const object_t& oid,
{
if (len > (size_t) INT_MAX)
return -EDOM;
OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");

::ObjectOperation rd;
prepare_assert_ops(&rd);
Expand Down Expand Up @@ -1796,6 +1859,9 @@ void librados::IoCtxImpl::C_aio_Ack::finish(int r)
c->io->client->finisher.queue(new C_AioSafe(c));
}

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
#endif
c->put_unlock();
}

Expand Down Expand Up @@ -1878,6 +1944,9 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r)

c->io->complete_aio_write(c);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
#endif
c->put_unlock();
}

Expand Down
6 changes: 6 additions & 0 deletions src/librados/IoCtxImpl.h
Expand Up @@ -160,6 +160,9 @@ struct librados::IoCtxImpl {
AioCompletionImpl *c, int flags, bufferlist *pbl);

struct C_aio_Ack : public Context {
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
object_t oid;
#endif
librados::AioCompletionImpl *c;
explicit C_aio_Ack(AioCompletionImpl *_c);
void finish(int r);
Expand All @@ -182,6 +185,9 @@ struct librados::IoCtxImpl {
};

struct C_aio_Safe : public Context {
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
object_t oid;
#endif
AioCompletionImpl *c;
explicit C_aio_Safe(AioCompletionImpl *_c);
void finish(int r);
Expand Down
1 change: 1 addition & 0 deletions src/librados/RadosClient.cc
Expand Up @@ -47,6 +47,7 @@
#include "RadosClient.h"

#include "include/assert.h"
#include "common/EventTrace.h"

#define dout_subsys ceph_subsys_rados
#undef dout_prefix
Expand Down
33 changes: 33 additions & 0 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -21,6 +21,10 @@
#include "AsyncMessenger.h"
#include "AsyncConnection.h"

#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "common/EventTrace.h"

// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
#define SEQ_MASK 0x7fffffff

Expand Down Expand Up @@ -324,6 +328,9 @@ void AsyncConnection::process()
{
ssize_t r = 0;
int prev_state = state;
#ifdef WITH_LTTNG
utime_t ltt_recv_stamp = ceph_clock_now(async_msgr->cct);
#endif
bool need_dispatch_writer = false;
std::lock_guard<std::mutex> l(lock);
last_active = ceph::coarse_mono_clock::now();
Expand Down Expand Up @@ -425,6 +432,9 @@ void AsyncConnection::process()

case STATE_OPEN_MESSAGE_HEADER:
{
#ifdef WITH_LTTNG
ltt_recv_stamp = ceph_clock_now(async_msgr->cct);
#endif
ldout(async_msgr->cct, 20) << __func__ << " begin MSG" << dendl;
ceph_msg_header header;
ceph_msg_header_old oldheader;
Expand Down Expand Up @@ -740,6 +750,18 @@ void AsyncConnection::process()

message->set_connection(this);

#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
if (message->get_type() == CEPH_MSG_OSD_OP || message->get_type() == CEPH_MSG_OSD_OPREPLY) {
utime_t ltt_processed_stamp = ceph_clock_now(async_msgr->cct);
double usecs_elapsed = (ltt_processed_stamp.to_nsec()-ltt_recv_stamp.to_nsec())/1000;
ostringstream buf;
if (message->get_type() == CEPH_MSG_OSD_OP)
OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP", false);
else
OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY", false);
}
#endif

// note last received message.
in_seq.set(message->get_seq());
ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
Expand Down Expand Up @@ -1850,6 +1872,7 @@ void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)

int AsyncConnection::send_message(Message *m)
{
FUNCTRACE();
lgeneric_subdout(async_msgr->cct, ms,
1) << "-- " << async_msgr->get_myaddr() << " --> "
<< get_peer_addr() << " -- "
Expand All @@ -1864,6 +1887,11 @@ int AsyncConnection::send_message(Message *m)
m->get_header().src = async_msgr->get_myname();
m->set_connection(this);

if (m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);

if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
std::lock_guard<std::mutex> l(write_lock);
Expand Down Expand Up @@ -2147,6 +2175,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer

ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
{
FUNCTRACE();
assert(can_write == WriteStatus::CANWRITE);
m->set_seq(out_seq.inc());

Expand Down Expand Up @@ -2242,6 +2271,10 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
} else {
ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl;
}
if (m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
m->put();

return rc;
Expand Down
10 changes: 10 additions & 0 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -25,6 +25,10 @@
#include "common/Timer.h"
#include "common/errno.h"

#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
#include "common/EventTrace.h"

#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
Expand Down Expand Up @@ -500,6 +504,12 @@ ConnectionRef AsyncMessenger::get_loopback_connection()

int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
{
FUNCTRACE();
if (m && m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE(((MOSDOp *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP");
else if (m && m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE(((MOSDOpReply *)m)->get_oid().name.c_str(), "SEND_MSG_OSD_OP_REPLY");

ldout(cct, 1) << __func__ << "--> " << dest.name << " "
<< dest.addr << " -- " << *m << " -- ?+"
<< m->get_data().length() << " " << m << dendl;
Expand Down

0 comments on commit a67630d

Please sign in to comment.