Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fastcore/_nbdev.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,11 @@
"threaded": "03a_parallel.ipynb",
"startthread": "03a_parallel.ipynb",
"set_num_threads": "03a_parallel.ipynb",
"check_parallel_num": "03a_parallel.ipynb",
"parallelable": "03a_parallel.ipynb",
"ThreadPoolExecutor": "03a_parallel.ipynb",
"ProcessPoolExecutor": "03a_parallel.ipynb",
"parallel": "03a_parallel.ipynb",
"add_one": "03a_parallel.ipynb",
"run_procs": "03a_parallel.ipynb",
"parallel_gen": "03a_parallel.ipynb",
"url_default_headers": "03b_net.ipynb",
Expand Down
29 changes: 20 additions & 9 deletions fastcore/parallel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/03a_parallel.ipynb (unless otherwise specified).

__all__ = ['threaded', 'startthread', 'set_num_threads', 'check_parallel_num', 'ThreadPoolExecutor',
'ProcessPoolExecutor', 'parallel', 'run_procs', 'parallel_gen']
__all__ = ['threaded', 'startthread', 'set_num_threads', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor',
'parallel', 'add_one', 'run_procs', 'parallel_gen']

# Cell
from .imports import *
Expand Down Expand Up @@ -58,12 +58,13 @@ def _call(lock, pause, n, g, item):
return g(item)

# Cell
def check_parallel_num(param_name, num_workers):
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0:
def parallelable(param_name, num_workers, f=None):
f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__"
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main:
print("Due to IPython and Windows limitation, python multiprocessing isn't available now.")
print(f"So `{param_name}` is changed to 0 to avoid getting stuck")
num_workers = 0
return num_workers
print(f"So `{param_name}` has to be changed to 0 to avoid getting stuck")
return False
return True

# Cell
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
Expand All @@ -88,13 +89,16 @@ class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
if max_workers is None: max_workers=defaults.cpus
max_workers = check_parallel_num('max_workers', max_workers)
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
super().__init__(max_workers, **kwargs)

def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0
self.not_parallel = self.max_workers==0
if self.not_parallel: self.max_workers=1

if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
Expand All @@ -118,6 +122,13 @@ def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None
r = progress_bar(r, total=total, leave=False)
return L(r)

# Cell
def add_one(x, a=1):
# this import is necessary for multiprocessing in notebook on windows
import random
time.sleep(random.random()/80)
return x+a

# Cell
def run_procs(f, f_done, args):
"Call `f` for each item in `args` in parallel, yielding `f_done`"
Expand All @@ -135,7 +146,7 @@ def _done_pg(queue, items): return (queue.get() for _ in items)
# Cell
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
n_workers = check_parallel_num('n_workers', n_workers)
if not parallelable('n_workers', n_workers): n_workers = 0
if n_workers==0:
yield from enumerate(list(cls(**kwargs)(items)))
return
Expand Down
56 changes: 39 additions & 17 deletions nbs/03a_parallel.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,13 @@
"outputs": [],
"source": [
"#export\n",
"def check_parallel_num(param_name, num_workers):\n",
" if sys.platform == \"win32\" and IN_NOTEBOOK and num_workers > 0:\n",
"def parallelable(param_name, num_workers, f=None):\n",
" f_in_main = f == None or sys.modules[f.__module__].__name__ == \"__main__\" \n",
" if sys.platform == \"win32\" and IN_NOTEBOOK and num_workers > 0 and f_in_main:\n",
" print(\"Due to IPython and Windows limitation, python multiprocessing isn't available now.\")\n",
" print(f\"So `{param_name}` is changed to 0 to avoid getting stuck\")\n",
" num_workers = 0\n",
" return num_workers"
" print(f\"So `{param_name}` has to be changed to 0 to avoid getting stuck\")\n",
" return False\n",
" return True"
]
},
{
Expand Down Expand Up @@ -263,13 +264,16 @@
" \"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution\"\n",
" def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):\n",
" if max_workers is None: max_workers=defaults.cpus\n",
" max_workers = check_parallel_num('max_workers', max_workers)\n",
" store_attr()\n",
" self.not_parallel = max_workers==0\n",
" if self.not_parallel: max_workers=1\n",
" super().__init__(max_workers, **kwargs)\n",
"\n",
" def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):\n",
" if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0\n",
" self.not_parallel = self.max_workers==0\n",
" if self.not_parallel: self.max_workers=1\n",
" \n",
" if self.not_parallel == False: self.lock = Manager().Lock()\n",
" g = partial(f, *args, **kwargs)\n",
" if self.not_parallel: return map(g, items)\n",
Expand Down Expand Up @@ -340,10 +344,20 @@
"metadata": {},
"outputs": [],
"source": [
"def add_one(x, a=1): \n",
"#export\n",
"def add_one(x, a=1):\n",
" # this import is necessary for multiprocessing in notebook on windows\n",
" import random\n",
" time.sleep(random.random()/80)\n",
" return x+a\n",
"\n",
" return x+a"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"inp,exp = range(50),range(1,51)\n",
"\n",
"test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)\n",
Expand Down Expand Up @@ -378,11 +392,11 @@
"name": "stdout",
"output_type": "stream",
"text": [
"0 2021-02-03 09:51:30.561681\n",
"1 2021-02-03 09:51:30.812066\n",
"2 2021-02-03 09:51:31.063662\n",
"3 2021-02-03 09:51:31.313478\n",
"4 2021-02-03 09:51:31.564776\n"
"0 2021-02-23 06:38:58.778425\n",
"1 2021-02-23 06:38:59.028804\n",
"2 2021-02-23 06:38:59.280227\n",
"3 2021-02-23 06:38:59.530889\n",
"4 2021-02-23 06:38:59.781011\n"
]
}
],
Expand Down Expand Up @@ -438,15 +452,15 @@
"#export \n",
"def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):\n",
" \"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel.\"\n",
" n_workers = check_parallel_num('n_workers', n_workers)\n",
" if not parallelable('n_workers', n_workers): n_workers = 0\n",
" if n_workers==0:\n",
" yield from enumerate(list(cls(**kwargs)(items)))\n",
" return\n",
" batches = L(chunked(items, n_chunks=n_workers))\n",
" idx = L(itertools.accumulate(0 + batches.map(len)))\n",
" queue = Queue()\n",
" if progress_bar: items = progress_bar(items, leave=False)\n",
" f=partial(_f_pg, cls(**kwargs), queue)\n",
" f=partial(_f_pg, cls(**kwargs), queue) \n",
" done=partial(_done_pg, queue, items)\n",
" yield from run_procs(f, done, L(batches,idx).zip())"
]
Expand Down Expand Up @@ -546,7 +560,8 @@
"Converted 05_transform.ipynb.\n",
"Converted 07_meta.ipynb.\n",
"Converted 08_script.ipynb.\n",
"Converted index.ipynb.\n"
"Converted index.ipynb.\n",
"Converted parallel_win.ipynb.\n"
]
}
],
Expand All @@ -569,6 +584,13 @@
"exit_code = process.wait()\n",
"test_eq(exit_code, 0)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
38 changes: 38 additions & 0 deletions nbs/parallel_win.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "informational-central",
"metadata": {},
"outputs": [],
"source": [
"from fastcore.test import *\n",
"from fastcore.parallel import *\n",
"\n",
"if __name__ == \"__main__\":\n",
" inp,exp = range(50),range(1,51)\n",
"\n",
" test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)\n",
" test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "developing-darwin",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}