Skip to content
Lightweight workers and task management library for Python
Python
Branch: master
Clone or download
Latest commit 72b4e5b Dec 17, 2019
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
doc workers.rst Dec 17, 2019
neotasker 0.0.21 Dec 17, 2019
tests
.gitignore initial import Dec 9, 2019
LICENSE Initial commit Dec 9, 2019
README.md README.md Dec 15, 2019
setup.py 0.0.21 Dec 17, 2019

README.md

neotasker

Lightweight Python library for modern thread / multiprocessing pooling and task processing via asyncio.

Neotasker is lightweight variation of atasker library: tasks don't have priorities, go directly to ThreadPoolExecutor and are standard Python future objects. This library is useful for the high-load projects with lightweight tasks as majority tasks are directly proxied to pool.

Neotasker works on top of ThreadPoolExecutor and asyncio and provides additional features:

  • Easy thread pool and asyncio loops initialization
  • Interval, queue and event-based workers
  • Built-in integration with aiosched

Install

pip3 install neotasker

Sources: https://github.com/alttch/neotasker

Documentation: https://neotasker.readthedocs.io/

Code examples

Start/stop

from neotasker import task_supervisor

# set pool size
# min_size='max' means pre-spawn all pool threads
task_supervisor.set_thread_pool(min_size='max', max_size=20)
task_supervisor.start()
# ...
# start workers, other threads etc.
# ...
# optionally block current thread
task_supervisor.block()

# stop from any thread
task_supervisor.stop()

Executing future

You may work with neotasker.thread_pool object directly or use task_supervisor.spawn function, which's directly mapped to thread_pool.submit)

from neotasker import thread_pool

thread_pool.start()

def mytask(a, b, c):
    print(f'I am working in the background! {a} {b} {c}')
    return 777

task = task_supervisor.spawn(mytask, 1, 2, c=3)

# get future result
result = task.result()

Creating async io loop

from neotasker import thread_pool

thread_pool.start()
task_supervisor.create_aloop('default', default=True)

# The loop will until supervisor is stopped
# Spawn coroutine from another thread:

task_supervisor.get_aloop().spawn_coroutine_threadsafe(coro)

Worker examples

from neotasker import background_worker, task_supervisor

task_supervisor.start()
# we need to create at least one aloop to start workers
task_supervisor.create_aloop('default', default=True)
# create one more async loop
task_supervisor.create_aloop('loop2')

@background_worker
def worker1(**kwargs):
    print('I am a simple background worker')

@background_worker
async def worker_async(**kwargs):
    print('I am async background worker')

@background_worker(interval=1, loop='loop2')
def worker2(**kwargs):
    print('I run every second!')

@background_worker(queue=True)
def worker3(task, **kwargs):
    print('I run when there is a task in my queue')

@background_worker(event=True)
def worker4(**kwargs):
    print('I run when triggered')

worker1.start()
worker_async.start()
worker2.start()
worker3.start()
worker4.start()

worker3.put_threadsafe('todo1')
worker4.trigger_threadsafe()

from neotasker import BackgroundIntervalWorker

class MyWorker(BackgroundIntervalWorker):

    def run(self, **kwargs):
        print('I am custom worker class')

worker5 = MyWorker(interval=0.1, name='worker5')
worker5.start()
You can’t perform that action at this time.