Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Caliper based profiling #741

Merged
merged 8 commits into from Aug 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions configure.ac
Expand Up @@ -20,6 +20,11 @@ AM_MAINTAINER_MODE
AC_DEFINE([_GNU_SOURCE], 1,
[Define _GNU_SOURCE so that we get all necessary prototypes])

##
# Initialize pkg-config for PKG_CHECK_MODULES to avoid conditional issues
##
PKG_PROG_PKG_CONFIG

##
# Checks for programs
##
Expand Down Expand Up @@ -112,6 +117,19 @@ LX_FIND_MPI
AM_CONDITIONAL([HAVE_MPI], [test "$have_C_mpi" = yes])
AX_CODE_COVERAGE

AC_ARG_ENABLE(caliper,
[ --enable-caliper[=OPTS] Use caliper for profiling. [default=no] [OPTS=no/yes]], ,
[enable_caliper="no"])

if test "$enable_caliper" = "yes"; then
PKG_CHECK_MODULES([CALIPER], [caliper], [], [])
CFLAGS="${CFLAGS} ${CALIPER_CFLAGS} "
# Do not use CALIPER_LIBS, only link to libcaliper-stub
LIBS="${LIBS} $(pkg-config --libs-only-L caliper) -lcaliper-stub -lrt "
AC_DEFINE([HAVE_CALIPER], [1], [Define if you have libcaliper])
fi


##
# Embedded libev
##
Expand Down
6 changes: 6 additions & 0 deletions doc/man1/flux-start.adoc
Expand Up @@ -48,6 +48,12 @@ Display commands before executing them.
*-X, --noexec*::
Don't execute anything. This option is most useful with -v.

*--caliper-profile*='PROFILE'::
Run brokers with Caliper profiling enabled, using a Caliper
configuration profile named 'PROFILE'. Requires a version of Flux
built with --enable-caliper. Unless CALI_LOG_VERBOSITY is already
set in the environment, it will default to 0 for all brokers.

EXAMPLES
--------

Expand Down
25 changes: 25 additions & 0 deletions src/broker/broker.c
Expand Up @@ -54,6 +54,10 @@
#error gperftools headers not configured
#endif
#endif /* WITH_TCMALLOC */
#if HAVE_CALIPER
#include <caliper/cali.h>
#include <sys/syscall.h>
#endif

#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
Expand Down Expand Up @@ -238,6 +242,23 @@ static void usage (void)
exit (1);
}

static int setup_profiling (const char *program, int rank)
{
#if HAVE_CALIPER
cali_begin_string_byname ("flux.type", "main");
cali_begin_int_byname ("flux.tid", syscall (SYS_gettid));
cali_begin_string_byname ("binary", program);
cali_begin_int_byname ("flux.rank", rank);
// TODO: this is a stopgap until we have better control over
// instrumemtation in child processes. If we want to see what children
// that load libflux are up to, this should be disabled
unsetenv ("CALI_SERVICES_ENABLE");
unsetenv ("CALI_CONFIG_PROFILE");
#endif
return (0);
}


