From eb8952c4247a0dd7c852bf550270cbba84468ff5 Mon Sep 17 00:00:00 2001 From: Adrien Delle Cave Date: Mon, 13 Jan 2020 09:44:36 +0100 Subject: [PATCH] Added method sonicprobe.libs.workerpool.WorkerPool.run_args. --- CHANGELOG | 6 + RELEASE | 2 +- VERSION | 2 +- requirements.txt | 2 +- setup.yml | 4 +- sonicprobe/libs/pworkerpool.py | 240 ------------------------------ sonicprobe/libs/workerpool.py | 14 ++ sonicprobe/sp_logging/handlers.py | 87 +++++++++++ 8 files changed, 112 insertions(+), 245 deletions(-) delete mode 100644 sonicprobe/libs/pworkerpool.py create mode 100644 sonicprobe/sp_logging/handlers.py diff --git a/CHANGELOG b/CHANGELOG index 4d8c8b3..d8fc0d7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,9 @@ +python-sonicprobe (0.3.24) unstable; urgency=medium + + * Added method sonicprobe.libs.workerpool.WorkerPool.run_args. + + -- Adrien DELLE CAVE (Decryptus) Mon, 13 Jan 2020 09:40:38 +0100 + python-sonicprobe (0.3.23) unstable; urgency=medium * Added function helpers.get_nb_workers diff --git a/RELEASE b/RELEASE index a1dad2a..cfe389e 100644 --- a/RELEASE +++ b/RELEASE @@ -1 +1 @@ -0.3.23 +0.3.24 diff --git a/VERSION b/VERSION index a1dad2a..cfe389e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.3.23 +0.3.24 diff --git a/requirements.txt b/requirements.txt index 86790fe..8c43c18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -httpdis>=0.6.10 +httpdis>=0.6.11 pyOpenSSL python-magic psutil>=2.1 diff --git a/setup.yml b/setup.yml index c5c9cc8..d1d31e3 100644 --- a/setup.yml +++ b/setup.yml @@ -4,8 +4,8 @@ description: sonicprobe author: Adrien Delle Cave author_email: pypi@doowan.net copyright: '2020 Adrien Delle Cave' -release: '0.3.23' -version: '0.3.23' +release: '0.3.24' +version: '0.3.24' license: License GPL-3 url: https://github.com/decryptus/sonicprobe python_requires: diff --git a/sonicprobe/libs/pworkerpool.py b/sonicprobe/libs/pworkerpool.py deleted file mode 100644 index 0201cdc..0000000 --- a/sonicprobe/libs/pworkerpool.py +++ /dev/null @@ -1,240 +0,0 @@ -# -*- coding: utf-8 -*- -"""pworkerpool""" - -__author__ = "Adrien DELLE CAVE " -__license__ = """ - Copyright (C) 2017 doowan - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program; if not, write to the Free Software Foundation, Inc., - 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. -""" - -import inspect -import logging -import multiprocessing -import Queue -import time - - -LOG = logging.getLogger('sonicprobe.pworkerpool') - - -class WorkerAdd(object): - pass - -class WorkerExit(object): - pass - - -class WorkerProc(multiprocessing.Process): - def __init__(self, xid, queue, shared, tasks, max_life_time, count_lock, kill_event): - super(WorkerProc, self).__init__() - self.daemon = False - self.queue = queue - self.shared = shared - self.tasks = tasks - self.max_life_time = max_life_time - self.life_time = time.time() - self.count_lock = count_lock - self.kill_event = kill_event - self.xid = xid - - def start(self): - self.life_time = time.time() - return super(WorkerProc, self).start() - - def get_name(self, xid, name = None): - if name: - return "%s:%d" % (name, xid) - else: - return "pwpool:%d" % xid - - def expired(self): - if self.max_life_time > 0 \ - and self.life_time > 0 \ - and (time.time() - self.life_time) >= self.max_life_time: - return True - - return False - - def run(self): - while True: - try: - task = self.tasks.get(True, 0.1) - except IOError: - break - except Queue.Empty: - continue - - if task is WorkerExit: - break - - self.count_lock.acquire() - self.shared['working'] += 1 - if (self.shared['working'] >= self.shared['workers']) \ - and (self.shared['workers'] < self.shared['max_workers']): - self.count_lock.release() - self.queue.put_nowait(WorkerAdd) - else: - self.count_lock.release() - - func, cb, name, args, kargs = task - self.name = self.get_name(self.xid, name or self.shared['name']) - try: - if isinstance(cb, basestring) and inspect.isclass(func): - c = func(*args, **kargs) - getattr(c, cb)(ret) - else: - ret = func(*args, **kargs) - if cb: - cb(ret) - except Exception, e: - LOG.exception("Unexpected error: %r", e) - - self.count_lock.acquire() - self.shared['working'] -= 1 - self.count_lock.release() - - if self.expired(): - break - - self.count_lock.acquire() - self.shared['workers'] -= 1 - self.count_lock.release() - - if not self.shared['workers']: - self.kill_event.set() - - self.queue = None - self.shared = None - self.tasks = None - self.count_lock = None - self.kill_event = None - - -class PWorkerPool(multiprocessing.Process): - def __init__(self, queue = None, max_workers = 10, life_time = None, name = None, start = True): - super(PWorkerPool, self).__init__() - self.tasks = queue or multiprocessing.Queue() - manager = multiprocessing.Manager() - self.shared = manager.dict() - self.shared['workers'] = 0 - self.shared['working'] = 0 - self.shared['max_workers'] = max_workers - self.shared['name'] = name - self.count_lock = multiprocessing.RLock() - self.kill_event = multiprocessing.Event() - self.life_time = life_time - self.name = name - self.queue = multiprocessing.Queue() - - self.kill_event.set() - - if start: - self.start() - - def count_workers(self): - self.count_lock.acquire() - r = self.shared['workers'] - self.count_lock.release() - return r - - def count_working(self): - self.count_lock.acquire() - r = self.shared['working'] - self.count_lock.release() - return r - - def kill(self, nb = 1): - """ - Kill one or many workers. - """ - self.count_lock.acquire() - if nb > self.shared['workers']: - nb = self.shared['workers'] - self.count_lock.release() - for x in xrange(nb): - self.tasks.put_nowait(WorkerExit) - - def set_max_workers(self, nb): - """ - Set the maximum workers to create. - """ - self.count_lock.acquire() - self.shared['max_workers'] = nb - if self.shared['workers'] > self.shared['max_workers']: - self.kill(self.shared['workers'] - self.shared['max_workers']) - self.count_lock.release() - - def get_name(self, xid, name = None): - if name: - return "%s:%d" % (name, xid) - elif self.name: - return "%s:%d" % (self.name, xid) - else: - return "pwpool:%d" % xid - - def add(self, nb = 1, name = None): - """ - Create one or many workers. - """ - for x in xrange(nb): - self.count_lock.acquire() - self.shared['workers'] += 1 - xid = self.shared['workers'] - self.kill_event.clear() - self.count_lock.release() - w = WorkerProc(xid, self.queue, self.shared, self.tasks, self.life_time, self.count_lock, self.kill_event) - w.name = self.get_name(xid, name) - w.start() - - def run(self): - while True: - try: - task = self.queue.get(True, 0.1) - except IOError: - break - except Queue.Empty: - continue - - if task is WorkerAdd: - self.add() - else: - self.tasks.put_nowait(task) - - def put(self, target, callback = None, name = None, *args, **kargs): - """ - Start task. - @target: callable to run with *args and **kargs arguments. - @callback: callable executed after target. - """ - self.count_lock.acquire() - if not self.shared['workers']: - self.add(name = name) - self.count_lock.release() - self.queue.put_nowait((target, callback, name, args, kargs)) - - def killall(self, wait = None): - """ - Kill all active workers. - @wait: Seconds to wait until last worker ends. - If None it waits forever. - """ - self.queue.close() - self.count_lock.acquire() - self.kill(self.shared['workers']) - self.count_lock.release() - self.tasks.close() - self.terminate() - self.kill_event.wait(wait) diff --git a/sonicprobe/libs/workerpool.py b/sonicprobe/libs/workerpool.py index 59a7112..1108824 100644 --- a/sonicprobe/libs/workerpool.py +++ b/sonicprobe/libs/workerpool.py @@ -253,6 +253,20 @@ def run(self, target, callback = None, name = None, complete = None, qpriority = qpriority = time.time() self.tasks.put((qpriority, (target, callback, name, complete, args, kargs))) + def run_args(self, target, *args, **kwargs): + callback = kwargs.pop('_callback_', None) + name = kwargs.pop('_name_', None) + complete = kwargs.pop('_complete_', None) + qpriority = kwargs.pop('_qpriority_', None) + + self.run(target = target, + callback = callback, + name = name, + complete = complete, + qpriority = qpriority, + *args, + **kwargs) + def killall(self, wait = None): """ Kill all active workers. diff --git a/sonicprobe/sp_logging/handlers.py b/sonicprobe/sp_logging/handlers.py new file mode 100644 index 0000000..b5a8b60 --- /dev/null +++ b/sonicprobe/sp_logging/handlers.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2015-2019 Adrien Delle Cave +# SPDX-License-Identifier: GPL-3.0-or-later +"""sonicprobe.sp_logging.handlers""" + +import smtplib + +try: + from email.utils import formatdate +except ImportError: + from email.Utils import formatdate + +from logging.handlers import SMTPHandler + +import six + + +class QueueSMTPHandler(SMTPHandler): + def __init__(self, mailhost, fromaddr, toaddrs, subject, + credentials = None, secure = None, timeout = 5.0, logger_name = None): + self.queues = {} + self.logger_name = logger_name + self._timeout = timeout + + if six.PY2: + SMTPHandler.__init__(self, mailhost, fromaddr, toaddrs, subject, credentials, secure) + else: + SMTPHandler.__init__(self, mailhost, fromaddr, toaddrs, subject, credentials, secure, timeout) + + def isEmpty(self): + return len(self.queues) == 0 + + def getSubject(self, record): + if self.logger_name: + return "[%s] %s Event" % (self.logger_name, record.levelname) + return "%s Event" % record.levelname + + def emit(self, record): + if record.levelname not in self.queues: + self.queues[record.levelname] = [] + + self.queues[record.levelname].append(record) + + def purge(self): + if self.isEmpty(): + return + + queues = dict(self.queues) + self.queues = {} + + for records in six.itervalues(queues): + msg = "" + record = None + + for record in records: + msg += "%s\n" % self.format(record) + + if not record: + continue + + smtp = None + + try: + port = self.mailport + if not port: + port = smtplib.SMTP_PORT + smtp = smtplib.SMTP(self.mailhost, port, timeout=self._timeout) + msg = ("From: %s\r\nTo: %s\r\nSubject: %s\r\nDate: %s\r\n\r\n%s" % + (self.fromaddr, + ",".join(self.toaddrs), + self.getSubject(record), + formatdate(), + msg)) + if self.username: + if self.secure is not None: + smtp.ehlo() + smtp.starttls(*self.secure) + smtp.ehlo() + smtp.login(self.username, self.password) + smtp.sendmail(self.fromaddr, self.toaddrs, msg) + except (KeyboardInterrupt, SystemExit): + raise + except Exception: + self.handleError(record) + finally: + if smtp: + smtp.quit()