In [5]:
#export
from fastprogress import *
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import functools, operator

In [2]:
#export
from nbdev.showdoc import *

In [3]:
#export
def parallel(func, items, chunksize=None, max_workers=1):
    '''This function use in parallel processing'''
    
    if isinstance(items, list):
        if chunksize:
            arr = [items[i: i+chunksize] for i in (range(0, len(items), chunksize))]
        else:
            chunksize = int(len(items)/max_workers)
            arr = [items[i: i+chunksize] for i in (range(0, len(items), chunksize))]
            
    elif isinstance(items, np.ndarray):
        if chunksize:
            n_splits = int((items.shape[0]/chunksize)+0.5)
            arr = np.array_split(items, n_splits)
        else:
            arr = np.array_split(items, max_workers)

    if max_workers<2: results = list(progress_bar(map(func, enumerate(arr)), total=len(arr)))
    else:
        with ProcessPoolExecutor(max_workers=max_workers) as ex:
            return list(progress_bar(ex.map(func, enumerate(arr)), total=len(arr)))
    if any([o is not None for o in results]): return results

In [5]:
#show_doc(parallel)

In [20]:
#hide
def squareme(x):
    return np.power(x,2)

items = np.array(list(range(10)))
results = parallel(squareme, items, chunksize=2, max_workers=1)
results

[array([0, array([0, 1])], dtype=object),
 array([1, array([4, 9])], dtype=object),
 array([4, array([16, 25])], dtype=object),
 array([9, array([36, 49])], dtype=object),
 array([16, array([64, 81])], dtype=object)]

In [52]:
#hide
functools.reduce(operator.iconcat,[r[1].tolist() for r in results] , [])

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [None]:
#hide
def squareme(x):
    return np.power(x,1)

items = list(range(10))
results = parallel(squareme, items, chunksize=2, max_workers=4)
results

# Using Multi processing package

### Example 1

In [25]:
#export
import time
import random
from tqdm.notebook import tqdm
from multiprocessing import Pool, freeze_support, RLock
import numpy as np

def func(pid, n):

    tqdm_text = "#" + "{}".format(pid).zfill(3)

    current_sum = 0
    with tqdm(total=n, desc=tqdm_text, position=pid+1) as pbar:
        for i in range(1, n+1):
            current_sum += i
            time.sleep(0.05)
            pbar.update(1)
    
    return current_sum

def runme(func, items, workers=4):

    freeze_support() # For Windows support

    num_processes = workers
    num_jobs = workers
    random_seed = 3107
    random.seed(random_seed) 

    pool = Pool(processes=num_processes, initargs=(RLock(),), initializer=tqdm.set_lock)

    argument_list = items

    jobs = [pool.apply_async(func, args=(n,)) for i, n in enumerate(argument_list)]
    pool.close()
    result_list = [job.get() for job in jobs]
    return result_list

In [26]:
#export
def squareme(x):
    return np.power(x,1)

items = list(range(100))
runme(squareme, items)

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99]

### Example 2

In [27]:
#hide
import time
import random
from multiprocessing import Pool
from tqdm import tqdm

def squareme(x):
    return np.power(x,1)

pool = Pool(2)
'''
for _ in tqdm(pool.imap_unordered(myfunc, range(100)), total=100):
    pass
'''
pbar = tqdm(total=100)
def update(*a):
    pbar.update()
    # tqdm.write(str(a))
for i in range(pbar.total):
    pool.apply_async(squareme, args=(i,), callback=update)
# tqdm.write('scheduled')
pool.close()
pool.join()

100%|██████████| 100/100 [00:23<00:00,  3.40it/s]

In [30]:
#hide
from p_tqdm import p_map

In [41]:
#hide
def squareme(x):
    return np.power(x,1)

items = np.array(list(range(100)))

results = p_map(squareme, items, num_cpus=4)


100%|██████████| 100/100 [00:00<00:00, 282444.71it/s]


In [60]:
from multiprocessing import Pool
from tqdm.notebook import tqdm

def squareme(x):
    return np.power(x,1)

items = list(range(100000))
chunksize = 1000

arr = [items[i: i+chunksize] for i in (range(0, len(items), chunksize))]

