Skip to content

mowshon/bounded_pool_executor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

19 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Bounded Process&Thread Pool Executor

BoundedSemaphore for ProcessPoolExecutor & ThreadPoolExecutor from concurrent.futures

Installation

pip install bounded-pool-executor

What is the main problem?

If you use the standard module "concurrent.futures" and want to simultaneously process several million data, then a queue of workers will take up all the free memory.

If the script is run on a weak VPS, this will lead to a memory leak.

BoundedProcessPoolExecutor VS ProcessPoolExecutor

BoundedProcessPoolExecutor

BoundedProcessPoolExecutor will put a new worker in queue only when another worker has finished his work.

Linux:

from bounded_pool_executor import BoundedProcessPoolExecutor
from time import sleep
from random import randint

def do_job(num):
    sleep_sec = randint(1, 10)
    print('value: %d, sleep: %d sec.' % (num, sleep_sec))
    sleep(sleep_sec)

with BoundedProcessPoolExecutor(max_workers=5) as worker:
    for num in range(10000):
        print('#%d Worker initialization' % num)
        worker.submit(do_job, num)

Windows:

from bounded_pool_executor import BoundedProcessPoolExecutor
from time import sleep
from random import randint

def do_job(num):
    sleep_sec = randint(1, 10)
    print('value: %d, sleep: %d sec.' % (num, sleep_sec))
    sleep(sleep_sec)

if __name__ == '__main__':
    with BoundedProcessPoolExecutor(max_workers=5) as worker:
        for num in range(10000):
            print('#%d Worker initialization' % num)
            worker.submit(do_job, num)

Result:

BoundedProcessPoolExecutor

Classic concurrent.futures.ProcessPoolExecutor

ProcessPoolExecutor inserts all workers into the queue and expects tasks to be performed as the new worker is released, depending on the value of max_workers.

Linux:

import concurrent.futures
from time import sleep
from random import randint

def do_job(num):
    sleep_sec = randint(1, 3)
    print('value: %d, sleep: %d sec.' % (num, sleep_sec))
    sleep(sleep_sec)

with concurrent.futures.ProcessPoolExecutor(max_workers=5) as worker:
    for num in range(100000):
        print('#%d Worker initialization' % num)
        worker.submit(do_job, num)

Windows:

import concurrent.futures
from time import sleep
from random import randint

def do_job(num):
    sleep_sec = randint(1, 3)
    print('value: %d, sleep: %d sec.' % (num, sleep_sec))
    sleep(sleep_sec)

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as worker:
        for num in range(100000):
            print('#%d Worker initialization' % num)
            worker.submit(do_job, num)

Result:

concurrent.futures.ProcessPoolExecutor

About

Bounded Process&Thread Pool Executor

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages