Skip to content
This repository has been archived by the owner on Sep 16, 2022. It is now read-only.

Service and Periodic execution of ping #89

Merged
merged 5 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions agent/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
from . import run, get_device_id, get_open_ports, say_hello,\
get_claim_token, get_claim_url
get_claim_token, get_claim_url, executor


def main():
Expand All @@ -9,7 +9,8 @@ def main():
'portscan': (get_open_ports, "Print open ports."),
'test-cert': (say_hello, "Validate device certificate."),
'claim-token': (get_claim_token, "Print claim token."),
'claim-url': (get_claim_url, "Print claim URL.")
'claim-url': (get_claim_url, "Print claim URL."),
'daemon': (run_daemon, "Run as daemon")
}
help_string = "One of the following:\n\n" + "\n".join(
["{: <12} {: <}".format(k, v[1]) for k, v in actions.items()])
Expand Down Expand Up @@ -37,12 +38,26 @@ def main():
action="store_true",
help="Developer mode: work with locally running server.")
args = parser.parse_args()

if not args.action:
run(ping=True, debug=args.debug, dev=args.dev)
elif args.action == 'daemon':
run_daemon(debug=args.debug, dev=args.dev)
else:
run(ping=False, debug=args.debug, dev=args.dev)
print(actions[args.action][0]())


PING_INTERVAL = 60 * 60
PING_TIMEOUT = 10 * 60


def run_daemon(debug, dev):
exe = executor.Executor(PING_INTERVAL, run, (True, debug, dev),
timeout=PING_TIMEOUT, debug=debug)
executor.schedule(exe)
executor.spin()


if __name__ == '__main__':
main()
129 changes: 129 additions & 0 deletions agent/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
from multiprocessing import Queue
from typing import Callable
from typing import Dict
from typing import Any


class Executor():
MAX_WORKERS = 10
processes = MAX_WORKERS or os.cpu_count()
executor = ThreadPoolExecutor(max_workers=processes)

def __init__(self,
interval,
func, fargs,
timeout=None,
callback_timeout=None,
daemon=False,
debug=False):
"""
Periodic process executor. Calls func and sleeps for interval,
repeatedly. Kills the process after a timeout.
Call schedule() to put it into asyncio loop.
:param interval: sleep interval between calls, in seconds. If None, Executor will only execute once.
:param func: the function to call
:param fargs: function args (tuple) or a single arg
:param timeout: kill the process after this many seconds
:param callback_timeout: will be called if the process gets killed on timeout
:param daemon:
"""
self.interval = interval
self.params = {'func': func, 'fn_args': fargs, "p_kwargs": {},
'timeout': timeout, 'callback_timeout': callback_timeout,
'daemon': daemon}
self.process = None
self.oneshot = interval is None
self.shouldStop = False
a-martynovich marked this conversation as resolved.
Show resolved Hide resolved
self.debug = debug

async def start(self):
""" start calling the process periodically """
while not self.shouldStop:
self.executor.submit(self._submit_unpack_kwargs, self.params)
if self.oneshot:
break
await asyncio.sleep(self.interval)

def stop(self):
""" terminate running process """
self.shouldStop = True
if self.process:
self.process.terminate()

def _submit_unpack_kwargs(self, params):
""" unpack the kwargs and call submit """
return self._submit(**params)

def _submit(self,
func: Callable,
fn_args: Any,
p_kwargs: Dict,
timeout: float,
callback_timeout: Callable[[Any], Any],
daemon: bool):
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as func(*args, **kwargs) in a new
process.
:param func: the function to execute
:param fn_args: the arguments to pass to the function. Can be one argument
or a tuple of multiple args.
:param p_kwargs: the kwargs to pass to the function
:param timeout: after this time, the process executing the function
will be killed if it did not finish
:param callback_timeout: this function will be called with the same
arguments, if the task times out.
:param daemon: run the child process as daemon
:return: the result of the function, or None if the process failed or
timed out
"""
p_args = fn_args if isinstance(fn_args, tuple) else (fn_args,)
queue = Queue()
if self.debug:
print("Executor: starting {} {}".format(func.__name__, p_args))
p = Process(target=self._process_run,
args=(queue, func, *p_args,), kwargs=p_kwargs)

if daemon:
p.daemon = True
self.process = p

p.start()
p.join(timeout=timeout)
if not queue.empty():
return queue.get()
if callback_timeout:
callback_timeout(*p_args, **p_kwargs)
if p.is_alive():
if self.debug:
print('Executor: terminating by timeout')
p.terminate()
p.join()

@staticmethod
def _process_run(queue: Queue, func: Callable[[Any], Any] = None,
*args, **kwargs):
"""
Executes the specified function as func(*args, **kwargs).
The result will be stored in the shared dictionary
:param func: the function to execute
:param queue: a Queue
"""
queue.put(func(*args, **kwargs))


def schedule(executor: Executor) -> asyncio.Future:
"""
Put executor into asyncio loop.
:param executor:
:return: executor.start() wrapped in Future
"""
return asyncio.ensure_future(executor.start())


def spin():
asyncio.get_event_loop().run_forever()
6 changes: 6 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
wott-agent (0.1.5.0) stable; urgency=medium

* Homegrown periodic process runner.

-- Artem Martynovich <artem.martynovich@gmail.com> Mon, 20 May 2019 19:22:32 +0000

wott-agent (0.1.4.9) stable; urgency=medium

* Merges in multiple PRs.
Expand Down
1 change: 0 additions & 1 deletion debian/install
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
debian/wott-agent.timer lib/systemd/system
misc/pass_hashes.txt opt/wott
12 changes: 11 additions & 1 deletion debian/wott-agent.service
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
[Unit]
Description=WoTT Agent
Wants=network-online.target
After=network-online.target
StartLimitIntervalSec=200
StartLimitBurst=5

[Service]
Type=simple
ExecStart=/usr/bin/wott-agent
ExecStart=/usr/bin/wott-agent daemon
Restart=always
RestartSec=1

[Install]
Alias=wott-agent
WantedBy=multi-user.target
9 changes: 0 additions & 9 deletions debian/wott-agent.timer

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

setup(
name="wott-agent",
version='0.1.4',
version='0.1.5',

author="Viktor Petersson",
author_email="v@viktopia.io",
Expand Down