diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 7001f762c98..0326db5ec88 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -396,7 +396,6 @@ pub struct EbpfYamlConfig { pub io_event_collect_mode: usize, #[serde(with = "humantime_serde")] pub io_event_minimal_duration: Duration, - pub java_symbol_file_max_space_limit: u8, #[serde(with = "humantime_serde")] pub java_symbol_file_refresh_defer_interval: Duration, pub on_cpu_profile: OnCpuProfile, @@ -425,8 +424,7 @@ impl Default for EbpfYamlConfig { go_tracing_timeout: 120, io_event_collect_mode: 1, io_event_minimal_duration: Duration::from_millis(1), - java_symbol_file_max_space_limit: 10, - java_symbol_file_refresh_defer_interval: Duration::from_secs(600), + java_symbol_file_refresh_defer_interval: Duration::from_secs(60), on_cpu_profile: OnCpuProfile::default(), off_cpu_profile: OffCpuProfile::default(), memory_profile: MemoryProfile::default(), @@ -810,23 +808,18 @@ impl YamlConfig { c.ebpf.ring_size = 65536; } if c.ebpf.max_socket_entries < 100000 || c.ebpf.max_socket_entries > 2000000 { - c.ebpf.max_socket_entries = 524288; + c.ebpf.max_socket_entries = 131072; } if c.ebpf.socket_map_max_reclaim < 100000 || c.ebpf.socket_map_max_reclaim > 2000000 { - c.ebpf.socket_map_max_reclaim = 520000; + c.ebpf.socket_map_max_reclaim = 120000; } if c.ebpf.max_trace_entries < 100000 || c.ebpf.max_trace_entries > 2000000 { - c.ebpf.max_trace_entries = 524288; - } - if c.ebpf.java_symbol_file_max_space_limit < 2 - || c.ebpf.java_symbol_file_max_space_limit > 100 - { - c.ebpf.java_symbol_file_max_space_limit = 10 + c.ebpf.max_trace_entries = 131072; } if c.ebpf.java_symbol_file_refresh_defer_interval < Duration::from_secs(5) || c.ebpf.java_symbol_file_refresh_defer_interval > Duration::from_secs(3600) { - c.ebpf.java_symbol_file_refresh_defer_interval = Duration::from_secs(600) + c.ebpf.java_symbol_file_refresh_defer_interval = Duration::from_secs(60) } c.ebpf.off_cpu_profile.min_block = c .ebpf diff --git a/agent/src/ebpf/Makefile b/agent/src/ebpf/Makefile index b604bd66515..ac8a5c3b9d7 100644 --- a/agent/src/ebpf/Makefile +++ b/agent/src/ebpf/Makefile @@ -94,15 +94,15 @@ OBJS := user/elf.o \ $(patsubst %.c,%.o,$(wildcard user/extended/profile/*.c)) \ user/profile/perf_profiler.o \ user/profile/stringifier.o \ - user/profile/java/df_jattach.o \ - user/profile/java/gen_syms_file.o + user/profile/java/jvm_symbol_collect.o \ + user/profile/java/collect_symbol_files.o JAVA_TOOL := deepflow-jattach -JAVA_AGENT_VERSION := 1 +JAVA_AGENT_VERSION := 2 JAVA_AGENT_GNU_SO := df_java_agent_v$(JAVA_AGENT_VERSION).so JAVA_AGENT_MUSL_SO := df_java_agent_musl_v$(JAVA_AGENT_VERSION).so JAVA_AGENT_SO := $(JAVA_AGENT_GNU_SO) $(JAVA_AGENT_MUSL_SO) -JAVA_AGENT_SRC := user/profile/java/agent.c +JAVA_AGENT_SRC := user/profile/java/symbol_collect_agent.c JAVA_AGENT_MACROS := -DAGENT_LIB_NAME="\"$(JAVA_AGENT_GNU_SO)\"" -DAGENT_MUSL_LIB_NAME="\"$(JAVA_AGENT_MUSL_SO)\"" STATIC_OBJS := $(addprefix $(STATIC_OBJDIR)/,$(OBJS)) @@ -135,8 +135,7 @@ endif all: build -SOCKET_TRACE_ELFS := \ - user/socket_trace_bpf_common.c \ +SOCKET_TRACE_ELFS := user/socket_trace_bpf_common.c \ user/socket_trace_bpf_3_10_0.c \ user/socket_trace_bpf_5_2_plus.c \ user/socket_trace_bpf_kylin.c @@ -216,9 +215,9 @@ tools: $(LIBTRACE) $(call msg,TOOLS,deepflow-ebpfctl) $(Q)$(CC) $(CFLAGS) --static -g -O2 user/ctrl_tracer.c user/ctrl.c $(LIBTRACE) -o deepflow-ebpfctl -lelf -lz -lpthread -$(JAVA_TOOL): $(JAVA_AGENT_SO) user/profile/java/df_jattach.c user/log.c user/common.c libs/jattach/build/libjattach.a +$(JAVA_TOOL): $(JAVA_AGENT_SO) user/log.c user/common.c user/mem.c user/vec.c user/profile/java/jvm_symbol_collect.c libs/jattach/build/libjattach.a $(call msg,TOOLS,$@) - @$(GNU_CC) $(CFLAGS) -DJAVA_AGENT_ATTACH_TOOL user/profile/java/df_jattach.c user/log.c user/common.c libs/jattach/build/libjattach.a -o $@ -ldl -lpthread + @$(GNU_CC) $(CFLAGS) -DJAVA_AGENT_ATTACH_TOOL user/log.c user/common.c user/mem.c user/vec.c user/profile/java/jvm_symbol_collect.c libs/jattach/build/libjattach.a -o $@ -ldl -lpthread @rm -rf user/profile/deepflow_jattach_bin.c @./tools/bintobuffer ./$@ user/profile/deepflow_jattach_bin.c deepflow_jattach_bin diff --git a/agent/src/ebpf/mod.rs b/agent/src/ebpf/mod.rs index 05e5d9300a7..12cf9fcbf1f 100644 --- a/agent/src/ebpf/mod.rs +++ b/agent/src/ebpf/mod.rs @@ -573,11 +573,6 @@ extern "C" { /* * start continuous profiler * @freq sample frequency, Hertz. (e.g. 99 profile stack traces at 99 Hertz) - * @java_syms_space_limit The maximum space occupied by the Java symbol files - * in the '/' directory of the target POD container.The recommended range for - * values is [2, 100], which means it falls within the interval of 2Mi to 100Mi. - * If the configuration value is outside this range, the default value of - * 10(10Mi), will be used. * @java_syms_update_delay To allow Java to run for an extended period and gather * more symbol information, we delay symbol retrieval when encountering unknown * symbols. The unit of measurement used is seconds. @@ -587,7 +582,6 @@ extern "C" { */ pub fn start_continuous_profiler( freq: c_int, - java_syms_space_limit: c_int, java_syms_update_delay: c_int, callback: extern "C" fn(_data: *mut stack_profile_data), ) -> c_int; @@ -709,6 +703,7 @@ extern "C" { pub fn enable_oncpu_profiler() -> c_int; pub fn disable_oncpu_profiler() -> c_int; + pub fn show_collect_pool(); cfg_if::cfg_if! { if #[cfg(feature = "extended_profile")] { diff --git a/agent/src/ebpf/samples/rust/profiler/src/main.rs b/agent/src/ebpf/samples/rust/profiler/src/main.rs index 95219c49106..e46f8976663 100644 --- a/agent/src/ebpf/samples/rust/profiler/src/main.rs +++ b/agent/src/ebpf/samples/rust/profiler/src/main.rs @@ -79,6 +79,7 @@ fn cp_container_id_safe(cp: *mut stack_profile_data) -> String { } } +#[allow(dead_code)] fn increment_counter(num: u32, counter_type: u32) { if counter_type == 0 { let mut counter = COUNTER.lock().unwrap(); @@ -121,7 +122,7 @@ extern "C" fn socket_trace_callback(_sd: *mut SK_BPF_DATA) {} extern "C" fn continuous_profiler_callback(cp: *mut stack_profile_data) { unsafe { - process_stack_trace_data_for_flame_graph(cp); + //process_stack_trace_data_for_flame_graph(cp); increment_counter((*cp).count, 1); increment_counter(1, 0); //let data = sk_data_str_safe(cp); @@ -143,6 +144,7 @@ extern "C" fn continuous_profiler_callback(cp: *mut stack_profile_data) { } } +#[allow(dead_code)] fn get_counter(counter_type: u32) -> u32 { if counter_type == 0 { *COUNTER.lock().unwrap() @@ -173,9 +175,9 @@ fn main() { 1, /* Number of worker threads, indicating how many user-space threads participate in data processing */ 64, /* Number of page frames occupied by kernel-shared memory, must be a power of 2. Used for perf data transfer */ 65536, /* Size of the circular buffer queue, must be a power of 2. e.g: 2, 4, 8, 16, 32, 64, 128 */ - 524288, /* Maximum number of hash table entries for socket tracing, depends on the actual number of concurrent requests in the scenario */ - 524288, /* Maximum number of hash table entries for thread/coroutine tracing sessions */ - 520000, /* Maximum threshold for cleaning socket map entries. If the current number of map entries exceeds this value, map cleaning operation is performed */ + 131072, /* Maximum number of hash table entries for socket tracing, depends on the actual number of concurrent requests in the scenario */ + 131072, /* Maximum number of hash table entries for thread/coroutine tracing sessions */ + 120000, /* Maximum threshold for cleaning socket map entries. If the current number of map entries exceeds this value, map cleaning operation is performed */ ) != 0 { println!("running_socket_tracer() error."); @@ -184,7 +186,7 @@ fn main() { // Used to test our DeepFlow products, written as 97 frequency, so that // it will not affect the sampling test of deepflow agent (using 99Hz). - if start_continuous_profiler(97, 10, 300, continuous_profiler_callback) != 0 { + if start_continuous_profiler(97, 60, continuous_profiler_callback) != 0 { println!("start_continuous_profiler() error."); ::std::process::exit(1); } @@ -215,17 +217,20 @@ fn main() { std::thread::sleep(Duration::from_secs(1)); } - thread::sleep(Duration::from_secs(150)); - stop_continuous_profiler(); - print!( - "====== capture count {}, sum {}\n", - get_counter(0), - get_counter(1) - ); - release_flame_graph_hash(); + //thread::sleep(Duration::from_secs(150)); + //stop_continuous_profiler(); + //print!( + // "====== capture count {}, sum {}\n", + // get_counter(0), + // get_counter(1) + //); + //release_flame_graph_hash(); } loop { - thread::sleep(Duration::from_secs(5)); + thread::sleep(Duration::from_secs(15)); + unsafe { + show_collect_pool(); + } } } diff --git a/agent/src/ebpf/test/Makefile b/agent/src/ebpf/test/Makefile index 86940c98466..5e5ff68a313 100644 --- a/agent/src/ebpf/test/Makefile +++ b/agent/src/ebpf/test/Makefile @@ -24,7 +24,7 @@ ARCH ?= $(shell uname -m) CC ?= gcc CFLAGS ?= -std=gnu99 --static -g -O2 -ffunction-sections -fdata-sections -fPIC -fno-omit-frame-pointer -Wall -Wno-sign-compare -Wno-unused-parameter -Wno-missing-field-initializers -EXECS := test_symbol test_offset test_insns_cnt test_bihash test_vec test_fetch_container_id test_parse_range test_set_ports_bitmap +EXECS := test_symbol test_offset test_insns_cnt test_bihash test_vec test_fetch_container_id test_parse_range test_set_ports_bitmap test_pid_check ifeq ($(ARCH), x86_64) #-lbcc -lstdc++ LDLIBS += ../libtrace.a -ljattach -lbcc_bpf -lGoReSym -lbddisasm -ldwarf -lelf -lz -lpthread -lbcc -lstdc++ -ldl diff --git a/agent/src/ebpf/test/test_pid_check.c b/agent/src/ebpf/test/test_pid_check.c new file mode 100644 index 00000000000..845c17119de --- /dev/null +++ b/agent/src/ebpf/test/test_pid_check.c @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "../user/config.h" +#include "../user/utils.h" +#include "../user/common.h" +#include "../user/mem.h" +#include "../user/log.h" +#include "../user/types.h" +#include "../user/vec.h" +#include "../user/tracer.h" +#include "../user/socket.h" +#include "../user/profile/perf_profiler.h" +#include "../user/elf.h" +#include "../user/load.h" + +#define TEST_NAME "test_pid_check" +static int check_test_running_pid(void) +{ + int pid = find_pid_by_name(TEST_NAME, getpid()); + if (pid > 0) { + ebpf_warning("The deepflow-agent with process ID %d is already " + "running. You can disable the continuous profiling " + "feature of the deepflow-agent to skip this check.\n", + pid); + return ETR_EXIST; + } + + return ETR_NOTEXIST; +} + +int main(void) +{ + bpf_tracer_init(NULL, true); + if (check_test_running_pid() == ETR_EXIST) + return 0; + + char buf[1024]; + exec_command("./test_pid_check", "", buf, sizeof(buf)); + ebpf_info("--- %s\n", buf); + if (strstr(buf, "is already running")) { + ebpf_info("TEST success.\n"); + return 0; + } + return -1; +} diff --git a/agent/src/ebpf/user/common.c b/agent/src/ebpf/user/common.c index 80431208948..ff67a6b9942 100644 --- a/agent/src/ebpf/user/common.c +++ b/agent/src/ebpf/user/common.c @@ -21,6 +21,8 @@ #include #include #include +#include +#include /* isdigit() */ #include #include #include @@ -1116,17 +1118,23 @@ int df_enter_ns(int pid, const char *type, int *self_fd) int newns; newns = open(path, O_RDONLY); if (newns < 0) { + ebpf_warning("open() failed with %s(%d)\n", + strerror(errno), errno); return -1; } *self_fd = open(selfpath, O_RDONLY); if (*self_fd < 0) { + ebpf_warning("open() failed with %s(%d)\n", + strerror(errno), errno); return -1; } // Some ancient Linux distributions do not have setns() function int result = syscall(__NR_setns, newns, 0); close(newns); if (result < 0) { + ebpf_warning("setns(%s) failed with %s(%d)\n", + type, strerror(errno), errno); close(*self_fd); *self_fd = -1; } @@ -1305,6 +1313,286 @@ u64 kallsyms_lookup_name(const char *name) return 0; } +static inline bool __is_same_ns(int target_pid, const char *tag) +{ + struct stat self_st, target_st; + char path[64]; + snprintf(path, sizeof(path), "/proc/self/ns/%s", tag); + if (stat(path, &self_st) != 0) + return false; + + snprintf(path, sizeof(path), "/proc/%d/ns/%s", target_pid, tag); + if (stat(path, &target_st) != 0) + return false; + + if (self_st.st_ino == target_st.st_ino) { + return true; + } + + return false; +} + +bool is_same_netns(int pid) +{ + return __is_same_ns(pid, "net"); +} + +bool is_same_mntns(int pid) +{ + return __is_same_ns(pid, "mnt"); +} + +// Function to get the inode number of a Unix socket from /proc/net/unix +static ino_t get_unix_socket_inode(const char *socket_path) +{ + FILE *fp = fopen("/proc/net/unix", "r"); + if (!fp) { + perror("fopen /proc/net/unix"); + return (ino_t) - 1; + } + + char line[PATH_MAX + 100]; + while (fgets(line, sizeof(line), fp)) { + if (strstr(line, socket_path)) { + ino_t inode; + sscanf(line, "%*p: %*s %*s %*s %*s %*s %lu", &inode); + fclose(fp); + return inode; + } + } + + fclose(fp); + return (ino_t) - 1; +} + +// Function to check if a process has the specified Unix socket open +static int is_process_using_unix_socket(const char *file_path, const char *pid) +{ + char fd_dir_path[PATH_MAX]; + char target_path[PATH_MAX]; + snprintf(fd_dir_path, sizeof(fd_dir_path), "/proc/%s/fd", pid); + + DIR *dir = opendir(fd_dir_path); + if (!dir) { + return 0; + } + + struct dirent *entry; + while ((entry = readdir(dir)) != NULL) { + if (entry->d_type == DT_LNK) { + char link_path[PATH_MAX]; + ssize_t len; + snprintf(link_path, sizeof(link_path), "%s/%s", + fd_dir_path, entry->d_name); + if ((len = + readlink(link_path, target_path, + sizeof(target_path) - 1)) == -1) { + continue; + } + target_path[len] = '\0'; + + char *inode_start = strstr(target_path, "socket:["); + if (inode_start) { + inode_start += strlen("socket:["); + char *inode_end = strchr(inode_start, ']'); + if (inode_end) { + *inode_end = '\0'; + ino_t inode_number = + strtoul(inode_start, NULL, 10); + ino_t target_inode = + get_unix_socket_inode(file_path); + if (target_inode != (ino_t) - 1 + && inode_number == target_inode) { + closedir(dir); + ebpf_info + ("File '%s' is opened by another process (PID: %s).\n", + file_path, pid); + return 1; + } + } + } + } + } + + closedir(dir); + return 0; +} + +// Function to check if a regular file is opened by other processes +int is_file_opened_by_other_processes(const char *filepath) +{ + struct stat file_stat; + if (stat(filepath, &file_stat) == -1) { + return -1; + } + + if (!S_ISREG(file_stat.st_mode) && !S_ISSOCK(file_stat.st_mode)) { + fprintf(stderr, + "The specified file is neither a regular file nor a Unix socket.\n"); + return -1; + } + + DIR *proc_dir = opendir("/proc"); + if (!proc_dir) { + perror("opendir /proc"); + return -1; + } + + struct dirent *proc_entry; + while ((proc_entry = readdir(proc_dir)) != NULL) { + if (!isdigit(proc_entry->d_name[0])) + continue; // Skip non-numeric entries + + if (S_ISSOCK(file_stat.st_mode)) { + if (is_process_using_unix_socket + (filepath, proc_entry->d_name) == 1) { + closedir(proc_dir); + return 1; + } + continue; + } + + char fd_dir_path[PATH_MAX]; + snprintf(fd_dir_path, sizeof(fd_dir_path), "/proc/%s/fd", + proc_entry->d_name); + + DIR *fd_dir = opendir(fd_dir_path); + if (!fd_dir) + continue; // Skip if unable to open fd directory + + struct dirent *fd_entry; + while ((fd_entry = readdir(fd_dir)) != NULL) { + if (fd_entry->d_type != DT_LNK) + continue; // Skip non-symlink entries + + char link_path[PATH_MAX], resolved_path[PATH_MAX]; + snprintf(link_path, sizeof(link_path), "%s/%s", + fd_dir_path, fd_entry->d_name); + + ssize_t len = readlink(link_path, resolved_path, + sizeof(resolved_path) - 1); + if (len == -1) + continue; + + resolved_path[len] = '\0'; + + struct stat link_stat; + if (stat(resolved_path, &link_stat) == -1) + continue; // Skip if unable to stat the resolved path + + // Compare device and inode numbers + if (file_stat.st_dev == link_stat.st_dev + && file_stat.st_ino == link_stat.st_ino) { + if (atoi(proc_entry->d_name) != getpid()) { + closedir(fd_dir); + closedir(proc_dir); + ebpf_info + ("File '%s' is opened by another process (PID: %s).\n", + filepath, proc_entry->d_name); + return 1; // File is opened by another process + } + } + } + + closedir(fd_dir); + } + + closedir(proc_dir); + return 0; // File is not opened by any other process +} + +// Check if the substring starts with the main string +bool substring_starts_with(const char *haystack, const char *needle) +{ + int needle_len = strlen(needle); // Length of the substring + int haystack_len = strlen(haystack); // Length of the main string + + // If the substring length is greater than the main string length, return false + if (needle_len > haystack_len) { + return false; + } + // Compare the first needle_len characters + if (strncmp(haystack, needle, needle_len) == 0) { + return true; // Substring starts with the main string + } + + return false; // Substring does not start with the main string +} + +static int find_proc_form_status_file(const char *status_path, + const char *process_name) +{ +#define LINE_SIZE 256 +#define NAME_SIZE 16 +#define STATUS_PATH_SIZE 256 + + FILE *status_file = fopen(status_path, "r"); + if (status_file == NULL) { + ebpf_warning + ("Failed to open status file:%s, with %s(%d)\n", + status_file, strerror(errno), errno); + return -1; + } + + char line[LINE_SIZE]; + while (fgets(line, sizeof(line), status_file)) { + if (strncmp(line, "Name:", 5) == 0) { + char name[NAME_SIZE]; + if (sscanf(line, "Name:\t%15s", name) == 1) { + if (strcmp(name, process_name) + == 0) { + return 0; + } + } + break; + } + } + + fclose(status_file); + return -1; +} + +int find_pid_by_name(const char *process_name, int exclude_pid) +{ + struct dirent *entry; + DIR *proc = opendir("/proc"); + if (proc == NULL) { + ebpf_warning("Failed to open /proc directory, with %s(%d)\n", + strerror(errno), errno); + return -1; + } + + while ((entry = readdir(proc)) != NULL) { + if (entry->d_type == DT_DIR) { + // Check if the directory name is a number (process ID) + char *endptr; + int pid = (int)strtol(entry->d_name, &endptr, 10); + if (exclude_pid > 0 && pid == exclude_pid) + continue; + + if (*endptr == '\0' && pid > 0) { + char status_path[STATUS_PATH_SIZE]; + snprintf(status_path, sizeof(status_path), + "/proc/%d/status", pid); + if (find_proc_form_status_file + (status_path, process_name) == 0) + return pid; + } + } + } + + if (closedir(proc) == -1) { + ebpf_warning("Failed to close /proc directory, with %s(%d)\n", + strerror(errno), errno); + } + + return -1; + +#undef STATUS_PATH_SIZE +#undef LINE_SIZE +#undef NAME_SIZE +} + #if !defined(AARCH64_MUSL) && !defined(JAVA_AGENT_ATTACH_TOOL) int create_work_thread(const char *name, pthread_t * t, void *fn, void *arg) { diff --git a/agent/src/ebpf/user/common.h b/agent/src/ebpf/user/common.h index b2d0f35238e..24e0b4660e6 100644 --- a/agent/src/ebpf/user/common.h +++ b/agent/src/ebpf/user/common.h @@ -296,6 +296,9 @@ int parse_num_range(const char *config_str, int bytes_count, int parse_num_range_disorder(const char *config_str, int bytes_count, bool ** mask); int generate_random_integer(int max_value); +bool is_same_netns(int pid); +bool is_same_mntns(int pid); +int is_file_opened_by_other_processes(const char *filename); /** * @brief Find the address through kernel symbols. * @@ -304,7 +307,9 @@ int generate_random_integer(int max_value); * a non-zero value represents the address of the kernel symbol. */ u64 kallsyms_lookup_name(const char *name); +bool substring_starts_with(const char *haystack, const char *needle); char *get_timestamp_from_us(u64 microseconds); +int find_pid_by_name(const char *process_name, int exclude_pid); #if !defined(AARCH64_MUSL) && !defined(JAVA_AGENT_ATTACH_TOOL) int create_work_thread(const char *name, pthread_t *t, void *fn, void *arg); #endif /* !defined(AARCH64_MUSL) && !defined(JAVA_AGENT_ATTACH_TOOL) */ diff --git a/agent/src/ebpf/user/config.h b/agent/src/ebpf/user/config.h index 6e76daf25cc..1b7a30a2c71 100644 --- a/agent/src/ebpf/user/config.h +++ b/agent/src/ebpf/user/config.h @@ -71,7 +71,8 @@ enum { THREAD_PROFILER_READER_IDX = 0, THREAD_OFFCPU_READER_IDX = 1, THREAD_MEMORY_READER_IDX = 2, - THREAD_PROC_ACT_IDX_BASE = 3, + THREAD_PROC_EVENTS_HANDLE_IDX = 3, + THREAD_SOCK_READER_IDX_BASE = 4, }; /* diff --git a/agent/src/ebpf/user/perf_reader.h b/agent/src/ebpf/user/perf_reader.h index e0376d56bd6..1d32ae828cc 100644 --- a/agent/src/ebpf/user/perf_reader.h +++ b/agent/src/ebpf/user/perf_reader.h @@ -36,40 +36,39 @@ struct perf_reader { static inline int __reader_epoll_wait(struct bpf_perf_reader *r, - struct epoll_event *events, - int epoll_id, int timeout) + struct epoll_event *events, int epoll_id, int timeout) { - int nfds = epoll_wait(r->epoll_fds[epoll_id], events, - r->readers_count, timeout); + int nfds; +retry: + nfds = epoll_wait(r->epoll_fds[epoll_id], events, + r->readers_count, timeout); if (nfds == -1) { - ebpf_warning("epoll_wait() failed\n"); + ebpf_warning("epoll_wait() failed, with %s(%d)\n", + strerror(errno), errno); + if (errno == EINTR) { + goto retry; + } return ETR_EPOLL; - } + } return nfds; } static inline int reader_epoll_wait(struct bpf_perf_reader *r, - struct epoll_event *events, - int epoll_id) + struct epoll_event *events, int epoll_id) { - return __reader_epoll_wait(r, events, epoll_id, - r->epoll_timeout); + return __reader_epoll_wait(r, events, epoll_id, r->epoll_timeout); } static inline bool reader_epoll_short_wait(struct bpf_perf_reader *r, - struct epoll_event *events, - int epoll_id) + struct epoll_event *events, int epoll_id) { - return __reader_epoll_wait(r, events, epoll_id, - EPOLL_SHORT_TIMEOUT); + return __reader_epoll_wait(r, events, epoll_id, EPOLL_SHORT_TIMEOUT); } -static inline void -reader_event_read(struct epoll_event *events, - int nfds) +static inline void reader_event_read(struct epoll_event *events, int nfds) { int i; for (i = 0; i < nfds; ++i) { @@ -78,8 +77,7 @@ reader_event_read(struct epoll_event *events, } static inline void -reader_event_read_polling(struct epoll_event *events, - int nfds) +reader_event_read_polling(struct epoll_event *events, int nfds) { int i; for (i = 0; i < nfds; ++i) { diff --git a/agent/src/ebpf/user/proc.c b/agent/src/ebpf/user/proc.c index 9073ec37d14..78bcf6dcca9 100644 --- a/agent/src/ebpf/user/proc.c +++ b/agent/src/ebpf/user/proc.c @@ -51,8 +51,8 @@ #endif #include "libGoReSym.h" #include "profile/perf_profiler.h" -#include "profile/java/df_jattach.h" -#include "profile/java/gen_syms_file.h" +#include "profile/java/jvm_symbol_collect.h" +#include "profile/java/collect_symbol_files.h" #include "bihash_8_8.h" #include "profile/stringifier.h" #include "profile/profile_common.h" @@ -537,6 +537,7 @@ static int config_symbolizer_proc_info(struct symbolizer_proc_info *p, int pid) p->new_java_syms_file = false; p->cache_need_update = false; p->gen_java_syms_file_err = false; + p->need_new_symbol_collector = true; p->lock = 0; pthread_mutex_init(&p->mutex, NULL); p->syms_cache = 0; @@ -675,7 +676,9 @@ static void *symbols_cache_update(symbol_caches_hash_t * h, goto exit; } - ebpf_info("cache update PID %d NAME %s\n", kv->k.pid, p->comm); + if (p->need_new_symbol_collector) + ebpf_info("cache update PID %d NAME %s\n", kv->k.pid, p->comm); + add_symcache_count++; exit: diff --git a/agent/src/ebpf/user/proc.h b/agent/src/ebpf/user/proc.h index 1a42ed20ce4..e03c2127120 100644 --- a/agent/src/ebpf/user/proc.h +++ b/agent/src/ebpf/user/proc.h @@ -73,6 +73,8 @@ struct symbolizer_proc_info { /* Unknown symbols was found, and it is currently mainly used to * obtain the match of the Java process.*/ bool unknown_syms_found; + /* Is it necessary to create a new Java symbol collector? */ + bool need_new_symbol_collector; /* Expiration time (in seconds) for updating the Java symbol table */ u64 update_syms_table_time; /* process name */ diff --git a/agent/src/ebpf/user/profile/java/gen_syms_file.c b/agent/src/ebpf/user/profile/java/collect_symbol_files.c similarity index 62% rename from agent/src/ebpf/user/profile/java/gen_syms_file.c rename to agent/src/ebpf/user/profile/java/collect_symbol_files.c index 1a631a41778..05502efd144 100644 --- a/agent/src/ebpf/user/profile/java/gen_syms_file.c +++ b/agent/src/ebpf/user/profile/java/collect_symbol_files.c @@ -25,73 +25,78 @@ #include "../../tracer.h" #include "../../socket.h" #include "../../proc.h" -#include "gen_syms_file.h" +#include "collect_symbol_files.h" #include "config.h" -#include "df_jattach.h" - -extern int g_java_syms_write_bytes_max; +#include "jvm_symbol_collect.h" +#include "../perf_profiler.h" +#include "../../elf.h" +#include "../../load.h" +#include "../../perf_reader.h" +#include "../../bihash_8_8.h" +#include "../stringifier.h" +#include "../profile_common.h" static pthread_mutex_t list_lock; /* For Java symbols update task. */ static struct list_head java_syms_update_tasks_head; -/** Generate Java symbol file. +/** Collect Java symbols. * * @pid Process ID * @ret_val * Return address, used by the caller to determine subsequent processing. * @error_occurred - * 'true' indicates that an error has occurred at some point, + * 'true' indicates that an error has occurred at some point, + * no symbol collection for this process. * 'false' indicates that no error has occurred. */ -void gen_java_symbols_file(int pid, int *ret_val, bool error_occurred) +void collect_java_symbols(int pid, int *ret_val, bool error_occurred) { /* * If an error has occurred at some point, no further retries will * be attempted. */ if (error_occurred) { - goto error; - } - - *ret_val = JAVA_SYMS_OK; - int target_ns_pid = get_nspid(pid); - if (target_ns_pid < 0) { + *ret_val = JAVA_SYMS_COLLECT_ERR; return; } - char args[PERF_PATH_SZ * 2]; - snprintf(args, sizeof(args), - "%d %d," DF_AGENT_LOCAL_PATH_FMT ".map," - DF_AGENT_LOCAL_PATH_FMT ".log", pid, - g_java_syms_write_bytes_max, pid, pid); - - char ret_buf[1024]; - memset(ret_buf, 0, sizeof(ret_buf)); + *ret_val = JAVA_SYMS_COLLECT_OK; + bool is_new_collector; u64 start_time = gettime(CLOCK_MONOTONIC, TIME_TYPE_NAN); - exec_command(DF_JAVA_ATTACH_CMD, args, ret_buf, sizeof(ret_buf)); + if (update_java_symbol_file(pid, &is_new_collector)) + goto error; u64 end_time = gettime(CLOCK_MONOTONIC, TIME_TYPE_NAN); - if (target_symbol_file_access(pid, target_ns_pid, true) != 0) { + if (target_symbol_file_access(pid) != 0) { goto error; } - i64 new_file_sz = get_local_symbol_file_sz(pid, target_ns_pid); + i64 new_file_sz = get_local_symbol_file_sz(pid); if (new_file_sz == 0) { goto error; } - ebpf_info("Refreshing JAVA symbol file: " DF_AGENT_LOCAL_PATH_FMT - ".map, PID %d, size %ld, cost %lu us", - pid, pid, new_file_sz, (end_time - start_time) / 1000ULL); + if (is_new_collector) { + ebpf_info("Refreshing JAVA symbol file: " + DF_AGENT_LOCAL_PATH_FMT + ".map, PID %d, size %ld, cost %lu us", pid, pid, + new_file_sz, (end_time - start_time) / 1000ULL); + *ret_val = JAVA_SYMS_NEW_COLLECTOR; + } else { + *ret_val = JAVA_SYMS_NEED_UPDATE; + } - *ret_val = JAVA_SYMS_NEED_UPDATE; return; + error: - *ret_val = JAVA_SYMS_ERR; - ebpf_warning("Generate Java symbol files failed. PID %d\n%s\n", pid, - ret_buf); + if (is_new_collector) + *ret_val = JAVA_CREATE_COLLECTOR_ERR; + else + *ret_val = JAVA_SYMS_COLLECT_ERR; + + ebpf_warning("Generate Java symbol files failed. PID %d\n", pid); } void clean_local_java_symbols_files(int pid) @@ -119,6 +124,10 @@ void add_java_syms_update_task(struct symbolizer_proc_info *p_info) void java_syms_update_main(void *arg) { + // Ensure the profiler is initialized and currently running + while (!profiler_is_running()) + CLIB_PAUSE(); + pthread_mutex_init(&list_lock, NULL); init_list_head(&java_syms_update_tasks_head); struct java_syms_update_task *task; @@ -141,17 +150,31 @@ void java_syms_update_main(void *arg) /* JAVA process has not exited. */ if (AO_GET(&p->use) > 1) { int ret; - gen_java_symbols_file(p->pid, &ret, - p->gen_java_syms_file_err); - if (ret != JAVA_SYMS_ERR) { - if (ret == JAVA_SYMS_NEED_UPDATE) + collect_java_symbols(p->pid, &ret, + p->gen_java_syms_file_err); + if (ret != JAVA_SYMS_COLLECT_ERR + && ret != JAVA_CREATE_COLLECTOR_ERR) { + if (ret == JAVA_SYMS_NEED_UPDATE + || ret == JAVA_SYMS_NEW_COLLECTOR) p->cache_need_update = true; else p->cache_need_update = false; + if (ret != JAVA_SYMS_NEW_COLLECTOR) { + p->need_new_symbol_collector = + false; + } + p->gen_java_syms_file_err = false; } else { - p->gen_java_syms_file_err = true; + /* + * Mark an error occurred when creating collector, + * no further symbol collection for this process. + */ + if (ret == JAVA_CREATE_COLLECTOR_ERR) + p->gen_java_syms_file_err = + true; + p->cache_need_update = false; } diff --git a/agent/src/ebpf/user/profile/java/gen_syms_file.h b/agent/src/ebpf/user/profile/java/collect_symbol_files.h similarity index 78% rename from agent/src/ebpf/user/profile/java/gen_syms_file.h rename to agent/src/ebpf/user/profile/java/collect_symbol_files.h index 2470c5ea5e3..717b92ef372 100644 --- a/agent/src/ebpf/user/profile/java/gen_syms_file.h +++ b/agent/src/ebpf/user/profile/java/collect_symbol_files.h @@ -14,14 +14,14 @@ * limitations under the License. */ -#ifndef GEN_SYMS_FILE_H -#define GEN_SYMS_FILE_H +#ifndef COLLECT_SYMS_FILE_H +#define COLLECT_SYMS_FILE_H -#define JAVA_SYMS_OK 0 -#define JAVA_SYMS_ERR 1 -#define JAVA_SYMS_NEED_UPDATE 2 - -#define DF_JAVA_ATTACH_CMD "/usr/bin/deepflow-jattach" +#define JAVA_SYMS_COLLECT_OK 0 +#define JAVA_SYMS_COLLECT_ERR 1 +#define JAVA_CREATE_COLLECTOR_ERR 2 +#define JAVA_SYMS_NEED_UPDATE 3 +#define JAVA_SYMS_NEW_COLLECTOR 4 struct java_syms_update_task { struct list_head list; @@ -32,4 +32,4 @@ void gen_java_symbols_file(int pid, int *ret_val, bool error_occurred); void clean_local_java_symbols_files(int pid); void add_java_syms_update_task(struct symbolizer_proc_info *p_info); void java_syms_update_main(void *arg); -#endif /* GEN_SYMS_FILE_H */ +#endif /* COLLECT_SYMS_FILE_H */ diff --git a/agent/src/ebpf/user/profile/java/config.h b/agent/src/ebpf/user/profile/java/config.h index 62b8d524a65..85747a3bea0 100644 --- a/agent/src/ebpf/user/profile/java/config.h +++ b/agent/src/ebpf/user/profile/java/config.h @@ -17,10 +17,33 @@ #ifndef DF_JAVA_CONFIG_H #define DF_JAVA_CONFIG_H +// Maximum length of Java symbol information string +#define STRING_BUFFER_SIZE 2000 +/* + * In Unix domain sockets, the maximum length of the path is defined by + * the macro UNIX_PATH_MAX. For most systems (e.g., Linux), this maximum + * length is typically 108 characters. + */ +#define UNIX_PATH_MAX 108 + +// The upper limit for updating Java symbol files during method unload events. +#define UPDATE_SYMS_FILE_UNLOAD_HIGH_THRESH 100 + +enum event_type { + METHOD_LOAD, + METHOD_UNLOAD, + DYNAMIC_CODE_GEN +}; + +struct symbol_metadata { + unsigned short len; + unsigned short type; +}; + #define TARGET_NS_STORAGE_PATH "/proc/%d/root/deepflow" #if !defined(AGENT_LIB_NAME) || !defined(AGENT_MUSL_LIB_NAME) - #error Makefile should define "AGENT_LIB_NAME" and "AGENT_MUSL_LIB_NAME" +#error Makefile should define "AGENT_LIB_NAME" and "AGENT_MUSL_LIB_NAME" #endif #define AGENT_LIB_SRC_PATH "/tmp/" AGENT_LIB_NAME @@ -29,16 +52,15 @@ #define AGENT_MUSL_LIB_SRC_PATH "/tmp/" AGENT_MUSL_LIB_NAME #define AGENT_MUSL_LIB_TARGET_PATH "/deepflow/" AGENT_MUSL_LIB_NAME -#define JAVA_LOG_TAG "[JAVA]" +#define JAVA_LOG_TAG "[JAVA] " #define PERF_PATH_SZ 256 -#define DF_AGENT_MAP_PATH_FMT "/proc/%d/root/deepflow/df-perf-%d.map" -#define DF_AGENT_LOG_PATH_FMT "/proc/%d/root/deepflow/df-perf-%d.log" +#define DF_AGENT_MAP_SOCKET_PATH_FMT "/proc/%d/root/tmp/.deepflow-java-symbols-pid%d.socket" +#define DF_AGENT_LOG_SOCKET_PATH_FMT "/proc/%d/root/tmp/.deepflow-java-jvmti-logs-pid%d.socket" -#define DF_AGENT_PATH_FMT "/proc/%d/root/deepflow/df-perf-%d" #define DF_AGENT_LOCAL_PATH_FMT "/tmp/perf-%d" -#define PERF_MAP_FILE_FMT "/deepflow/df-perf-%d.map" -#define PERF_MAP_LOG_FILE_FMT "/deepflow/df-perf-%d.log" +#define JVM_AGENT_SYMS_SOCKET_PATH_FMT "/tmp/.deepflow-java-symbols-pid%d.socket" +#define JVM_AGENT_LOG_SOCKET_PATH_FMT "/tmp/.deepflow-java-jvmti-logs-pid%d.socket" #endif /* DF_JAVA_CONFIG_H */ diff --git a/agent/src/ebpf/user/profile/java/df_jattach.c b/agent/src/ebpf/user/profile/java/df_jattach.c deleted file mode 100644 index 29b7328c749..00000000000 --- a/agent/src/ebpf/user/profile/java/df_jattach.c +++ /dev/null @@ -1,773 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "../../config.h" -#include "../../common.h" -#include "../../log.h" -#include "config.h" -#include "df_jattach.h" - -#define jattach_log(fmt, ...) \ - do { \ - fprintf(stdout, fmt, ##__VA_ARGS__); \ - fflush(stdout); \ - } while(0) - -static char agent_lib_so_path[MAX_PATH_LENGTH]; -extern int jattach(int pid, int argc, char **argv); - -static int agent_so_lib_copy(const char *src, const char *dst, int uid, int gid) -{ - if (access(src, F_OK)) { - jattach_log("Fun %s src file '%s' not exist.\n", __func__, src); - return ETR_NOTEXIST; - } - - if (copy_file(src, dst)) { - return ETR_INVAL; - } - - if (chown(dst, uid, gid) != 0) { - jattach_log - ("Failed to change ownership and group. file '%s'\n", dst); - return ETR_INVAL; - } - - return ETR_OK; -} - -static int copy_agent_libs_into_target_ns(pid_t target_pid, int target_uid, - int target_gid) -{ - /* - * Call this function only when the target process is in a subordinate - * namespace. Here, we copy the agent.so to a temporary path within t- - * he mounted namespace. We also change the file ownership so that the - * target process sees itself as the owner of the file (this is neces- - * sary because some versions of Java might reject proxy injection - * otherwise). - */ - int ret; - char copy_target_path[MAX_PATH_LENGTH]; - int len = snprintf(copy_target_path, sizeof(copy_target_path), - TARGET_NS_STORAGE_PATH, target_pid); - if (access(copy_target_path, F_OK)) { - /* - * The purpose of umask(0); is to set the current process's file - * creation mask (umask) to 0, which means that no permission - * bits will be cleared when creating a file or directory. Files - * and directories will have the permission bits specified at the - * time of creation. - */ - umask(0); - - if (mkdir(copy_target_path, 0777) != 0) { - jattach_log(JAVA_LOG_TAG "Fun %s cannot mkdir() '%s'\n", - __func__, copy_target_path); - - return ETR_NOTEXIST; - } - } - - snprintf(copy_target_path + len, sizeof(copy_target_path) - len, - "/%s", AGENT_LIB_NAME); - - if ((ret = - agent_so_lib_copy(AGENT_LIB_SRC_PATH, - copy_target_path, target_uid, - target_gid)) != ETR_OK) { - jattach_log("cp '%s' to '%s' failed.\n", AGENT_LIB_SRC_PATH, - copy_target_path); - return ret; - } - - snprintf(copy_target_path + len, sizeof(copy_target_path) - len, - "/%s", AGENT_MUSL_LIB_NAME); - - if ((ret = - agent_so_lib_copy(AGENT_MUSL_LIB_SRC_PATH, - copy_target_path, target_uid, - target_gid)) != ETR_OK) { - jattach_log("cp '%s' to '%s' failed.\n", - AGENT_MUSL_LIB_SRC_PATH, copy_target_path); - return ret; - } - - return ETR_OK; -} - -bool test_dl_open(const char *so_lib_file_path) -{ - if (access(so_lib_file_path, F_OK)) { - jattach_log(JAVA_LOG_TAG "Fun %s file '%s' not exist.\n", - __func__, so_lib_file_path); - - return false; - } - - /* - * By calling dlerror() before each dlopen()/dlsym() invocation, - * you can clear any prior error state, ensuring that you accur- - * ately obtain error information pertaining to the current - * operation. - */ - dlerror(); - void *h = dlopen(so_lib_file_path, RTLD_LAZY); - - if (h == NULL) { - jattach_log(JAVA_LOG_TAG - "Fuc '%s' dlopen() path %s failure: %s.", __func__, - so_lib_file_path, dlerror()); - return false; - } - - dlerror(); - agent_test_t test_fn = - (uint64_t(*)(void))dlsym(h, "df_java_agent_so_libs_test"); - - if (test_fn == NULL) { - jattach_log(JAVA_LOG_TAG - "Func '%s' dlsym() path %s failure: %s.", __func__, - so_lib_file_path, dlerror()); - return false; - } - - const uint64_t expected_test_fn_result = - JAVA_AGENT_LIBS_TEST_FUN_RET_VAL; - const uint64_t observed_test_fn_result = test_fn(); - - if (observed_test_fn_result != expected_test_fn_result) { - jattach_log(JAVA_LOG_TAG - "%s test '%s' function returned: %lu, expected %lu.", - __func__, so_lib_file_path, observed_test_fn_result, - expected_test_fn_result); - return false; - } - - jattach_log(JAVA_LOG_TAG "%s: Success for %s.\n", __func__, - so_lib_file_path); - return true; -} - -static void select_suitable_agent_lib(pid_t pid, bool is_same_mntns) -{ - /* Enter pid & mount namespace for target pid, - * and use dlopen() in that namespace.*/ - int pid_self_fd, mnt_self_fd; - df_enter_ns(pid, "pid", &pid_self_fd); - df_enter_ns(pid, "mnt", &mnt_self_fd); - - agent_lib_so_path[0] = '\0'; - char test_path[PERF_PATH_SZ]; - if (!is_same_mntns) - snprintf(test_path, sizeof(test_path), "%s", - AGENT_LIB_TARGET_PATH); - else - snprintf(test_path, sizeof(test_path), "%s", - AGENT_LIB_SRC_PATH); - - if (test_dl_open(test_path)) { - snprintf(agent_lib_so_path, MAX_PATH_LENGTH, "%s", test_path); - jattach_log(JAVA_LOG_TAG - "Func %s target PID %d test %s, success.\n", - __func__, pid, test_path); - goto found; - } - - if (!is_same_mntns) - snprintf(test_path, sizeof(test_path), "%s", - AGENT_MUSL_LIB_TARGET_PATH); - else - snprintf(test_path, sizeof(test_path), "%s", - AGENT_MUSL_LIB_SRC_PATH); - - if (test_dl_open(test_path)) { - snprintf(agent_lib_so_path, MAX_PATH_LENGTH, "%s", test_path); - jattach_log(JAVA_LOG_TAG - "Func %s target PID %d test %s, success.\n", - __func__, pid, test_path); - goto found; - } - - jattach_log(JAVA_LOG_TAG "%s test agent so libs, failure.", __func__); - -found: - - if (!is_same_mntns) { - if (strcmp(agent_lib_so_path, AGENT_LIB_TARGET_PATH) == 0) { - clear_target_ns_tmp_file(AGENT_MUSL_LIB_TARGET_PATH); - } else { - clear_target_ns_tmp_file(AGENT_LIB_TARGET_PATH); - } - } - - df_exit_ns(pid_self_fd); - df_exit_ns(mnt_self_fd); -} - -static int attach(pid_t pid, char *opts) -{ - char *argv[] = { "load", agent_lib_so_path, "true", opts }; - int argc = sizeof(argv) / sizeof(argv[0]); - printf("argc %d opts %s\n", argc, argv[3]); - int ret = jattach(pid, argc, (char **)argv); - jattach_log(JAVA_LOG_TAG - "jattach pid %d argv: \"load %s true\" return %d\n", pid, - agent_lib_so_path, ret); - - return ret; -} - -void clear_target_ns_tmp_file(const char *target_path) -{ - if (access(target_path, F_OK) == 0) { - if (unlink(target_path) != 0) - jattach_log(JAVA_LOG_TAG "rm file %s failed\n", - target_path); - } -} - -static inline bool __is_same_ns(int target_pid, const char *tag) -{ - struct stat self_st, target_st; - char path[64]; - snprintf(path, sizeof(path), "/proc/self/ns/%s", tag); - if (stat(path, &self_st) != 0) - return false; - - snprintf(path, sizeof(path), "/proc/%d/ns/%s", target_pid, tag); - if (stat(path, &target_st) != 0) - return false; - - if (self_st.st_ino == target_st.st_ino) { - return true; - } - - return false; -} - -static bool __unused is_same_netns(int pid) -{ - return __is_same_ns(pid, "net"); -} - -bool is_same_mntns(int pid) -{ - return __is_same_ns(pid, "mnt"); -} - -void clear_local_perf_files(int pid) -{ - char local_path[MAX_PATH_LENGTH]; - snprintf(local_path, sizeof(local_path), - DF_AGENT_LOCAL_PATH_FMT ".map", pid); - clear_target_ns_tmp_file(local_path); - - snprintf(local_path, sizeof(local_path), - DF_AGENT_LOCAL_PATH_FMT ".log", pid); - clear_target_ns_tmp_file(local_path); -} - -void clear_target_ns(int pid, int my_pid) -{ - /* - * Delete files: - * path/df-perf-.map - * path/df-perf-.log - * path/df_java_agent.so - * path/df_java_agent_musl.so - */ - - if (is_same_mntns(pid)) - return; - - char target_path[MAX_PATH_LENGTH]; - snprintf(target_path, sizeof(target_path), - DF_AGENT_MAP_PATH_FMT, pid, my_pid); - clear_target_ns_tmp_file(target_path); - snprintf(target_path, sizeof(target_path), - DF_AGENT_LOG_PATH_FMT, pid, my_pid); - clear_target_ns_tmp_file(target_path); - - snprintf(target_path, sizeof(target_path), "/proc/%d/root%s", pid, - AGENT_MUSL_LIB_TARGET_PATH); - clear_target_ns_tmp_file(target_path); - snprintf(target_path, sizeof(target_path), "/proc/%d/root%s", pid, - AGENT_LIB_TARGET_PATH); - clear_target_ns_tmp_file(target_path); - - snprintf(target_path, sizeof(target_path), TARGET_NS_STORAGE_PATH, pid); - rmdir(target_path); -} - -static int get_target_ns_info(const char *tag, struct stat *st) -{ - int fd; - char selfpath[64]; - snprintf(selfpath, sizeof(selfpath), "/proc/self/ns/%s", tag); - if (st != NULL) { - if (stat(selfpath, st) != 0) - return -1; - } - - fd = open(selfpath, O_RDONLY); - if (fd < 0) - return -1; - - return fd; -} - -static inline void get_nsfd_and_stat(const char *tag, struct stat *st, int *fd) -{ - *fd = get_target_ns_info(tag, st); -} - -static inline void switch_to_root_ns(int root_fd) -{ - /* - * If the user of the target namespace is a non-root user, it will be - * impossible to switch the target namespace to the root namespace. - * There may be a better solution. - * (TODO @jiping) - */ - df_exit_ns(root_fd); -} - -static inline i64 get_symbol_file_size(int pid, int ns_pid, bool is_target) -{ - char path[PERF_PATH_SZ]; - if (is_target) - snprintf(path, sizeof(path), DF_AGENT_PATH_FMT ".map", - pid, ns_pid); - else - snprintf(path, sizeof(path), - DF_AGENT_LOCAL_PATH_FMT ".map", pid); - - if (access(path, F_OK)) { - return -1; - } - - struct stat st; - if (stat(path, &st) == 0) { - return (i64) st.st_size; - } - - return -1; -} - -int target_symbol_file_access(int pid, int ns_pid, bool is_same_mnt) -{ - char path[PERF_PATH_SZ]; - if (!is_same_mnt) - snprintf(path, sizeof(path), DF_AGENT_PATH_FMT ".map", - pid, ns_pid); - else - snprintf(path, sizeof(path), - DF_AGENT_LOCAL_PATH_FMT ".map", pid); - - return access(path, F_OK); -} - -i64 get_target_symbol_file_sz(int pid, int ns_pid) -{ - return get_symbol_file_size(pid, ns_pid, true); -} - -i64 get_local_symbol_file_sz(int pid, int ns_pid) -{ - return get_symbol_file_size(pid, ns_pid, false); -} - -int copy_file_from_target_ns(int pid, int ns_pid, const char *file_type) -{ - char target_path[PERF_PATH_SZ]; - char src_path[PERF_PATH_SZ]; - snprintf(src_path, sizeof(src_path), DF_AGENT_PATH_FMT ".%s", - pid, ns_pid, file_type); - snprintf(target_path, sizeof(target_path), - DF_AGENT_LOCAL_PATH_FMT ".%s", pid, file_type); - - if (access(src_path, F_OK)) { - return -1; - } - - if (access(target_path, F_OK) == 0) { - if (unlink(target_path) != 0) { - return -1; - } - } - - if (copy_file(src_path, target_path)) { - jattach_log("Copy '%s' to '%s' failed.\n", src_path, - target_path); - } - - return 0; -} - -// parse comma separated arguments -int parse_config(char *opts, options_t *parsed) -{ - char line[PERF_PATH_SZ * 2]; - strncpy(line, opts, PERF_PATH_SZ * 2); - line[PERF_PATH_SZ * 2 - 1] = '\0'; - - char *token = strtok(line, ","); - if (token == NULL) { - jattach_log("Bad argument line %s\n", opts); - return -1; - } - parsed->perf_map_size_limit = atoi(token); - - token = strtok(NULL, ","); - if (token == NULL) { - jattach_log("Bad argument line %s\n", opts); - return -1; - } - strncpy(parsed->perf_map_path, token, PERF_PATH_SZ); - parsed->perf_map_path[PERF_PATH_SZ - 1] = '\0'; - - token = strtok(NULL, ","); - if (token == NULL) { - jattach_log("Bad argument line %s\n", opts); - return -1; - } - strncpy(parsed->perf_log_path, token, PERF_PATH_SZ); - parsed->perf_log_path[PERF_PATH_SZ - 1] = '\0'; - - return 0; -} - -int java_attach_same_namespace(pid_t pid, options_t *opts) -{ - /* - * If the agent is installed directly on the node or host, - * be careful to delete the perf-pid.map and - * perf-pid.log on them. - */ - clear_target_ns_tmp_file(opts->perf_map_path); - clear_target_ns_tmp_file(opts->perf_log_path); - - /* - * In containers, different libc implementations may be used to compile agent - * libraries, primarily two types: glibc and musl. We must provide both vers- - * ions of the agent library. So, which one should we choose? To determine t- - * his, we need to enter the target process's namespace and test each library - * until we find one that can be successfully loaded using dlopen. - */ - select_suitable_agent_lib(pid, true); - - if (strlen(agent_lib_so_path) == 0) - return -1; - - char buffer[PERF_PATH_SZ * 2]; - snprintf(buffer, PERF_PATH_SZ * 2, "%d,%s,%s", - opts->perf_map_size_limit, - opts->perf_map_path, - opts->perf_log_path); - - /* Invoke the jattach (https://github.com/apangin/jattach) to inject the - * library as a JVMTI agent.*/ - return attach(pid, buffer); -} - -int create_ipc_socket(const char *path) -{ - int sock = -1; - if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { - jattach_log("Create unix socket failed with %d\n", errno); - return -1; - } - - struct sockaddr_un addr = { .sun_family = AF_UNIX }; - strncpy(addr.sun_path, path, UNIX_PATH_MAX - 1); - int len = sizeof(addr.sun_family) + strlen(addr.sun_path); - if (bind(sock, (struct sockaddr *)&addr, len) < 0) { - jattach_log("Bind unix socket failed with %d\n", errno); - return -1; - } - if (listen(sock, 1) < 0) { - jattach_log("Listen on unix socket failed with %d\n", errno); - unlink(path); - return -1; - } - - return sock; -} - -static void *ipc_receiver_thread(void *arguments) -{ - receiver_args_t *args = (receiver_args_t *)arguments; - - FILE *map_fp = fopen(args->opts->perf_map_path, "w"); - if (!map_fp) { - // byte stream in socket needs to be consumed to avoid client stuck - // even if file open fails - jattach_log("fopen() %s failed with %d\n", args->opts->perf_map_path, errno); - } - FILE *log_fp = fopen(args->opts->perf_log_path, "w"); - if (!log_fp) { - // byte stream in socket needs to be consumed to avoid client stuck - // even if file open fails - jattach_log("fopen() %s failed with %d\n", args->opts->perf_log_path, errno); - } - - int map_sock = args->map_socket; - int log_sock = args->log_socket; - int map_client = -1; - int log_client = -1; - bool map_done = false, log_done = false; - - int max_fd = map_sock > log_sock ? map_sock : log_sock; - fd_set fds; - FD_ZERO(&fds); - FD_SET(map_sock, &fds); - FD_SET(log_sock, &fds); - - struct timeval timeout = { .tv_sec = 1, .tv_usec = 0 }; - - char rcv_buf[BUFSIZE]; - - while (!*args->done && !(map_done && log_done)) { - struct timeval tv = timeout; - fd_set read_fds = fds; - if (map_client >= 0) { - FD_SET(map_client, &read_fds); - } - if (log_client >= 0) { - FD_SET(log_client, &read_fds); - } - - int ret = select(max_fd + 1, &read_fds, 0, 0, &tv); - if (ret == -1) { - jattach_log("select() failed with %d\n", errno); - continue; - } - - if (FD_ISSET(map_sock, &read_fds)) { - if (map_client >= 0) { - jattach_log("map socket already accepted\n"); - } else if ((map_client = accept(map_sock, NULL, NULL)) < 0) { - jattach_log("accept() failed with %d\n", errno); - } else { - max_fd = map_client > max_fd ? map_client : max_fd; - } - } - - if (FD_ISSET(log_sock, &read_fds)) { - if (log_client >= 0) { - jattach_log("log socket already accepted\n"); - } else if ((log_client = accept(log_sock, NULL, NULL)) < 0) { - jattach_log("accept() failed with %d\n", errno); - } else { - max_fd = log_client > max_fd ? log_client : max_fd; - } - } - - if (FD_ISSET(map_client, &read_fds)) { - int n = recv(map_client, rcv_buf, sizeof(rcv_buf), 0); - if (n > 0) { - if (map_fp) { - fwrite(rcv_buf, sizeof(char), n, map_fp); - } - } else if (n == 0 || (n < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)) { - if (n < 0) { - jattach_log("recv() failed with %d\n", errno); - } - close(map_client); - map_client = -1; - map_done = true; - } - } - - if (FD_ISSET(log_client, &read_fds)) { - int n = recv(log_client, rcv_buf, sizeof(rcv_buf), 0); - if (n > 0) { - if (log_fp) { - fwrite(rcv_buf, sizeof(char), n, log_fp); - } - } else if (n == 0 || (n < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK)) { - if (n < 0) { - jattach_log("recv() failed with %d\n", errno); - } - close(log_client); - log_client = -1; - log_done = true; - } - } - } - - if (map_fp) { - fclose(map_fp); - map_fp = NULL; - } - if (log_fp) { - fclose(log_fp); - log_fp = NULL; - } - - return NULL; -} - -int java_attach_different_namespace(pid_t pid, options_t *opts) -{ - int ret = -1; - int uid, gid; - int map_socket = -1, log_socket = -1; - - if (get_target_uid_and_gid(pid, &uid, &gid)) { - return -1; - } - - /* if pid == target_ns_pid, run in same namespace */ - int target_ns_pid = get_nspid(pid); - if (target_ns_pid < 0) { - return -1; - } - - int my_pid = getpid(); - - /* - * Delete the files on the target file system if they - * are not on the same mount point. - */ - clear_target_ns(pid, my_pid); - - /* - * Here, the original method of determination (based on whether the net - * namespace is the same) is modified to use the mnt namespace for comparison, - * thus avoiding situations where both the net namespace and pid namespace are - * the same but the file system is different. - */ - - /* - * If the target Java process is in a subordinate namespace, copy the - * 'agent.so' into the artifacts path (in /tmp) inside of that namespace - * (for visibility to the target process). - */ - jattach_log("[PID %d] copy agent os library ...\n", pid); - if (copy_agent_libs_into_target_ns(pid, uid, gid)) { - jattach_log("[PID %d] copy agent os library failed.\n", - pid); - goto cleanup; - } - jattach_log("[PID %d] copy agent os library success.\n", pid); - - /* - * In containers, different libc implementations may be used to compile agent - * libraries, primarily two types: glibc and musl. We must provide both vers- - * ions of the agent library. So, which one should we choose? To determine t- - * his, we need to enter the target process's namespace and test each library - * until we find one that can be successfully loaded using dlopen. - */ - select_suitable_agent_lib(pid, false); - - if (strlen(agent_lib_so_path) == 0) - goto cleanup; - - /* Invoke the jattach (https://github.com/apangin/jattach) to inject the - * library as a JVMTI agent.*/ - - // make the sockets accessable from unprivileged user in container - umask(0); - - char buffer[PERF_PATH_SZ * 2]; - snprintf(buffer, PERF_PATH_SZ, DF_AGENT_MAP_PATH_FMT, pid, my_pid); - if ((map_socket = create_ipc_socket(buffer)) < 0) { - goto cleanup; - } - snprintf(buffer, PERF_PATH_SZ, DF_AGENT_LOG_PATH_FMT, pid, my_pid); - if ((log_socket = create_ipc_socket(buffer)) < 0) { - goto cleanup; - } - - pthread_t ipc_receiver; - bool done = false; - receiver_args_t args = { - .opts = opts, - .map_socket = map_socket, - .log_socket = log_socket, - .done = &done, - }; - if ((ret = pthread_create(&ipc_receiver, NULL, &ipc_receiver_thread, &args)) < 0) { - jattach_log("Create ipc receiver thread failed with errno(%d)\n", errno); - goto cleanup; - } - - snprintf(buffer, PERF_PATH_SZ * 2, - "%d," PERF_MAP_FILE_FMT "," PERF_MAP_LOG_FILE_FMT, - opts->perf_map_size_limit, my_pid, my_pid); - ret = attach(pid, buffer); - - done = true; - pthread_join(ipc_receiver, NULL); - -cleanup: - if (map_socket >= 0) { - close(map_socket); - } - if (log_socket >= 0) { - close(log_socket); - } - // attach() may change euid/egid, restore them to remove tmp files - if (seteuid(getuid()) < 0) { - jattach_log("seteuid() failed with errno(%d)\n", errno); - } - if (setegid(getgid()) < 0) { - jattach_log("seteuid() failed with errno(%d)\n", errno); - } - clear_target_ns(pid, my_pid); - return ret; -} - -int java_attach(pid_t pid, char *opts) -{ - options_t parsed_opts; - if (parse_config(opts, &parsed_opts) != 0) { - return -1; - } - - if (is_same_mntns(pid)) { - return java_attach_same_namespace(pid, &parsed_opts); - } else { - return java_attach_different_namespace(pid, &parsed_opts); - } -} - -#ifdef JAVA_AGENT_ATTACH_TOOL -int main(int argc, char **argv) -{ - if (argc != 3) { - fprintf(stderr, "Usage: %s \n", argv[0]); - return -1; - } - - log_to_stdout = true; - int pid = atoi(argv[1]); - return java_attach(pid, argv[2]); -} -#endif /* JAVA_AGENT_ATTACH_TOOL */ diff --git a/agent/src/ebpf/user/profile/java/df_jattach.h b/agent/src/ebpf/user/profile/java/df_jattach.h deleted file mode 100644 index 2d0de6ed84b..00000000000 --- a/agent/src/ebpf/user/profile/java/df_jattach.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (c) 2024 Yunshan Networks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef DF_JATTACH_H -#define DF_JATTACH_H - -#include "config.h" - -#define BUFSIZE 1024 -#define UNIX_PATH_MAX 108 - -typedef uint64_t(*agent_test_t) (void); - -typedef struct options { - int perf_map_size_limit; - char perf_map_path[PERF_PATH_SZ]; - char perf_log_path[PERF_PATH_SZ]; -} options_t; - -typedef struct receiver_args { - options_t *opts; - int map_socket; - int log_socket; - bool *done; -} receiver_args_t; - -void clear_target_ns_tmp_file(const char *target_path); -int copy_file_from_target_ns(int pid, int ns_pid, const char *file_type); -void clear_target_ns(int pid, int target_ns_pid); -void clear_target_ns_so(int pid, int target_ns_pid); -void clear_local_perf_files(int pid); -bool is_same_mntns(int target_pid); -i64 get_target_symbol_file_sz(int pid, int ns_pid); -i64 get_local_symbol_file_sz(int pid, int ns_pid); -int target_symbol_file_access(int pid, int ns_pid, bool is_same_mnt); -#endif /* DF_JATTACH_H */ diff --git a/agent/src/ebpf/user/profile/java/jvm_symbol_collect.c b/agent/src/ebpf/user/profile/java/jvm_symbol_collect.c new file mode 100644 index 00000000000..0b8567e6e1c --- /dev/null +++ b/agent/src/ebpf/user/profile/java/jvm_symbol_collect.c @@ -0,0 +1,1429 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "../../config.h" +#include "../../common.h" +#include "../../log.h" +#include "../../mem.h" +#include "../../vec.h" +#include "config.h" +#include "jvm_symbol_collect.h" + +#define SYM_COLLECT_MAX_EVENTS 4 + +// Use thread pool to manage threads for obtaining Java symbols. +symbol_collect_thread_pool_t *g_collect_pool; + +/* + * Use a dynamic array to store the addresses of 'COMPILED_METHOD_UNLOAD' + * sent by the Java JVM. This is a per-thread variable, with each thread + * handling the data sent by the corresponding JVM. + */ +static __thread java_unload_addr_str_t *unload_addrs; +extern int jattach(int pid, int argc, char **argv, int print_output); +static int create_symbol_collect_task(pid_t pid, options_t * opts, + bool is_same_mntns); + +bool test_dl_open(const char *so_lib_file_path) +{ + if (access(so_lib_file_path, F_OK)) { + ebpf_warning(JAVA_LOG_TAG "Fun %s file '%s' not exist.\n", + __func__, so_lib_file_path); + + return false; + } + + /* + * By calling dlerror() before each dlopen()/dlsym() invocation, + * you can clear any prior error state, ensuring that you accur- + * ately obtain error information pertaining to the current + * operation. + */ + dlerror(); + void *h = dlopen(so_lib_file_path, RTLD_LAZY); + + if (h == NULL) { + ebpf_warning(JAVA_LOG_TAG + "Fuc '%s' dlopen() path %s failure: %s.", __func__, + so_lib_file_path, dlerror()); + return false; + } + + dlerror(); + agent_test_t test_fn = + (uint64_t(*)(void))dlsym(h, "df_java_agent_so_libs_test"); + + if (test_fn == NULL) { + ebpf_warning(JAVA_LOG_TAG + "Func '%s' dlsym() path %s failure: %s.", __func__, + so_lib_file_path, dlerror()); + return false; + } + + const uint64_t expected_test_fn_result = + JAVA_AGENT_LIBS_TEST_FUN_RET_VAL; + const uint64_t observed_test_fn_result = test_fn(); + + if (observed_test_fn_result != expected_test_fn_result) { + ebpf_warning(JAVA_LOG_TAG + "%s test '%s' function returned: %lu, expected %lu.", + __func__, so_lib_file_path, + observed_test_fn_result, expected_test_fn_result); + return false; + } + + ebpf_info(JAVA_LOG_TAG "%s: Success for %s.\n", __func__, + so_lib_file_path); + return true; +} + +void clear_target_ns_tmp_file(const char *target_path) +{ + if (access(target_path, F_OK) == 0) { + if (unlink(target_path) != 0) + ebpf_warning(JAVA_LOG_TAG "rm file %s failed\n", + target_path); + } +} + +void clear_local_perf_files(int pid) +{ + char local_path[MAX_PATH_LENGTH]; + snprintf(local_path, sizeof(local_path), + DF_AGENT_LOCAL_PATH_FMT ".map", pid); + clear_target_ns_tmp_file(local_path); + + snprintf(local_path, sizeof(local_path), + DF_AGENT_LOCAL_PATH_FMT ".log", pid); + clear_target_ns_tmp_file(local_path); +} + +int check_and_clear_unix_socket_files(int pid, bool check_in_use) +{ + char target_path[MAX_PATH_LENGTH]; + snprintf(target_path, sizeof(target_path), + DF_AGENT_MAP_SOCKET_PATH_FMT, pid, pid); + + if (check_in_use) { + if (is_file_opened_by_other_processes(target_path) == 1) { + ebpf_warning(JAVA_LOG_TAG + "File '%s' is opened by another process.\n", + target_path); + return -1; + } + } + clear_target_ns_tmp_file(target_path); + snprintf(target_path, sizeof(target_path), + DF_AGENT_LOG_SOCKET_PATH_FMT, pid, pid); + if (check_in_use) { + if (is_file_opened_by_other_processes(target_path) == 1) { + ebpf_warning(JAVA_LOG_TAG + "File '%s' is opened by another process.\n", + target_path); + return -1; + } + } + clear_target_ns_tmp_file(target_path); + + return 0; +} + +static int clear_so_target_ns(int pid, bool check_in_use) +{ + char target_path[MAX_PATH_LENGTH]; + snprintf(target_path, sizeof(target_path), "/proc/%d/root%s", pid, + AGENT_MUSL_LIB_TARGET_PATH); + if (check_in_use) { + if (is_file_opened_by_other_processes(target_path) == 1) { + ebpf_warning(JAVA_LOG_TAG + "File '%s' is opened by another process.\n", + target_path); + return -1; + } + } + clear_target_ns_tmp_file(target_path); + snprintf(target_path, sizeof(target_path), "/proc/%d/root%s", pid, + AGENT_LIB_TARGET_PATH); + if (check_in_use) { + if (is_file_opened_by_other_processes(target_path) == 1) { + ebpf_warning(JAVA_LOG_TAG + "File '%s' is opened by another process.\n", + target_path); + return -1; + } + } + clear_target_ns_tmp_file(target_path); + + snprintf(target_path, sizeof(target_path), TARGET_NS_STORAGE_PATH, pid); + rmdir(target_path); + + return 0; +} + +int check_and_clear_target_ns(int pid, bool check_in_use) +{ + /* + * Delete files: + * path/.deepflow-java-symbols-pid.socket + * path/.deepflow-java-jvmti-logs-ipd.socket + * path/df_java_agent.so + * path/df_java_agent_musl.so + */ + + if (is_same_mntns(pid)) + return 0; + + if (check_and_clear_unix_socket_files(pid, check_in_use) == -1) + return -1; + + return clear_so_target_ns(pid, check_in_use); +} + +static int get_target_ns_info(const char *tag, struct stat *st) +{ + int fd; + char selfpath[64]; + snprintf(selfpath, sizeof(selfpath), "/proc/self/ns/%s", tag); + if (st != NULL) { + if (stat(selfpath, st) != 0) + return -1; + } + + fd = open(selfpath, O_RDONLY); + if (fd < 0) + return -1; + + return fd; +} + +static inline void get_nsfd_and_stat(const char *tag, struct stat *st, int *fd) +{ + *fd = get_target_ns_info(tag, st); +} + +static inline void switch_to_root_ns(int root_fd) +{ + /* + * If the user of the target namespace is a non-root user, it will be + * impossible to switch the target namespace to the root namespace. + * There may be a better solution. + * (TODO @jiping) + */ + df_exit_ns(root_fd); +} + +static inline i64 get_symbol_file_size(int pid) +{ + char path[PERF_PATH_SZ]; + snprintf(path, sizeof(path), DF_AGENT_LOCAL_PATH_FMT ".map", pid); + + if (access(path, F_OK)) { + return -1; + } + + struct stat st; + if (stat(path, &st) == 0) { + return (i64) st.st_size; + } + + return -1; +} + +int target_symbol_file_access(int pid) +{ + char path[PERF_PATH_SZ]; + snprintf(path, sizeof(path), DF_AGENT_LOCAL_PATH_FMT ".map", pid); + + return access(path, F_OK); +} + +i64 get_local_symbol_file_sz(int pid) +{ + return get_symbol_file_size(pid); +} + +// parse comma separated arguments +int parse_config(char *opts, options_t * parsed) +{ + char line[PERF_PATH_SZ * 2]; + strncpy(line, opts, PERF_PATH_SZ * 2); + line[PERF_PATH_SZ * 2 - 1] = '\0'; + + char *token = strtok(line, ","); + if (token == NULL) { + ebpf_warning(JAVA_LOG_TAG "Bad argument line %s\n", opts); + return -1; + } + strncpy(parsed->perf_map_path, token, PERF_PATH_SZ); + parsed->perf_map_path[PERF_PATH_SZ - 1] = '\0'; + + token = strtok(NULL, ","); + if (token == NULL) { + ebpf_warning(JAVA_LOG_TAG "Bad argument line %s\n", opts); + return -1; + } + strncpy(parsed->perf_log_path, token, PERF_PATH_SZ); + parsed->perf_log_path[PERF_PATH_SZ - 1] = '\0'; + + return 0; +} + +int symbol_collect_same_namespace(pid_t pid, options_t * opts) +{ + // Clear '/tmp/' unix domain sockets files. + if (check_and_clear_unix_socket_files(pid, false) == -1) + return -1; + + return create_symbol_collect_task(pid, opts, true); +} + +int create_ipc_socket(const char *path) +{ + int sock = -1; + if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + ebpf_warning(JAVA_LOG_TAG + "Create unix socket failed with '%s(%d)'\n", + strerror(errno), errno); + return -1; + } + + struct sockaddr_un addr = {.sun_family = AF_UNIX }; + strncpy(addr.sun_path, path, UNIX_PATH_MAX - 1); + int len = sizeof(addr.sun_family) + strlen(addr.sun_path); + if (bind(sock, (struct sockaddr *)&addr, len) < 0) { + ebpf_warning(JAVA_LOG_TAG + "Bind unix socket failed with '%s(%d)'\n", + strerror(errno), errno); + return -1; + } + if (listen(sock, 1) < 0) { + ebpf_warning(JAVA_LOG_TAG + "Listen on unix socket failed with '%s(%d)'\n", + strerror(errno), errno); + unlink(path); + return -1; + } + + return sock; +} + +static inline int add_fd_to_epoll(int epoll_fd, int fd) +{ + if (fd <= 0) { + ebpf_warning(JAVA_LOG_TAG + "fd must be a value greater than 0, fd %d\n", fd); + return -1; + } + + struct epoll_event event; + event.events = EPOLLIN; + event.data.fd = fd; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { + ebpf_warning(JAVA_LOG_TAG + "epoll_ctl() ADD failed with '%s(%d)'\n", + strerror(errno), errno); + return -1; + } + + return 0; +} + +static inline int del_fd_from_epoll(int epoll_fd, int fd) +{ + if (fd <= 0 || epoll_fd <= 0) { + ebpf_warning(JAVA_LOG_TAG + "fd must be a value greater than 0, fd %d, epoll fd %d\n", + fd, epoll_fd); + return -1; + } + + if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) { + ebpf_warning(JAVA_LOG_TAG + "epoll_ctl() DEL failed with '%s(%d)'\n", + strerror(errno), errno); + return -1; + } + + return 0; +} + +static inline int receive_msg(receiver_args_t * args, int sock_fd, char *buf, + size_t buf_size, bool received_once) +{ + int recv_bytes = 0; + int n = 0; // Initialize n + + do { + if ((n = + recv(sock_fd, buf + recv_bytes, buf_size - recv_bytes, + 0)) == -1) { + if (errno == EINTR) { + // Retry on interrupt or temporary failure + continue; + } else { + // Handle other errors + ebpf_warning(JAVA_LOG_TAG + "Receive Java process(PID: %d) message" + " failed with '%s(%d)'\n", + args->pid, strerror(errno), errno); + return -1; + } + } else if (n == 0) { + ebpf_warning(JAVA_LOG_TAG + "The target Java process (PID: %d) has" + " disconnected. The Java process may have exited.\n", + args->pid); + return -1; + } + + recv_bytes += n; + } while (recv_bytes < buf_size && !received_once); + + return recv_bytes; // Return total bytes received +} + +static bool is_unload_address(const char *sym_str) +{ + java_unload_addr_str_t *jaddr; + vec_foreach(jaddr, unload_addrs) { + if (jaddr->is_verified) + continue; + if (substring_starts_with(sym_str, jaddr->addr)) { + jaddr->is_verified = true; + return true; + } + } + + return false; +} + +static int delete_method_unload_symbol(receiver_args_t * args) +{ + const char *path = args->opts->perf_map_path; + size_t delete_count = 0; + FILE *fp_in = fopen(path, "r"); + if (!fp_in) { + ebpf_warning(JAVA_LOG_TAG + "Error opening input file %s, with '%s(%d)'\n", + path, strerror(errno), errno); + return -1; + } + + char temp_path[MAX_PATH_LENGTH]; + snprintf(temp_path, sizeof(temp_path), "%s.temp", path); + FILE *fp_out = fopen(temp_path, "w"); + if (!fp_out) { + ebpf_warning(JAVA_LOG_TAG + "Error creating temporary file %s, with '%s(%d)'\n", + temp_path, strerror(errno), errno); + fclose(fp_in); + return -1; + } + + char buffer[STRING_BUFFER_SIZE]; + while (fgets(buffer, sizeof(buffer), fp_in)) { + if (!is_unload_address(buffer)) + fputs(buffer, fp_out); + else + delete_count++; + } + + fclose(fp_in); + fclose(fp_out); + + if (remove(path) != 0) { + ebpf_warning(JAVA_LOG_TAG + "Error deleting original file %s, with '%s(%d)'\n", + path, strerror(errno), errno); + return -1; + } + if (rename(temp_path, path) != 0) { + ebpf_warning(JAVA_LOG_TAG + "Error renaming temporary file '%s(%d)'\n", + strerror(errno), errno); + return -1; + } + + return delete_count; +} + +static int update_java_perf_map_file(receiver_args_t * args, char *addr_str) +{ + if (addr_str != NULL) { + int ret = VEC_OK; + java_unload_addr_str_t java_addr = { 0 }; + snprintf(java_addr.addr, sizeof(java_addr.addr), "%s", + addr_str); + vec_add1(unload_addrs, java_addr, ret); + if (ret != VEC_OK) { + ebpf_warning(" Java unload_addrs add failed.\n"); + } + } + + int unload_count = vec_len(unload_addrs); + if ((args->task->need_refresh && unload_count > 0) + || unload_count >= UPDATE_SYMS_FILE_UNLOAD_HIGH_THRESH) { + fclose(args->map_fp); + int count; + if ((count = delete_method_unload_symbol(args)) < 0) { + vec_free(unload_addrs); + return -1; + } + vec_free(unload_addrs); + args->map_fp = fopen(args->opts->perf_map_path, "a"); + if (!args->map_fp) { + ebpf_warning(JAVA_LOG_TAG + "fopen() %s failed with '%s(%d)'\n", + args->opts->perf_map_path, strerror(errno), + errno); + return -1; + } + ebpf_debug + ("=== file update args->task->need_refresh %d pid %d unload_count %d\n", + args->task->need_refresh, args->task->pid, unload_count); + } + + return 0; +} + +static int symbol_msg_process(receiver_args_t * args, int sock_fd) +{ + FILE *fp = args->map_fp; + struct symbol_metadata meta; + int n = receive_msg(args, sock_fd, (char *)&meta, sizeof(meta), false); + if (n != sizeof(meta)) + return -1; + + char rcv_buf[STRING_BUFFER_SIZE]; + if (meta.len > STRING_BUFFER_SIZE) + return -1; + + n = receive_msg(args, sock_fd, rcv_buf, meta.len, false); + if (n != meta.len) + return -1; + rcv_buf[meta.len] = '\0'; + + /* + * If the replay is complete and the event type is + * JVMTI_EVENT_COMPILED_METHOD_UNLOAD, the map file + * needs to be updated. + */ + if (args->replay_done && meta.type == METHOD_UNLOAD) { + if (update_java_perf_map_file(args, rcv_buf)) + return -1; + } else { + int written_count = fwrite(rcv_buf, sizeof(char), n, fp); + if (written_count != n) { + ebpf_warning(JAVA_LOG_TAG "%s(%d)\n", strerror(errno), + errno); + return -1; + } + /* + * Ensure data is written to the file promptly, + * avoiding prolonged residence in the buffer. + */ + fflush(fp); + } + + return 0; +} + +static int symbol_log_process(receiver_args_t * args, int sock_fd) +{ + FILE *fp = args->log_fp; + char rcv_buf[STRING_BUFFER_SIZE]; + int n = receive_msg(args, sock_fd, rcv_buf, sizeof(rcv_buf), true); + if (n == -1) + return -1; + int written_count = fwrite(rcv_buf, sizeof(char), n, fp); + if (written_count != n) { + ebpf_warning(JAVA_LOG_TAG "%s(%d)\n", strerror(errno), errno); + return -1; + } + fflush(fp); + return 0; +} + +int epoll_events_process(receiver_args_t * args, int epoll_fd, + struct epoll_event *ev) +{ + errno = 0; + if (ev->data.fd == args->map_socket) { + if ((args->map_client = accept(ev->data.fd, NULL, NULL)) < 0) { + ebpf_warning(JAVA_LOG_TAG + "accept() failed with '%s(%d)'\n", + strerror(errno), errno); + return -1; + } + if (add_fd_to_epoll(epoll_fd, args->map_client) == -1) + return -1; + } else if (ev->data.fd == args->log_socket) { + if ((args->log_client = accept(ev->data.fd, NULL, NULL)) < 0) { + ebpf_warning(JAVA_LOG_TAG + "accept() failed with '%s(%d)'\n", + strerror(errno), errno); + return -1; + } + if (add_fd_to_epoll(epoll_fd, args->log_client) == -1) + return -1; + } else { + if (ev->data.fd == args->map_client) { + if (symbol_msg_process(args, ev->data.fd)) + return -1; + } else if (ev->data.fd == args->log_client) { + if (symbol_log_process(args, ev->data.fd)) + return -1; + } else { + ebpf_warning(JAVA_LOG_TAG + "Unexpected event, event fd %d\n", + ev->data.fd); + return 0; + } + } + + return 0; +} + +static int destroy_task(symbol_collect_task_t * task, + symbol_collect_thread_pool_t * pool) +{ + receiver_args_t *args = (receiver_args_t *) & task->args; + if (args->map_fp) { + fclose(args->map_fp); + } + + if (args->log_fp) { + fclose(args->log_fp); + } + + if (args->map_client > 0) { + del_fd_from_epoll(args->epoll_fd, args->map_client); + close(args->map_client); + } + + if (args->log_client > 0) { + del_fd_from_epoll(args->epoll_fd, args->log_client); + close(args->log_client); + } + + if (args->map_socket > 0) { + del_fd_from_epoll(args->epoll_fd, args->map_socket); + close(args->map_socket); + } + + if (args->log_socket > 0) { + del_fd_from_epoll(args->epoll_fd, args->log_socket); + close(args->log_socket); + } + + if (args->epoll_fd > 0) { + close(args->epoll_fd); + } + + if (!task->is_local_mntns) + check_and_clear_target_ns(args->pid, false); + else + check_and_clear_unix_socket_files(args->pid, false); + + ebpf_debug(JAVA_LOG_TAG "All resources cleaned up for symbol table" + " management task (associated with JAVA PID: %d).\n", + args->pid); + free(task); + return 0; +} + +static void *ipc_receiver_main(void *arguments) +{ + receiver_args_t *args = (receiver_args_t *) arguments; + + /* + * If the file already exists, opening it in "w" mode will clear its contents + * (truncate it to zero length). If the file does not exist, opening it in "w" + * mode will create a new file. + */ + FILE *map_fp = fopen(args->opts->perf_map_path, "w"); + if (!map_fp) { + // byte stream in socket needs to be consumed to avoid client stuck + // even if file open fails + ebpf_warning(JAVA_LOG_TAG "fopen() %s failed with '%s(%d)'\n", + args->opts->perf_map_path, strerror(errno), errno); + goto cleanup; + } + args->map_fp = map_fp; + + FILE *log_fp = fopen(args->opts->perf_log_path, "w"); + if (!log_fp) { + // byte stream in socket needs to be consumed to avoid client stuck + // even if file open fails + ebpf_warning(JAVA_LOG_TAG "fopen() %s failed with '%s(%d)'\n", + args->opts->perf_log_path, strerror(errno), errno); + goto cleanup; + } + args->log_fp = log_fp; + + int epoll_fd = epoll_create1(0); + if (epoll_fd == -1) { + ebpf_warning(JAVA_LOG_TAG + "epoll_create1() failed with '%s(%d)'\n", + strerror(errno), errno); + goto cleanup; + } + args->epoll_fd = epoll_fd; + + if (add_fd_to_epoll(epoll_fd, args->map_socket) == -1) { + goto cleanup; + } + + if (add_fd_to_epoll(epoll_fd, args->log_socket) == -1) { + goto cleanup; + } + + struct epoll_event events[SYM_COLLECT_MAX_EVENTS]; + while (args->attach_ret == 0) { + int n = epoll_wait(epoll_fd, events, SYM_COLLECT_MAX_EVENTS, + PROFILER_READER_EPOLL_TIMEOUT); + if (n == -1) { + if (errno == EINTR) { + // If epoll_wait was interrupted by a signal, retry + continue; + } else { + ebpf_warning(JAVA_LOG_TAG + "epoll_wait() failed with '%s(%d)'\n", + strerror(errno), errno); + goto cleanup; + } + } + + for (int i = 0; i < n; ++i) { + if (events[i].events & EPOLLIN) { + struct epoll_event *ev = &events[i]; + if (epoll_events_process + (args, epoll_fd, ev) < 0) + goto cleanup; + } + } + + if (args->task->need_refresh) { + int ret_val = update_java_perf_map_file(args, NULL); + pthread_mutex_lock(&args->task->mutex); + args->task->update_status = ret_val; + args->task->need_refresh = false; + pthread_cond_signal(&args->task->cond); + pthread_mutex_unlock(&args->task->mutex); + } + } + +cleanup: + /* Return to worker_thread() to handle unified resource cleanup. */ + return NULL; +} + +static void *worker_thread(void *arg) +{ + symbol_collect_thread_pool_t *pool = arg; + pthread_t thread = pthread_self(); + int thread_idx = pool->thread_index; + + while (1) { + pthread_mutex_lock(&pool->lock); + while (pool->pending_tasks <= 0 && !pool->stop) { + pthread_cond_wait(&pool->cond, &pool->lock); + } + + if (pool->stop && pool->task_count == 0) { + pthread_mutex_unlock(&pool->lock); + pthread_exit(NULL); + } + + if (pool->threads[thread_idx].thread != thread) { + pthread_mutex_unlock(&pool->lock); + pthread_exit(NULL); + } + // Get task from queue + symbol_collect_task_t *task; + task = list_first_entry(&pool->task_list_head, + symbol_collect_task_t, list); + list_head_del(&task->list); + pool->pending_tasks--; + task->thread = thread; + pool->threads[thread_idx].task = task; + pthread_mutex_unlock(&pool->lock); + + // Execute task + ebpf_debug(JAVA_LOG_TAG + "Thread %ld executing task for java processes (PID: %d)\n", + task->thread, task->pid); + task->func(&task->args); + + ebpf_debug(JAVA_LOG_TAG + "Thread %ld finished task for java processes (PID: %d)\n", + task->thread, task->pid); + pthread_mutex_lock(&pool->lock); + pool->threads[thread_idx].task = NULL; + pool->task_count--; + pthread_mutex_unlock(&pool->lock); + destroy_task(task, pool); + } + + return NULL; +} + +static bool check_target_jvmti_attach_files(pid_t pid) +{ + /* + * After a successful attach, the following files will be generated. + * HotSpot: /tmp/.java_pid + * OpenJ9: /tmp/.com_ibm_tools_attach/ + */ + char hotspot_path[MAX_PATH_LENGTH], openj9_path[MAX_PATH_LENGTH]; + pid_t ns_pid = get_nspid(pid); + + // Check for HotSpot JVM dependency file + snprintf(hotspot_path, sizeof(hotspot_path), + "/proc/%d/root/tmp/.java_pid%d", pid, ns_pid); + bool hotspot_exist = (access(hotspot_path, F_OK) == 0); + if (hotspot_exist) { + ebpf_debug(JAVA_LOG_TAG + "Java process (PID:%d) is HotSpot JVM.\n", pid); + return true; + } + // Check for OpenJ9 JVM dependency file + snprintf(openj9_path, sizeof(openj9_path), + "/proc/%d/root/tmp/.com_ibm_tools_attach/%d", pid, ns_pid); + bool openj9_exist = (access(openj9_path, F_OK) == 0); + if (openj9_exist) { + ebpf_debug(JAVA_LOG_TAG + "Java process (PID:%d) is OpenJ9 JVM.\n", pid); + return true; + } + + ebpf_warning(JAVA_LOG_TAG "Check HotSpot JVM, file '%s' not exist.\n" + "Check OpenJ9 JVM, file '%s' not exist.\n", + hotspot_path, openj9_path); + + return false; +} + +static int thread_pool_add_task(symbol_collect_thread_pool_t * pool, + symbol_collect_task_t * task) +{ + pthread_mutex_lock(&pool->lock); + list_add_tail(&task->list, &pool->task_list_head); + pool->task_count++; + pool->pending_tasks++; + + // Wake up threads in the thread pool to execute tasks. + pthread_cond_signal(&pool->cond); + + // If there are no threads available in the thread pool, + // new threads need to be added to the thread pool. + if (pool->task_count > pool->thread_count) { + int ret; + pthread_t thread; + pool->thread_index = pool->thread_count; + + if ((ret = + pthread_create(&thread, NULL, &worker_thread, pool)) < 0) { + ebpf_warning(JAVA_LOG_TAG + "Create worker thread failed with '%s(%d)'\n", + strerror(errno), errno); + pthread_mutex_unlock(&pool->lock); + return -2; + } + + if (pthread_detach(thread) != 0) { + ebpf_warning(JAVA_LOG_TAG + "Failed to detach thread with '%s(%d)'\n", + strerror(errno), errno); + pthread_mutex_unlock(&pool->lock); + return -1; + } + + task_thread_t *new_threads = realloc(pool->threads, + (++pool->thread_count) * + sizeof(task_thread_t)); + if (new_threads == NULL) { + ebpf_warning + (JAVA_LOG_TAG + "Failed to reallocate memory for threads with '%s(%d)'\n", + strerror(errno), errno); + pthread_mutex_unlock(&pool->lock); + return -1; + } + + pool->threads = new_threads; + pool->threads[pool->thread_count - 1].task = NULL; + pool->threads[pool->thread_count - 1].thread = thread; + pool->threads[pool->thread_count - 1].index = + pool->thread_count - 1; + ebpf_debug(JAVA_LOG_TAG + "Created new thread. Current thread count: %d\n", + pool->thread_count); + } + + pthread_mutex_unlock(&pool->lock); + + return 0; +} + +static int create_symbol_collect_task(pid_t pid, options_t * opts, + bool is_same_mntns) +{ + int ret = -1; + symbol_collect_task_t *task = NULL; + int map_socket = -1, log_socket = -1; + + // make the sockets accessable from unprivileged user in container + umask(0); + + char buffer[PERF_PATH_SZ * 2]; + snprintf(buffer, PERF_PATH_SZ, DF_AGENT_MAP_SOCKET_PATH_FMT, pid, pid); + + if ((map_socket = create_ipc_socket(buffer)) < 0) { + goto cleanup; + } + snprintf(buffer, PERF_PATH_SZ, DF_AGENT_LOG_SOCKET_PATH_FMT, pid, pid); + + if ((log_socket = create_ipc_socket(buffer)) < 0) { + goto cleanup; + } + + task = malloc(sizeof(symbol_collect_task_t) + sizeof(*opts)); + if (task == NULL) { + ebpf_warning(JAVA_LOG_TAG "malloc() failed, with %s(%d)\n", + strerror(errno), errno); + goto cleanup; + } + memset(task, 0, sizeof(symbol_collect_task_t) + sizeof(*opts)); + task->pid = pid; + task->is_local_mntns = is_same_mntns; + task->pid_start_time = get_process_starttime_and_comm(pid, NULL, 0); + if (task->pid_start_time == 0) { + ebpf_warning("The Java process(PID: %d) no longer exists.\n", + pid); + goto cleanup; + } + task->func = ipc_receiver_main; + pthread_mutex_init(&task->mutex, NULL); + pthread_cond_init(&task->cond, NULL); + task->need_refresh = false; + options_t *__opts = (options_t *) (task + 1); + *__opts = *opts; + + task->args.pid = pid; + task->args.opts = __opts; + task->args.map_socket = map_socket; + task->args.log_socket = log_socket; + task->args.attach_ret = 0; + task->args.replay_done = false; + task->args.task = task; + + ret = thread_pool_add_task(g_collect_pool, task); + if (ret < 0) { + goto cleanup; + } + + snprintf(buffer, sizeof(buffer), "%d", pid); + char ret_buf[1024]; + memset(ret_buf, 0, sizeof(ret_buf)); + ret = + exec_command(DF_JAVA_ATTACH_CMD, buffer, ret_buf, sizeof(ret_buf)); + if (ret != 0) { + ebpf_warning(JAVA_LOG_TAG "ret %d: %s", ret, ret_buf); + } + task->args.replay_done = true; + task->args.attach_ret = ret; + CLIB_MEMORY_STORE_BARRIER(); + + /* After successfully attaching, clean up the residual .so files + * in the target namespace. */ + if (!is_same_mntns) + clear_so_target_ns(pid, false); + + if (!check_target_jvmti_attach_files(pid)) { + ebpf_warning(JAVA_LOG_TAG + "Miss HotSpot/OpenJ9 JVM dependency file.\n"); + } + + return ret; + +cleanup: + if (task) + free(task); + + if (map_socket >= 0) { + close(map_socket); + } + if (log_socket >= 0) { + close(log_socket); + } + // attach() may change euid/egid, restore them to remove tmp files + if (seteuid(getuid()) < 0) { + ebpf_warning(JAVA_LOG_TAG "seteuid() failed with '%s(%d)'\n", + strerror(errno), errno); + } + if (setegid(getgid()) < 0) { + ebpf_warning(JAVA_LOG_TAG "seteuid() failed with '%s(%d)'\n", + strerror(errno), errno); + } + + if (!is_same_mntns) + check_and_clear_target_ns(pid, false); + else + check_and_clear_unix_socket_files(pid, false); + + return ret; +} + +int symbol_collect_different_namespace(pid_t pid, options_t * opts) +{ + /* + * Delete the files on the target file system if they + * are not on the same mount point. + */ + if (check_and_clear_target_ns(pid, false) == -1) + return -1; + + return create_symbol_collect_task(pid, opts, false); +} + +static int symbol_collect_thread_pool_init(void) +{ + symbol_collect_thread_pool_t *pool = + malloc(sizeof(symbol_collect_thread_pool_t)); + if (pool == NULL) { + ebpf_warning(JAVA_LOG_TAG + "Failed to allocate memory for thread pool\n"); + return -1; + } + + if (pthread_mutex_init(&pool->lock, NULL) != 0) { + ebpf_warning(JAVA_LOG_TAG + "Failed to initialize mutex, %s(%d)\n", + strerror(errno), errno); + free(pool); + return -1; + } + + if (pthread_cond_init(&pool->cond, NULL) != 0) { + ebpf_warning(JAVA_LOG_TAG + "Failed to initialize cond, %s(%d)\n", + strerror(errno), errno); + free(pool); + return -1; + } + + pool->task_count = 0; + pool->thread_count = 0; // Initial thread count is 0 + pool->threads = NULL; // Initial thread array is empty + pool->stop = 0; + pool->pending_tasks = 0; + init_list_head(&pool->task_list_head); + g_collect_pool = pool; + + return 0; +} + +static symbol_collect_task_t *get_task_by_pid(pid_t pid) +{ + if (g_collect_pool == NULL) + return NULL; + + symbol_collect_task_t *task = NULL; + pthread_mutex_lock(&g_collect_pool->lock); + for (int i = 0; i < g_collect_pool->thread_count; i++) { + if (g_collect_pool->threads[i].task == NULL) + continue; + if (g_collect_pool->threads[i].task->pid == pid) { + task = g_collect_pool->threads[i].task; + break; + } + } + pthread_mutex_unlock(&g_collect_pool->lock); + + return task; +} + +int start_java_symbol_collection(pid_t pid, const char *opts) +{ + // Initialize a thread pool for managing Java symbols. + if (g_collect_pool == NULL) { + if (symbol_collect_thread_pool_init()) { + ebpf_warning + ("symbol_collect_thread_pool_init() failed.\n"); + return -1; + } + } + + options_t parsed_opts; + if (parse_config((char *)opts, &parsed_opts) != 0) { + return -1; + } + + if (is_same_mntns(pid)) { + return symbol_collect_same_namespace(pid, &parsed_opts); + } else { + return symbol_collect_different_namespace(pid, &parsed_opts); + } +} + +int update_java_symbol_file(pid_t pid, bool * is_new_collector) +{ + char opts[PERF_PATH_SZ * 2]; + snprintf(opts, sizeof(opts), + DF_AGENT_LOCAL_PATH_FMT ".map," + DF_AGENT_LOCAL_PATH_FMT ".log", pid, pid); + + symbol_collect_task_t *task = get_task_by_pid(pid); + if (task == NULL) { + *is_new_collector = true; + return start_java_symbol_collection(pid, opts); + } + + u64 start_time = get_process_starttime_and_comm(pid, NULL, 0); + if (start_time == 0) { + ebpf_warning("The process with PID %d no longer exists.\n", + pid); + task->args.attach_ret = -1; // Force the thread to exit the task it is executing. + return -1; + } + // The task is stale and needs to be cleaned up. + if (task->pid_start_time != start_time) { + task->args.attach_ret = -1; + ebpf_warning("The task for the process with PID %d" + " is invalid and needs to be recreated.\n", pid); + return -1; + } + // Notify to refresh the file + task->need_refresh = true; + // Refresh the file again; needs to wait for completion. + pthread_mutex_lock(&task->mutex); + pthread_cond_wait(&task->cond, &task->mutex); + pthread_mutex_unlock(&task->mutex); + *is_new_collector = false; + return task->update_status; +} + +void show_collect_pool(void) +{ + if (g_collect_pool == NULL) + return; + + task_thread_t *task_thread; + symbol_collect_task_t *task = NULL; + int online_task_cnt = 0; + pthread_mutex_lock(&g_collect_pool->lock); + fprintf(stdout, + "-------------------------------------------------------\n"); + for (int i = 0; i < g_collect_pool->thread_count; i++) { + if (g_collect_pool->threads[i].task == NULL) + continue; + task_thread = &g_collect_pool->threads[i]; + task = task_thread->task; + fprintf(stdout, "Thread %ld Task %p JavaPID %d\n", + task_thread->thread, task, task->pid); + online_task_cnt++; + } + fprintf(stdout, + "-------------------------------------------------------\n"); + fprintf(stdout, "pool threads %d tasks %d pending_task %d\n", + g_collect_pool->thread_count, g_collect_pool->task_count, + g_collect_pool->pending_tasks); + pthread_mutex_unlock(&g_collect_pool->lock); + fflush(stdout); +} + +#ifdef JAVA_AGENT_ATTACH_TOOL +static char agent_lib_so_path[MAX_PATH_LENGTH]; +static int agent_so_lib_copy(const char *src, const char *dst, int uid, int gid) +{ + if (access(src, F_OK)) { + ebpf_warning(JAVA_LOG_TAG "Fun %s src file '%s' not exist.\n", + __func__, src); + return ETR_NOTEXIST; + } + + if (copy_file(src, dst)) { + return ETR_INVAL; + } + + if (chown(dst, uid, gid) != 0) { + ebpf_warning(JAVA_LOG_TAG + "Failed to change ownership and group. file '%s'\n", + dst); + return ETR_INVAL; + } + + return ETR_OK; +} + +static int copy_agent_libs_into_target_ns(pid_t target_pid, int target_uid, + int target_gid) +{ + + /* + * Call this function only when the target process is in a subordinate + * namespace. Here, we copy the agent.so to a temporary path within t- + * he mounted namespace. We also change the file ownership so that the + * target process sees itself as the owner of the file (this is neces- + * sary because some versions of Java might reject proxy injection + * otherwise). + */ + int ret; + char copy_target_path[MAX_PATH_LENGTH]; + int len = snprintf(copy_target_path, sizeof(copy_target_path), + TARGET_NS_STORAGE_PATH, target_pid); + if (access(copy_target_path, F_OK)) { + /* + * The purpose of umask(0); is to set the current process's file + * creation mask (umask) to 0, which means that no permission + * bits will be cleared when creating a file or directory. Files + * and directories will have the permission bits specified at the + * time of creation. + */ + umask(0); + + if (mkdir(copy_target_path, 0777) != 0) { + ebpf_warning(JAVA_LOG_TAG + "Fun %s cannot mkdir() '%s'\n", __func__, + copy_target_path); + + return ETR_NOTEXIST; + } + } + + snprintf(copy_target_path + len, sizeof(copy_target_path) - len, + "/%s", AGENT_LIB_NAME); + if ((ret = + agent_so_lib_copy(AGENT_LIB_SRC_PATH, + copy_target_path, target_uid, + target_gid)) != ETR_OK) { + ebpf_warning(JAVA_LOG_TAG "cp '%s' to '%s' failed.\n", + AGENT_LIB_SRC_PATH, copy_target_path); + return ret; + } + + snprintf(copy_target_path + len, sizeof(copy_target_path) - len, + "/%s", AGENT_MUSL_LIB_NAME); + + if ((ret = + agent_so_lib_copy(AGENT_MUSL_LIB_SRC_PATH, + copy_target_path, target_uid, + target_gid)) != ETR_OK) { + ebpf_warning(JAVA_LOG_TAG "cp '%s' to '%s' failed.\n", + AGENT_MUSL_LIB_SRC_PATH, copy_target_path); + return ret; + } + + return ETR_OK; +} + +static void select_suitable_agent_lib(pid_t pid, bool is_same_mntns) +{ + /* Enter pid & mount namespace for target pid, + * and use dlopen() in that namespace.*/ + int pid_self_fd, mnt_self_fd; + df_enter_ns(pid, "pid", &pid_self_fd); + df_enter_ns(pid, "mnt", &mnt_self_fd); + + agent_lib_so_path[0] = '\0'; + char test_path[PERF_PATH_SZ]; + if (!is_same_mntns) + snprintf(test_path, sizeof(test_path), "%s", + AGENT_LIB_TARGET_PATH); + else + snprintf(test_path, sizeof(test_path), "%s", + AGENT_LIB_SRC_PATH); + + if (test_dl_open(test_path)) { + snprintf(agent_lib_so_path, MAX_PATH_LENGTH, "%s", test_path); + ebpf_info(JAVA_LOG_TAG + "Func %s target PID %d test %s, success.\n", + __func__, pid, test_path); + goto found; + } + + if (!is_same_mntns) + snprintf(test_path, sizeof(test_path), "%s", + AGENT_MUSL_LIB_TARGET_PATH); + else + snprintf(test_path, sizeof(test_path), "%s", + AGENT_MUSL_LIB_SRC_PATH); + + if (test_dl_open(test_path)) { + snprintf(agent_lib_so_path, MAX_PATH_LENGTH, "%s", test_path); + ebpf_info(JAVA_LOG_TAG + "Func %s target PID %d test %s, success.\n", + __func__, pid, test_path); + goto found; + } + + ebpf_warning(JAVA_LOG_TAG "%s test agent so libs, failure.", __func__); + +found: + + if (!is_same_mntns) { + if (strcmp(agent_lib_so_path, AGENT_LIB_TARGET_PATH) == 0) { + clear_target_ns_tmp_file(AGENT_MUSL_LIB_TARGET_PATH); + } else { + clear_target_ns_tmp_file(AGENT_LIB_TARGET_PATH); + } + } + + df_exit_ns(pid_self_fd); + df_exit_ns(mnt_self_fd); +} + +static int prepare_for_attach_same_ns(pid_t pid) +{ + /* + * In containers, different libc implementations may be used to compile agent + * libraries, primarily two types: glibc and musl. We must provide both vers- + * ions of the agent library. So, which one should we choose? To determine t- + * his, we need to enter the target process's namespace and test each library + * until we find one that can be successfully loaded using dlopen. + */ + select_suitable_agent_lib(pid, true); + + if (strlen(agent_lib_so_path) == 0) + return -1; + return 0; +} + +static int prepare_for_attach_different_ns(pid_t pid) +{ + int uid, gid; + if (get_target_uid_and_gid(pid, &uid, &gid)) { + return -1; + } + + /* if pid == target_ns_pid, run in same namespace */ + int target_ns_pid = get_nspid(pid); + if (target_ns_pid < 0) { + return -1; + } + + /* + * Here, the original method of determination (based on whether the net + * namespace is the same) is modified to use the mnt namespace for comparison, + * thus avoiding situations where both the net namespace and pid namespace are + * the same but the file system is different. + */ + + /* + * If the target Java process is in a subordinate namespace, copy the + * 'agent.so' into the artifacts path (in /tmp) inside of that namespace + * (for visibility to the target process). + */ + ebpf_info(JAVA_LOG_TAG "[PID %d] copy agent so library ...\n", pid); + if (copy_agent_libs_into_target_ns(pid, uid, gid)) { + ebpf_warning(JAVA_LOG_TAG + "[PID %d] copy agent os library failed.\n", pid); + check_and_clear_target_ns(pid, false); + return -1; + } + ebpf_info(JAVA_LOG_TAG "[PID %d] copy agent so library success.\n", + pid); + + /* + * In containers, different libc implementations may be used to compile agent + * libraries, primarily two types: glibc and musl. We must provide both vers- + * ions of the agent library. So, which one should we choose? To determine t- + * his, we need to enter the target process's namespace and test each library + * until we find one that can be successfully loaded using dlopen. + */ + select_suitable_agent_lib(pid, false); + + if (strlen(agent_lib_so_path) == 0) { + ebpf_warning(JAVA_LOG_TAG + "[PID %d] agent_lib_so_path is NULL.\n", pid); + check_and_clear_target_ns(pid, false); + return -1; + } + + return 0; +} + +static int attach(pid_t pid, char *opts) +{ + char *argv[] = { "load", agent_lib_so_path, "true", opts }; + int argc = sizeof(argv) / sizeof(argv[0]); + int ret = jattach(pid, argc, (char **)argv, 1); + ebpf_info(JAVA_LOG_TAG + "jattach pid %d argv: \"load %s true\" return %d\n", pid, + agent_lib_so_path, ret); + + return ret; +} + +int java_attach(pid_t pid) +{ + int ret = -1; + bool is_same_mnt = is_same_mntns(pid); + if (is_same_mnt) { + ret = prepare_for_attach_same_ns(pid); + } else { + /* + * Clean up the '*.so' files to prevent exceptions in + * the target JVM when using jattach. + */ + clear_so_target_ns(pid, false); + ret = prepare_for_attach_different_ns(pid); + } + + if (ret < 0) + return -1; + + char buffer[PERF_PATH_SZ * 2]; + snprintf(buffer, sizeof(buffer), + JVM_AGENT_SYMS_SOCKET_PATH_FMT "," + JVM_AGENT_LOG_SOCKET_PATH_FMT, pid, pid); + + /* Invoke the jattach (https://github.com/apangin/jattach) to inject the + * library as a JVMTI agent.*/ + return attach(pid, buffer); + + /* Resource cleanup is performed in the thread executing 'deepflow-jattach' */ +} + +/* + * Command-line execution, for example: + * cp ./df_java_agent_v2.so /tmp/ + * ./deepflow-jattach $PID + */ +int main(int argc, char **argv) +{ + if (argc != 2) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return -1; + } + + log_to_stdout = true; + int pid = atoi(argv[1]); + return java_attach(pid); +} +#endif /* JAVA_AGENT_ATTACH_TOOL */ diff --git a/agent/src/ebpf/user/profile/java/jvm_symbol_collect.h b/agent/src/ebpf/user/profile/java/jvm_symbol_collect.h new file mode 100644 index 00000000000..d82fe5f0ced --- /dev/null +++ b/agent/src/ebpf/user/profile/java/jvm_symbol_collect.h @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef JVM_SYMBOL_COLLECT_H +#define JVM_SYMBOL_COLLECT_H + +#include "config.h" + +#define STRING_BUFFER_SIZE 2000 +#define UNIX_PATH_MAX 108 +#define JAVA_ADDR_STR_SIZE 13 + +#define DF_JAVA_ATTACH_CMD "/usr/bin/deepflow-jattach" + +/* + * The address range of the 64-bit user space is from 0x0000000000000000 + * to 0x00007fffffffffff, which effectively uses only 48 bits. We use 13 + * bytes to represent the address string, with the last byte used as '\0'. + */ +typedef struct { + char addr[JAVA_ADDR_STR_SIZE]; + bool is_verified; +} java_unload_addr_str_t; + +typedef uint64_t(*agent_test_t) (void); + +typedef struct options { + char perf_map_path[PERF_PATH_SZ]; + char perf_log_path[PERF_PATH_SZ]; +} options_t; + +typedef struct task_s symbol_collect_task_t; + +/** + * @brief Parameters for task processing. + */ +typedef struct receiver_args { + pid_t pid; /**< Java process ID */ + options_t *opts; /**< Parameters for calling jattach, such as the file path for the Unix domain socket */ + int map_socket; /**< For receiving JVM connection requests, establishing a Java symbol data transmission channel */ + int log_socket; /**< For receiving JVM connection requests, establishing a JVM log data transmission channel */ + int map_client; /**< For Java symbol data transmission */ + int log_client; /**< For JVM log data transmission */ + int epoll_fd; /**< epoll listening socket */ + FILE *map_fp; /**< File for saving Java symbol information */ + FILE *log_fp; /**< File for saving JVM log information */ + volatile int attach_ret; /**< To store the return value of jattach */ + volatile bool replay_done; /**< Indicates whether Java symbol replay is complete */ + symbol_collect_task_t *task; /**< Address of the associated task */ +} receiver_args_t; + +/** + * @brief Definition of Java symbol collection task. + */ +struct task_s { + struct list_head list; /**< Task queue */ + pid_t pid; /**< Java process ID to be handled by the task */ + u64 pid_start_time; /**< Process start time; combined with `` to uniquely identify a process */ + bool is_local_mntns; /**< Whether it is in the same mount namespace as deepflow-agent */ + pthread_t thread; /**< Thread handling the task */ + void *(*func) (void *); /**< Callback function for task processing */ + bool need_refresh; /**< Whether the file needs to be refreshed */ + int update_status; /**< Symbol file update status */ + pthread_mutex_t mutex; /**< Mutex for protecting tasks */ + pthread_cond_t cond; /**< Condition variable for notifying updates to files */ + receiver_args_t args; /**< Parameters for task processing */ +}; + +/** + * @brief Definition of task thread + */ +typedef struct { + int index; /**< thread index in pool. */ + pthread_t thread; /**< thread ID */ + symbol_collect_task_t *task; /**< task address */ +} task_thread_t; + +/** + * @brief Definition of the thread pool for Java symbol collection tasks. + */ +typedef struct { + task_thread_t *threads; /**< Array for managing threads */ + int thread_index; /**< Index of the most recent thread */ + pthread_mutex_t lock; /**< Thread pool lock */ + pthread_cond_t cond; /**< Condition variable for waking up threads to execute tasks */ + struct list_head task_list_head; /**< Queue of tasks waiting to be processed */ + int task_count; /**< Total number of tasks currently being processed */ + int pending_tasks; /**< Number of tasks waiting to be processed */ + int thread_count; /**< Number of threads in the thread pool */ + int stop; /**< Thread pool stop flag */ +} symbol_collect_thread_pool_t; + +/** + * @brief Updates the Java symbol file. + * + * Informs the Java symbol collector to collect Java symbols. If a + * collection task is already running, it will update the Java symbol + * file. If no collection task exists, a new task will be created for + * the collection. + * + * @param pid The Java process ID for symbol collection. + * @param is_new_collector Is it a newly created symbol collector, with the result returned to the caller. + * @return 0 if the Java symbol file has been successfully updated, + * otherwise returns a failure code. + */ +int update_java_symbol_file(pid_t pid, bool *is_new_collector); + +/** + * @brief Cleans up a single file in the target namespace. + * + * @param target_path The path of the file to be cleaned. + */ +void clear_target_ns_tmp_file(const char *target_path); + +/** + * @brief Cleans up files in the target namespace. + * + * These files include: + * - path/.deepflow-java-symbols-pid.socket + * - path/.deepflow-java-jvmti-logs-pid.socket + * - path/df_java_agent.so + * - path/df_java_agent_musl.so + * + * @param pid The process ID of the target to clean. + * @param check_in_use Whether to check if the files are being used by + * other processes. If true, the files will be + * checked, and if they are in use, they will not + * be cleaned. + * @return 0 if the cleanup was successful, non-zero if it failed. + */ +int check_and_clear_target_ns(int pid, bool check_in_use); + +/** + * @brief Cleans up dynamic library files in the target namespace. + * + * These files include: + * - path/df_java_agent.so + * - path/df_java_agent_musl.so + * + * @param pid The process ID of the target to clean. + * @param check_in_use Whether to check if the files are being used by + * other processes. If true, the files will be + * checked, and if they are in use, they will not + * be cleaned. + * @return 0 if the cleanup was successful, non-zero if it failed. + */ +int check_and_clear_unix_socket_files(int pid, bool check_in_use); + +/** + * @brief Cleans up local '/tmp' perf files. + * + * These files include: + * - /tmp/perf-.map + * - /tmp/perf-.log + * + * @param pid The process ID of the target to clean. + * @return 0 if the cleanup was successful, non-zero if it failed. + */ +void clear_local_perf_files(int pid); + +/** + * @brief Gets the size of the local symbol file. + * + * File: /tmp/perf-.map + * + * @param pid The process ID associated with the file. + * @return The file size, or a negative value if there was an error. + */ +i64 get_local_symbol_file_sz(int pid); + +/** + * @brief Checks if the symbol file is accessible. + * + * File: /tmp/perf-.map + * + * @param pid The process ID associated with the file. + * @return 0 if the file exists, otherwise returns a non-zero value + * indicating the file does not exist. + */ +int target_symbol_file_access(int pid); + +#endif /* JVM_SYMBOL_COLLECT_H */ diff --git a/agent/src/ebpf/user/profile/java/agent.c b/agent/src/ebpf/user/profile/java/symbol_collect_agent.c similarity index 64% rename from agent/src/ebpf/user/profile/java/agent.c rename to agent/src/ebpf/user/profile/java/symbol_collect_agent.c index 8e458c2aa78..dad28981a6b 100644 --- a/agent/src/ebpf/user/profile/java/agent.c +++ b/agent/src/ebpf/user/profile/java/symbol_collect_agent.c @@ -32,17 +32,16 @@ #include #include #include +#include #include #include #include #include "../../config.h" +#include "config.h" -#define STRING_BUFFER_SIZE 2000 -#define BIG_STRING_BUFFER_SIZE 20000 - -#define UNIX_PATH_MAX 108 +#define LOG_BUF_SZ 512 /* * HotSpot JVM does not support agent unloading. However, you @@ -56,56 +55,63 @@ */ pthread_mutex_t g_df_lock; +jvmtiEnv *g_jvmti; +bool replay_finish; +int replay_count; +char perf_map_socket_path[128]; +char perf_log_socket_path[128]; -char perf_map_file_path[128]; -char perf_log_file_path[128]; - -FILE *perf_map_file_ptr = NULL; -FILE *perf_map_log_file_ptr = NULL; - -int g_perf_map_file_size_limit; -int g_perf_map_file_size; +int perf_map_socket_fd = -1; +int perf_map_log_socket_fd = -1; +jint close_files(void); #define _(e) \ if (e != JNI_OK) { \ df_log("DF java agent failed, %s, error code: %d.", #e, e); \ close_files(); \ - return e; \ - } - -void df_log(const char *format, ...) -{ - if (perf_map_log_file_ptr) { - va_list ap; - va_start(ap, format); - vfprintf(perf_map_log_file_ptr, format, ap); - fprintf(perf_map_log_file_ptr, "\n"); - fflush(perf_map_log_file_ptr); - va_end(ap); + return e; \ } -} -jint df_open_file(pid_t pid, const char *fmt, FILE ** ptr) +#define df_log(format, ...) \ + do { \ + if (perf_map_log_socket_fd > 0) { \ + char str_buf[LOG_BUF_SZ]; \ + int n = snprintf(str_buf, sizeof(str_buf), format, ##__VA_ARGS__); \ + pthread_mutex_lock(&g_df_lock); \ + send_msg(perf_map_log_socket_fd, str_buf, n); \ + pthread_mutex_unlock(&g_df_lock); \ + } \ + } while (0) + +inline int send_msg(int sock_fd, const char *buf, size_t len) { - FILE *file_ptr; - char filename[50]; // Assuming the filename won't exceed 50 characters - - // Create the filename using snprintf - snprintf(filename, sizeof(filename), fmt, pid); - - // Open the file for writing - file_ptr = fopen(filename, "w"); - if (file_ptr == NULL) { - fprintf(stderr, "Couldn't open %s: errno(%d)\n", filename, errno); - return JNI_ERR; - } + int send_bytes = 0; + int n = 0; // Initialize n + + do { + n = send(sock_fd, buf + send_bytes, len - send_bytes, 0); + if (n == -1) { + if (errno == EINTR || errno == EAGAIN + || errno == EWOULDBLOCK) { + // Retry on interrupt or temporary failure + continue; + } else { + close_files(); // Example function call, define as needed + break; + } + } else if (n == 0) { + // Connection closed by peer + close_files(); // Example function call, define as needed + break; + } - *ptr = file_ptr; + send_bytes += n; + } while (send_bytes < len); - return JNI_OK; + return send_bytes; // Return total bytes sent } -jint df_open_socket_as_file(const char *path, FILE **ptr) +jint df_open_socket(const char *path, int *ptr) { int s = socket(AF_UNIX, SOCK_STREAM, 0); if (s == -1) { @@ -113,21 +119,35 @@ jint df_open_socket_as_file(const char *path, FILE **ptr) return JNI_ERR; } - struct sockaddr_un remote = { .sun_family = AF_UNIX }; - strncpy(remote.sun_path, path, UNIX_PATH_MAX - 1); - int len = sizeof(remote.sun_family) + strlen(remote.sun_path); - if (connect(s, (struct sockaddr*)&remote, len) == -1) { - fprintf(stderr, "Call connect() failed: errno(%d)\n", errno); + /* + * The reason for setting non-blocking mode: + * 1 To prevent Java threads from being blocked. + * 2 When attempts to write data to a closed writing port of a pipe or + * socket, the operating system detects this situation and sends the + * SIGPIPE signal to Java process, which causes the program to exit. + * Use non-blocking mode to avoid this issue. + */ + int flags = fcntl(s, F_GETFL, 0); + if (flags == -1) { + fprintf(stderr, "Call fcntl() get failed: errno(%d)\n", errno); + close(s); + return JNI_ERR; + } + if (fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { + fprintf(stderr, "Call fcntl() set failed: errno(%d)\n", errno); + close(s); return JNI_ERR; } - FILE *file_ptr = fdopen(s, "w"); - if (file_ptr == NULL) { - fprintf(stderr, "Couldn't open %s: errno(%d)\n", path, errno); + struct sockaddr_un remote = {.sun_family = AF_UNIX }; + strncpy(remote.sun_path, path, UNIX_PATH_MAX - 1); + int len = sizeof(remote.sun_family) + strlen(remote.sun_path); + if (connect(s, (struct sockaddr *)&remote, len) == -1) { + fprintf(stderr, "Call connect() failed: errno(%d)\n", errno); return JNI_ERR; } - *ptr = file_ptr; + *ptr = s; return JNI_OK; } @@ -136,6 +156,9 @@ bool is_socket_file(const char *path) { struct stat sb; if (stat(path, &sb) == -1) { + fprintf(stderr, "stat() failed, with %s(%d)\n", strerror(errno), + errno); + fflush(stderr); return false; } @@ -144,65 +167,59 @@ bool is_socket_file(const char *path) jint open_perf_map_file(pid_t pid) { - if (is_socket_file(perf_map_file_path)) { - return df_open_socket_as_file(perf_map_file_path, &perf_map_file_ptr); - } else { - return df_open_file(pid, perf_map_file_path, &perf_map_file_ptr); + if (is_socket_file(perf_map_socket_path)) { + return df_open_socket(perf_map_socket_path, + &perf_map_socket_fd); } + + return JNI_ERR; } jint open_perf_map_log_file(pid_t pid) { - if (is_socket_file(perf_log_file_path)) { - return df_open_socket_as_file(perf_log_file_path, &perf_map_log_file_ptr); - } else { - return df_open_file(pid, perf_log_file_path, &perf_map_log_file_ptr); + if (is_socket_file(perf_log_socket_path)) { + return df_open_socket(perf_log_socket_path, + &perf_map_log_socket_fd); } + + return JNI_ERR; } jint df_agent_config(char *opts) { - g_perf_map_file_size = 0; - char buf[300]; char *start; start = buf; snprintf(buf, sizeof(buf), "%s", opts); - /* g_perf_map_file_size_limit */ + /* perf_map_socket_path[] */ char *p = strchr(start, ','); if (p == NULL) return JNI_ERR; *p = '\0'; - g_perf_map_file_size_limit = atoi(start); - - /* perf_map_file_path[] */ - start = ++p; - p = strchr(start, ','); - if (p == NULL) - return JNI_ERR; - *p = '\0'; - snprintf(perf_map_file_path, sizeof(perf_map_file_path), "%s", start); + snprintf(perf_map_socket_path, sizeof(perf_map_socket_path), "%s", + start); - /* perf_log_file_path[] */ + /* perf_log_socket_path[] */ start = ++p; if (start == NULL) return JNI_ERR; - snprintf(perf_log_file_path, sizeof(perf_log_file_path), "%s", start); + snprintf(perf_log_socket_path, sizeof(perf_log_socket_path), "%s", + start); return JNI_OK; } jint close_files(void) { - if (perf_map_file_ptr) { - fclose(perf_map_file_ptr); - perf_map_file_ptr = 0; + if (perf_map_socket_fd > 0) { + close(perf_map_socket_fd); + perf_map_socket_fd = -1; } - if (perf_map_log_file_ptr) { - fclose(perf_map_log_file_ptr); - perf_map_log_file_ptr = 0; + if (perf_map_log_socket_fd > 0) { + close(perf_map_log_socket_fd); + perf_map_log_socket_fd = -1; } return JNI_OK; @@ -226,7 +243,6 @@ jint get_jvmti_env(JavaVM * jvm, jvmtiEnv ** jvmti) jint error = (*jvm)->GetEnv(jvm, (void **)jvmti, JVMTI_VERSION_1_0); if (error != JNI_OK || *jvmti == NULL) { - df_log("[error] Unable to access JVMTI."); return JNI_ERR; } return JNI_OK; @@ -278,30 +294,35 @@ void deallocate(jvmtiEnv * jvmti, void *string) (*jvmti)->Deallocate(jvmti, (unsigned char *)string); } -void df_write_symbol(const void *code_addr, unsigned int code_size, - const char *entry) +void df_send_symbol(enum event_type type, const void *code_addr, + unsigned int code_size, const char *entry) { - if (!perf_map_file_ptr) { + if (perf_map_socket_fd < 0) { return; } - char symbol_str[1024]; - int bytes_all = 0; - pthread_mutex_lock(&g_df_lock); - snprintf(symbol_str, sizeof(symbol_str), "%lx %x %s\n", - (unsigned long)code_addr, code_size, entry); - bytes_all = g_perf_map_file_size + strlen(symbol_str); - if (bytes_all >= g_perf_map_file_size_limit) { - pthread_mutex_unlock(&g_df_lock); - return; + struct symbol_metadata *meta; + char symbol_str[STRING_BUFFER_SIZE]; + if (type == METHOD_UNLOAD) { + snprintf(symbol_str + sizeof(*meta), + sizeof(symbol_str) - sizeof(*meta), "%lx", + (unsigned long)code_addr); + } else { + snprintf(symbol_str + sizeof(*meta), + sizeof(symbol_str) - sizeof(*meta), "%lx %x %s\n", + (unsigned long)code_addr, code_size, entry); } - fprintf(perf_map_file_ptr, "%s", symbol_str); - fflush(perf_map_file_ptr); - g_perf_map_file_size = bytes_all; + meta = (struct symbol_metadata *)symbol_str; + meta->len = strlen(symbol_str + sizeof(*meta)); + meta->type = type; + pthread_mutex_lock(&g_df_lock); + send_msg(perf_map_socket_fd, symbol_str, meta->len + sizeof(*meta)); + if (!replay_finish) + replay_count++; pthread_mutex_unlock(&g_df_lock); } -void generate_single_entry(jvmtiEnv * jvmti, +void generate_single_entry(enum event_type type, jvmtiEnv * jvmti, jmethodID method, const void *code_addr, jint code_size) { @@ -311,7 +332,6 @@ void generate_single_entry(jvmtiEnv * jvmti, char *csig = NULL; char output[STRING_BUFFER_SIZE]; - char source_info[1000] = ""; char *method_signature = ""; size_t noutput = sizeof(output); @@ -330,8 +350,8 @@ void generate_single_entry(jvmtiEnv * jvmti, } else { memcpy(class_name, csig, sizeof(class_name) - 1); } - snprintf(output, noutput, "%s::%s%s%s", class_name, - method_name, method_signature, source_info); + snprintf(output, noutput, "%s::%s%s", class_name, + method_name, method_signature); deallocate(jvmti, (unsigned char *)csig); } @@ -339,7 +359,7 @@ void generate_single_entry(jvmtiEnv * jvmti, deallocate(jvmti, (unsigned char *)method_name); deallocate(jvmti, (unsigned char *)msig); - df_write_symbol(code_addr, (unsigned int)code_size, output); + df_send_symbol(type, code_addr, (unsigned int)code_size, output); } void JNICALL @@ -350,7 +370,7 @@ cbCompiledMethodLoad(jvmtiEnv * jvmti, jint map_length, const jvmtiAddrLocationMap * map, const void *compile_info) { - generate_single_entry(jvmti, method, code_addr, code_size); + generate_single_entry(METHOD_LOAD, jvmti, method, code_addr, code_size); } void JNICALL @@ -358,14 +378,14 @@ cbDynamicCodeGenerated(jvmtiEnv * jvmti, const char *name, const void *address, jint length) { - df_write_symbol(address, (unsigned int)length, name); + df_send_symbol(DYNAMIC_CODE_GEN, address, (unsigned int)length, name); } void JNICALL cbCompiledMethodUnload(jvmtiEnv * jvmti, jmethodID method, const void *address) { - df_write_symbol(address, 0, ""); + df_send_symbol(METHOD_UNLOAD, address, 0, ""); } jvmtiError set_callback_funs(jvmtiEnv * jvmti) @@ -384,14 +404,12 @@ jvmtiError set_callback_funs(jvmtiEnv * jvmti) "Unable to attach CompiledMethodLoad callback."); return JNI_ERR; } - return JNI_OK; } jint set_notification_modes(jvmtiEnv * jvmti, jvmtiEventMode mode) { jvmtiError error; - error = (*jvmti)->SetEventNotificationMode(jvmti, mode, JVMTI_EVENT_COMPILED_METHOD_LOAD, @@ -465,23 +483,44 @@ JNIEXPORT jint JNICALL Agent_OnAttach(JavaVM * vm, char *options, void *reserved) { jvmtiEnv *jvmti; + if (g_jvmti) { + /* + * Close files during multiple agent.so loads at runtime to prevent + * increased file handle usage by Java programs. + */ + close_files(); + /* + * Terminate the previous replay event to prevent multiple replay events + * from running simultaneously. + */ + _(set_notification_modes(g_jvmti, JVMTI_DISABLE)); + g_jvmti = NULL; + replay_finish = false; + replay_count = 0; + _(get_jvmti_env(vm, &jvmti)); + goto enable_replay; + } + pthread_mutex_init(&g_df_lock, NULL); + _(get_jvmti_env(vm, &jvmti)); + +enable_replay: _(df_agent_config(options)); _(open_perf_map_log_file(getpid())); - df_log("- JVMTI g_perf_map_file_size_limit: %d, " - "perf_map_file_path: %s perf_log_file_path: %s", - g_perf_map_file_size_limit, - perf_map_file_path, perf_log_file_path); _(open_perf_map_file(getpid())); - _(get_jvmti_env(vm, &jvmti)); + df_log("- JVMTI perf_map_socket_path: %s perf_log_socket_path: %s\n", + perf_map_socket_path, perf_log_socket_path); + _(enable_capabilities(jvmti)); _(set_callback_funs(jvmti)); _(set_notification_modes(jvmti, JVMTI_ENABLE)); + if (g_jvmti == NULL) + g_jvmti = jvmti; _(replay_callbacks(jvmti)); - _(set_notification_modes(jvmti, JVMTI_DISABLE)); + replay_finish = true; + df_log + ("- JVMTI symbolization agent startup sequence complete. Replay count %d\n", + replay_count); - df_log("- JVMTI symbolization agent startup sequence complete."); - - close_files(); - return 0; + return JNI_OK; } diff --git a/agent/src/ebpf/user/profile/perf_profiler.c b/agent/src/ebpf/user/profile/perf_profiler.c index 8206ef05504..6563b54c359 100644 --- a/agent/src/ebpf/user/profile/perf_profiler.c +++ b/agent/src/ebpf/user/profile/perf_profiler.c @@ -23,6 +23,7 @@ #ifndef AARCH64_MUSL #include #include +#include /* kill() */ #include #include "../config.h" #include "../utils.h" @@ -33,7 +34,7 @@ #include "../vec.h" #include "../tracer.h" #include "../socket.h" -#include "java/gen_syms_file.h" +#include "java/collect_symbol_files.h" #include "perf_profiler.h" #include "../elf.h" #include "../load.h" @@ -44,7 +45,7 @@ #include "../table.h" #include #include "java/config.h" -#include "java/df_jattach.h" +#include "java/jvm_symbol_collect.h" #include "profile_common.h" #include "../proc.h" @@ -54,14 +55,12 @@ #define CP_PERF_PG_NUM 16 #define ONCPU_PROFILER_NAME "oncpu" #define PROFILER_CTX_ONCPU_IDX THREAD_PROFILER_READER_IDX +#define DEEPFLOW_AGENT_NAME "deepflow-agent" extern int sys_cpus_count; struct profiler_context *g_ctx_array[PROFILER_CTX_NUM]; static struct profiler_context oncpu_ctx; -/* The maximum bytes limit for writing the df_perf-PID.map file by agent.so */ -int g_java_syms_write_bytes_max; - static bool g_enable_oncpu = true; /* Used for handling updates to JAVA symbol files */ @@ -567,11 +566,24 @@ static struct tracer_sockopts cpdbg_sockopts = { .get = cpdbg_sockopt_get, }; +// Function to check if the recorded PID and its start time are correct +int check_profiler_running_pid(void) +{ + int pid = find_pid_by_name(DEEPFLOW_AGENT_NAME, getpid()); + if (pid > 0) { + ebpf_warning("The deepflow-agent with process ID %d is already " + "running. You can disable the continuous profiling " + "feature of the deepflow-agent to skip this check.\n", + pid); + return ETR_EXIST; + } + + return ETR_NOTEXIST; +} + /* * start continuous profiler * @freq sample frequency, Hertz. (e.g. 99 profile stack traces at 99 Hertz) - * @java_syms_space_limit The maximum space occupied by the Java symbol files - * in the target POD. * @java_syms_update_delay To allow Java to run for an extended period and gather * more symbol information, we delay symbol retrieval when * encountering unknown symbols. The default value is @@ -581,16 +593,23 @@ static struct tracer_sockopts cpdbg_sockopts = { * @returns 0 on success, < 0 on error */ -int start_continuous_profiler(int freq, int java_syms_space_limit, - int java_syms_update_delay, +int start_continuous_profiler(int freq, int java_syms_update_delay, tracer_callback_t callback) { char bpf_load_buffer_name[NAME_LEN]; void *bpf_bin_buffer; uword buffer_sz; + /* + * To determine if the profiler is already running, at any given time, only + * one profiler can be active due to the persistence required for Java symbol + * generation, which is incompatible with multiple agents. + */ + if (check_profiler_running_pid() == ETR_EXIST) + exit(EXIT_FAILURE); + if (!run_conditions_check()) - return (-1); + exit(EXIT_FAILURE); memset(g_ctx_array, 0, sizeof(g_ctx_array)); profiler_context_init(&oncpu_ctx, ONCPU_PROFILER_NAME, LOG_CP_TAG, @@ -600,15 +619,6 @@ int start_continuous_profiler(int freq, int java_syms_space_limit, NANOSEC_PER_SEC / freq); g_ctx_array[PROFILER_CTX_ONCPU_IDX] = &oncpu_ctx; - int java_space_bytes = java_syms_space_limit * 1024 * 1024; - if ((java_space_bytes < JAVA_POD_WRITE_FILES_SPACE_MIN) || - (java_space_bytes > JAVA_POD_WRITE_FILES_SPACE_MAX)) - java_space_bytes = JAVA_POD_WRITE_FILES_SPACE_DEF; - g_java_syms_write_bytes_max = - java_space_bytes - JAVA_POD_EXTRA_SPACE_MMA; - ebpf_info("set java_syms_write_bytes_max : %d\n", - g_java_syms_write_bytes_max); - if ((java_syms_update_delay < JAVA_SYMS_UPDATE_DELAY_MIN) || (java_syms_update_delay > JAVA_SYMS_UPDATE_DELAY_MAX)) java_syms_update_delay = JAVA_SYMS_UPDATE_DELAY_DEF; @@ -829,7 +839,6 @@ int disable_oncpu_profiler(void) #include "perf_profiler.h" int start_continuous_profiler(int freq, - int java_syms_space_limit, int java_syms_update_delay, tracer_callback_t callback) { diff --git a/agent/src/ebpf/user/profile/perf_profiler.h b/agent/src/ebpf/user/profile/perf_profiler.h index 6e1d1f33b46..b8166024603 100644 --- a/agent/src/ebpf/user/profile/perf_profiler.h +++ b/agent/src/ebpf/user/profile/perf_profiler.h @@ -22,6 +22,8 @@ #include "../../kernel/include/perf_profiler.h" #define PROFILER_CTX_NUM 3 +// For storing information about continuously running profiling processes. +#define DEEPFLOW_RUNNING_PID_PATH "/tmp/.deepflow-agent-running-pid" /* * stack_trace_msg_hash, used to store stack trace messages and @@ -168,8 +170,7 @@ struct stack_ids_bitmap { } __attribute__((packed)); int stop_continuous_profiler(void); -int start_continuous_profiler(int freq, int java_syms_space_limit, - int java_syms_update_delay, +int start_continuous_profiler(int freq, int java_syms_update_delay, tracer_callback_t callback); void process_stack_trace_data_for_flame_graph(stack_trace_msg_t * val); void release_flame_graph_hash(void); @@ -178,4 +179,5 @@ int set_profiler_cpu_aggregation(int flag); struct bpf_tracer *get_profiler_tracer(void); void set_enable_perf_sample(struct bpf_tracer *t, u64 enable_flag); void cpdbg_process(stack_trace_msg_t * msg); +int check_profiler_running_pid(void); #endif /* DF_USER_PERF_PROFILER_H */ diff --git a/agent/src/ebpf/user/profile/profile_common.c b/agent/src/ebpf/user/profile/profile_common.c index 174b3dee26c..68e55dbba40 100644 --- a/agent/src/ebpf/user/profile/profile_common.c +++ b/agent/src/ebpf/user/profile/profile_common.c @@ -34,7 +34,7 @@ #include "../vec.h" #include "../tracer.h" #include "../socket.h" -#include "java/gen_syms_file.h" +#include "java/collect_symbol_files.h" #include "perf_profiler.h" #include "../elf.h" #include "../load.h" @@ -45,7 +45,7 @@ #include "../table.h" #include #include "java/config.h" -#include "java/df_jattach.h" +#include "java/jvm_symbol_collect.h" #include "profile_common.h" #include "../proc.h" #include "stringifier.h" @@ -382,8 +382,9 @@ static void cleanup_stackmap(struct profiler_context *ctx, struct bpf_tracer *t, } } -static void print_profiler_status(struct profiler_context *ctx, - struct bpf_tracer *t, u64 iter_count) +static void __attribute__ ((__unused__)) +print_profiler_status(struct profiler_context *ctx, + struct bpf_tracer *t, u64 iter_count) { u64 alloc_b, free_b; get_mem_stat(&alloc_b, &free_b); @@ -1094,7 +1095,7 @@ void process_bpf_stacktraces(struct profiler_context *ctx, struct bpf_tracer *t) bpf_table_set_value(t, ctx->state_map_name, sample_count_idx, &sample_cnt_val); - print_profiler_status(ctx, t, count); + //print_profiler_status(ctx, t, count); /* free all elems */ clean_stack_strs(&ctx->stack_str_hash); @@ -1130,3 +1131,16 @@ bool check_profiler_regex(struct profiler_context *ctx, const char *name) return false; } + +bool profiler_is_running(void) +{ + for (int i = 0; i < ARRAY_SIZE(g_ctx_array); i++) { + if (g_ctx_array[i] == NULL) + continue; + if (g_ctx_array[i]->enable_bpf_profile == 1) + return true; + + } + + return false; +} diff --git a/agent/src/ebpf/user/profile/profile_common.h b/agent/src/ebpf/user/profile/profile_common.h index 0a7247f0282..ff77fb1cd56 100644 --- a/agent/src/ebpf/user/profile/profile_common.h +++ b/agent/src/ebpf/user/profile/profile_common.h @@ -178,4 +178,6 @@ void push_and_release_stack_trace_msg(struct profiler_context *ctx, * @return true successfully passed the check, false otherwise */ bool check_profiler_regex(struct profiler_context *ctx, const char *name); +// Check if the profiler is currently running. +bool profiler_is_running(void); #endif /*DF_USER_PROFILE_COMMON_H */ diff --git a/agent/src/ebpf/user/profile/stringifier.c b/agent/src/ebpf/user/profile/stringifier.c index 61c7ae905ff..429ea558e81 100644 --- a/agent/src/ebpf/user/profile/stringifier.c +++ b/agent/src/ebpf/user/profile/stringifier.c @@ -56,7 +56,7 @@ #include "../table.h" #include "../bihash_8_8.h" #include "../bihash_16_8.h" -#include "java/gen_syms_file.h" +#include "java/collect_symbol_files.h" #include "stringifier.h" #include #include "../proc.h" diff --git a/agent/src/ebpf/user/socket.c b/agent/src/ebpf/user/socket.c index 4edbce787a6..4e802d79932 100644 --- a/agent/src/ebpf/user/socket.c +++ b/agent/src/ebpf/user/socket.c @@ -60,6 +60,7 @@ static pthread_t proc_events_pthread; // Process exec/exit thread */ static volatile uint64_t probes_act; +extern __thread uword thread_index; // for symbol pid caches hash extern int sys_cpus_count; extern bool *cpu_online; extern uint32_t attach_failed_count; @@ -1216,6 +1217,7 @@ static void check_datadump_timeout(void) static void process_events_handle_main(__unused void *arg) { prctl(PR_SET_NAME, "proc-events"); + thread_index = THREAD_PROC_EVENTS_HANDLE_IDX; struct bpf_tracer *t = arg; for (;;) { /* @@ -1799,7 +1801,6 @@ static_always_inline uint64_t clib_cpu_time_now(void) } #endif -extern __thread uword thread_index; // for symbol pid caches hash static void perf_buffer_read(void *arg) { /* @@ -1807,7 +1808,7 @@ static void perf_buffer_read(void *arg) * to monitor the perf buffer belonging to its jurisdiction. */ uint64_t epoll_id = (uint64_t) arg; - thread_index = THREAD_PROC_ACT_IDX_BASE + epoll_id; // for bihash + thread_index = THREAD_SOCK_READER_IDX_BASE + epoll_id; // for bihash struct bpf_tracer *tracer = find_bpf_tracer(SK_TRACER_NAME); if (tracer == NULL) { ebpf_warning("find_bpf_tracer() error\n"); diff --git a/agent/src/ebpf/user/symbol.c b/agent/src/ebpf/user/symbol.c index ef443b89a08..56ed5b3b238 100644 --- a/agent/src/ebpf/user/symbol.c +++ b/agent/src/ebpf/user/symbol.c @@ -48,8 +48,8 @@ #endif #include "libGoReSym.h" #include "profile/perf_profiler.h" -#include "profile/java/df_jattach.h" -#include "profile/java/gen_syms_file.h" +#include "profile/java/jvm_symbol_collect.h" +#include "profile/java/collect_symbol_files.h" #include "bihash_8_8.h" #include "profile/stringifier.h" #include "profile/profile_common.h" diff --git a/agent/src/ebpf/user/tracer.c b/agent/src/ebpf/user/tracer.c index 29deb5ef1a7..6a22860e36c 100644 --- a/agent/src/ebpf/user/tracer.c +++ b/agent/src/ebpf/user/tracer.c @@ -1225,7 +1225,6 @@ static int perf_reader_setup(struct bpf_perf_reader *perf_reader, int thread_nr) reader_idx = perf_reader->readers_count++; perf_reader->reader_fds[reader_idx] = perf_fd; perf_reader->readers[reader_idx] = reader; - event.data.fd = perf_fd; event.data.ptr = reader; event.events = EPOLLIN; diff --git a/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs b/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs index ad5ba06516b..b2d12f74393 100644 --- a/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs +++ b/agent/src/ebpf_dispatcher/ebpf_dispatcher.rs @@ -736,7 +736,6 @@ impl EbpfCollector { if ebpf::start_continuous_profiler( on_cpu.frequency as i32, - ebpf_conf.java_symbol_file_max_space_limit as i32, ebpf_conf.java_symbol_file_refresh_defer_interval.as_secs() as i32, Self::ebpf_profiler_callback, ) != 0 diff --git a/server/agent_config/example.yaml b/server/agent_config/example.yaml index f7838842656..2a705baa764 100644 --- a/server/agent_config/example.yaml +++ b/server/agent_config/example.yaml @@ -1414,23 +1414,19 @@ vtap_group_id: g-xxxxxx #io-event-minimal-duration: 1ms ## Java compliant update latency time - ## Default: 600s. Range: [5, 3600]s + ## Default: 60s. Range: [5, 3600]s ## Note: ## When deepflow-agent finds that an unresolved function name appears in the function call stack ## of a Java process, it will trigger the regeneration of the symbol file of the process. ## Because Java utilizes the Just-In-Time (JIT) compilation mechanism, to obtain more symbols for ## Java processes, the regeneration will be deferred for a period of time. - #java-symbol-file-refresh-defer-interval: 600s - - ## Maximum size limit for Java symbol file. - ## Default: 10. Range: [2, 100] - ## Note: - ## Which means it falls within the interval of 2Mi to 100Mi. If the configuration value is outside - ## this range, the default value of 10(10Mi), will be used. - ## All Java symbol files are stored in the '/tmp' directory mounted by the deepflow-agent. To prevent - ## excessive occupation of host node space due to large Java symbol files, a maximum size limit is set - ## for each generated Java symbol file. - #java-symbol-file-max-space-limit: 10 + ## + ## At the startup of a Java program, the JVM and JIT compiler are in a "warm-up" phase. During this + ## period, symbol changes are typically frequent due to the dynamic compilation and optimization + ## processes. Therefore, deepflow-agent delay symbol collection for one minute after the Java program + ## starts, allowing the JVM and JIT to "warm up" and for symbol churn to be minimized before proceeding + ## with the collection. + #java-symbol-file-refresh-defer-interval: 60s ## On-cpu profile configuration #on-cpu-profile: