Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use proc_events on Java processes #208

Merged
merged 27 commits into from Nov 26, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
118340b
Add proc_events
YishaiZinkin Nov 14, 2021
c795706
Run lint.sh
YishaiZinkin Nov 21, 2021
a122366
Remove unused import
YishaiZinkin Nov 21, 2021
4697752
Move to class and done some fixes
YishaiZinkin Nov 22, 2021
fe2373e
Add stop functionality
YishaiZinkin Nov 22, 2021
5513350
Integrate with java profiler
YishaiZinkin Nov 22, 2021
916ace5
select -> selectors
YishaiZinkin Nov 22, 2021
77088e8
Merge branch 'master' into feature/track-java-processes-termination
YishaiZinkin Nov 22, 2021
ffe742e
Typo
YishaiZinkin Nov 23, 2021
6dc4140
Raise in case of error in socket.bind
YishaiZinkin Nov 23, 2021
6d3f4c1
Add tid to callback
YishaiZinkin Nov 23, 2021
2032625
Fix: Don't call socket.close while in select()
YishaiZinkin Nov 23, 2021
728c9c8
Remove ProcEventsListener object from the API
YishaiZinkin Nov 23, 2021
c8b2097
Typo
YishaiZinkin Nov 24, 2021
b68c289
Cleanup properly
YishaiZinkin Nov 24, 2021
7520777
Differ thread start to first registration
YishaiZinkin Nov 24, 2021
8f7d5ce
Merge branch 'master' into feature/track-java-processes-termination
YishaiZinkin Nov 24, 2021
b06b654
Add unregister
YishaiZinkin Nov 24, 2021
2cc5df1
Add type hints
YishaiZinkin Nov 24, 2021
def3098
Add type hints
YishaiZinkin Nov 24, 2021
2b22419
Add module doc
YishaiZinkin Nov 24, 2021
4675511
Use granulate-utils
YishaiZinkin Nov 25, 2021
c4e3a4c
Fix linting
YishaiZinkin Nov 25, 2021
02f4cf0
Log the error code of the main thread only
YishaiZinkin Nov 25, 2021
7cdc3d4
Add git to Dockerfile
YishaiZinkin Nov 25, 2021
dfca758
Add package name in requirements.txt
YishaiZinkin Nov 25, 2021
99a1b59
Merge branch 'master' into feature/track-java-processes-termination
YishaiZinkin Nov 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
155 changes: 155 additions & 0 deletions gprofiler/proc_events.py
@@ -0,0 +1,155 @@
import os
import selectors
import socket
import struct
import threading


def _raise_if_not_running(func):
def wrapper(self, *args, **kwargs):
if not self.is_alive():
raise RuntimeError("Process Events Listener wasn't started")
return func(self, *args, **kwargs)

return wrapper


class ProcEventsListener(threading.Thread):
# linux/netlink.h:
_NETLINK_CONNECTOR = 11

# linux/netlink.h:
_NLMSG_DONE = 0x3 # End of a dump

# struct nlmsghdr {
# __u32 nlmsg_len; /* Length of message including header */
# __u16 nlmsg_type; /* Message content */
# __u16 nlmsg_flags; /* Additional flags */
# __u32 nlmsg_seq; /* Sequence number */
# __u32 nlmsg_pid; /* Sending process port ID */
# };
_nlmsghdr = struct.Struct("=I2H2I")

# linux/connector.h:
_CN_IDX_PROC = 0x1
_CN_VAL_PROC = 0x1

# struct cn_msg {
# struct cb_id id;
#
# __u32 seq;
# __u32 ack;
#
# __u16 len; /* Length of the following data */
# __u16 flags;
# __u8 data[0];
# };
_cn_msg = struct.Struct("=4I2H")

# linux/cn_proc.h:
# struct proc_event {
# enum what {
# ...
# } what;
# __u32 cpu;
# __u64 __attribute__((aligned(8))) timestamp_ns;
# /* Number of nano seconds since system boot */
# union { /* must be last field of proc_event struct */
# ...
# } event_data;
# };
_base_proc_event = struct.Struct("=2IQ")

# From enum what
_PROC_EVENT_EXIT = 0x80000000

# From enum proc_cn_mcast_op
_PROC_CN_MCAST_LISTEN = 1

# struct exit_proc_event {
# __kernel_pid_t process_pid;
# __kernel_pid_t process_tgid;
# __u32 exit_code, exit_signal;
# } exit;
_exit_proc_event = struct.Struct("=4I")

def __init__(self):
self._socket = socket.socket(socket.AF_NETLINK, socket.SOCK_DGRAM, self._NETLINK_CONNECTOR)
self._exit_callbacks = []
self._should_stop = False

super().__init__(target=self._proc_events_listener, name="Process Events Listener", daemon=True)

