Skip to content

Commit

Permalink
Added PriorityQueue in sonicprobe.libs.workerpool.
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrien Delle Cave committed Jan 10, 2020
1 parent 515e15a commit b7f7a54
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 22 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
python-sonicprobe (0.3.20) unstable; urgency=medium

* Added PriorityQueue in sonicprobe.libs.workerpool.

-- Adrien DELLE CAVE (Decryptus) <adc@doowan.net> Fri, 10 Jan 2020 12:54:09 +0100

python-sonicprobe (0.3.19) unstable; urgency=medium

* Reviewed requirements.
Expand Down
2 changes: 1 addition & 1 deletion RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.19
0.3.20
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.19
0.3.20
6 changes: 3 additions & 3 deletions setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ name: sonicprobe
description: sonicprobe
author: Adrien Delle Cave
author_email: pypi@doowan.net
copyright: '2019 Adrien Delle Cave'
release: '0.3.19'
version: '0.3.19'
copyright: '2020 Adrien Delle Cave'
release: '0.3.20'
version: '0.3.20'
license: License GPL-3
url: https://github.com/decryptus/sonicprobe
python_requires:
Expand Down
51 changes: 34 additions & 17 deletions sonicprobe/libs/workerpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

LOG = logging.getLogger('sonicprobe.workerpool')

DEFAULT_MAX_WORKERS = 10
DEFAULT_MAX_WORKERS = 10
DEFAULT_EXIT_PRIORITY = -9999


class WorkerExit(object): # pylint: disable=useless-object-inheritance,too-few-public-methods
Expand Down Expand Up @@ -64,7 +65,10 @@ def run(self):
continue

try:
task = self.pool.tasks.get_nowait()
if not self.pool.is_qpriority:
task = self.pool.tasks.get_nowait()
else:
qpriority, task = self.pool.tasks.get_nowait()
except _queue.Empty:
continue

Expand Down Expand Up @@ -126,21 +130,23 @@ def run(self):

class WorkerPool(object): # pylint: disable=useless-object-inheritance
def __init__(self, queue = None, max_workers = DEFAULT_MAX_WORKERS, life_time = None, name = None, max_tasks = None, auto_gc = True):
self.tasks = queue or _queue.Queue()
self.workers = 0
self.working = 0
self.max_workers = int(max_workers)
self.tasks = queue or _queue.Queue()
self.workers = 0
self.working = 0
self.max_workers = int(max_workers)
if self.max_workers < 1:
self.max_workers = DEFAULT_MAX_WORKERS
self.life_time = life_time
self.name = name
self.max_tasks = max_tasks
self.auto_gc = auto_gc
self.id_list = []
self.life_time = life_time
self.name = name
self.max_tasks = max_tasks
self.auto_gc = auto_gc
self.id_list = []

self.exit = False
self.kill_event = threading.Event()
self.count_lock = threading.RLock()

self.exit = False
self.kill_event = threading.Event()
self.count_lock = threading.RLock()
self.is_qpriority = isinstance(self.tasks, _queue.PriorityQueue)

self.kill_event.set()

Expand Down Expand Up @@ -168,7 +174,10 @@ def kill(self, nb = 1):
nb = self.workers
self.count_lock.release()
for x in xrange(nb): # pylint: disable=unused-variable
self.tasks.put(WorkerExit())
if not self.is_qpriority:
self.tasks.put(WorkerExit())
else:
self.tasks.put((DEFAULT_EXIT_PRIORITY, WorkerExit()))

def set_max_workers(self, nb):
"""
Expand Down Expand Up @@ -220,21 +229,29 @@ def add(self, nb = 1, name = None, xid = None):
w.setName(self.get_name(xid, name))
w.start()

def run(self, target, callback = None, name = None, complete = None, *args, **kargs):
def run(self, target, callback = None, name = None, complete = None, qpriority = None, *args, **kargs):
"""
Start task.
@target: callable to run with *args and **kargs arguments.
@callback: callable executed after target.
@name: thread name
@complete: complete executed after target in finally
@qpriority: priority for PriorityQueue
"""
self.count_lock.acquire()
if not self.workers:
self.count_lock.release()
self.add(name = name)
else:
self.count_lock.release()
self.tasks.put((target, callback, name, complete, args, kargs))

if not self.is_qpriority:
self.tasks.put((target, callback, name, complete, args, kargs))
return

if qpriority is None:
qpriority = time.time()
self.tasks.put((qpriority, (target, callback, name, complete, args, kargs)))

def killall(self, wait = None):
"""
Expand Down

0 comments on commit b7f7a54

Please sign in to comment.