From 1605fe57c9466c47a23bfee980a6023d984163ab Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 31 May 2022 23:13:15 -0400 Subject: [PATCH 01/25] added inotify support to cmonitor. --- tools/common-code/cmonitor_launcher.py | 189 +++++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 tools/common-code/cmonitor_launcher.py diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py new file mode 100644 index 0000000..f961b1f --- /dev/null +++ b/tools/common-code/cmonitor_launcher.py @@ -0,0 +1,189 @@ +# ======================================================================================================= +# cmonitor_launcher.py +# +# Author: Satyabrata Bharati +# Created: April 2022 +# +# ======================================================================================================= + +import inotify.adapters +import multiprocessing +from threading import Thread, Lock +from queue import Queue +from subprocess import Popen +import os +import time + +# ======================================================================================================= +# CmonitorLauncher +# +# - Watch all files below a directory and nottify an event for changes. +# - Retrieves all the process and extract the process name "/proc//stat. +# - check the process name against the white-list given in the filter list. +# - Execute command to launch CMonitor if the process name mathes with the filter. +# +# ======================================================================================================= +class CmonitorLauncher: + def __init__(self): + self.path = "" + self.filter = "" + self.command = "" + + def get_cgroup_version(self): + proc_self_mount = "/proc/self/mounts" + ncgroup_v1 = 0 + ncgroup_v2 = 0 + with open(proc_self_mount) as file: + for line in file: + row = line.split() + fs_spec = row[0] + fs_file = row[1] + fs_vfstype = row[2] + if ( + fs_spec == "cgroup" or fs_spec == "cgroup2" + ) and fs_vfstype == "cgroup2": + ncgroup_v2 += 1 + else: + ncgroup_v1 += 1 + + if ncgroup_v1 == 0 and ncgroup_v2 > 0: + cgroup_versopn = "v2" + return cgroup_version + else: + cgroup_version = "v1" + return cgroup_version + + def get_process_name(self, pid): + cgroup_version = self.get_cgroup_version() + if cgroup_version == "v1": + proc_filename = "/proc" + "/" + pid + "/stat" + else: + proc_filename = "/proc" + "/" + pid + "/cgroup.procs" + with open(proc_filename) as file: + for line in file: + parts = line.split() + process_name = parts[1].strip("()") + return process_name + + def get_pid_list(self, filename): + list = [] + with open(filename) as file: + for line in file: + list.append(line.strip()) + return list + + def get_list_of_files(self, dir): + listOfFiles = os.listdir(dir) + allFiles = list() + for entry in listOfFiles: + fullpath = os.path.join(dir, entry) + if os.path.isdir(fullpath): + allFiles = allFiles + self.get_list_of_files(fullpath) + else: + allFiles.append(fullpath) + + return allFiles + + def process_task_files(self, dir): + time.sleep(5) + allFiles = self.get_list_of_files(dir) + for file in allFiles: + if file.endswith("tasks"): + list = self.get_pid_list(file) + if list: + for pid in list: + process_name = self.get_process_name(pid) + match = self.check_filter(process_name) + if match is True: + print("Found match:", process_name) + print("Launchig CMonitor", process_name, self.filter) + self.launch_cmonitor(file) + + def launch_cmonitor(self, filename): + for c in self.command: + cmd = c.strip() + base_path = os.path.dirname(filename) + path = "/".join(base_path.split("/")[5:]) + cmd = cmd + " " + "--cgroup-name=" + path + print("Launch CMonitor : with command:", cmd) + # p = Popen(cmd) + # print("process id:",p.pid) #FIXME : need to delete the current cmonitor already running fot this process..!!! + os.system(cmd) + + def check_filter(self, process_name): + for e in self.filter: + if process_name in e: + return True + + def inotify_events(self, input_path, queue): + lock = multiprocessing.Lock() + i = inotify.adapters.Inotify() + i.add_watch(input_path) + try: + for event in i.event_gen(): + if event is not None: + if "IN_CREATE" in event[1]: + (header, type_names, path, filename) = event + print(header, type_names, path, filename) + lock.acquire() + dir = path + filename + queue.put(dir) + lock.release() + finally: + i.remove_watch(path) + + def process_events(self, event, filter, command): + self.filter = filter + self.command = command + lock = multiprocessing.Lock() + entry = 1 + while True: + lock.acquire() + filename = event.get() + print(" in process events entry:", entry) + print(" in process events", filename) + time.sleep(50) + self.process_task_files(filename) + entry = entry + 1 + lock.release() + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Processsome integers.") + parser.add_argument("-p", "--path", help="path to watch") + parser.add_argument( + "-filter", + "--filter", + nargs="*", + help="cmonitor triggers for matching pattern", + ) + parser.add_argument( + "-command", + "--command", + nargs="*", + help="cmonitor input command parameters", + ) + args = parser.parse_args() + input_path = args.path + print("Input path to watch:", input_path) + filter = args.filter + command = args.command + + queue = multiprocessing.Queue() + process_1 = multiprocessing.Process( + target=CmonitorLauncher().inotify_events, args=(input_path, queue) + ) + process_2 = multiprocessing.Process( + target=CmonitorLauncher().process_events, args=(queue, filter, command) + ) + + process_1.start() + process_2.start() + + process_1.join() + process_2.join() + + +if __name__ == "__main__": + main() From 40fe079e1050b40c36a792a096cd6d83c16304a5 Mon Sep 17 00:00:00 2001 From: sbharati Date: Mon, 6 Jun 2022 07:57:47 -0400 Subject: [PATCH 02/25] minor change. --- tools/common-code/cmonitor_launcher.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py index f961b1f..4232706 100644 --- a/tools/common-code/cmonitor_launcher.py +++ b/tools/common-code/cmonitor_launcher.py @@ -115,8 +115,7 @@ def check_filter(self, process_name): if process_name in e: return True - def inotify_events(self, input_path, queue): - lock = multiprocessing.Lock() + def inotify_events(self, input_path, lock, queue): i = inotify.adapters.Inotify() i.add_watch(input_path) try: @@ -125,27 +124,26 @@ def inotify_events(self, input_path, queue): if "IN_CREATE" in event[1]: (header, type_names, path, filename) = event print(header, type_names, path, filename) - lock.acquire() dir = path + filename + lock.acquire() queue.put(dir) lock.release() finally: i.remove_watch(path) - def process_events(self, event, filter, command): + def process_events(self, lock, event, filter, command): self.filter = filter self.command = command - lock = multiprocessing.Lock() entry = 1 while True: - lock.acquire() + #lock.acquire() filename = event.get() + #lock.release() print(" in process events entry:", entry) print(" in process events", filename) time.sleep(50) self.process_task_files(filename) entry = entry + 1 - lock.release() def main(): @@ -153,13 +151,13 @@ def main(): parser = argparse.ArgumentParser(description="Processsome integers.") parser.add_argument("-p", "--path", help="path to watch") parser.add_argument( - "-filter", + "-f", "--filter", nargs="*", help="cmonitor triggers for matching pattern", ) parser.add_argument( - "-command", + "-c", "--command", nargs="*", help="cmonitor input command parameters", @@ -170,12 +168,13 @@ def main(): filter = args.filter command = args.command + lock = multiprocessing.Lock() queue = multiprocessing.Queue() process_1 = multiprocessing.Process( - target=CmonitorLauncher().inotify_events, args=(input_path, queue) + target=CmonitorLauncher().inotify_events, args=(input_path, lock, queue) ) process_2 = multiprocessing.Process( - target=CmonitorLauncher().process_events, args=(queue, filter, command) + target=CmonitorLauncher().process_events, args=(lock, queue, filter, command) ) process_1.start() From 09e529a4b999fbb9a66ebbbe25432eee2a1b9e5f Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 14 Jun 2022 08:26:05 -0400 Subject: [PATCH 03/25] added concurrent futures over multiprocessing. --- tools/common-code/cmonitor_launcher.py | 133 +++++++++++++++++-------- 1 file changed, 89 insertions(+), 44 deletions(-) diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py index 4232706..8eb7953 100644 --- a/tools/common-code/cmonitor_launcher.py +++ b/tools/common-code/cmonitor_launcher.py @@ -7,27 +7,48 @@ # ======================================================================================================= import inotify.adapters -import multiprocessing -from threading import Thread, Lock -from queue import Queue +import concurrent.futures +from concurrent.futures import ProcessPoolExecutor from subprocess import Popen +import queue import os import time +import signal +#import subprocess +queue = queue.Queue() # ======================================================================================================= # CmonitorLauncher # -# - Watch all files below a directory and nottify an event for changes. +# - Watch all files below a directory and notify an event for changes. # - Retrieves all the process and extract the process name "/proc//stat. # - check the process name against the white-list given in the filter list. -# - Execute command to launch CMonitor if the process name mathes with the filter. +# - Execute command to launch CMonitor if the process name matches with the filter. # # ======================================================================================================= class CmonitorLauncher: - def __init__(self): - self.path = "" - self.filter = "" - self.command = "" + def __init__(self,path, filter, ip , port, command): + self.path = path + self.filter = filter + self.ip = ip + self.port = port + self.command = command + self.process_host_dict = {} + self.prev_process_dict= {} + self.prev_file = "" + + """ + Should add the list of IPs as key to the dictionary + """ + tmp_ip = self.ip + for key in self.filter: + for value in tmp_ip: + self.process_host_dict[key] = value + tmp_ip.remove(value) + # Printing resultant dictionary + print("Input [filter-host]: " + str(self.process_host_dict)) + print("Input [port]: " + str(self.port)) + def get_cgroup_version(self): proc_self_mount = "/proc/self/mounts" @@ -96,51 +117,72 @@ def process_task_files(self, dir): match = self.check_filter(process_name) if match is True: print("Found match:", process_name) - print("Launchig CMonitor", process_name, self.filter) - self.launch_cmonitor(file) - - def launch_cmonitor(self, filename): + # remove already running cmonitor for the same process + if self.prev_file: + self.remove_process(process_name, self.prev_file) + # launch cmonitor for new process + self.ip = self.process_host_dict.get(process_name) + self.launch_cmonitor(file, self.ip) + # store the new process task file + cur_pod = file.split("/")[7] + self.prev_process_dict[process_name] = cur_pod + # store the old process task file + self.prev_file = file + + def remove_process(self, process_name, prev_file): + prev_pod = prev_file.split("/")[7] + print("Previous running process:", self.prev_process_dict) + if process_name in self.prev_process_dict: + for line in os.popen("ps -aef | grep " + prev_pod + " | grep -v grep"): + fields = line.split() + pid = fields[1] + # terminating process + print("Terminating cMonitor running process with ID:", process_name,pid) + os.kill(int(pid), signal.SIGKILL) + print("Process Successfully terminated") + + + def launch_cmonitor(self, filename, ip): for c in self.command: cmd = c.strip() + port = self.port base_path = os.path.dirname(filename) path = "/".join(base_path.split("/")[5:]) - cmd = cmd + " " + "--cgroup-name=" + path - print("Launch CMonitor : with command:", cmd) - # p = Popen(cmd) - # print("process id:",p.pid) #FIXME : need to delete the current cmonitor already running fot this process..!!! + cmd = cmd + " " + "--cgroup-name=" + path + " " + "-A" + " " + ip + " " + "-S" + " " + port + #cmd = cmd + " " + "--cgroup-name=" + path + " " + "--remote-ip" + " " + ip + " " + "--remote-port" + " " + port + print("Launch cMonitor with command:", cmd) os.system(cmd) + # FIXME : need to delete the current cmonitor already running for this process..!!! + # pid = subprocess.Popen(cmd.split(), shell=True) + # print("cmd PID:", pid.pid) + # As it does not store the pid of the actual command , need to find another way to + # kill the already running process : using pod ID def check_filter(self, process_name): for e in self.filter: if process_name in e: return True - def inotify_events(self, input_path, lock, queue): + def inotify_events(self, queue): i = inotify.adapters.Inotify() - i.add_watch(input_path) + i.add_watch(self.path) try: for event in i.event_gen(): if event is not None: if "IN_CREATE" in event[1]: (header, type_names, path, filename) = event - print(header, type_names, path, filename) + # print(header, type_names, path, filename) dir = path + filename - lock.acquire() queue.put(dir) - lock.release() finally: i.remove_watch(path) - def process_events(self, lock, event, filter, command): - self.filter = filter - self.command = command + def process_events(self, event): entry = 1 while True: - #lock.acquire() filename = event.get() - #lock.release() - print(" in process events entry:", entry) - print(" in process events", filename) + print("In process events entry:", entry, filename) + # print("In process events", filename) time.sleep(50) self.process_task_files(filename) entry = entry + 1 @@ -162,27 +204,30 @@ def main(): nargs="*", help="cmonitor input command parameters", ) + parser.add_argument( + "-i", + "--ip", + nargs="*", + help="cmonitor input IP", + ) + parser.add_argument( + "-r", + "--port", + help="cmonitor input Port", + ) args = parser.parse_args() input_path = args.path - print("Input path to watch:", input_path) + print("Input [path]:", input_path) filter = args.filter command = args.command + ip = args.ip + port = args.port - lock = multiprocessing.Lock() - queue = multiprocessing.Queue() - process_1 = multiprocessing.Process( - target=CmonitorLauncher().inotify_events, args=(input_path, lock, queue) - ) - process_2 = multiprocessing.Process( - target=CmonitorLauncher().process_events, args=(lock, queue, filter, command) - ) - - process_1.start() - process_2.start() - - process_1.join() - process_2.join() + cMonitorLauncher = CmonitorLauncher(input_path, filter, ip , port, command) + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + executor.submit(cMonitorLauncher.inotify_events, queue) + executor.submit(cMonitorLauncher.process_events, queue) if __name__ == "__main__": main() From 4c5945fcf615acea6ee7938f3fa02aa37146292c Mon Sep 17 00:00:00 2001 From: sbharati Date: Thu, 16 Jun 2022 09:59:24 -0400 Subject: [PATCH 04/25] minor change. --- tools/common-code/cmonitor_launcher.py | 54 +++++--------------------- 1 file changed, 10 insertions(+), 44 deletions(-) diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py index 8eb7953..ec7afb7 100644 --- a/tools/common-code/cmonitor_launcher.py +++ b/tools/common-code/cmonitor_launcher.py @@ -27,15 +27,12 @@ # # ======================================================================================================= class CmonitorLauncher: - def __init__(self,path, filter, ip , port, command): + def __init__(self,path, filter, ip , command): self.path = path self.filter = filter self.ip = ip - self.port = port self.command = command self.process_host_dict = {} - self.prev_process_dict= {} - self.prev_file = "" """ Should add the list of IPs as key to the dictionary @@ -47,8 +44,6 @@ def __init__(self,path, filter, ip , port, command): tmp_ip.remove(value) # Printing resultant dictionary print("Input [filter-host]: " + str(self.process_host_dict)) - print("Input [port]: " + str(self.port)) - def get_cgroup_version(self): proc_self_mount = "/proc/self/mounts" @@ -117,46 +112,23 @@ def process_task_files(self, dir): match = self.check_filter(process_name) if match is True: print("Found match:", process_name) - # remove already running cmonitor for the same process - if self.prev_file: - self.remove_process(process_name, self.prev_file) - # launch cmonitor for new process + # print("Launchig cMonitor:", process_name) self.ip = self.process_host_dict.get(process_name) + # print("Launchig cMonitor with IP :", str(self.ip)) self.launch_cmonitor(file, self.ip) - # store the new process task file - cur_pod = file.split("/")[7] - self.prev_process_dict[process_name] = cur_pod - # store the old process task file - self.prev_file = file - - def remove_process(self, process_name, prev_file): - prev_pod = prev_file.split("/")[7] - print("Previous running process:", self.prev_process_dict) - if process_name in self.prev_process_dict: - for line in os.popen("ps -aef | grep " + prev_pod + " | grep -v grep"): - fields = line.split() - pid = fields[1] - # terminating process - print("Terminating cMonitor running process with ID:", process_name,pid) - os.kill(int(pid), signal.SIGKILL) - print("Process Successfully terminated") def launch_cmonitor(self, filename, ip): for c in self.command: cmd = c.strip() - port = self.port + ip_port = ip.split(":"); + ip = ip_port[0] + port = ip_port[1] base_path = os.path.dirname(filename) path = "/".join(base_path.split("/")[5:]) cmd = cmd + " " + "--cgroup-name=" + path + " " + "-A" + " " + ip + " " + "-S" + " " + port - #cmd = cmd + " " + "--cgroup-name=" + path + " " + "--remote-ip" + " " + ip + " " + "--remote-port" + " " + port print("Launch cMonitor with command:", cmd) os.system(cmd) - # FIXME : need to delete the current cmonitor already running for this process..!!! - # pid = subprocess.Popen(cmd.split(), shell=True) - # print("cmd PID:", pid.pid) - # As it does not store the pid of the actual command , need to find another way to - # kill the already running process : using pod ID def check_filter(self, process_name): for e in self.filter: @@ -206,24 +178,18 @@ def main(): ) parser.add_argument( "-i", - "--ip", + "--ip-port", nargs="*", - help="cmonitor input IP", - ) - parser.add_argument( - "-r", - "--port", - help="cmonitor input Port", + help="cmonitor input ", ) args = parser.parse_args() input_path = args.path print("Input [path]:", input_path) filter = args.filter command = args.command - ip = args.ip - port = args.port + ip = args.ip_port - cMonitorLauncher = CmonitorLauncher(input_path, filter, ip , port, command) + cMonitorLauncher = CmonitorLauncher(input_path, filter, ip , command) with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.submit(cMonitorLauncher.inotify_events, queue) From 0e12c0619f1dd03bb1e9e436963cf85a400205c1 Mon Sep 17 00:00:00 2001 From: sbharati Date: Thu, 16 Jun 2022 10:11:02 -0400 Subject: [PATCH 05/25] minor change. --- tools/common-code/cmonitor_launcher.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py index ec7afb7..ba5a0ad 100644 --- a/tools/common-code/cmonitor_launcher.py +++ b/tools/common-code/cmonitor_launcher.py @@ -13,8 +13,6 @@ import queue import os import time -import signal -#import subprocess queue = queue.Queue() # ======================================================================================================= From 69dcff45e902849f7f47c3c9a9d2e085732b290f Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 17 Jun 2022 08:26:36 -0400 Subject: [PATCH 06/25] minor change. --- tools/common-code/cmonitor_launcher.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py index ba5a0ad..f3dd9f8 100644 --- a/tools/common-code/cmonitor_launcher.py +++ b/tools/common-code/cmonitor_launcher.py @@ -110,9 +110,7 @@ def process_task_files(self, dir): match = self.check_filter(process_name) if match is True: print("Found match:", process_name) - # print("Launchig cMonitor:", process_name) self.ip = self.process_host_dict.get(process_name) - # print("Launchig cMonitor with IP :", str(self.ip)) self.launch_cmonitor(file, self.ip) @@ -124,7 +122,7 @@ def launch_cmonitor(self, filename, ip): port = ip_port[1] base_path = os.path.dirname(filename) path = "/".join(base_path.split("/")[5:]) - cmd = cmd + " " + "--cgroup-name=" + path + " " + "-A" + " " + ip + " " + "-S" + " " + port + cmd = f"{cmd} --cgroup-name={path} -A {ip} -S {port}" print("Launch cMonitor with command:", cmd) os.system(cmd) From 702384fdee0d85725e74a479c4d033a892546b73 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 17 Jun 2022 11:42:01 -0400 Subject: [PATCH 07/25] minor change. --- tools/common-code/cmonitor_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py index f3dd9f8..39c7a9c 100644 --- a/tools/common-code/cmonitor_launcher.py +++ b/tools/common-code/cmonitor_launcher.py @@ -122,7 +122,7 @@ def launch_cmonitor(self, filename, ip): port = ip_port[1] base_path = os.path.dirname(filename) path = "/".join(base_path.split("/")[5:]) - cmd = f"{cmd} --cgroup-name={path} -A {ip} -S {port}" + cmd = f"{cmd} --cgroup-name={path} --remote-ip {ip} --remote-port {port}" print("Launch cMonitor with command:", cmd) os.system(cmd) From 150a8637df6906304ecdad934e0960f93ef6a4a8 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 19 Aug 2022 05:21:55 -0400 Subject: [PATCH 08/25] rework on review comments. --- tools/common-code/cmonitor_launcher.py | 195 ----------------------- tools/launcher/cmonitor_launcher.py | 211 +++++++++++++++++++++++++ tools/launcher/cmonitor_watcher.py | 190 ++++++++++++++++++++++ 3 files changed, 401 insertions(+), 195 deletions(-) delete mode 100644 tools/common-code/cmonitor_launcher.py create mode 100644 tools/launcher/cmonitor_launcher.py create mode 100644 tools/launcher/cmonitor_watcher.py diff --git a/tools/common-code/cmonitor_launcher.py b/tools/common-code/cmonitor_launcher.py deleted file mode 100644 index 39c7a9c..0000000 --- a/tools/common-code/cmonitor_launcher.py +++ /dev/null @@ -1,195 +0,0 @@ -# ======================================================================================================= -# cmonitor_launcher.py -# -# Author: Satyabrata Bharati -# Created: April 2022 -# -# ======================================================================================================= - -import inotify.adapters -import concurrent.futures -from concurrent.futures import ProcessPoolExecutor -from subprocess import Popen -import queue -import os -import time - -queue = queue.Queue() -# ======================================================================================================= -# CmonitorLauncher -# -# - Watch all files below a directory and notify an event for changes. -# - Retrieves all the process and extract the process name "/proc//stat. -# - check the process name against the white-list given in the filter list. -# - Execute command to launch CMonitor if the process name matches with the filter. -# -# ======================================================================================================= -class CmonitorLauncher: - def __init__(self,path, filter, ip , command): - self.path = path - self.filter = filter - self.ip = ip - self.command = command - self.process_host_dict = {} - - """ - Should add the list of IPs as key to the dictionary - """ - tmp_ip = self.ip - for key in self.filter: - for value in tmp_ip: - self.process_host_dict[key] = value - tmp_ip.remove(value) - # Printing resultant dictionary - print("Input [filter-host]: " + str(self.process_host_dict)) - - def get_cgroup_version(self): - proc_self_mount = "/proc/self/mounts" - ncgroup_v1 = 0 - ncgroup_v2 = 0 - with open(proc_self_mount) as file: - for line in file: - row = line.split() - fs_spec = row[0] - fs_file = row[1] - fs_vfstype = row[2] - if ( - fs_spec == "cgroup" or fs_spec == "cgroup2" - ) and fs_vfstype == "cgroup2": - ncgroup_v2 += 1 - else: - ncgroup_v1 += 1 - - if ncgroup_v1 == 0 and ncgroup_v2 > 0: - cgroup_versopn = "v2" - return cgroup_version - else: - cgroup_version = "v1" - return cgroup_version - - def get_process_name(self, pid): - cgroup_version = self.get_cgroup_version() - if cgroup_version == "v1": - proc_filename = "/proc" + "/" + pid + "/stat" - else: - proc_filename = "/proc" + "/" + pid + "/cgroup.procs" - with open(proc_filename) as file: - for line in file: - parts = line.split() - process_name = parts[1].strip("()") - return process_name - - def get_pid_list(self, filename): - list = [] - with open(filename) as file: - for line in file: - list.append(line.strip()) - return list - - def get_list_of_files(self, dir): - listOfFiles = os.listdir(dir) - allFiles = list() - for entry in listOfFiles: - fullpath = os.path.join(dir, entry) - if os.path.isdir(fullpath): - allFiles = allFiles + self.get_list_of_files(fullpath) - else: - allFiles.append(fullpath) - - return allFiles - - def process_task_files(self, dir): - time.sleep(5) - allFiles = self.get_list_of_files(dir) - for file in allFiles: - if file.endswith("tasks"): - list = self.get_pid_list(file) - if list: - for pid in list: - process_name = self.get_process_name(pid) - match = self.check_filter(process_name) - if match is True: - print("Found match:", process_name) - self.ip = self.process_host_dict.get(process_name) - self.launch_cmonitor(file, self.ip) - - - def launch_cmonitor(self, filename, ip): - for c in self.command: - cmd = c.strip() - ip_port = ip.split(":"); - ip = ip_port[0] - port = ip_port[1] - base_path = os.path.dirname(filename) - path = "/".join(base_path.split("/")[5:]) - cmd = f"{cmd} --cgroup-name={path} --remote-ip {ip} --remote-port {port}" - print("Launch cMonitor with command:", cmd) - os.system(cmd) - - def check_filter(self, process_name): - for e in self.filter: - if process_name in e: - return True - - def inotify_events(self, queue): - i = inotify.adapters.Inotify() - i.add_watch(self.path) - try: - for event in i.event_gen(): - if event is not None: - if "IN_CREATE" in event[1]: - (header, type_names, path, filename) = event - # print(header, type_names, path, filename) - dir = path + filename - queue.put(dir) - finally: - i.remove_watch(path) - - def process_events(self, event): - entry = 1 - while True: - filename = event.get() - print("In process events entry:", entry, filename) - # print("In process events", filename) - time.sleep(50) - self.process_task_files(filename) - entry = entry + 1 - - -def main(): - import argparse - parser = argparse.ArgumentParser(description="Processsome integers.") - parser.add_argument("-p", "--path", help="path to watch") - parser.add_argument( - "-f", - "--filter", - nargs="*", - help="cmonitor triggers for matching pattern", - ) - parser.add_argument( - "-c", - "--command", - nargs="*", - help="cmonitor input command parameters", - ) - parser.add_argument( - "-i", - "--ip-port", - nargs="*", - help="cmonitor input ", - ) - args = parser.parse_args() - input_path = args.path - print("Input [path]:", input_path) - filter = args.filter - command = args.command - ip = args.ip_port - - cMonitorLauncher = CmonitorLauncher(input_path, filter, ip , command) - - with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: - executor.submit(cMonitorLauncher.inotify_events, queue) - executor.submit(cMonitorLauncher.process_events, queue) - -if __name__ == "__main__": - main() diff --git a/tools/launcher/cmonitor_launcher.py b/tools/launcher/cmonitor_launcher.py new file mode 100644 index 0000000..292ff15 --- /dev/null +++ b/tools/launcher/cmonitor_launcher.py @@ -0,0 +1,211 @@ +#!/usr/bin/python3 + +# +# cmonitor_launcher.py +# +# Author: Satyabrata Bharati +# Created: April 2022 +# + +import concurrent.futures +from concurrent.futures import ProcessPoolExecutor +import subprocess +from subprocess import Popen +import argparse +import queue +import os +import sys +import time +import logging +from datetime import datetime + +from cmonitor_watcher import CgroupWatcher +from argparse import RawTextHelpFormatter + +queue = queue.Queue() +logger = logging.getLogger(__name__) + +# default sleep timeout +default_sleep_timeout = 20 +# ======================================================================================================= +# CmonitorLauncher +# ======================================================================================================= +class CmonitorLauncher: + """ + - Retrieves all the events from the Queue. + - Lauch cMonitor with appropriate command. + """ + + def __init__(self, path, filter, ip, command, timeout): + self.path = path + self.filter = filter + self.ip = ip + self.command = command + self.timeout = timeout + self.process_host_dict = {} + + """ + Should add the list of IPs as key to the dictionary + """ + tmp_ip = self.ip + for key in self.filter: + for value in tmp_ip: + self.process_host_dict[key] = value + tmp_ip.remove(value) + # Printing resultant dictionary + print("Input [filter-host]: " + str(self.process_host_dict)) + logging.info(f"Input [filter-host] {str(self.process_host_dict)}") + + def process_events(self, event): + """Main thread function for processing input events from the queue. + Args: + event: events to read from this queue. + The events from this queue will be processed by this threading function to + launch cMonitor with appropriate command input. + + """ + try: + entry = 1 + while True: + if not event.empty(): + fileList = event.get() + for key, value in fileList.items(): + filename = key + process_name = value + logging.info(f"In processing event from the Queue - event: {entry},file: {filename},process_name: {process_name}") + logging.info(f"Launching cMonitor with: {filename} with IP :{self.process_host_dict.get(process_name)}") + self.__launch_cmonitor(filename, self.process_host_dict.get(process_name)) + entry = entry + 1 + else: + # time.sleep(10) + time.sleep(self.timeout) + logging.info(f"In processing event Queue is empty - sleeping: {self.timeout} sec") + except event.Empty(): + pass + + def __launch_cmonitor(self, filename, ip): + """ + - Lauch cMonitor with appropriate command. + """ + + for c in self.command: + cmd = c.strip() + ip_port = ip.split(":") + ip = ip_port[0] + port = ip_port[1] + base_path = os.path.dirname(filename) + path = "/".join(base_path.split("/")[5:]) + cmd = f"{cmd} --cgroup-name={path} --remote-ip {ip} --remote-port {port}" + print("Launch cMonitor with command:", cmd) + logging.info(f"Launch cMonitor with command: {cmd}") + # os.system(cmd) + subprocess.run(cmd, shell=True) + + +def parse_command_line(): + """Parses the command line and returns the configuration as dictionary object.""" + parser = argparse.ArgumentParser( + description="Utility to Lauch cMonitor with appropriate command.", + epilog=""" +Example: + cmonitor_launcher.py --path /sys/fs/cgroup/memory/kubepods/burstable/ + --filter process_1 process_2 + --ip-port 172.0.0.1:9090 172.0.0.2:9099 + --command "./cmonitor_collector --num-samples=until-cgroup-alive + --deep-collect --collect=cgroup_threads,cgroup_cpu,cgroup_memory,cgroup_network + --score-threshold=0 --sampling-interval=3 --output-directory=/home + --allow-multiple-instances --remote prometheus" + --log /home + --timeout 20 +""", + formatter_class=RawTextHelpFormatter, + ) + parser.add_argument("-p", "--path", help="path to watch", default=None) + parser.add_argument( + "-f", + "--filter", + nargs="*", + help="cmonitor triggers for matching pattern", + default=None, + ) + parser.add_argument( + "-c", + "--command", + nargs="*", + help="cmonitor input command parameters", + default=None, + ) + parser.add_argument("-i", "--ip-port", nargs="*", help="cmonitor input ", default=None) + parser.add_argument("-l", "--log", help="path to logfile") + parser.add_argument("-t", "--timeout", type=int, help="sleep time interval") + args = parser.parse_args() + + if args.path is None: + print("Please provide the input path to watch for iNotify events to be monitored") + parser.print_help() + sys.exit(os.EX_USAGE) + + if args.filter is None: + print("Please provide the input filter for white-listing events") + parser.print_help() + sys.exit(os.EX_USAGE) + + if args.command is None: + print("Please provide the input comamnd to launch cMonitor with") + parser.print_help() + sys.exit(os.EX_USAGE) + + if args.ip_port is None: + print("Please provide the input ip:port to launch cMonitor") + parser.print_help() + sys.exit(os.EX_USAGE) + + return { + "input_path": args.path, + "input_filter": args.filter, + "input_command": args.command, + "input_ip": args.ip_port, + "input_log": args.log, + "input_timeout": args.timeout, + } + + +def main(): + config = parse_command_line() + # default sleep timeout + timeout = default_sleep_timeout + + # command line inputs + input_path = config["input_path"] + print("Input [path]:", input_path) + filter = config["input_filter"] + command = config["input_command"] + ip = config["input_ip"] + log_dir = config["input_log"] + timeout = config["input_timeout"] + + if log_dir: + log_file_name = os.path.join(log_dir, datetime.now().strftime("cmonitor_launcher_%Y%m%d_%H%M%S.log")) + else: + log_file_name = datetime.now().strftime("cmonitor_launcher_%Y%m%d_%H%M%S.log") + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%a, %d %b %Y %H:%M:%S", + filename=log_file_name, + filemode="w", + ) + logging.info("Started") + logging.info(f"timeout set for sleep: {timeout}") + + cGroupWatcher = CgroupWatcher(input_path, filter, timeout) + cMonitorLauncher = CmonitorLauncher(input_path, filter, ip, command, timeout) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + executor.submit(cGroupWatcher.inotify_events, queue) + executor.submit(cMonitorLauncher.process_events, queue) + + +if __name__ == "__main__": + main() diff --git a/tools/launcher/cmonitor_watcher.py b/tools/launcher/cmonitor_watcher.py new file mode 100644 index 0000000..9791d34 --- /dev/null +++ b/tools/launcher/cmonitor_watcher.py @@ -0,0 +1,190 @@ +#!/usr/bin/python3 + +# +# cmonitor_watcher.py +# +# Author: Satyabrata Bharati +# Created: April 2022 +# +import inotify.adapters +import queue +import os +import time +import logging +from datetime import datetime + +# ======================================================================================================= +# CgroupWatcher : Basic inotify class +# ======================================================================================================= +class CgroupWatcher: + """ + - Watch all files below a directory and notify an event for changes. + - Retrieves all the process and extract the process name "/proc//stat. + - check the process name against the white-list given in the filter list. + - store the events in Queue. + """ + + def __init__(self, path, filter, timeout): + """Initialize CgroupWatcher + Args: + path: path to watch for events. + filter: white-list against which the process-event is filtered. + + """ + self.path = path + self.filter = filter + self.timeout = timeout + self.myFileList = {} + + def __get_cgroup_version(self): + """ + Detect the cgroup version. + """ + proc_self_mount = "/proc/self/mounts" + ncgroup_v1 = 0 + ncgroup_v2 = 0 + with open(proc_self_mount) as file: + for line in file: + row = line.split() + fs_spec = row[0] + fs_file = row[1] + fs_vfstype = row[2] + if (fs_spec == "cgroup" or fs_spec == "cgroup2") and fs_vfstype == "cgroup2": + ncgroup_v2 += 1 + else: + ncgroup_v1 += 1 + + if ncgroup_v1 == 0 and ncgroup_v2 > 0: + cgroup_versopn = "v2" + return cgroup_version + else: + cgroup_version = "v1" + return cgroup_version + + def __get_process_name(self, pid): + """Returns the process name for the process id. + Args: + pid: process id. + + Returns: + The process name. + + """ + cgroup_version = self.__get_cgroup_version() + if cgroup_version == "v1": + proc_filename = "/proc" + "/" + pid + "/stat" + else: + proc_filename = "/proc" + "/" + pid + "/cgroup.procs" + with open(proc_filename) as file: + for line in file: + parts = line.split() + process_name = parts[1].strip("()") + return process_name + + def __get_pid_list(self, filename): + """Get the list of the process ids belong to a tasks file. + Args: + filename: the tasks file. + + Returns: + The list of PIDs within the tasks file. + + """ + list = [] + with open(filename) as file: + for line in file: + list.append(line.strip()) + return list + + def __get_list_of_files(self, dir): + """Returns the list of the files created for the event within the watched dir. + Args: + filename: dir to be watched. + + Returns: + The list of files created within the watched dir. + + """ + listOfFiles = os.listdir(dir) + allFiles = list() + for entry in listOfFiles: + fullpath = os.path.join(dir, entry) + if os.path.isdir(fullpath): + allFiles = allFiles + self.__get_list_of_files(fullpath) + else: + allFiles.append(fullpath) + + return allFiles + + def __process_task_files(self, dir): + """Process all the files for triggered-event within the watched dir. + Finds the process Ids and filter out the process name against the + provided white-list. If the process Id matches the whilte-listing + process from command-line , it store and return the file anlog with the process-name. + The process name later will be used to get the ip and port from the + command-line for the specific process. + Args: + dir: dir to be watched. + + Returns: + The file along with the process name which will be used to launch cmonitor. + + """ + # time.sleep(20) + time.sleep(self.timeout) + logging.info(f"watcher process file sleep: {self.timeout}") + allFiles = self.__get_list_of_files(dir) + for file in allFiles: + if file.endswith("tasks"): + list = self.__get_pid_list(file) + if list: + for pid in list: + process_name = self.__get_process_name(pid) + logging.info(f"processing task file: {file} with pid: {pid}, process name: {process_name}") + match = self.__check_filter(process_name) + if match is True: + logging.info(f"Found match: {process_name}") + self.myFileList = {file: process_name} + return self.myFileList + + def __check_filter(self, process_name): + """Check process name against the whilte-list. + Args: + process_name: process name to be matched against the whilte-list from command-line. + + Returns: + True if process_name matches with the white-list. + + """ + for e in self.filter: + if process_name in e: + return True + + def inotify_events(self, queue): + """Main thread function for notifying events. + Monitored events that match with the white-list provided will be stored in this queue. + The events from this queue will be processed by cMonitorLauncher threading function to + launch cMonitor with appropriate command input + Args: + queue: monitored events will be stored in this queue. + + Returns: + + """ + logging.info(f"CgroupWatcher calling inotify_event") + i = inotify.adapters.Inotify() + i.add_watch(self.path) + try: + for event in i.event_gen(): + if event is not None: + if "IN_CREATE" in event[1]: + (header, type_names, path, filename) = event + logging.info(f"CgroupWatcher event triggered:{path},{filename}") + dir = path + filename + logging.info(f"CgroupWatcher event created:{filename}") + fileList = self.__process_task_files(dir) + if fileList: + logging.info(f"CgroupWatcher event in Queue:{fileList}") + queue.put(fileList) + finally: + i.remove_watch(path) From d1d7246612af6406d8a002e32209a64cfcac7952 Mon Sep 17 00:00:00 2001 From: sbharati Date: Sun, 21 Aug 2022 23:49:49 -0400 Subject: [PATCH 09/25] updated Makefile for cMonitor_launcher rpm installation. --- tools/Makefile | 6 ++++-- tools/{launcher => common-code}/cmonitor_watcher.py | 0 2 files changed, 4 insertions(+), 2 deletions(-) rename tools/{launcher => common-code}/cmonitor_watcher.py (100%) diff --git a/tools/Makefile b/tools/Makefile index 465197b..637e89d 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -12,11 +12,13 @@ include $(ROOT_DIR)/Constants.mk TOOLS = \ chart/cmonitor_chart.py \ filter/cmonitor_filter.py \ - statistics/cmonitor_statistics.py + statistics/cmonitor_statistics.py \ + launcher/cmonitor_launcher.py SYMLINKS = \ chart/cmonitor_chart \ filter/cmonitor_filter \ - statistics/cmonitor_statistics + statistics/cmonitor_statistics \ + launcher/cmonitor_launcher # cmonitor_version.py has to be listed explicitly because it does not exist yet # when the $(wilcard) runs (after a clean checkout) diff --git a/tools/launcher/cmonitor_watcher.py b/tools/common-code/cmonitor_watcher.py similarity index 100% rename from tools/launcher/cmonitor_watcher.py rename to tools/common-code/cmonitor_watcher.py From ef2b049831b9a4a9e99524403366cba5765bba9c Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 23 Aug 2022 08:30:48 -0400 Subject: [PATCH 10/25] added unit test case. --- tools/Makefile | 1 + tools/common-code/cmonitor_watcher.py | 6 +- tools/launcher/Makefile | 11 ++ tools/launcher/cmonitor_launcher.py | 6 +- ...cgroup_watcher.cpython-36-pytest-7.0.1.pyc | Bin 0 -> 2836 bytes .../test_dymmy.cpython-36-pytest-7.0.1.pyc | Bin 0 -> 435 bytes ...r_by_task_name.cpython-36-pytest-7.0.1.pyc | Bin 0 -> 3113 bytes .../test_launcher.cpython-36-pytest-7.0.1.pyc | Bin 0 -> 2830 bytes tools/launcher/tests/dummy.py | 17 +++ tools/launcher/tests/test_cgroup_watcher.py | 103 ++++++++++++++++++ 10 files changed, 141 insertions(+), 3 deletions(-) create mode 100644 tools/launcher/Makefile create mode 100644 tools/launcher/tests/__pycache__/test_cgroup_watcher.cpython-36-pytest-7.0.1.pyc create mode 100644 tools/launcher/tests/__pycache__/test_dymmy.cpython-36-pytest-7.0.1.pyc create mode 100644 tools/launcher/tests/__pycache__/test_filter_by_task_name.cpython-36-pytest-7.0.1.pyc create mode 100644 tools/launcher/tests/__pycache__/test_launcher.cpython-36-pytest-7.0.1.pyc create mode 100644 tools/launcher/tests/dummy.py create mode 100644 tools/launcher/tests/test_cgroup_watcher.py diff --git a/tools/Makefile b/tools/Makefile index 637e89d..838f201 100644 --- a/tools/Makefile +++ b/tools/Makefile @@ -57,6 +57,7 @@ endif test: $(MAKE) -C filter test $(MAKE) -C statistics test + $(MAKE) -C launcher test # FIXME: # $(MAKE) -C chart test diff --git a/tools/common-code/cmonitor_watcher.py b/tools/common-code/cmonitor_watcher.py index 9791d34..483130f 100644 --- a/tools/common-code/cmonitor_watcher.py +++ b/tools/common-code/cmonitor_watcher.py @@ -160,7 +160,7 @@ def __check_filter(self, process_name): if process_name in e: return True - def inotify_events(self, queue): + def inotify_events(self, queue, exit_flag): """Main thread function for notifying events. Monitored events that match with the white-list provided will be stored in this queue. The events from this queue will be processed by cMonitorLauncher threading function to @@ -186,5 +186,9 @@ def inotify_events(self, queue): if fileList: logging.info(f"CgroupWatcher event in Queue:{fileList}") queue.put(fileList) + # global exit_flag + if exit_flag is True: + exit(0) + finally: i.remove_watch(path) diff --git a/tools/launcher/Makefile b/tools/launcher/Makefile new file mode 100644 index 0000000..ea7a876 --- /dev/null +++ b/tools/launcher/Makefile @@ -0,0 +1,11 @@ +ROOT_DIR:=$(shell readlink -f ../..) +PYTHON_COMMON_CODE=$(ROOT_DIR)/tools/common-code + +run: + export PYTHONPATH=$(PYTHON_COMMON_CODE) ; \ + ./cmonitor_launcher.py $(ARGS) + +test: + cd tests && \ + export PYTHONPATH=$(PYTHON_COMMON_CODE) && \ + pytest --capture=no -vv diff --git a/tools/launcher/cmonitor_launcher.py b/tools/launcher/cmonitor_launcher.py index 292ff15..6eebd28 100644 --- a/tools/launcher/cmonitor_launcher.py +++ b/tools/launcher/cmonitor_launcher.py @@ -81,7 +81,7 @@ def process_events(self, event): time.sleep(self.timeout) logging.info(f"In processing event Queue is empty - sleeping: {self.timeout} sec") except event.Empty(): - pass + pass def __launch_cmonitor(self, filename, ip): """ @@ -199,11 +199,13 @@ def main(): logging.info("Started") logging.info(f"timeout set for sleep: {timeout}") + exit_flag = False + cGroupWatcher = CgroupWatcher(input_path, filter, timeout) cMonitorLauncher = CmonitorLauncher(input_path, filter, ip, command, timeout) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - executor.submit(cGroupWatcher.inotify_events, queue) + executor.submit(cGroupWatcher.inotify_events, queue, exit_flag) executor.submit(cMonitorLauncher.process_events, queue) diff --git a/tools/launcher/tests/__pycache__/test_cgroup_watcher.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_cgroup_watcher.cpython-36-pytest-7.0.1.pyc new file mode 100644 index 0000000000000000000000000000000000000000..961d93776537a491ffda973aa50961430950ff56 GIT binary patch literal 2836 zcmai0Uyt0j5hwM3wc5SCz4Q6bO$P8`&MkU-Fh+*|3S z)taL0TR{sH-MltEYcz*eJALs-VI#UcY`kU zde+Uoz{Bl2y`T>~BkSc`!It8g*&yEzwlShkI$WZ_Nl>&yEMgPqd+h{AyQD+huTgM` zbcqN39<|fU=|1Vv1A2w;E-}0s>65Lm(GmwgA=+a!8q{C7K0F^*d{VI6{^N+xqhl8F z$p2^(L7yn`pD0VlbDvMBe@rLQv!q~tG4}7}#WdjsdlK<@LYe=`wBVrk;2E8A_a%4* zJse?SJ}PE(3dGJMR>ZU{9~DLR;EcuFUvPEB)fLxJT(iM2!+K#Z3?(z93@9yyS;Ssy3$ro7!Op|l zX1V_k8~e4Xtgo&7lqAfr^_ckezT&;VQjtd}->31U@av9$_pYxPwfp+$BSUCKDXel{ za+(Wc7V(KN(ju9TECv~hgg#41V8l7Oq1v0OJra)ejC8|45bJ~9tB(#RMNSWsHfD#f z-3-ww53{J6%9tL?7?i3FV-=_Hc^mEPv$+_=j7FTkcKuc9(g8FGd)U?BGltOjF_X-T z2uMR^6jTJh1%hDd`9@3e5|VSFH+rgpXTg_^A$`dWqBSOXTyKo;&Hq0=a&U(hvkDnmWwU;PzV0LGT*f*eO$=Jb;?h--N z(PQcNU>KhY{4DS`uZqf$ zw%L+e+iT&)9lHhW@(pN^W?)aVG#Bq`4j#g2-*7OzFk(NKle;p7STzk3au#Db1s=Jq zOhc6ce+Mo^A040tmYF~R&K5ey>Nde$EZ+s%Xm2GqAqTWQ$P&13R--jKF_!wmASPsq zojPepLgHWX-`-qU5No$F(k}NJ6Lf4j=DkMa9*mqP5a&Ezm(An`iZLi}fjzEx#b_A} zKc4_5kjsbCp_q{&^4s8B$n(zE+sDo+mp?%TJ!@z8EHkt#C8}*Sr#%nV{wts zB1VM=;_$diGM-EU_p<^tO0gf7lcLIqQVmbC;yB6_n21hTMq?^h5d9C8kCI~gfU$y! zop4;RJmMjg$1DPP0Lf^J`Cvt0uP*7z0|0D*A%VTZdSQ&S=p=CHs%o!Ke`8JDR54P% z54zIgHas^s2RGV5Ys0&ngZF-nH*YD26cwLU{6{|-oBtSp1k20!ph3C=z+=gp17PG~ z0}tTc2kxHcAO~vA$H)V`0a#L_Kd2FtAU1kK*kMSDI1Gie_A`42B;p`!BXhky40!80 zqt6)tujpK`guUXi-^w{wJEe`ZOevv5sTM9I#0o=DPz*Kdj&7EsfSr*k6WQ{yxr=klf12u-Tq^81T(ewl`AIiQu>4uXf@-l_&&hPVr jR2jW1?{6t@ivb?;QLoGbqU!wv|Y literal 0 HcmV?d00001 diff --git a/tools/launcher/tests/__pycache__/test_dymmy.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_dymmy.cpython-36-pytest-7.0.1.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5c65d13f74a80375500cc412e9b91b3ef865e7e0 GIT binary patch literal 435 zcmYLEy-ve05I#Fela`i=g@wIKBY}j3gis}R76zm&R)`ICiS1y!f~=HB;2jXJS-Tm?t~r!cEoT-a6fDba#aCdgx(IIN1^c&-4!OAdBAYPH;A-A@)*< zkhofi<_m0s-5!1}@|95{m)dyw(B_pgT6$x1Z%pm-y4>h$C9o9EyB-;Dm1-AF8x9@~ zGn)09ouG>mfKXynj3Hx8l~OY{ABXIIBWo|U3)5xOvY|R=P1{K?O6P?2(&&X1FIIXH x#zQNl``!e}-8w6+jqt{BNS8`5_W!NFY}7yFQW?If#Z^ae6NC(=kkKs3$S)$=ZtVa7 literal 0 HcmV?d00001 diff --git a/tools/launcher/tests/__pycache__/test_filter_by_task_name.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_filter_by_task_name.cpython-36-pytest-7.0.1.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0f835d15a5aaaf85b3c9dd377a148051b4867a1e GIT binary patch literal 3113 zcma)8TW=f36`sA3%jHEPsjKD7Dyd_$MI=SClg5yYq;^syNDa3!T3~^8vEq!RrS`(i z3}s7QqD2{?K!E`6yZ=Dn`_v!Mr#|;#Ui;*~pr@WQOUg`}79lY+XU?2C^Uaxa*}Z1d z+dQ`Vf2rVYEadq8@PI`1vNc-fd`|N zcxgST8@S(28fi0VYT8L!=|Zre=}NMgwu82&t4SyA23<{?$$LI8IX0p-RvoMluF)pjVAti^#DKp^TXf-TGBJW1)clh47B8N9KKwo&$&;Kv@IU2I z)@P!h`#;MwnR5~A8{&V+!bE@%Odn^3lbYeePX;c_&(6rk`8&77K{^4+bSR z88_i%YjA5q#*4)wT?Hgf=@(5v)&^v2Fz*!Yq65gfMfasCQ2!deU33B2JGi2eU7JJp z50Kc9zDD*>M1wRSyM{Ds)?<4*b^z(dD;v-)(L3};v80hMPmzN4-XI0EZD`$bhu);O zz9uE!$7sc-+`Pi<{}<-J!-;G3uEyNE!n}Ef*%ZGAi4D04m`(mC#D_Fs-qcwCALbu^ zZ_EVp%a5LL7u@_=&kWk~Bo?8BSPsw1U{>BCL;5fmtY`2R2wp4?&mj|Kdp<@^NYB&s zd~0}qQGd#F$V+WH!t2mong<7$o!fVZ=Q};&9~}5d?)4z5F~q0g z-!qj30s(Y;id?oLAsClh^nOnu`+iWJrUTwUtvjgZUQc+xof3Yj84kINoZX)2dE?o6 zAHaE50%_FdORyUi?qYWjyAQr~-iPIRA6_LING-&H_9Zm22Hi0vnqMzMq9Newka8x* zpUtmqcqSug^mSG(vm87Iio@N^tsP~&_zU6JVSf43?UOuZ+i{jlC{tQyOz+5Arxu zHWozXJm;}w09112^b>G?j^!X|P3;$ zai$uu93GCq_;DswrL>h=3&lx3N~jhMkCXf`O3HFqgNk*;P=jiHtUVZN?I)b)Ty?{v zoTrh5@;OYRERwJiw80Y7w}r|`BN=Ij;kV!(y%pZX>r@dR)V`pGC0Gju6{f!tl+{nE zGKTyP+R?@0tE;YgG~~DS-sfd?=UtrD@it{(?aoi$D<>tsd-Id`gJ#s1Be-7aO!K?; z<|psZPwvmF_;H|(SSo*SM+ZRJ-9XU2@9nC0L0tR>I%2ujb*tgnM#E^B*n7}gy01Xr zGF%gREej~(LO^^@8l**B%QbOc@&e6UG&=vm;rnnx@20ATA$k*M&v#)(t%s$@ zY?X$DSKQ*P^u*c~OI4>lzJq%zPmZ?|=L9jAQ0<%yrUs#7puyD1A zLBzO@s9^DmJFNpcy5`uk$JbG@0N%w$GETO@vmsOflpX`UAmfyUQ-fmgP{_6_11<-Y+hRQ^5F Nt(t~uLbJT;zX5AFemDRC literal 0 HcmV?d00001 diff --git a/tools/launcher/tests/__pycache__/test_launcher.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_launcher.cpython-36-pytest-7.0.1.pyc new file mode 100644 index 0000000000000000000000000000000000000000..63b182200d614a6593e9f7f9143809bb94950150 GIT binary patch literal 2830 zcmai0PmkNi6`$dMQIuAzU9WfDv}M~kuG+*~d*d9O*hp$SKzeYUIH&_60m0>rR^*bR zJRI69f&z*v*QSeH6JL7k7w9MGr~i&=3?Lf{B*lKvTV*8mSprO11-A^-kcZz7up*?*^{wyFnLv zJ?rLP;Nf;mU+Za(N9WGJeBq-V;7O{!*y>^15UD6@$*C@C| zy2OKikJ{ZU{9~DLR;EcuY0U=14C{rtFqF-ZHjuOwW)XX-EzHIQfSrf6 z&2s-8Huh^%oxZm6Q<5;h)??z=`%3rz%0+%c`96&&gl)@_K zC8xPCW)YtVBQ28Y$YPM8Na(YK1V)^b8>+pj+9TmeWF!p#K&%gbuRb`O6gfRi+L#@_ zh8dz$9%fNBl`%b(F(_3Vu65UEb1{e+jW~S``c(<$02+in>}v2DL+JaMN#zCh@`w_D zh5ff65G*lov=lEPEf;#Dry5`iC^m*fksCy7Ou$-ijPKA3g>!>bbBPw#+1rhkT1#{v zedT|tE$oHUIExOvVT zEkqw3paqthK!C^=I>+iB!7VJ`1=(otBR3%jv^~fYxM^0SH99et`obV4WQm|`!`6J zq%_sQF6HhLliv664T~PRL?MGh7$~xym^BC|966?=MpJkv4e+_sQx=GGfAcO z<@*T(8ZOw}zjoCxulX%MkeBkpfBsTt^#}*uJUR=X7wnX>vNrD*Q~C=g+2ECS=gzC) z)nCu%zZ;eQojbl1k47c?shp<;Iv8YgB+#pVRF;%+<#KCOO7$&fgCF~u+Jyb8?cS8C zxBiD}U)~fbE?EKz>PSz;ofbUjtiDY zJcPoSML-W=8Er8iP7&CvOS(b;hz&R-u%EXzT^QpmItd)Qs@LnUzp)l>su(HkgRVSr z8=jk+gB$IjJ;S@3gZF;SH*YCGii*!F{-eJaoBuBU2q!P!g9hmi5RWBm4v>+D4LpEn zAGCX#gB++eA0rR=253o*{-8!olGx}CVTU0p;xH7>8fW$nSj0itM&^3^F@V-}MxQev zUeUQw345huzm;>Wc1jy*nNmWBA}w4D@ z7nrI}w7GLlI?JNklB|(O>{N6iWuV4zmef?3ESjDG=R?6)U%COZWL~D2-T5FdNR`pM f^8S|cwitkzCy`n`9`v5>=?*qb4ZE&0aIX9Z?h*zJ literal 0 HcmV?d00001 diff --git a/tools/launcher/tests/dummy.py b/tools/launcher/tests/dummy.py new file mode 100644 index 0000000..2f2ec60 --- /dev/null +++ b/tools/launcher/tests/dummy.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 + +""" + Author: Satyabrata Bharati + Verify the behavior of CmonitorCgroupWatcher InotifyEvent : dummy process to monitor +""" + +import time + + +def call_sleep(): + while True: + time.sleep(30) + + +if __name__ == "__main__": + call_sleep() diff --git a/tools/launcher/tests/test_cgroup_watcher.py b/tools/launcher/tests/test_cgroup_watcher.py new file mode 100644 index 0000000..77cc915 --- /dev/null +++ b/tools/launcher/tests/test_cgroup_watcher.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +""" + Author: Satyabrata Bharati + Verify the behavior of CmonitorWatcher InotifyEvent +""" + +import pytest +import queue +import os +import sys +import subprocess +from subprocess import Popen +import concurrent.futures +from concurrent.futures import ProcessPoolExecutor + +# import cmonitor_watcher +from cmonitor_watcher import CgroupWatcher + +# import dateutil.parser as datetime_parser + +queue = queue.Queue() +myDict = {} + +test_list = [ + # run0 + { + "expected_task_file": "/tmp/unit_test_cmonitor/task/tasks", + "expected_process_name": "python3", + }, + # run1 + # { + # "expected_task_file": "/tmp/unit_test_cmonitor/task/tasks", + # "expected_process_name": "python3", + # }, +] + + +def create_task_file(path, pid): + cmd1 = f"rm -rf {path}/task" + os.system(cmd1) + + cmd2 = f"mkdir {path}/task" + os.system(cmd2) + + # create the task file with the process id of the dummy process + # /tmp/unit_test_cmonitor/task/tasks + cmd3 = f"cd {path}/task;rm -rf tasks;echo {pid} >> tasks" + os.system(cmd3) + + filename = os.path.join(path, "task/tasks") + return filename + + +def process_task_file(path, queue): + # create the dummy process + cmd = "python3 dummy.py" + p = Popen(cmd.split()) + # process id of the dummy process + pid = p.pid + + filename = create_task_file(path, pid) + print(f"Created: task file {filename} with process Id: {pid}") + + d = queue.get() + for k, v in d.items(): + # print(k, v) + process_name = v + print(f"Read from Queue filename :{k}, process_name {process_name}") + + # store the task file and the process name in the dictionary + global myDict + myDict = d.copy() + if queue.empty(): + print("Queue is Empty") + # terminate the dummy process + p.terminate() + return + + +@pytest.mark.parametrize("testrun_idx", range(len(test_list))) +def test_outputCmonitorWatcherInotifyEvent(testrun_idx): + global test_list + testrun = test_list[testrun_idx] + + path = "/tmp/unit_test_cmonitor/" + filter = ["python3"] + if not os.path.exists(path): + os.mkdir(path) + print("Directory '% s' created" % path) + + watcher = CgroupWatcher(path, filter, 10) + flag = True + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future1 = executor.submit(watcher.inotify_events, queue, flag) + future2 = executor.submit(process_task_file, path, queue) + + # both threads completely executed + print("Done!") + + for k, v in myDict.items(): + # print(k, v) + assert k == testrun["expected_task_file"] + assert v == testrun["expected_process_name"] From 95461555194ffb4e74f7bf96cdeb8cbf04ee7697 Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 23 Aug 2022 08:35:28 -0400 Subject: [PATCH 11/25] minor change. --- ...st_cgroup_watcher.cpython-36-pytest-7.0.1.pyc | Bin 2836 -> 0 bytes .../test_dymmy.cpython-36-pytest-7.0.1.pyc | Bin 435 -> 0 bytes ...lter_by_task_name.cpython-36-pytest-7.0.1.pyc | Bin 3113 -> 0 bytes .../test_launcher.cpython-36-pytest-7.0.1.pyc | Bin 2830 -> 0 bytes 4 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 tools/launcher/tests/__pycache__/test_cgroup_watcher.cpython-36-pytest-7.0.1.pyc delete mode 100644 tools/launcher/tests/__pycache__/test_dymmy.cpython-36-pytest-7.0.1.pyc delete mode 100644 tools/launcher/tests/__pycache__/test_filter_by_task_name.cpython-36-pytest-7.0.1.pyc delete mode 100644 tools/launcher/tests/__pycache__/test_launcher.cpython-36-pytest-7.0.1.pyc diff --git a/tools/launcher/tests/__pycache__/test_cgroup_watcher.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_cgroup_watcher.cpython-36-pytest-7.0.1.pyc deleted file mode 100644 index 961d93776537a491ffda973aa50961430950ff56..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2836 zcmai0Uyt0j5hwM3wc5SCz4Q6bO$P8`&MkU-Fh+*|3S z)taL0TR{sH-MltEYcz*eJALs-VI#UcY`kU zde+Uoz{Bl2y`T>~BkSc`!It8g*&yEzwlShkI$WZ_Nl>&yEMgPqd+h{AyQD+huTgM` zbcqN39<|fU=|1Vv1A2w;E-}0s>65Lm(GmwgA=+a!8q{C7K0F^*d{VI6{^N+xqhl8F z$p2^(L7yn`pD0VlbDvMBe@rLQv!q~tG4}7}#WdjsdlK<@LYe=`wBVrk;2E8A_a%4* zJse?SJ}PE(3dGJMR>ZU{9~DLR;EcuFUvPEB)fLxJT(iM2!+K#Z3?(z93@9yyS;Ssy3$ro7!Op|l zX1V_k8~e4Xtgo&7lqAfr^_ckezT&;VQjtd}->31U@av9$_pYxPwfp+$BSUCKDXel{ za+(Wc7V(KN(ju9TECv~hgg#41V8l7Oq1v0OJra)ejC8|45bJ~9tB(#RMNSWsHfD#f z-3-ww53{J6%9tL?7?i3FV-=_Hc^mEPv$+_=j7FTkcKuc9(g8FGd)U?BGltOjF_X-T z2uMR^6jTJh1%hDd`9@3e5|VSFH+rgpXTg_^A$`dWqBSOXTyKo;&Hq0=a&U(hvkDnmWwU;PzV0LGT*f*eO$=Jb;?h--N z(PQcNU>KhY{4DS`uZqf$ zw%L+e+iT&)9lHhW@(pN^W?)aVG#Bq`4j#g2-*7OzFk(NKle;p7STzk3au#Db1s=Jq zOhc6ce+Mo^A040tmYF~R&K5ey>Nde$EZ+s%Xm2GqAqTWQ$P&13R--jKF_!wmASPsq zojPepLgHWX-`-qU5No$F(k}NJ6Lf4j=DkMa9*mqP5a&Ezm(An`iZLi}fjzEx#b_A} zKc4_5kjsbCp_q{&^4s8B$n(zE+sDo+mp?%TJ!@z8EHkt#C8}*Sr#%nV{wts zB1VM=;_$diGM-EU_p<^tO0gf7lcLIqQVmbC;yB6_n21hTMq?^h5d9C8kCI~gfU$y! zop4;RJmMjg$1DPP0Lf^J`Cvt0uP*7z0|0D*A%VTZdSQ&S=p=CHs%o!Ke`8JDR54P% z54zIgHas^s2RGV5Ys0&ngZF-nH*YD26cwLU{6{|-oBtSp1k20!ph3C=z+=gp17PG~ z0}tTc2kxHcAO~vA$H)V`0a#L_Kd2FtAU1kK*kMSDI1Gie_A`42B;p`!BXhky40!80 zqt6)tujpK`guUXi-^w{wJEe`ZOevv5sTM9I#0o=DPz*Kdj&7EsfSr*k6WQ{yxr=klf12u-Tq^81T(ewl`AIiQu>4uXf@-l_&&hPVr jR2jW1?{6t@ivb?;QLoGbqU!wv|Y diff --git a/tools/launcher/tests/__pycache__/test_dymmy.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_dymmy.cpython-36-pytest-7.0.1.pyc deleted file mode 100644 index 5c65d13f74a80375500cc412e9b91b3ef865e7e0..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 435 zcmYLEy-ve05I#Fela`i=g@wIKBY}j3gis}R76zm&R)`ICiS1y!f~=HB;2jXJS-Tm?t~r!cEoT-a6fDba#aCdgx(IIN1^c&-4!OAdBAYPH;A-A@)*< zkhofi<_m0s-5!1}@|95{m)dyw(B_pgT6$x1Z%pm-y4>h$C9o9EyB-;Dm1-AF8x9@~ zGn)09ouG>mfKXynj3Hx8l~OY{ABXIIBWo|U3)5xOvY|R=P1{K?O6P?2(&&X1FIIXH x#zQNl``!e}-8w6+jqt{BNS8`5_W!NFY}7yFQW?If#Z^ae6NC(=kkKs3$S)$=ZtVa7 diff --git a/tools/launcher/tests/__pycache__/test_filter_by_task_name.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_filter_by_task_name.cpython-36-pytest-7.0.1.pyc deleted file mode 100644 index 0f835d15a5aaaf85b3c9dd377a148051b4867a1e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3113 zcma)8TW=f36`sA3%jHEPsjKD7Dyd_$MI=SClg5yYq;^syNDa3!T3~^8vEq!RrS`(i z3}s7QqD2{?K!E`6yZ=Dn`_v!Mr#|;#Ui;*~pr@WQOUg`}79lY+XU?2C^Uaxa*}Z1d z+dQ`Vf2rVYEadq8@PI`1vNc-fd`|N zcxgST8@S(28fi0VYT8L!=|Zre=}NMgwu82&t4SyA23<{?$$LI8IX0p-RvoMluF)pjVAti^#DKp^TXf-TGBJW1)clh47B8N9KKwo&$&;Kv@IU2I z)@P!h`#;MwnR5~A8{&V+!bE@%Odn^3lbYeePX;c_&(6rk`8&77K{^4+bSR z88_i%YjA5q#*4)wT?Hgf=@(5v)&^v2Fz*!Yq65gfMfasCQ2!deU33B2JGi2eU7JJp z50Kc9zDD*>M1wRSyM{Ds)?<4*b^z(dD;v-)(L3};v80hMPmzN4-XI0EZD`$bhu);O zz9uE!$7sc-+`Pi<{}<-J!-;G3uEyNE!n}Ef*%ZGAi4D04m`(mC#D_Fs-qcwCALbu^ zZ_EVp%a5LL7u@_=&kWk~Bo?8BSPsw1U{>BCL;5fmtY`2R2wp4?&mj|Kdp<@^NYB&s zd~0}qQGd#F$V+WH!t2mong<7$o!fVZ=Q};&9~}5d?)4z5F~q0g z-!qj30s(Y;id?oLAsClh^nOnu`+iWJrUTwUtvjgZUQc+xof3Yj84kINoZX)2dE?o6 zAHaE50%_FdORyUi?qYWjyAQr~-iPIRA6_LING-&H_9Zm22Hi0vnqMzMq9Newka8x* zpUtmqcqSug^mSG(vm87Iio@N^tsP~&_zU6JVSf43?UOuZ+i{jlC{tQyOz+5Arxu zHWozXJm;}w09112^b>G?j^!X|P3;$ zai$uu93GCq_;DswrL>h=3&lx3N~jhMkCXf`O3HFqgNk*;P=jiHtUVZN?I)b)Ty?{v zoTrh5@;OYRERwJiw80Y7w}r|`BN=Ij;kV!(y%pZX>r@dR)V`pGC0Gju6{f!tl+{nE zGKTyP+R?@0tE;YgG~~DS-sfd?=UtrD@it{(?aoi$D<>tsd-Id`gJ#s1Be-7aO!K?; z<|psZPwvmF_;H|(SSo*SM+ZRJ-9XU2@9nC0L0tR>I%2ujb*tgnM#E^B*n7}gy01Xr zGF%gREej~(LO^^@8l**B%QbOc@&e6UG&=vm;rnnx@20ATA$k*M&v#)(t%s$@ zY?X$DSKQ*P^u*c~OI4>lzJq%zPmZ?|=L9jAQ0<%yrUs#7puyD1A zLBzO@s9^DmJFNpcy5`uk$JbG@0N%w$GETO@vmsOflpX`UAmfyUQ-fmgP{_6_11<-Y+hRQ^5F Nt(t~uLbJT;zX5AFemDRC diff --git a/tools/launcher/tests/__pycache__/test_launcher.cpython-36-pytest-7.0.1.pyc b/tools/launcher/tests/__pycache__/test_launcher.cpython-36-pytest-7.0.1.pyc deleted file mode 100644 index 63b182200d614a6593e9f7f9143809bb94950150..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2830 zcmai0PmkNi6`$dMQIuAzU9WfDv}M~kuG+*~d*d9O*hp$SKzeYUIH&_60m0>rR^*bR zJRI69f&z*v*QSeH6JL7k7w9MGr~i&=3?Lf{B*lKvTV*8mSprO11-A^-kcZz7up*?*^{wyFnLv zJ?rLP;Nf;mU+Za(N9WGJeBq-V;7O{!*y>^15UD6@$*C@C| zy2OKikJ{ZU{9~DLR;EcuY0U=14C{rtFqF-ZHjuOwW)XX-EzHIQfSrf6 z&2s-8Huh^%oxZm6Q<5;h)??z=`%3rz%0+%c`96&&gl)@_K zC8xPCW)YtVBQ28Y$YPM8Na(YK1V)^b8>+pj+9TmeWF!p#K&%gbuRb`O6gfRi+L#@_ zh8dz$9%fNBl`%b(F(_3Vu65UEb1{e+jW~S``c(<$02+in>}v2DL+JaMN#zCh@`w_D zh5ff65G*lov=lEPEf;#Dry5`iC^m*fksCy7Ou$-ijPKA3g>!>bbBPw#+1rhkT1#{v zedT|tE$oHUIExOvVT zEkqw3paqthK!C^=I>+iB!7VJ`1=(otBR3%jv^~fYxM^0SH99et`obV4WQm|`!`6J zq%_sQF6HhLliv664T~PRL?MGh7$~xym^BC|966?=MpJkv4e+_sQx=GGfAcO z<@*T(8ZOw}zjoCxulX%MkeBkpfBsTt^#}*uJUR=X7wnX>vNrD*Q~C=g+2ECS=gzC) z)nCu%zZ;eQojbl1k47c?shp<;Iv8YgB+#pVRF;%+<#KCOO7$&fgCF~u+Jyb8?cS8C zxBiD}U)~fbE?EKz>PSz;ofbUjtiDY zJcPoSML-W=8Er8iP7&CvOS(b;hz&R-u%EXzT^QpmItd)Qs@LnUzp)l>su(HkgRVSr z8=jk+gB$IjJ;S@3gZF;SH*YCGii*!F{-eJaoBuBU2q!P!g9hmi5RWBm4v>+D4LpEn zAGCX#gB++eA0rR=253o*{-8!olGx}CVTU0p;xH7>8fW$nSj0itM&^3^F@V-}MxQev zUeUQw345huzm;>Wc1jy*nNmWBA}w4D@ z7nrI}w7GLlI?JNklB|(O>{N6iWuV4zmef?3ESjDG=R?6)U%COZWL~D2-T5FdNR`pM f^8S|cwitkzCy`n`9`v5>=?*qb4ZE&0aIX9Z?h*zJ From c7c2854a150a8ad75f797b3bb7d8b5f6c827c052 Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 23 Aug 2022 08:56:03 -0400 Subject: [PATCH 12/25] added install module for inotify --- .github/workflows/main.yml | 2 +- Makefile | 2 +- tools/spec/tools.spec | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f1d4a2c..f1e3266 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: install deps - run: sudo apt install -y libgtest-dev libbenchmark-dev libfmt-dev tidy git python3 python3-dateutil python3-pip + run: sudo apt install -y libgtest-dev libbenchmark-dev libfmt-dev tidy git python3 python3-dateutil python3-pip inotify - name: install PyDeps run: sudo pip3 install pytest - name: make all diff --git a/Makefile b/Makefile index e03281a..5391164 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ all: centos_install_prereq: # this is just the list present in "BuildRequires" field of the RPM spec file: - yum install gcc-c++ make gtest-devel fmt-devel git + yum install gcc-c++ make gtest-devel fmt-devel git inotify test: $(MAKE) -C collector test diff --git a/tools/spec/tools.spec b/tools/spec/tools.spec index f9393fc..a0e2fbf 100644 --- a/tools/spec/tools.spec +++ b/tools/spec/tools.spec @@ -12,7 +12,7 @@ Source0: cmonitor-tools-__RPM_VERSION__.tar.gz # these are the requirements that we need on COPR builds: # IMPORTANT: python3-devel provide macros like %{python3_sitelib} -BuildRequires: gcc-c++, make, python3-devel +BuildRequires: gcc-c++, make, python3-devel, inotify # cmonitor_filter uses dateutil library to parse dates.. of course to make our life easier the same python library # RPM has different names on different distro versions... From c5a58a6b5f867bd4f97340b95c338d467439e1f3 Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 23 Aug 2022 09:02:42 -0400 Subject: [PATCH 13/25] minor change. --- .github/workflows/main.yml | 2 +- Makefile | 2 +- tools/spec/tools.spec | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index f1e3266..7c50f46 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: install deps - run: sudo apt install -y libgtest-dev libbenchmark-dev libfmt-dev tidy git python3 python3-dateutil python3-pip inotify + run: sudo apt install -y libgtest-dev libbenchmark-dev libfmt-dev tidy git python3 python3-dateutil python3-pip inotify-tools - name: install PyDeps run: sudo pip3 install pytest - name: make all diff --git a/Makefile b/Makefile index 5391164..0ad9259 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ all: centos_install_prereq: # this is just the list present in "BuildRequires" field of the RPM spec file: - yum install gcc-c++ make gtest-devel fmt-devel git inotify + yum install gcc-c++ make gtest-devel fmt-devel git inotify-tools test: $(MAKE) -C collector test diff --git a/tools/spec/tools.spec b/tools/spec/tools.spec index a0e2fbf..c575a66 100644 --- a/tools/spec/tools.spec +++ b/tools/spec/tools.spec @@ -12,7 +12,7 @@ Source0: cmonitor-tools-__RPM_VERSION__.tar.gz # these are the requirements that we need on COPR builds: # IMPORTANT: python3-devel provide macros like %{python3_sitelib} -BuildRequires: gcc-c++, make, python3-devel, inotify +BuildRequires: gcc-c++, make, python3-devel, inotify-tools # cmonitor_filter uses dateutil library to parse dates.. of course to make our life easier the same python library # RPM has different names on different distro versions... From a5b356125b598f2fe81c6b486fccca1f6639831c Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 23 Aug 2022 09:27:50 -0400 Subject: [PATCH 14/25] added inotify module --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 0039aab..ab8f0ec 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -15,7 +15,7 @@ jobs: - name: install deps run: sudo apt install -y libgtest-dev libbenchmark-dev libfmt-dev tidy git python3 python3-dateutil python3-pip inotify-tools - name: install PyDeps - run: sudo pip3 install pytest + run: sudo pip3 install pytest inotify - name: install Prometheus run: | sudo pip3 install conan From ae66b1550562da0a0797fb6be1b2c6bbf54aa488 Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 23 Aug 2022 09:41:04 -0400 Subject: [PATCH 15/25] minor change. --- tools/launcher/cmonitor_launcher.py | 2 ++ tools/launcher/tests/dummy.py | 3 --- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tools/launcher/cmonitor_launcher.py b/tools/launcher/cmonitor_launcher.py index 6eebd28..62d0230 100644 --- a/tools/launcher/cmonitor_launcher.py +++ b/tools/launcher/cmonitor_launcher.py @@ -199,6 +199,8 @@ def main(): logging.info("Started") logging.info(f"timeout set for sleep: {timeout}") + # flag has to be set in case inotify_events() needed to be unblocked + # default False : keep blocking exit_flag = False cGroupWatcher = CgroupWatcher(input_path, filter, timeout) diff --git a/tools/launcher/tests/dummy.py b/tools/launcher/tests/dummy.py index 2f2ec60..26db655 100644 --- a/tools/launcher/tests/dummy.py +++ b/tools/launcher/tests/dummy.py @@ -4,14 +4,11 @@ Author: Satyabrata Bharati Verify the behavior of CmonitorCgroupWatcher InotifyEvent : dummy process to monitor """ - import time - def call_sleep(): while True: time.sleep(30) - if __name__ == "__main__": call_sleep() From ea020f08a0564708add8d5536ca69f4a0f16b345 Mon Sep 17 00:00:00 2001 From: sbharati Date: Thu, 25 Aug 2022 05:43:26 -0400 Subject: [PATCH 16/25] added Readme. --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 4b88e45..a35cafa 100644 --- a/README.md +++ b/README.md @@ -403,6 +403,27 @@ cmonitor_collector \ --output-filename=pod-performances.json ``` +### CMonitor helper tool: +cmonitor_launcher tool can be used to automate the monitoring the Kubernetes PODs. + +It will perform following steps: + + - Watch all files below a directory and notify an event for changes Pod restart or creation of a new Pod. + - check the process name against the white-list given in the filter list. + - Execute command to launch CMonitor if the process name matches with the filter. + +``` +Example: + cmonitor_launcher.py --path /sys/fs/cgroup/memory/kubepods/burstable/ + --filter process_1 process_2 + --ip-port 172.0.0.1:9090 172.0.0.2:9099 + --command "./cmonitor_collector --num-samples=until-cgroup-alive + --deep-collect --collect=cgroup_threads,cgroup_cpu,cgroup_memory,cgroup_network + --score-threshold=0 --sampling-interval=3 --output-directory=/home + --allow-multiple-instances --remote prometheus" + --log /home + --timeout 20 +``` ### Connecting with InfluxDB and Grafana From 6239457910c8f2960ccb5911e11a59e5ea304311 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 26 Aug 2022 01:19:54 -0400 Subject: [PATCH 17/25] update unit tc for cgroup_wathcher. --- tools/launcher/tests/dummy.py | 14 -------------- tools/launcher/tests/test_cgroup_watcher.py | 15 ++++++--------- 2 files changed, 6 insertions(+), 23 deletions(-) delete mode 100644 tools/launcher/tests/dummy.py diff --git a/tools/launcher/tests/dummy.py b/tools/launcher/tests/dummy.py deleted file mode 100644 index 26db655..0000000 --- a/tools/launcher/tests/dummy.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env python3 - -""" - Author: Satyabrata Bharati - Verify the behavior of CmonitorCgroupWatcher InotifyEvent : dummy process to monitor -""" -import time - -def call_sleep(): - while True: - time.sleep(30) - -if __name__ == "__main__": - call_sleep() diff --git a/tools/launcher/tests/test_cgroup_watcher.py b/tools/launcher/tests/test_cgroup_watcher.py index 77cc915..cf48336 100644 --- a/tools/launcher/tests/test_cgroup_watcher.py +++ b/tools/launcher/tests/test_cgroup_watcher.py @@ -7,6 +7,7 @@ import pytest import queue import os +import time import sys import subprocess from subprocess import Popen @@ -24,14 +25,9 @@ test_list = [ # run0 { - "expected_task_file": "/tmp/unit_test_cmonitor/task/tasks", + "expected_task_file": "/tmp/unit_test_cmonitor/cgroup_memory_kubepods/task/tasks", "expected_process_name": "python3", }, - # run1 - # { - # "expected_task_file": "/tmp/unit_test_cmonitor/task/tasks", - # "expected_process_name": "python3", - # }, ] @@ -53,7 +49,7 @@ def create_task_file(path, pid): def process_task_file(path, queue): # create the dummy process - cmd = "python3 dummy.py" + cmd = "python3 -c 'time.sleep(5)'" p = Popen(cmd.split()) # process id of the dummy process pid = p.pid @@ -82,10 +78,11 @@ def test_outputCmonitorWatcherInotifyEvent(testrun_idx): global test_list testrun = test_list[testrun_idx] - path = "/tmp/unit_test_cmonitor/" + path = "/tmp/unit_test_cmonitor/cgroup_memory_kubepods/" filter = ["python3"] if not os.path.exists(path): - os.mkdir(path) + #os.mkdir(path) + os.makedirs(path) print("Directory '% s' created" % path) watcher = CgroupWatcher(path, filter, 10) From c2de21e748a6768a23540a70e15c227f2483d9b7 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 26 Aug 2022 01:35:16 -0400 Subject: [PATCH 18/25] minor change. --- tools/launcher/tests/test_cgroup_watcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/launcher/tests/test_cgroup_watcher.py b/tools/launcher/tests/test_cgroup_watcher.py index cf48336..13f707b 100644 --- a/tools/launcher/tests/test_cgroup_watcher.py +++ b/tools/launcher/tests/test_cgroup_watcher.py @@ -81,7 +81,6 @@ def test_outputCmonitorWatcherInotifyEvent(testrun_idx): path = "/tmp/unit_test_cmonitor/cgroup_memory_kubepods/" filter = ["python3"] if not os.path.exists(path): - #os.mkdir(path) os.makedirs(path) print("Directory '% s' created" % path) From 74d8369dfd2877db9df469ecdd30b27adf7a2e88 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 26 Aug 2022 02:17:41 -0400 Subject: [PATCH 19/25] minor change. --- README.md | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 32323a8..f058c24 100644 --- a/README.md +++ b/README.md @@ -428,28 +428,6 @@ cmonitor_collector \ --output-filename=pod-performances.json ``` -### CMonitor helper tool: -cmonitor_launcher tool can be used to automate the monitoring the Kubernetes PODs. - -It will perform following steps: - - - Watch all files below a directory and notify an event for changes Pod restart or creation of a new Pod. - - check the process name against the white-list given in the filter list. - - Execute command to launch CMonitor if the process name matches with the filter. - -``` -Example: - cmonitor_launcher.py --path /sys/fs/cgroup/memory/kubepods/burstable/ - --filter process_1 process_2 - --ip-port 172.0.0.1:9090 172.0.0.2:9099 - --command "./cmonitor_collector --num-samples=until-cgroup-alive - --deep-collect --collect=cgroup_threads,cgroup_cpu,cgroup_memory,cgroup_network - --score-threshold=0 --sampling-interval=3 --output-directory=/home - --allow-multiple-instances --remote prometheus" - --log /home - --timeout 20 -``` - ### Connecting with InfluxDB and Grafana The `cmonitor_collector` can be connected to an [InfluxDB](https://www.influxdata.com/) deployment to store collected data (this can happen @@ -490,6 +468,27 @@ cmonitor_collector \ The Prometheus instance can then be used as data source for graphing tools like [Grafana](https://grafana.com/) which allow you to create nice interactive dashboards (see examples in InfluxDB section). +### CMonitor helper tool: +cmonitor_launcher tool can be used to automate the monitoring the Kubernetes PODs. + +It will perform following steps: + + - Watch all files below a directory and notify an event for changes Pod restart or creation of a new Pod. + - check the process name against the white-list given in the filter list. + - Execute command to launch CMonitor if the process name matches with the filter. + +``` +Example: + cmonitor_launcher --path /sys/fs/cgroup/memory/kubepods/burstable/ + --filter process_1 process_2 + --ip-port 172.0.0.1:9090 172.0.0.2:9099 + --command "./cmonitor_collector --num-samples=until-cgroup-alive + --deep-collect --collect=cgroup_threads,cgroup_cpu,cgroup_memory,cgroup_network + --score-threshold=0 --sampling-interval=3 --output-directory=/home + --allow-multiple-instances --remote prometheus" + --log /home + --timeout 20 +``` ### Reference Manual From 952a11a5681c2b18716af1751a567f86d0094c81 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 26 Aug 2022 02:26:36 -0400 Subject: [PATCH 20/25] minor change. --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f058c24..712022e 100644 --- a/README.md +++ b/README.md @@ -473,7 +473,7 @@ cmonitor_launcher tool can be used to automate the monitoring the Kubernetes POD It will perform following steps: - - Watch all files below a directory and notify an event for changes Pod restart or creation of a new Pod. + - Watch all files below a directory and notify an event for changes of a Pod restart or creation of a new Pod. - check the process name against the white-list given in the filter list. - Execute command to launch CMonitor if the process name matches with the filter. @@ -489,6 +489,7 @@ Example: --log /home --timeout 20 ``` +In the above example,cmonitor_collector will be lauched automatically for process_1 and process_2 with prometheus instance at 172.0.0.1:9090 and 172.0.0.2:9099 respectively. ### Reference Manual From 26320824702f6ccbbf6e69c15a178af5a4b44179 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 26 Aug 2022 02:28:43 -0400 Subject: [PATCH 21/25] minor change. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 712022e..8e0ac0d 100644 --- a/README.md +++ b/README.md @@ -489,7 +489,7 @@ Example: --log /home --timeout 20 ``` -In the above example,cmonitor_collector will be lauched automatically for process_1 and process_2 with prometheus instance at 172.0.0.1:9090 and 172.0.0.2:9099 respectively. +In the above example, cmonitor_collector will be launched automatically for process_1 and process_2 with Prometheus instance at 172.0.0.1:9090 and 172.0.0.2:9099 respectively. ### Reference Manual From b274e70e4d0112d92e1d0e8f8d14086fc164cf60 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 26 Aug 2022 08:28:17 -0400 Subject: [PATCH 22/25] updated README. --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 8e0ac0d..9f1b14c 100644 --- a/README.md +++ b/README.md @@ -474,14 +474,14 @@ cmonitor_launcher tool can be used to automate the monitoring the Kubernetes POD It will perform following steps: - Watch all files below a directory and notify an event for changes of a Pod restart or creation of a new Pod. - - check the process name against the white-list given in the filter list. + - Check the process name against the white-list given in the filter list. - Execute command to launch CMonitor if the process name matches with the filter. ``` Example: cmonitor_launcher --path /sys/fs/cgroup/memory/kubepods/burstable/ --filter process_1 process_2 - --ip-port 172.0.0.1:9090 172.0.0.2:9099 + --ip-port 172.0.0.1:9090 172.0.0.2:9090 --command "./cmonitor_collector --num-samples=until-cgroup-alive --deep-collect --collect=cgroup_threads,cgroup_cpu,cgroup_memory,cgroup_network --score-threshold=0 --sampling-interval=3 --output-directory=/home @@ -489,7 +489,7 @@ Example: --log /home --timeout 20 ``` -In the above example, cmonitor_collector will be launched automatically for process_1 and process_2 with Prometheus instance at 172.0.0.1:9090 and 172.0.0.2:9099 respectively. +In the above example, cmonitor_collector will be launched automatically for process_1 and process_2 with Prometheus instance at 172.0.0.1:9090 and 172.0.0.2:9090 respectively. ### Reference Manual From fb0c93e8ca10d38cfc810ae5543ac3116305e532 Mon Sep 17 00:00:00 2001 From: sbharati Date: Fri, 26 Aug 2022 13:08:38 -0400 Subject: [PATCH 23/25] minor change. --- tools/launcher/cmonitor_launcher.py | 47 +++++++++++++++++++---------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/tools/launcher/cmonitor_launcher.py b/tools/launcher/cmonitor_launcher.py index 62d0230..c6530e3 100644 --- a/tools/launcher/cmonitor_launcher.py +++ b/tools/launcher/cmonitor_launcher.py @@ -19,6 +19,8 @@ import logging from datetime import datetime +import signal + from cmonitor_watcher import CgroupWatcher from argparse import RawTextHelpFormatter @@ -43,6 +45,7 @@ def __init__(self, path, filter, ip, command, timeout): self.command = command self.timeout = timeout self.process_host_dict = {} + self.monitored_processes = {} """ Should add the list of IPs as key to the dictionary @@ -74,32 +77,48 @@ def process_events(self, event): process_name = value logging.info(f"In processing event from the Queue - event: {entry},file: {filename},process_name: {process_name}") logging.info(f"Launching cMonitor with: {filename} with IP :{self.process_host_dict.get(process_name)}") - self.__launch_cmonitor(filename, self.process_host_dict.get(process_name)) + # self.__launch_cmonitor(filename, self.process_host_dict.get(process_name)) + self.__launch_cmonitor(filename, process_name) entry = entry + 1 else: - # time.sleep(10) time.sleep(self.timeout) logging.info(f"In processing event Queue is empty - sleeping: {self.timeout} sec") except event.Empty(): pass - def __launch_cmonitor(self, filename, ip): + def __launch_cmonitor(self, filename, process_name): """ - Lauch cMonitor with appropriate command. """ for c in self.command: cmd = c.strip() + ip = self.process_host_dict.get(process_name) ip_port = ip.split(":") ip = ip_port[0] port = ip_port[1] base_path = os.path.dirname(filename) path = "/".join(base_path.split("/")[5:]) - cmd = f"{cmd} --cgroup-name={path} --remote-ip {ip} --remote-port {port}" - print("Launch cMonitor with command:", cmd) - logging.info(f"Launch cMonitor with command: {cmd}") - # os.system(cmd) - subprocess.run(cmd, shell=True) + monitor_cmd = f"{cmd} --cgroup-name={path} --remote-ip {ip} --remote-port {port}" + monitor_cmd = [ + f"{cmd}", + f"--cgroup-name={path}", + f"--remote-ip {ip}", + f"--remote-port {port}", + ] + print("Launch cMonitor with command:", monitor_cmd) + logging.info(f"Launch cMonitor with command: {monitor_cmd }") + # monitor_process = subprocess.Popen(monitor_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + monitor_process = subprocess.Popen(monitor_cmd, shell=True) + self.monitored_processes[process_name] = monitor_process + + def handler(self, signum, frame): + for process_name, monitor_process in self.monitored_processes.items(): + logging.info(f"Stopping cmonitor_collector from pid '{monitor_process.pid}' of container '{process_name}'") + # monitor_process is a subprocess.Popen object: + monitor_process.terminate() + monitor_process.wait() + exit(1) def parse_command_line(): @@ -155,11 +174,6 @@ def parse_command_line(): parser.print_help() sys.exit(os.EX_USAGE) - if args.ip_port is None: - print("Please provide the input ip:port to launch cMonitor") - parser.print_help() - sys.exit(os.EX_USAGE) - return { "input_path": args.path, "input_filter": args.filter, @@ -196,7 +210,7 @@ def main(): filename=log_file_name, filemode="w", ) - logging.info("Started") + logging.info("Started cMonitor Launcher") logging.info(f"timeout set for sleep: {timeout}") # flag has to be set in case inotify_events() needed to be unblocked @@ -206,9 +220,10 @@ def main(): cGroupWatcher = CgroupWatcher(input_path, filter, timeout) cMonitorLauncher = CmonitorLauncher(input_path, filter, ip, command, timeout) + signal.signal(signal.SIGINT, cMonitorLauncher.handler) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - executor.submit(cGroupWatcher.inotify_events, queue, exit_flag) - executor.submit(cMonitorLauncher.process_events, queue) + executer1 = executor.submit(cGroupWatcher.inotify_events, queue, exit_flag) + executer2 = executor.submit(cMonitorLauncher.process_events, queue) if __name__ == "__main__": From 42850c1cac49013a2d6835a4e26b50a8c4127d7c Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 30 Aug 2022 04:10:05 -0400 Subject: [PATCH 24/25] minir changes. --- tools/common-code/cmonitor_watcher.py | 11 +++++++---- tools/launcher/cmonitor_launcher.py | 11 +++++++++-- tools/launcher/tests/test_cgroup_watcher.py | 6 ++++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/tools/common-code/cmonitor_watcher.py b/tools/common-code/cmonitor_watcher.py index 483130f..49a0630 100644 --- a/tools/common-code/cmonitor_watcher.py +++ b/tools/common-code/cmonitor_watcher.py @@ -13,6 +13,7 @@ import logging from datetime import datetime +exit_flag = False # ======================================================================================================= # CgroupWatcher : Basic inotify class # ======================================================================================================= @@ -160,7 +161,7 @@ def __check_filter(self, process_name): if process_name in e: return True - def inotify_events(self, queue, exit_flag): + def inotify_events(self, queue): """Main thread function for notifying events. Monitored events that match with the white-list provided will be stored in this queue. The events from this queue will be processed by cMonitorLauncher threading function to @@ -186,9 +187,11 @@ def inotify_events(self, queue, exit_flag): if fileList: logging.info(f"CgroupWatcher event in Queue:{fileList}") queue.put(fileList) - # global exit_flag - if exit_flag is True: - exit(0) + # global exit_flag + if exit_flag is True: + logging.info(f"CgroupWatcher exit_flag {exit_flag}") + exit(1) + finally: i.remove_watch(path) diff --git a/tools/launcher/cmonitor_launcher.py b/tools/launcher/cmonitor_launcher.py index c6530e3..41ecb96 100644 --- a/tools/launcher/cmonitor_launcher.py +++ b/tools/launcher/cmonitor_launcher.py @@ -21,6 +21,7 @@ import signal +import cmonitor_watcher from cmonitor_watcher import CgroupWatcher from argparse import RawTextHelpFormatter @@ -83,6 +84,10 @@ def process_events(self, event): else: time.sleep(self.timeout) logging.info(f"In processing event Queue is empty - sleeping: {self.timeout} sec") + if cmonitor_watcher.exit_flag is True: + logging.info(f"In processing event_flag set to {cmonitor_watcher.exit_flag}") + exit(1) + except event.Empty(): pass @@ -116,6 +121,8 @@ def handler(self, signum, frame): for process_name, monitor_process in self.monitored_processes.items(): logging.info(f"Stopping cmonitor_collector from pid '{monitor_process.pid}' of container '{process_name}'") # monitor_process is a subprocess.Popen object: + cmonitor_watcher.exit_flag = True + time.sleep(10) monitor_process.terminate() monitor_process.wait() exit(1) @@ -215,14 +222,14 @@ def main(): # flag has to be set in case inotify_events() needed to be unblocked # default False : keep blocking - exit_flag = False + #exit_flag = False cGroupWatcher = CgroupWatcher(input_path, filter, timeout) cMonitorLauncher = CmonitorLauncher(input_path, filter, ip, command, timeout) signal.signal(signal.SIGINT, cMonitorLauncher.handler) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - executer1 = executor.submit(cGroupWatcher.inotify_events, queue, exit_flag) + executer1 = executor.submit(cGroupWatcher.inotify_events, queue) executer2 = executor.submit(cMonitorLauncher.process_events, queue) diff --git a/tools/launcher/tests/test_cgroup_watcher.py b/tools/launcher/tests/test_cgroup_watcher.py index 13f707b..4f53c8a 100644 --- a/tools/launcher/tests/test_cgroup_watcher.py +++ b/tools/launcher/tests/test_cgroup_watcher.py @@ -15,6 +15,7 @@ from concurrent.futures import ProcessPoolExecutor # import cmonitor_watcher +import cmonitor_watcher from cmonitor_watcher import CgroupWatcher # import dateutil.parser as datetime_parser @@ -68,6 +69,7 @@ def process_task_file(path, queue): myDict = d.copy() if queue.empty(): print("Queue is Empty") + cmonitor_watcher.exit_flag = True # terminate the dummy process p.terminate() return @@ -85,9 +87,9 @@ def test_outputCmonitorWatcherInotifyEvent(testrun_idx): print("Directory '% s' created" % path) watcher = CgroupWatcher(path, filter, 10) - flag = True + #flag = True with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - future1 = executor.submit(watcher.inotify_events, queue, flag) + future1 = executor.submit(watcher.inotify_events, queue) future2 = executor.submit(process_task_file, path, queue) # both threads completely executed From a2c359be246686a58429b42bcc5f5baf9d51f06a Mon Sep 17 00:00:00 2001 From: sbharati Date: Tue, 30 Aug 2022 04:23:16 -0400 Subject: [PATCH 25/25] run black. --- tools/common-code/cmonitor_watcher.py | 5 ++--- tools/launcher/cmonitor_launcher.py | 6 +++--- tools/launcher/tests/test_cgroup_watcher.py | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/tools/common-code/cmonitor_watcher.py b/tools/common-code/cmonitor_watcher.py index 49a0630..493811c 100644 --- a/tools/common-code/cmonitor_watcher.py +++ b/tools/common-code/cmonitor_watcher.py @@ -189,9 +189,8 @@ def inotify_events(self, queue): queue.put(fileList) # global exit_flag if exit_flag is True: - logging.info(f"CgroupWatcher exit_flag {exit_flag}") - exit(1) - + logging.info(f"CgroupWatcher exit_flag {exit_flag}") + exit(1) finally: i.remove_watch(path) diff --git a/tools/launcher/cmonitor_launcher.py b/tools/launcher/cmonitor_launcher.py index 41ecb96..364c248 100644 --- a/tools/launcher/cmonitor_launcher.py +++ b/tools/launcher/cmonitor_launcher.py @@ -85,8 +85,8 @@ def process_events(self, event): time.sleep(self.timeout) logging.info(f"In processing event Queue is empty - sleeping: {self.timeout} sec") if cmonitor_watcher.exit_flag is True: - logging.info(f"In processing event_flag set to {cmonitor_watcher.exit_flag}") - exit(1) + logging.info(f"In processing event_flag set to {cmonitor_watcher.exit_flag}") + exit(1) except event.Empty(): pass @@ -222,7 +222,7 @@ def main(): # flag has to be set in case inotify_events() needed to be unblocked # default False : keep blocking - #exit_flag = False + # exit_flag = False cGroupWatcher = CgroupWatcher(input_path, filter, timeout) cMonitorLauncher = CmonitorLauncher(input_path, filter, ip, command, timeout) diff --git a/tools/launcher/tests/test_cgroup_watcher.py b/tools/launcher/tests/test_cgroup_watcher.py index 4f53c8a..6c5bd1b 100644 --- a/tools/launcher/tests/test_cgroup_watcher.py +++ b/tools/launcher/tests/test_cgroup_watcher.py @@ -87,7 +87,7 @@ def test_outputCmonitorWatcherInotifyEvent(testrun_idx): print("Directory '% s' created" % path) watcher = CgroupWatcher(path, filter, 10) - #flag = True + # flag = True with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: future1 = executor.submit(watcher.inotify_events, queue) future2 = executor.submit(process_task_file, path, queue)