Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions integration_tests/sampling.bats
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ teardown_file() {
}

@test "sampling_heartbeat" {
LTTNG_UST_SAMPLING_ENERGY=0 LTTNG_UST_SAMPLING_HEARTBEAT=1 $THAPI_BIN_DIR/iprof --no-analysis --sample --trace-output heartbeat_trace -- bash -c 'sleep 2'
$THAPI_BIN_DIR/babeltrace_thapi --no-restrict heartbeat_trace | grep heartbeat
rm -rf heartbeat_trace
LTTNG_UST_ZE_SAMPLING_ENERGY=0 LTTNG_UST_SAMPLING_HEARTBEAT=1 \
$THAPI_BIN_DIR/iprof --no-analysis --sample --trace-output heartbeat_trace --\
bash -c 'sleep 2'
$THAPI_BIN_DIR/babeltrace_thapi --no-restrict heartbeat_trace | grep "{foo: 16}"
[ $("$THAPI_BIN_DIR"/babeltrace_thapi --no-restrict heartbeat_trace | grep -c "{foo: 32}") == 1 ]
}

29 changes: 23 additions & 6 deletions sampling/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,34 @@ CLEANFILES = \
BUILT_SOURCES = \
$(SAMPLING_STATIC_PROBES_INCL)

nodist_libThapiSampling_la_SOURCES = \
$(SAMPLING_STATIC_PROBES_INCL)

libThapiSampling_la_SOURCES = \
thapi_sampling.h \
thapi_sampling.c

libThapiSampling_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -I$(top_srcdir)/utils/include $(LTTNG_UST_CFLAGS)
libThapiSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include
libThapiSampling_la_CFLAGS = -Wall -Wextra $(WERROR)
libThapiSampling_la_LDFLAGS = -lpthread -version-info 1:0:0 $(LTTNG_UST_LIBS)
libThapiSampling_la_LIBADD = libsamplingtracepoints.la

bin_PROGRAMS = thapi_sampling_daemon

thapi_sampling_daemon_SOURCES = \
thapi_sampling.h \
thapi_sampling_daemon.cpp

thapi_sampling_daemon_CPPFLAGS = -I$(top_srcdir)/utils/include
thapi_sampling_daemon_CXXFLAGS = -Wall -Wextra $(WERROR)

libHeartbeatSampling_la_SOURCES = heartbeat_sampling_plugin.c

nodist_libHeartbeatSampling_la_SOURCES = \
$(SAMPLING_STATIC_PROBES_INCL)

libHeartbeatSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include
libHeartbeatSampling_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) $(LTTNG_UST_CFLAGS)
libHeartbeatSampling_la_LDFLAGS = -avoid-version -module
libHeartbeatSampling_la_LIBADD = libThapiSampling.la libsamplingtracepoints.la -ldl $(LTTNG_UST_LIBS)

samplingdir = $(pkglibdir)/sampling
sampling_LTLIBRARIES = libThapiSampling.la
sampling_LTLIBRARIES = libThapiSampling.la libHeartbeatSampling.la

CLEANFILES += $(bin_PROGRAMS)
33 changes: 33 additions & 0 deletions sampling/heartbeat_sampling_plugin.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "sampling.h"
#include "thapi_sampling.h"
#include <time.h>

static void thapi_sampling_heartbeat() { do_tracepoint(lttng_ust_sampling, heartbeat, 16); }
static void thapi_sampling_heartbeat2() { do_tracepoint(lttng_ust_sampling, heartbeat2); }

static void *plugin_handle_heartbeat = NULL;
static void *plugin_handle_heartbeat2 = NULL;

void thapi_initialize_sampling_plugin(void) {
if (getenv("LTTNG_UST_SAMPLING_HEARTBEAT")) {
struct timespec interval;
interval.tv_sec = 1;
interval.tv_nsec = 0;
plugin_handle_heartbeat = thapi_register_sampling(&thapi_sampling_heartbeat, &interval);
}
if (getenv("LTTNG_UST_SAMPLING_HEARTBEAT2")) {
struct timespec interval;
interval.tv_sec = 0;
interval.tv_nsec = 30000000;
plugin_handle_heartbeat2 = thapi_register_sampling(&thapi_sampling_heartbeat2, &interval);
}
}

void thapi_finalize_sampling_plugin(void) {
if (plugin_handle_heartbeat != NULL) {
do_tracepoint(lttng_ust_sampling, heartbeat, 32);
thapi_unregister_sampling(plugin_handle_heartbeat);
}
if (plugin_handle_heartbeat2 != NULL)
thapi_unregister_sampling(plugin_handle_heartbeat2);
}
50 changes: 20 additions & 30 deletions sampling/thapi_sampling.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <inttypes.h>
#include <stdio.h>
#include "thapi_sampling.h"
#include "sampling.h"
#include "utarray.h"

struct sampling_entry {
Expand All @@ -16,28 +15,30 @@ struct sampling_entry {


static pthread_mutex_t thapi_sampling_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t thapi_sampling_cond = PTHREAD_COND_INITIALIZER;
static UT_array *thapi_sampling_events = NULL;

static pthread_once_t thapi_init_once = PTHREAD_ONCE_INIT;
static volatile int thapi_sampling_finished = 0;
static volatile int thapi_sampling_initialized = 0;
static int thapi_sampling_finished = 0;
static int thapi_sampling_initialized = 0;
static pthread_t thapi_sampling_thread;

static void __attribute__((destructor))
thapi_sampling_cleanup() {
if (!thapi_sampling_initialized)
return;
thapi_sampling_finished = 1;
pthread_join(thapi_sampling_thread, NULL);
pthread_mutex_lock(&thapi_sampling_mutex);
thapi_sampling_finished = 1;
struct sampling_entry **entry = NULL;
while ((entry = (struct sampling_entry **)utarray_next(thapi_sampling_events, entry)))
free(*entry);
utarray_free(thapi_sampling_events);
pthread_cond_signal(&thapi_sampling_cond);
pthread_mutex_unlock(&thapi_sampling_mutex);
pthread_join(thapi_sampling_thread, NULL);
}

static inline int time_cmp(const struct timespec * t1, const struct timespec * t2) {
static inline int time_cmp(const struct timespec *t1, const struct timespec *t2) {
if (t1->tv_sec < t2->tv_sec)
return -1;
if (t1->tv_sec > t2->tv_sec)
Expand All @@ -53,7 +54,7 @@ static inline int sampling_entry_cmp(const struct sampling_entry **e1, const str
return time_cmp(&(*e1)->next, &(*e2)->next);
}

static inline int sampling_entry_cmpw(const void * t1, const void * t2) {
static inline int sampling_entry_cmpw(const void *t1, const void *t2) {
return sampling_entry_cmp((const struct sampling_entry **)t1, (const struct sampling_entry **)t2);
}

Expand All @@ -70,9 +71,14 @@ void * thapi_sampling_loop(void *args) {
(void)args;
while(!thapi_sampling_finished) {
struct timespec now;
struct timespec target;
struct sampling_entry **entry = NULL;

pthread_mutex_lock(&thapi_sampling_mutex);
while(!thapi_sampling_finished && utarray_len(thapi_sampling_events)==0)
pthread_cond_wait(&thapi_sampling_cond, &thapi_sampling_mutex);
if (thapi_sampling_finished)
break;
clock_gettime(CLOCK_REALTIME, &now);
while ((entry = (struct sampling_entry **)utarray_next(thapi_sampling_events, entry)) &&
time_cmp(&(*entry)->next, &now) < 0) {
Expand All @@ -83,42 +89,23 @@ void * thapi_sampling_loop(void *args) {
}
utarray_sort(thapi_sampling_events, sampling_entry_cmpw);
entry = (struct sampling_entry **)utarray_front(thapi_sampling_events);
target = (*entry)->next;
pthread_mutex_unlock(&thapi_sampling_mutex);
if (entry)
while (clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &(*entry)->next, NULL) && !thapi_sampling_finished)
;
while (clock_nanosleep(CLOCK_REALTIME, TIMER_ABSTIME, &target, NULL) && !thapi_sampling_finished)
;
}
return NULL;
}

static void thapi_sampling_heartbeat() {
do_tracepoint(lttng_ust_sampling, heartbeat, 16);
}

static void thapi_sampling_heartbeat2() {
do_tracepoint(lttng_ust_sampling, heartbeat2);
}

void thapi_sampling_init_once() {
struct timespec interval;
utarray_new(thapi_sampling_events, &ut_ptr_icd);
if (!thapi_sampling_events)
return;
if (getenv("LTTNG_UST_SAMPLING_HEARTBEAT")) {
interval.tv_sec = 0;
interval.tv_nsec = 100000000;
thapi_register_sampling(&thapi_sampling_heartbeat, &interval);
}
if (getenv("LTTNG_UST_SAMPLING_HEARTBEAT2")) {
interval.tv_sec = 0;
interval.tv_nsec = 30000000;
thapi_register_sampling(&thapi_sampling_heartbeat2, &interval);
}
if (!pthread_create(&thapi_sampling_thread, NULL, &thapi_sampling_loop, NULL))
thapi_sampling_initialized = 1;
}

int thapi_sampling_init() {
static inline int thapi_sampling_init() {
if (getenv("LTTNG_UST_SAMPLING"))
pthread_once(&thapi_init_once, &thapi_sampling_init_once);
return 1;
Expand All @@ -127,6 +114,7 @@ int thapi_sampling_init() {
thapi_sampling_handle_t thapi_register_sampling(void (*pfn)(void), struct timespec *interval) {
struct sampling_entry *entry = NULL;
struct timespec now, next;
thapi_sampling_init();
if(clock_gettime(CLOCK_REALTIME, &now))
return NULL;
time_add(&next, &now, interval);
Expand All @@ -142,6 +130,7 @@ thapi_sampling_handle_t thapi_register_sampling(void (*pfn)(void), struct timesp
entry->next = next;
utarray_push_back(thapi_sampling_events, &entry);
utarray_sort(thapi_sampling_events, sampling_entry_cmpw);
pthread_cond_signal(&thapi_sampling_cond);
end:
pthread_mutex_unlock(&thapi_sampling_mutex);
return entry;
Expand All @@ -159,6 +148,7 @@ void thapi_unregister_sampling(thapi_sampling_handle_t handle)
(struct sampling_entry **)utarray_eltptr(thapi_sampling_events, i);
if (*p == entry) {
utarray_erase(thapi_sampling_events, i, 1);
free(entry);
break;
}
}
Expand Down
14 changes: 12 additions & 2 deletions sampling/thapi_sampling.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
#pragma once

#include <time.h>

typedef void * thapi_sampling_handle_t;

extern int thapi_sampling_init();
#ifdef __cplusplus
extern "C" {
#endif

typedef void * thapi_sampling_handle_t;

extern thapi_sampling_handle_t
thapi_register_sampling(
Expand All @@ -12,3 +17,8 @@ thapi_register_sampling(
extern void
thapi_unregister_sampling(
thapi_sampling_handle_t handle);

#ifdef __cplusplus
}
#endif

72 changes: 72 additions & 0 deletions sampling/thapi_sampling_daemon.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include <csignal>
#include <cstdlib>
#include <dlfcn.h>
#include <iostream>
#include <vector>
#define RT_SIGNAL_READY (SIGRTMIN)
#define RT_SIGNAL_FINISH (SIGRTMIN + 3)

typedef void (*plugin_initialize_func)(void);
typedef void (*plugin_finalize_func)(void);

int main(int argc, char **argv) {

// Setup signaling, to exit the sampling loop
int parent_pid = std::atoi(argv[1]);
sigset_t signal_set;
sigemptyset(&signal_set);
sigaddset(&signal_set, RT_SIGNAL_FINISH);
sigprocmask(SIG_BLOCK, &signal_set, NULL);

// DL Open
struct Plugin {
void *handle;
plugin_initialize_func initialize;
plugin_finalize_func finalize;
};

std::vector<Plugin> plugins;

for (int i = 2; i < argc; ++i) {
void *handle = dlopen(argv[i], RTLD_LAZY | RTLD_LOCAL | RTLD_DEEPBIND);
if (!handle) {
std::cerr << "Failed to load " << argv[i] << ": " << dlerror() << std::endl;
continue;
}
plugin_initialize_func init_func =
reinterpret_cast<plugin_initialize_func>(dlsym(handle, "thapi_initialize_sampling_plugin"));

plugin_finalize_func fini_func =
reinterpret_cast<plugin_finalize_func>(dlsym(handle, "thapi_finalize_sampling_plugin"));

plugins.push_back({handle, init_func, fini_func});
}

// User pluging
for (const auto &plugin : plugins) {
plugin.initialize();
}

// Signal Ready to manager
kill(parent_pid, RT_SIGNAL_READY);

// Wait for to finish
while (true) {
int signum;
sigwait(&signal_set, &signum);
if (signum == RT_SIGNAL_FINISH)
break;
}

// Finalization
for (const auto &plugin : plugins) {
if (plugin.finalize)
plugin.finalize();
dlclose(plugin.handle);
}

kill(parent_pid, RT_SIGNAL_READY);

// Will call the destructor, who will finalize all the not unregistered plugin
return 0;
}
Loading
Loading