pool = Pool(processes=4)
jobs = []
for j in tqdm(pool.map(squareme, arr), total=len(arr)):
    jobs.append(j)

pool.close()

HBox(children=(FloatProgress(value=0.0), HTML(value='')))




In [3]:
import multiprocessing
multiprocessing.set_start_method('fork', force=True)

from atpbar import register_reporter, find_reporter, flush
import random,time
from atpbar import atpbar

def run_with_multiprocessing():
    def task(n, name):
        for i in atpbar(range(n), name=name):
            time.sleep(0.0001)
    def worker(reporter, task, queue):
        register_reporter(reporter)
        while True:
            args = queue.get()
            if args is None:
                queue.task_done()
                break
            task(*args)
            queue.task_done()
    nprocesses = 4
    ntasks = 10
    reporter = find_reporter()
    queue = multiprocessing.JoinableQueue()
    for i in range(nprocesses):
        p = multiprocessing.Process(target=worker, args=(reporter, task, queue))
        p.start()
    for i in range(ntasks):
        name = 'task {}'.format(i)
        n = random.randint(5, 100000)
        queue.put((n, name))
    for i in range(nprocesses):
        queue.put(None)
        queue.join()
    flush()

In [4]:
run_with_multiprocessing()

VBox()

In [None]:
import multiprocessing
multiprocessing.set_start_method('fork', force=True)

from atpbar import register_reporter, find_reporter, flush
import random,time
from atpbar import atpbar

def run_with_multiprocessing():
    def task(n, name):
        for i in atpbar(range(n), name=name):
            time.sleep(0.0001)
    def worker(reporter, task, queue):
        register_reporter(reporter)
        while True:
            args = queue.get()
            if args is None:
                queue.task_done()
                break
            task(*args)
            queue.task_done()
    nprocesses = 4
    ntasks = 10
    reporter = find_reporter()
    queue = multiprocessing.JoinableQueue()
    for i in range(nprocesses):
        p = multiprocessing.Process(target=worker, args=(reporter, task, queue))
        p.start()
    for i in range(ntasks):
        name = 'task {}'.format(i)
        n = random.randint(5, 100000)
        queue.put((n, name))
    for i in range(nprocesses):
        queue.put(None)
        queue.join()
    flush()

## Decorated Concurrency

In [22]:
from deco import concurrent

In [24]:
@concurrent(processes=2)
def sample_fn(items):
    rs = []
    for i in items:
         rs.append(i*i)
    return rs

items = np.array(list(range(100)))
results = sample_fn(items)

In [26]:
results.get()

([0,
  1,
  4,
  9,
  16,
  25,
  36,
  49,
  64,
  81,
  100,
  121,
  144,
  169,
  196,
  225,
  256,
  289,
  324,
  361,
  400,
  441,
  484,
  529,
  576,
  625,
  676,
  729,
  784,
  841,
  900,
  961,
  1024,
  1089,
  1156,
  1225,
  1296,
  1369,
  1444,
  1521,
  1600,
  1681,
  1764,
  1849,
  1936,
  2025,
  2116,
  2209,
  2304,
  2401,
  2500,
  2601,
  2704,
  2809,
  2916,
  3025,
  3136,
  3249,
  3364,
  3481,
  3600,
  3721,
  3844,
  3969,
  4096,
  4225,
  4356,
  4489,
  4624,
  4761,
  4900,
  5041,
  5184,
  5329,
  5476,
  5625,
  5776,
  5929,
  6084,
  6241,
  6400,
  6561,
  6724,
  6889,
  7056,
  7225,
  7396,
  7569,
  7744,
  7921,
  8100,
  8281,
  8464,
  8649,
  8836,
  9025,
  9216,
  9409,
  9604,
  9801],
 [])

## mantichora exploration

In [1]:
import time, random
from atpbar import atpbar
from mantichora import mantichora

In [4]:
def task_loop(name, ret=None):
    n = random.randint(10000, 100000)
    for i in atpbar(range(n), name=name):
        time.sleep(0.0001)
    return ret

In [5]:
with mantichora(nworkers=3) as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()

print(results)

VBox()

['result1', 'result2', 'result3', 'result4', 'result5', 'result6']
