Skip to content

Commit

Permalink
[eBPF] Reducing CPU consumption for process events exec/exit
Browse files Browse the repository at this point in the history
Replace `sched_process_fork` with `sys_exit_fork` and `sys_exit_clone` tracepoints because sched_process_fork cannot distinguish between processes and threads, leading to excessive threads being pushed to the upper layer unnecessarily. `sys_exit_fork` and `sys_exit_clone` only push process information.

Use spin locks to protect the process event list instead of thread mutex locks to avoid frequent context switches.
  • Loading branch information
yinjiping committed Apr 23, 2024
1 parent 1947919 commit 5b80375
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 51 deletions.
47 changes: 24 additions & 23 deletions agent/src/ebpf/kernel/uprobe_base.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -641,44 +641,45 @@ int bpf_func_sched_process_exit(struct sched_comm_exit_ctx *ctx)
return 0;
}

// /sys/kernel/debug/tracing/events/sched/sched_process_fork/format
SEC("tracepoint/sched/sched_process_fork")
int bpf_func_sched_process_fork(struct sched_comm_fork_ctx *ctx)
static inline int kernel_clone_exit(struct syscall_comm_exit_ctx *ctx)
{
/*
* When you find that the golang process starts, sometimes you
* don't get the process start information, all you get is
* threads. Take the following example:
*
* # pstree -p 4157
* deepflow-server(4157)─┬─{deepflow-server}(4214)
* ├─{deepflow-server}(4216)
* ├─{deepflow-server}(4217)
* ├─{deepflow-server}(4218)
* ├─{deepflow-server}(4219)
* ├─{deepflow-server}(4229)
*
* fetch data:
* .... 296916.616252: 0: parent_pid 4216 child_pid 4218
* .... 296916.616366: 0: parent_pid 4218 child_pid 4219
*
* To get process startup information we add probe 'sched_process_exec'.
*/
__u64 id = bpf_get_current_pid_tgid();
long ret = ctx->ret;

// error or parent process
if (ret != 0)
return 0;

int pid = (int)id;
int tgid = (int)(id >> 32);
// filter threads
if (pid != tgid)
return 0;

struct member_fields_offset *offset = retrieve_ready_kern_offset();
if (offset == NULL)
return 0;

struct process_event_t data;
data.meta.event_type = EVENT_TYPE_PROC_EXEC;
data.pid = ctx->child_pid;
data.pid = pid;
bpf_get_current_comm(data.name, sizeof(data.name));
bpf_perf_event_output(ctx, &NAME(socket_data),
BPF_F_CURRENT_CPU, &data, sizeof(data));

return 0;
}

// /sys/kernel/debug/tracing/events/syscalls/sys_exit_fork/format
TPPROG(sys_exit_fork) (struct syscall_comm_exit_ctx * ctx) {
return kernel_clone_exit(ctx);
}

// /sys/kernel/debug/tracing/events/syscalls/sys_exit_clone/format
TPPROG(sys_exit_clone) (struct syscall_comm_exit_ctx * ctx) {
return kernel_clone_exit(ctx);
}

