diff --git a/agent/__main__.py b/agent/__main__.py index 0907b64..da4688b 100644 --- a/agent/__main__.py +++ b/agent/__main__.py @@ -1,6 +1,6 @@ import argparse from . import run, get_device_id, get_open_ports, say_hello,\ - get_claim_token, get_claim_url + get_claim_token, get_claim_url, executor def main(): @@ -9,7 +9,8 @@ def main(): 'portscan': (get_open_ports, "Print open ports."), 'test-cert': (say_hello, "Validate device certificate."), 'claim-token': (get_claim_token, "Print claim token."), - 'claim-url': (get_claim_url, "Print claim URL.") + 'claim-url': (get_claim_url, "Print claim URL."), + 'daemon': (run_daemon, "Run as daemon") } help_string = "One of the following:\n\n" + "\n".join( ["{: <12} {: <}".format(k, v[1]) for k, v in actions.items()]) @@ -37,12 +38,26 @@ def main(): action="store_true", help="Developer mode: work with locally running server.") args = parser.parse_args() + if not args.action: run(ping=True, debug=args.debug, dev=args.dev) + elif args.action == 'daemon': + run_daemon(debug=args.debug, dev=args.dev) else: run(ping=False, debug=args.debug, dev=args.dev) print(actions[args.action][0]()) +PING_INTERVAL = 60 * 60 +PING_TIMEOUT = 10 * 60 + + +def run_daemon(debug, dev): + exe = executor.Executor(PING_INTERVAL, run, (True, debug, dev), + timeout=PING_TIMEOUT, debug=debug) + executor.schedule(exe) + executor.spin() + + if __name__ == '__main__': main() diff --git a/agent/executor.py b/agent/executor.py new file mode 100644 index 0000000..04e88d0 --- /dev/null +++ b/agent/executor.py @@ -0,0 +1,129 @@ +import asyncio +import os +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import Process +from multiprocessing import Queue +from typing import Callable +from typing import Dict +from typing import Any + + +class Executor(): + MAX_WORKERS = 10 + processes = MAX_WORKERS or os.cpu_count() + executor = ThreadPoolExecutor(max_workers=processes) + + def __init__(self, + interval, + func, fargs, + timeout=None, + callback_timeout=None, + daemon=False, + debug=False): + """ + Periodic process executor. Calls func and sleeps for interval, + repeatedly. Kills the process after a timeout. + Call schedule() to put it into asyncio loop. + :param interval: sleep interval between calls, in seconds. If None, Executor will only execute once. + :param func: the function to call + :param fargs: function args (tuple) or a single arg + :param timeout: kill the process after this many seconds + :param callback_timeout: will be called if the process gets killed on timeout + :param daemon: + """ + self.interval = interval + self.params = {'func': func, 'fn_args': fargs, "p_kwargs": {}, + 'timeout': timeout, 'callback_timeout': callback_timeout, + 'daemon': daemon} + self.process = None + self.oneshot = interval is None + self.should_stop = False + self.debug = debug + + async def start(self): + """ start calling the process periodically """ + while not self.should_stop: + self.executor.submit(self._submit_unpack_kwargs, self.params) + if self.oneshot: + break + await asyncio.sleep(self.interval) + + def stop(self): + """ terminate running process """ + self.should_stop = True + if self.process: + self.process.terminate() + + def _submit_unpack_kwargs(self, params): + """ unpack the kwargs and call submit """ + return self._submit(**params) + + def _submit(self, + func: Callable, + fn_args: Any, + p_kwargs: Dict, + timeout: float, + callback_timeout: Callable[[Any], Any], + daemon: bool): + """ + Submits a callable to be executed with the given arguments. + Schedules the callable to be executed as func(*args, **kwargs) in a new + process. + :param func: the function to execute + :param fn_args: the arguments to pass to the function. Can be one argument + or a tuple of multiple args. + :param p_kwargs: the kwargs to pass to the function + :param timeout: after this time, the process executing the function + will be killed if it did not finish + :param callback_timeout: this function will be called with the same + arguments, if the task times out. + :param daemon: run the child process as daemon + :return: the result of the function, or None if the process failed or + timed out + """ + p_args = fn_args if isinstance(fn_args, tuple) else (fn_args,) + queue = Queue() + if self.debug: + print("Executor: starting {} {}".format(func.__name__, p_args)) + p = Process(target=self._process_run, + args=(queue, func, *p_args,), kwargs=p_kwargs) + + if daemon: + p.daemon = True + self.process = p + + p.start() + p.join(timeout=timeout) + if not queue.empty(): + return queue.get() + if callback_timeout: + callback_timeout(*p_args, **p_kwargs) + if p.is_alive(): + if self.debug: + print('Executor: terminating by timeout') + p.terminate() + p.join() + + @staticmethod + def _process_run(queue: Queue, func: Callable[[Any], Any] = None, + *args, **kwargs): + """ + Executes the specified function as func(*args, **kwargs). + The result will be stored in the shared dictionary + :param func: the function to execute + :param queue: a Queue + """ + queue.put(func(*args, **kwargs)) + + +def schedule(executor: Executor) -> asyncio.Future: + """ + Put executor into asyncio loop. + :param executor: + :return: executor.start() wrapped in Future + """ + return asyncio.ensure_future(executor.start()) + + +def spin(): + asyncio.get_event_loop().run_forever() diff --git a/debian/changelog b/debian/changelog index 701ff0d..4d62024 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +wott-agent (0.1.5.0) stable; urgency=medium + + * Homegrown periodic process runner. + + -- Artem Martynovich Mon, 20 May 2019 19:22:32 +0000 + wott-agent (0.1.4.9) stable; urgency=medium * Merges in multiple PRs. diff --git a/debian/install b/debian/install index 9aba6c8..95db861 100644 --- a/debian/install +++ b/debian/install @@ -1,2 +1 @@ -debian/wott-agent.timer lib/systemd/system misc/pass_hashes.txt opt/wott \ No newline at end of file diff --git a/debian/wott-agent.service b/debian/wott-agent.service index 6d2aa5f..344fd03 100644 --- a/debian/wott-agent.service +++ b/debian/wott-agent.service @@ -1,6 +1,16 @@ [Unit] Description=WoTT Agent +Wants=network-online.target +After=network-online.target +StartLimitIntervalSec=200 +StartLimitBurst=5 [Service] Type=simple -ExecStart=/usr/bin/wott-agent +ExecStart=/usr/bin/wott-agent daemon +Restart=always +RestartSec=1 + +[Install] +Alias=wott-agent +WantedBy=multi-user.target \ No newline at end of file diff --git a/debian/wott-agent.timer b/debian/wott-agent.timer deleted file mode 100644 index ab46398..0000000 --- a/debian/wott-agent.timer +++ /dev/null @@ -1,9 +0,0 @@ -[Unit] -Description=Run wott-agent hourly - -[Timer] -OnCalendar=hourly -Persistent=true - -[Install] -WantedBy=timers.target \ No newline at end of file diff --git a/setup.py b/setup.py index ac41d74..0c6c7d7 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name="wott-agent", - version='0.1.4', + version='0.1.5', author="Viktor Petersson", author_email="v@viktopia.io",