def _register_for_connector_events(self, socket):
"""Notify the kernel that we're listening for events on the connector"""
cn_proc_op = struct.Struct("=I").pack(self._PROC_CN_MCAST_LISTEN)
cn_msg = self._cn_msg.pack(self._CN_IDX_PROC, self._CN_VAL_PROC, 0, 0, len(cn_proc_op), 0) + cn_proc_op
nl_msg = self._nlmsghdr.pack(self._nlmsghdr.size + len(cn_msg), self._NLMSG_DONE, 0, 0, os.getpid()) + cn_msg
Jongy marked this conversation as resolved.
Show resolved Hide resolved

socket.send(nl_msg)

def _proc_events_listener(self):
"""Runs forever and calls registered callbacks on process events"""
try:
self._socket.bind((os.getpid(), self._CN_IDX_PROC))
except PermissionError as e:
raise PermissionError("You don't have permissions to bind to the process events connector") from e
Jongy marked this conversation as resolved.
Show resolved Hide resolved

self._register_for_connector_events(self._socket)
selector = selectors.DefaultSelector()
selector.register(self._socket, selectors.EVENT_READ)

while not self._should_stop:
events = selector.select()
Jongy marked this conversation as resolved.
Show resolved Hide resolved
if self._should_stop:
break

for key, _ in events:
data = key.fileobj.recv(256)

nl_hdr = dict(
zip(("len", "type", "flags", "seq", "pid"), self._nlmsghdr.unpack(data[: self._nlmsghdr.size]))
)
if nl_hdr["type"] != self._NLMSG_DONE:
# Handle only netlink messages
continue

# Strip off headers
data = data[self._nlmsghdr.size : nl_hdr["len"]]
data = data[self._cn_msg.size :]

event = dict(
zip(
("what", "cpu", "timestamp_ns"),
self._base_proc_event.unpack(data[: self._base_proc_event.size]),
)
)

if event["what"] == self._PROC_EVENT_EXIT:
# (exit_signal is the signal that the parent process received on exit)
event_data = dict(
zip(
("pid", "tgid", "exit_code", "exit_signal"),
self._exit_proc_event.unpack(
data[
self._base_proc_event.size : self._base_proc_event.size + self._exit_proc_event.size
]
),
)
)

for callback in self._exit_callbacks:
callback(event_data["pid"], event_data["exit_code"])
Jongy marked this conversation as resolved.
Show resolved Hide resolved

@_raise_if_not_running
def stop(self):
self._should_stop = True
self._socket.close()

@_raise_if_not_running
def register_exit_callback(self, callback):
"""Register a function to be called whenever a process exits

The callback should receive two arguments: pid and exit_code.
"""
self._exit_callbacks.append(callback)
20 changes: 20 additions & 0 deletions gprofiler/profilers/java.py
Expand Up @@ -19,6 +19,7 @@
from gprofiler.gprofiler_types import StackToSampleCount
from gprofiler.log import get_logger_adapter
from gprofiler.merge import parse_one_collapsed
from gprofiler.proc_events import ProcEventsListener
from gprofiler.profilers.profiler_base import ProcessProfilerBase
from gprofiler.profilers.registry import ProfilerArgument, register_profiler
from gprofiler.utils import (
Expand Down Expand Up @@ -110,6 +111,23 @@ def get_ap_log(self) -> str:
return self._ap_log


class AsyncProfiledProcessMonitor:
def __init__(self):
self._attached_processes = []
self._proc_events_listener = ProcEventsListener()
self._proc_events_listener.start()
self._proc_events_listener.register_exit_callback(self._proc_exit_callback)

def _proc_exit_callback(self, pid, exit_code):
if pid in self._attached_processes:
if exit_code != 0:
Jongy marked this conversation as resolved.
Show resolved Hide resolved
logger.warning(f"Async-profiled Java process [{pid}] exited with error code: {exit_code}")
self._attached_processes.remove(pid)
Jongy marked this conversation as resolved.
Show resolved Hide resolved

def register_process(self, pid):
self._attached_processes.append(pid)


@functools.lru_cache(maxsize=1)
def jattach_path() -> str:
return resource_path("java/jattach")
Expand Down Expand Up @@ -533,6 +551,7 @@ def __init__(
self._mode = java_async_profiler_mode
self._safemode = java_async_profiler_safemode
self._saved_mlock: Optional[int] = None
self._process_monitor = AsyncProfiledProcessMonitor()
self._java_safemode = java_safemode
if self._java_safemode:
logger.debug("Java safemode enabled")
Expand Down Expand Up @@ -649,6 +668,7 @@ def _profile_process(self, process: Process) -> Optional[StackToSampleCount]:
with AsyncProfiledProcess(
process, self._storage_dir, self._buildids, self._mode, self._safemode, self._java_safemode
) as ap_proc:
self._process_monitor.register_process(process.pid)
Jongy marked this conversation as resolved.
Show resolved Hide resolved
return self._profile_ap_process(ap_proc)

def _profile_ap_process(self, ap_proc: AsyncProfiledProcess) -> Optional[StackToSampleCount]:
Expand Down