// /sys/kernel/debug/tracing/events/sched/sched_process_exec/format
SEC("tracepoint/sched/sched_process_exec")
int bpf_func_sched_process_exec(struct sched_comm_exec_ctx *ctx)
Expand Down
4 changes: 4 additions & 0 deletions agent/src/ebpf/user/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ enum {

//thread index for bihash
enum {
// cp-reader-0
THREAD_PROFILER_READER_IDX = 0,
// proc-events
THREAD_PROC_EVENTS_IDX,
// sk-reader-0 ...
THREAD_PROC_ACT_IDX_BASE
};

Expand Down
28 changes: 19 additions & 9 deletions agent/src/ebpf/user/go_tracer.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ struct process_event {
/* *INDENT-ON* */

extern uint32_t k_version;
extern struct proc_events_record proc_ev_record;

static char build_info_magic[] = "\xff Go buildinf:";
static struct list_head proc_info_head; // For pid-offsets correspondence lists.
static struct list_head proc_events_head; // For process execute/exit events list.
Expand Down Expand Up @@ -744,7 +746,6 @@ int collect_go_uprobe_syms_from_procfs(struct tracer_probes_conf *conf)
struct dirent *entry = NULL;
DIR *fddir = NULL;

init_list_head(&proc_events_head);
init_list_head(&proc_info_head);
pthread_mutex_init(&mutex_proc_events_lock, NULL);

Expand Down Expand Up @@ -971,7 +972,8 @@ static inline void find_and_clear_event_from_list(int pid)
{
struct process_event *pe;
struct list_head *p, *n;
list_for_each_safe(p, n, &proc_events_head) {
struct proc_events_record *r = &proc_ev_record;
list_for_each_safe(p, n, &r->golang_events_head) {
pe = container_of(p, struct process_event, list);
if (pe->pid == pid) {
list_head_del(&pe->list);
Expand All @@ -983,9 +985,10 @@ static inline void find_and_clear_event_from_list(int pid)

static void process_exit_handle(int pid, struct bpf_tracer *tracer)
{
pthread_mutex_lock(&mutex_proc_events_lock);
struct proc_events_record *r = &proc_ev_record;
proc_events_lock(r->golang_list_lock);
find_and_clear_event_from_list(pid);
pthread_mutex_unlock(&mutex_proc_events_lock);
proc_events_unlock(r->golang_list_lock);

// Protect the probes operation in multiple threads, similar to process_execute_handle()
pthread_mutex_lock(&tracer->mutex_probes_lock);
Expand Down Expand Up @@ -1024,10 +1027,10 @@ static void add_event_to_proc_header(struct bpf_tracer *tracer, int pid,
pe->type = type;
pe->expire_time = get_sys_uptime() + PROC_EVENT_DELAY_HANDLE_DEF;

pthread_mutex_lock(&mutex_proc_events_lock);
find_and_clear_event_from_list(pid);
list_add_tail(&pe->list, &proc_events_head);
pthread_mutex_unlock(&mutex_proc_events_lock);
struct proc_events_record *r = &proc_ev_record;
proc_events_lock(r->golang_list_lock);
list_add_tail(&pe->list, &r->golang_events_head);
proc_events_unlock(r->golang_list_lock);
}

/**
Expand Down Expand Up @@ -1104,7 +1107,11 @@ void go_process_events_handle(void)
pthread_mutex_unlock(&mutex_proc_events_lock);
if (type == EVENT_TYPE_PROC_EXEC) {
if (access(path, F_OK) == 0) {
process_execute_handle(pid, tracer);
struct version_info go_version;
memset(&go_version, 0, sizeof(go_version));
if (fetch_go_elf_version(path, &go_version)) {
process_execute_handle(pid, tracer);
}
}
}
free(path);
Expand All @@ -1113,5 +1120,8 @@ void go_process_events_handle(void)
pthread_mutex_unlock(&mutex_proc_events_lock);
break;
}

sleep(1);

} while (true);
}
4 changes: 3 additions & 1 deletion agent/src/ebpf/user/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ static void socket_tracer_set_probes(struct tracer_probes_conf *tps)
tps_set_symbol(tps, "tracepoint/syscalls/sys_exit_accept");
tps_set_symbol(tps, "tracepoint/syscalls/sys_exit_accept4");
// process execute
tps_set_symbol(tps, "tracepoint/sched/sched_process_fork");
tps_set_symbol(tps, "tracepoint/syscalls/sys_exit_fork");
tps_set_symbol(tps, "tracepoint/syscalls/sys_exit_clone");
tps_set_symbol(tps, "tracepoint/sched/sched_process_exec");

// 周期性触发用于缓存的数据的超时检查
Expand Down Expand Up @@ -1183,6 +1184,7 @@ static void check_datadump_timeout(void)
static void process_events_handle_main(__unused void *arg)
{
prctl(PR_SET_NAME, "proc-events");
thread_index = THREAD_PROC_EVENTS_IDX;
struct bpf_tracer *t = arg;
for (;;) {
/*
Expand Down
68 changes: 50 additions & 18 deletions agent/src/ebpf/user/tracer.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ static volatile int ready_flag_cpus[MAX_CPU_NR];

/* Registration of additional transactions 额外事务处理的注册 */
static struct list_head extra_waiting_head;
/* Registration for periodic event handling 周期性事件处理的注册 */
static struct list_head period_events_head;

int sys_cpus_count;
bool *cpu_online; // 用于判断CPU是否是online
Expand All @@ -93,6 +91,12 @@ static int perf_reader_setup(struct bpf_perf_reader *perf_readerm,
int thread_nr);
static void perf_reader_release(struct bpf_perf_reader *perf_reader);

/* Registration for periodic event handling 周期性事件处理的注册 */
static struct list_head period_events_head;

// Detecting process start and exit events.
struct proc_events_record proc_ev_record;

/*
* 内核版本依赖检查
*/
Expand Down Expand Up @@ -254,7 +258,7 @@ int enable_tracer_reader_work(const char *prefix_name, int idx,
char name[TASK_COMM_LEN];
snprintf(name, sizeof(name), "%s-%d", prefix_name, idx);
ret = pthread_create(&tracer->perf_worker[idx], NULL, fn,
(void *)(uint64_t)idx);
(void *)(uint64_t) idx);
if (ret) {
ebpf_warning("tracer reader(%s), pthread_create "
"is error:%s\n", name, strerror(errno));
Expand Down Expand Up @@ -968,21 +972,18 @@ int tracer_hooks_process(struct bpf_tracer *tracer, enum tracer_hook_type type,
if (obj->progs[i].type == BPF_PROG_TYPE_PERF_EVENT) {
errno = 0;
int ret =
program__attach_perf_event(obj->progs[i].
prog_fd,
program__attach_perf_event(obj->
progs[i].prog_fd,
PERF_TYPE_SOFTWARE,
PERF_COUNT_SW_CPU_CLOCK,
0, /* sample_period */
tracer->
sample_freq,
tracer->sample_freq,
-1, /* pid, current process */
-1, /* cpu, no binding */
-1, /* new event group is created */
tracer->
per_cpu_fds,
tracer->per_cpu_fds,
ARRAY_SIZE
(tracer->
per_cpu_fds));
(tracer->per_cpu_fds));
if (!ret) {
ebpf_info
("tracer \"%s\" attach perf event prog successful.\n",
Expand Down Expand Up @@ -1010,8 +1011,8 @@ int tracer_hooks_process(struct bpf_tracer *tracer, enum tracer_hook_type type,
errno = 0;
int ret =
program__detach_perf_event(tracer->per_cpu_fds,
ARRAY_SIZE(tracer->
per_cpu_fds));
ARRAY_SIZE
(tracer->per_cpu_fds));
if (!ret) {
ebpf_info
("tracer \"%s\" detach perf event prog successful.\n",
Expand Down Expand Up @@ -1169,7 +1170,7 @@ static int perf_reader_setup(struct bpf_perf_reader *perf_reader, int thread_nr)
spread_id = 0;

struct reader_forward_info *fwd_info =
malloc(sizeof(struct reader_forward_info));
malloc(sizeof(struct reader_forward_info));
if (fwd_info == NULL) {
ebpf_error("reader_forward_info malloc() failed.\n");
return ETR_NOMEM;
Expand All @@ -1180,12 +1181,10 @@ static int perf_reader_setup(struct bpf_perf_reader *perf_reader, int thread_nr)

ebpf_info("Perf buffer reader cpu(%d) -> queue(%d)\n",
fwd_info->cpu_id, fwd_info->queue_id);
reader =
(struct perf_reader *)
reader = (struct perf_reader *)
bpf_open_perf_buffer(perf_reader->raw_cb,
perf_reader->lost_cb,
(void *)fwd_info, -1, i,
pages_cnt);
(void *)fwd_info, -1, i, pages_cnt);
if (reader == NULL) {
ebpf_error("bpf_open_perf_buffer() failed.\n");
return ETR_NORESOURCE;
Expand Down Expand Up @@ -1581,6 +1580,36 @@ bool is_feature_matched(int feature, const char *path)
return !error;
}

static int init_proc_events_record(const char *name)
{
init_list_head(&proc_ev_head.golang_events_head);
init_list_head(&proc_ev_head.ssl_events_head);

proc_ev_record.golang_list_lock =
clib_mem_alloc_aligned("go_proc_ev_lock",
CLIB_CACHE_LINE_BYTES,
CLIB_CACHE_LINE_BYTES, NULL);
proc_ev_record.ssl_list_lock =
clib_mem_alloc_aligned("ssl_proc_ev_lock",
CLIB_CACHE_LINE_BYTES,
CLIB_CACHE_LINE_BYTES, NULL);
if (proc_ev_record.golang_list_lock == NULL ||
proc_ev_record.ssl_list_lock == NULL) {
ebpf_error("process events lock alloc memory failed.\n");
return (-1);
}

proc_ev_record.golang_list_lock[0] = 0;
proc_ev_record.ssl_list_lock[0] = 0;

proc_event_hash_t *h = &proc_ev_record.hash;
memset(h, 0, sizeof(*h));
u32 nbuckets = SYMBOLIZER_CACHES_HASH_BUCKETS_NUM;
u64 hash_memory_size = SYMBOLIZER_CACHES_HASH_MEM_SZ; // 2G bytes
return proc_event_hash_init(h, (char *)name, nbuckets,
hash_memory_size);
}

int bpf_tracer_init(const char *log_file, bool is_stdout)
{
init_list_head(&extra_waiting_head);
Expand Down Expand Up @@ -1668,6 +1697,9 @@ int bpf_tracer_init(const char *log_file, bool is_stdout)
if ((err = sockopt_register(&trace_sockopts)) != ETR_OK)
return err;

if (init_proc_events_record("proc_event_record"))
return ETR_INVAL;

err = pthread_create(&ctrl_pthread, NULL, (void *)&ctrl_main, NULL);
if (err) {
ebpf_info("<%s> ctrl_pthread, pthread_create is error:%s\n",
Expand Down
30 changes: 30 additions & 0 deletions agent/src/ebpf/user/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,36 @@ static int inline __reclaim_map(int map_fd, struct list_head *h)

#define CACHE_LINE_BYTES 64

#define proc_event_hash_t clib_bihash_8_16_t
#define proc_event_hash_init clib_bihash_init_8_16
#define proc_event_hash_kv clib_bihash_kv_8_16_t
#define print_proc_event_caches print_bihash_8_16
#define proc_event_hash_search clib_bihash_search_8_16
#define proc_event_hash_add_del clib_bihash_add_del_8_16
#define proc_event_hash_free clib_bihash_free_8_16
#define proc_event_hash_key_value_pair_cb clib_bihash_foreach_key_value_pair_cb_8_16
#define proc_event_hash_foreach_key_value_pair clib_bihash_foreach_key_value_pair_8_16

struct proc_events_record {
// For quick search.
proc_event_hash_t hash;
volatile uint32_t *golang_list_lock;
struct list_head golang_events_head;
volatile uint32_t *ssl_list_lock;
struct list_head ssl_events_head;
};

static inline void proc_events_lock(volatile uint32_t *lock)
{
while (__atomic_test_and_set(lock, __ATOMIC_ACQUIRE))
CLIB_PAUSE();
}

static inline void proc_events_unlock(volatile uint32_t *lock)
{
__atomic_clear(lock, __ATOMIC_RELEASE);
}

int set_allow_port_bitmap(void *bitmap);
int set_bypass_port_bitmap(void *bitmap);
int enable_ebpf_protocol(int protocol);
Expand Down

0 comments on commit 5b80375

Please sign in to comment.