int main (int argc, char *argv[])
{
int c;
Expand All @@ -247,6 +268,7 @@ int main (int argc, char *argv[])
int security_set = 0;
int e;


memset (&ctx, 0, sizeof (ctx));
log_init (argv[0]);

Expand Down Expand Up @@ -431,6 +453,9 @@ int main (int argc, char *argv[])
if (attr_set_flags (ctx.attrs, "session-id", FLUX_ATTRFLAG_IMMUTABLE) < 0)
log_err_exit ("attr_set_flags session-id");

// Setup profiling
setup_profiling (argv[0], ctx.rank);

/* Create directory for sockets, and a subdirectory specific
* to this rank that will contain the pidfile and local connector socket.
* (These may have already been called by boot method)
Expand Down
18 changes: 18 additions & 0 deletions src/broker/module.c
Expand Up @@ -42,6 +42,10 @@
#include <argz.h>
#include <czmq.h>
#include <flux/core.h>
#if HAVE_CALIPER
#include <caliper/cali.h>
#include <sys/syscall.h>
#endif

#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
Expand All @@ -51,6 +55,7 @@
#include "module.h"
#include "modservice.h"


#define MODULE_MAGIC 0xfeefbe01
struct module_struct {
int magic;
Expand Down Expand Up @@ -99,6 +104,17 @@ struct modhash_struct {
heartbeat_t *heartbeat;
};

static int setup_module_profiling (module_t *p)
{
#if HAVE_CALIPER
cali_begin_string_byname ("flux.type", "module");
cali_begin_int_byname ("flux.tid", syscall (SYS_gettid));
cali_begin_int_byname ("flux.rank", p->rank);
cali_begin_string_byname ("flux.name", p->name);
#endif
return (0);
}

static void *module_thread (void *arg)
{
module_t *p = arg;
Expand All @@ -114,6 +130,8 @@ static void *module_thread (void *arg)

assert (p->zctx);

setup_module_profiling (p);

/* Connect to broker socket, enable logging, register built-in services
*/
if (!(p->h = flux_open (uri, 0)))
Expand Down
51 changes: 51 additions & 0 deletions src/cmd/flux-start.c
Expand Up @@ -38,6 +38,7 @@
#include "src/common/libutil/log.h"
#include "src/common/libutil/oom.h"
#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/setenvf.h"
#include "src/common/libpmi/simple_server.h"
#include "src/common/libpmi/dgetline.h"
#include "src/common/libsubprocess/subprocess.h"
Expand Down Expand Up @@ -79,11 +80,20 @@ char *create_scratch_dir (struct context *ctx);
struct client *client_create (struct context *ctx, int rank, const char *cmd);
void client_destroy (struct client *cli);
char *find_broker (const char *searchpath);
static void setup_profiling_env (struct context *ctx);

const char *default_killer_timeout = "1.0";

const int default_size = 1;

#ifndef HAVE_CALIPER
static int no_caliper_fatal_err (optparse_t *p, struct optparse_option *o,
const char *optarg)
{
log_msg_exit ("Error: --caliper-profile used but no Caliper support found");
}
#endif /* !HAVE_CALIPER */

const char *usage_msg = "[OPTIONS] command ...";
static struct optparse_option opts[] = {
{ .name = "verbose", .key = 'v', .has_arg = 0,
Expand All @@ -96,6 +106,17 @@ static struct optparse_option opts[] = {
.usage = "Add comma-separated broker options, e.g. \"-o,-q\"", },
{ .name = "killer-timeout",.key = 'k', .has_arg = 1, .arginfo = "SECONDS",
.usage = "After a broker exits, kill other brokers after SECONDS", },

/* Option group 1, these options will be listed after those above */
{ .group = 1,
.name = "caliper-profile", .key = 1001, .has_arg = 1,
.arginfo = "PROFILE",
.usage = "Enable profiling in brokers using Caliper configuration "
"profile named `PROFILE'",
#ifndef HAVE_CALIPER
.cb = no_caliper_fatal_err, /* Emit fatal err if not built w/ Caliper */
#endif /* !HAVE_CALIPER */
},
OPTPARSE_TABLE_END,
};

Expand Down Expand Up @@ -131,6 +152,8 @@ int main (int argc, char *argv[])
if (!(ctx->broker_path = find_broker (searchpath)))
log_msg_exit ("Could not locate broker in %s", searchpath);

setup_profiling_env (ctx);

ctx->size = optparse_get_int (ctx->opts, "size", default_size);

if (ctx->size == 1) {
Expand All @@ -151,6 +174,34 @@ int main (int argc, char *argv[])
return status;
}

static void setup_profiling_env (struct context *ctx)
{
#if HAVE_CALIPER
const char *profile;
/*
* If --profile was used, set or append libcaliper.so in LD_PRELOAD
* to subprocess environment, swapping stub symbols for the actual
* libcaliper symbols.
*/
if (optparse_getopt (ctx->opts, "caliper-profile", &profile) == 1) {
const char *pl = getenv ("LD_PRELOAD");
int rc = setenvf ("LD_PRELOAD", 1, "%s%s%s",
pl ? pl : "",
pl ? " ": "",
"libcaliper.so");
if (rc < 0)
log_err_exit ("Unable to set LD_PRELOAD in environment");

if ((profile != NULL) &&
(setenv ("CALI_CONFIG_PROFILE", profile, 1) < 0))
log_err_exit ("setenv (CALI_CONFIG_PROFILE)");
setenv ("CALI_LOG_VERBOSITY", "0", 0);
}
#endif
}



char *find_broker (const char *searchpath)
{
char *cpy = xstrdup (searchpath);
Expand Down
61 changes: 54 additions & 7 deletions src/common/libflux/dispatch.c
Expand Up @@ -26,18 +26,22 @@
#include "config.h"
#endif
#include <czmq.h>
#if HAVE_CALIPER
#include <caliper/cali.h>
#include <sys/syscall.h>
#endif

#include "message.h"
#include "reactor.h"
#include "dispatch.h"
#include "response.h"
#include "info.h"
#include "flog.h"

#include "src/common/libutil/log.h"
#include "src/common/libutil/coproc.h"
#include "src/common/libutil/iterators.h"


/* Fastpath for RPCs:
* fastpath array translates response matchtags to message handlers,
* bypassing the handlers zlist. Since the matchtag pools are LIFO,
Expand All @@ -51,6 +55,7 @@ struct fastpath {
int len;
};


struct dispatch {
flux_t h;
zlist_t *handlers;
Expand All @@ -61,6 +66,11 @@ struct dispatch {
flux_watcher_t *w;
int running_count;
int usecount;
#if HAVE_CALIPER
cali_id_t prof_msg_type;
cali_id_t prof_msg_topic;
cali_id_t prof_msg_dispatch;
#endif
};

#define HANDLER_MAGIC 0x44433322
Expand Down Expand Up @@ -144,7 +154,17 @@ static struct dispatch *dispatch_get (flux_t h)
goto nomem;
fastpath_init (&d->norm);
fastpath_init (&d->group);

#if HAVE_CALIPER
d->prof_msg_type = cali_create_attribute ("flux.message.type",
CALI_TYPE_STRING,
CALI_ATTR_SKIP_EVENTS);
d->prof_msg_topic = cali_create_attribute ("flux.message.topic",
CALI_TYPE_STRING,
CALI_ATTR_SKIP_EVENTS);
d->prof_msg_dispatch = cali_create_attribute ("flux.message.dispatch",
CALI_TYPE_BOOL,
CALI_ATTR_DEFAULT);
#endif
flux_aux_set (h, "flux::dispatch", d, dispatch_destroy);
}
return d;
Expand Down Expand Up @@ -537,8 +557,10 @@ static int delete_items_zlist (zlist_t *l, item_test_f item_test,
return rc;
}

static void handle_cb (flux_reactor_t *r, flux_watcher_t *hw,
int revents, void *arg)
static void handle_cb (flux_reactor_t *r,
flux_watcher_t *hw,
int revents,
void *arg)
{
struct dispatch *d = arg;
flux_msg_t *msg = NULL;
Expand All @@ -556,24 +578,47 @@ static void handle_cb (flux_reactor_t *r, flux_watcher_t *hw,
rc = 0; /* ignore mangled message */
goto done;
}

const char *topic;
flux_msg_get_topic (msg, &topic);
/* Add any new handlers here, making handler creation
* safe to call during handlers list traversal below.
*/
if (transfer_items_zlist (d->handlers_new, d->handlers) < 0)
goto done;

#if defined(HAVE_CALIPER)
cali_begin_string (d->prof_msg_type, flux_msg_typestr (type));
cali_begin_string (d->prof_msg_topic, topic);
cali_begin (d->prof_msg_dispatch);
cali_end (d->prof_msg_topic);
cali_end (d->prof_msg_type);
#endif

if ((flux_flags_get (d->h) & FLUX_O_COPROC))
match = dispatch_message_coproc (d, msg, type);
else
match = dispatch_message (d, msg, type);

#if defined(HAVE_CALIPER)
cali_begin_string (d->prof_msg_type, flux_msg_typestr (type));
cali_begin_string (d->prof_msg_topic, topic);
cali_end (d->prof_msg_dispatch);
cali_end (d->prof_msg_topic);
cali_end (d->prof_msg_type);
#endif

if (match < 0)
goto done;
/* Destroy handlers here, making handler destruction
* safe to call during handlers list traversal above.
*/
if (delete_items_zlist (d->handlers_new, item_test_destroyed,
if (delete_items_zlist (d->handlers_new,
item_test_destroyed,
(flux_free_f)free_msg_handler) < 0)
goto done;
if (delete_items_zlist (d->handlers, item_test_destroyed,
if (delete_items_zlist (d->handlers,
item_test_destroyed,
(flux_free_f)free_msg_handler) < 0)
goto done;
/* Message matched nothing.
Expand All @@ -587,7 +632,9 @@ static void handle_cb (flux_reactor_t *r, flux_watcher_t *hw,
} else if (flux_flags_get (d->h) & FLUX_O_TRACE) {
const char *topic = NULL;
(void)flux_msg_get_topic (msg, &topic);
fprintf (stderr, "nomatch: %s '%s'\n", flux_msg_typestr (type),
fprintf (stderr,
"nomatch: %s '%s'\n",
flux_msg_typestr (type),
topic ? topic : "");
}
}
Expand Down