From 4f45d5ab26f7f6a4905323134cd9b01b22895787 Mon Sep 17 00:00:00 2001 From: Artem Martynovich Date: Tue, 21 May 2019 01:24:47 +0600 Subject: [PATCH 1/5] Add periodic process runner. deb: 0.1.5 --- agent/__main__.py | 19 ++++++- agent/executor.py | 126 ++++++++++++++++++++++++++++++++++++++++++++++ debian/changelog | 6 +++ setup.py | 2 +- 4 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 agent/executor.py diff --git a/agent/__main__.py b/agent/__main__.py index 0907b64..b77e9b8 100644 --- a/agent/__main__.py +++ b/agent/__main__.py @@ -1,6 +1,6 @@ import argparse from . import run, get_device_id, get_open_ports, say_hello,\ - get_claim_token, get_claim_url + get_claim_token, get_claim_url, executor def main(): @@ -9,7 +9,8 @@ def main(): 'portscan': (get_open_ports, "Print open ports."), 'test-cert': (say_hello, "Validate device certificate."), 'claim-token': (get_claim_token, "Print claim token."), - 'claim-url': (get_claim_url, "Print claim URL.") + 'claim-url': (get_claim_url, "Print claim URL."), + 'daemon': (run_daemon, "Run as daemon") } help_string = "One of the following:\n\n" + "\n".join( ["{: <12} {: <}".format(k, v[1]) for k, v in actions.items()]) @@ -37,12 +38,26 @@ def main(): action="store_true", help="Developer mode: work with locally running server.") args = parser.parse_args() + if not args.action: run(ping=True, debug=args.debug, dev=args.dev) + elif args.action == 'daemon': + run_daemon(debug=args.debug, dev=args.dev) else: run(ping=False, debug=args.debug, dev=args.dev) print(actions[args.action][0]()) +PING_INTERVAL = 60*60 +PING_TIMEOUT = 10*60 + + +def run_daemon(debug, dev): + exe = executor.Executor(PING_INTERVAL, run, (True, debug, dev), + timeout=PING_TIMEOUT) + executor.schedule(exe) + executor.spin() + + if __name__ == '__main__': main() diff --git a/agent/executor.py b/agent/executor.py new file mode 100644 index 0000000..c5675df --- /dev/null +++ b/agent/executor.py @@ -0,0 +1,126 @@ +import asyncio +import os +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import Process +from multiprocessing import Queue +from typing import Callable +from typing import Dict +from typing import Any +import logging + + +L = logging.getLogger() + + +class Executor(): + MAX_WORKERS = 10 + processes = MAX_WORKERS or os.cpu_count() + executor = ThreadPoolExecutor(max_workers=processes) + + def __init__(self, interval, + func, fargs, + timeout=None, callback_timeout=None, daemon=False): + """ + Periodic process executor. Calls func and sleeps for interval, + repeatedly. Kills the process after a timeout. + Call schedule() to put it into asyncio loop. + :param interval: sleep interval between calls, in seconds + :param func: the function to call + :param fargs: function args (tuple) or a single arg + :param timeout: kill the process after this many seconds + :param callback_timeout: will be called if the process gets killed on timeout + :param daemon: + """ + self.interval = interval + self.params = {'func': func, 'fn_args': fargs, "p_kwargs": {}, + 'timeout': timeout, 'callback_timeout': callback_timeout, + 'daemon': daemon} + self.process = None + self.oneshot = interval is None + self.shouldStop = False + + async def start(self): + """ start calling the process periodically """ + while not self.shouldStop: + self.executor.submit(self._submit_unpack_kwargs, self.params) + if self.oneshot: + break + await asyncio.sleep(self.interval) + + def stop(self): + """ terminate running process """ + self.shouldStop = True + if self.process: + self.process.terminate() + + def _submit_unpack_kwargs(self, params): + """ unpack the kwargs and call submit """ + return self._submit(**params) + + def _submit(self, + func: Callable, + fn_args: Any, + p_kwargs: Dict, + timeout: float, + callback_timeout: Callable[[Any], Any], + daemon: bool): + """ + Submits a callable to be executed with the given arguments. + Schedules the callable to be executed as func(*args, **kwargs) in a new + process. + :param func: the function to execute + :param fn_args: the arguments to pass to the function. Can be one argument + or a tuple of multiple args. + :param p_kwargs: the kwargs to pass to the function + :param timeout: after this time, the process executing the function + will be killed if it did not finish + :param callback_timeout: this function will be called with the same + arguments, if the task times out. + :param daemon: run the child process as daemon + :return: the result of the function, or None if the process failed or + timed out + """ + p_args = fn_args if isinstance(fn_args, tuple) else (fn_args,) + queue = Queue() + p = Process(target=self._process_run, + args=(queue, func, *p_args,), kwargs=p_kwargs) + + if daemon: + p.deamon = True + self.process = p + + p.start() + p.join(timeout=timeout) + if not queue.empty(): + return queue.get() + if callback_timeout: + callback_timeout(*p_args, **p_kwargs) + if p.is_alive(): + logging.info('terminate') + p.terminate() + p.join() + + @staticmethod + def _process_run(queue: Queue, func: Callable[[Any], Any] = None, + *args, **kwargs): + """ + Executes the specified function as func(*args, **kwargs). + The result will be stored in the shared dictionary + :param func: the function to execute + :param queue: a Queue + """ + L.info('args: {}'.format(args)) + queue.put(func(*args, **kwargs)) + + +def schedule(executor: Executor) -> asyncio.Future: + """ + Put executor into asyncio loop. + :param executor: + :return: executor.start() wrapped in Future + """ + return asyncio.ensure_future(executor.start()) + + +def spin(): + asyncio.get_event_loop().run_forever() diff --git a/debian/changelog b/debian/changelog index 701ff0d..4d62024 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +wott-agent (0.1.5.0) stable; urgency=medium + + * Homegrown periodic process runner. + + -- Artem Martynovich Mon, 20 May 2019 19:22:32 +0000 + wott-agent (0.1.4.9) stable; urgency=medium * Merges in multiple PRs. diff --git a/setup.py b/setup.py index ac41d74..0c6c7d7 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="wott-agent", - version='0.1.4', + version='0.1.5', author="Viktor Petersson", author_email="v@viktopia.io", From ba0d435e1074bd53c0ebbf3f60863f62e561aebd Mon Sep 17 00:00:00 2001 From: Artem Martynovich Date: Tue, 21 May 2019 01:32:28 +0600 Subject: [PATCH 2/5] Add debug option to Executor. --- agent/__main__.py | 2 +- agent/executor.py | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/agent/__main__.py b/agent/__main__.py index b77e9b8..ccec7dc 100644 --- a/agent/__main__.py +++ b/agent/__main__.py @@ -54,7 +54,7 @@ def main(): def run_daemon(debug, dev): exe = executor.Executor(PING_INTERVAL, run, (True, debug, dev), - timeout=PING_TIMEOUT) + timeout=PING_TIMEOUT, debug=debug) executor.schedule(exe) executor.spin() diff --git a/agent/executor.py b/agent/executor.py index c5675df..216dd69 100644 --- a/agent/executor.py +++ b/agent/executor.py @@ -6,10 +6,6 @@ from typing import Callable from typing import Dict from typing import Any -import logging - - -L = logging.getLogger() class Executor(): @@ -17,9 +13,13 @@ class Executor(): processes = MAX_WORKERS or os.cpu_count() executor = ThreadPoolExecutor(max_workers=processes) - def __init__(self, interval, + def __init__(self, + interval, func, fargs, - timeout=None, callback_timeout=None, daemon=False): + timeout=None, + callback_timeout=None, + daemon=False, + debug=False): """ Periodic process executor. Calls func and sleeps for interval, repeatedly. Kills the process after a timeout. @@ -38,6 +38,7 @@ def __init__(self, interval, self.process = None self.oneshot = interval is None self.shouldStop = False + self.debug = debug async def start(self): """ start calling the process periodically """ @@ -82,11 +83,13 @@ def _submit(self, """ p_args = fn_args if isinstance(fn_args, tuple) else (fn_args,) queue = Queue() + if self.debug: + print("Executor: starting {} {}".format(func.__name__, p_args)) p = Process(target=self._process_run, args=(queue, func, *p_args,), kwargs=p_kwargs) if daemon: - p.deamon = True + p.daemon = True self.process = p p.start() @@ -96,7 +99,8 @@ def _submit(self, if callback_timeout: callback_timeout(*p_args, **p_kwargs) if p.is_alive(): - logging.info('terminate') + if self.debug: + print('Executor: terminating by timeout') p.terminate() p.join() @@ -109,7 +113,6 @@ def _process_run(queue: Queue, func: Callable[[Any], Any] = None, :param func: the function to execute :param queue: a Queue """ - L.info('args: {}'.format(args)) queue.put(func(*args, **kwargs)) From 0266b4ee6ccd868e3292ecc3d8dc1afad2541bd1 Mon Sep 17 00:00:00 2001 From: Artem Martynovich Date: Tue, 21 May 2019 01:37:47 +0600 Subject: [PATCH 3/5] Fix code style. Add important comment to Executor --- agent/__main__.py | 4 ++-- agent/executor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/__main__.py b/agent/__main__.py index ccec7dc..da4688b 100644 --- a/agent/__main__.py +++ b/agent/__main__.py @@ -48,8 +48,8 @@ def main(): print(actions[args.action][0]()) -PING_INTERVAL = 60*60 -PING_TIMEOUT = 10*60 +PING_INTERVAL = 60 * 60 +PING_TIMEOUT = 10 * 60 def run_daemon(debug, dev): diff --git a/agent/executor.py b/agent/executor.py index 216dd69..e4546e0 100644 --- a/agent/executor.py +++ b/agent/executor.py @@ -24,7 +24,7 @@ def __init__(self, Periodic process executor. Calls func and sleeps for interval, repeatedly. Kills the process after a timeout. Call schedule() to put it into asyncio loop. - :param interval: sleep interval between calls, in seconds + :param interval: sleep interval between calls, in seconds. If None, Executor will only execute once. :param func: the function to call :param fargs: function args (tuple) or a single arg :param timeout: kill the process after this many seconds From 61ccadf354c8a99173f58e41ddd047d90c08de02 Mon Sep 17 00:00:00 2001 From: Artem Martynovich Date: Tue, 21 May 2019 14:27:09 +0600 Subject: [PATCH 4/5] Remove systemd timer. Make wott-agent a full-fledged service. --- debian/install | 1 - debian/wott-agent.service | 12 +++++++++++- debian/wott-agent.timer | 9 --------- 3 files changed, 11 insertions(+), 11 deletions(-) delete mode 100644 debian/wott-agent.timer diff --git a/debian/install b/debian/install index 9aba6c8..95db861 100644 --- a/debian/install +++ b/debian/install @@ -1,2 +1 @@ -debian/wott-agent.timer lib/systemd/system misc/pass_hashes.txt opt/wott \ No newline at end of file diff --git a/debian/wott-agent.service b/debian/wott-agent.service index 6d2aa5f..344fd03 100644 --- a/debian/wott-agent.service +++ b/debian/wott-agent.service @@ -1,6 +1,16 @@ [Unit] Description=WoTT Agent +Wants=network-online.target +After=network-online.target +StartLimitIntervalSec=200 +StartLimitBurst=5 [Service] Type=simple -ExecStart=/usr/bin/wott-agent +ExecStart=/usr/bin/wott-agent daemon +Restart=always +RestartSec=1 + +[Install] +Alias=wott-agent +WantedBy=multi-user.target \ No newline at end of file diff --git a/debian/wott-agent.timer b/debian/wott-agent.timer deleted file mode 100644 index ab46398..0000000 --- a/debian/wott-agent.timer +++ /dev/null @@ -1,9 +0,0 @@ -[Unit] -Description=Run wott-agent hourly - -[Timer] -OnCalendar=hourly -Persistent=true - -[Install] -WantedBy=timers.target \ No newline at end of file From 1ae05e5df9fc50d2253ad3962ca5e34159d32e10 Mon Sep 17 00:00:00 2001 From: Artem Martynovich Date: Tue, 21 May 2019 19:00:30 +0600 Subject: [PATCH 5/5] No camelCase. --- agent/executor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/executor.py b/agent/executor.py index e4546e0..04e88d0 100644 --- a/agent/executor.py +++ b/agent/executor.py @@ -37,12 +37,12 @@ def __init__(self, 'daemon': daemon} self.process = None self.oneshot = interval is None - self.shouldStop = False + self.should_stop = False self.debug = debug async def start(self): """ start calling the process periodically """ - while not self.shouldStop: + while not self.should_stop: self.executor.submit(self._submit_unpack_kwargs, self.params) if self.oneshot: break @@ -50,7 +50,7 @@ async def start(self): def stop(self): """ terminate running process """ - self.shouldStop = True + self.should_stop = True if self.process: self.process.terminate()