-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel Execution Engine #47
Comments
First, thanks for the amazing PyFunctional. Now my 2 cents. I've been playing with the idea of supporting basic parallel execution using a As of now, I hope is at least a start. |
Thanks @versae! The second option, as you highlighted, is a good way to accomplish this with less overhead. I spent a while playing around with your POC. I liked a few things you had there (namely using execution strategies), but couldn't seem to get it to work correctly. It looked like things were not getting computed in parallel for some reason: In [13]: def f(x):
....: print("f({0}) on Core {1}".format(x, os.getpid()))
....: return x
In [50]: seq.range(20).map(f)
Out[50]: f(0) on Core 44668
f(1) on Core 44668
f(2) on Core 44668
f(3) on Core 44668
f(4) on Core 44668
f(5) on Core 44668
f(6) on Core 44668
f(7) on Core 44668
f(8) on Core 44668
f(9) on Core 44668
f(10) on Core 44668
f(11) on Core 44668
f(12) on Core 44668
f(13) on Core 44668
f(14) on Core 44668
f(15) on Core 44668
f(16) on Core 44668
f(17) on Core 44668
f(18) on Core 44668
f(19) on Core 44668
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] Using Is the specific pickling code there (rather than let The other thing I was thinking about is that the parallelization is across different function calls when some can be trivially combined (eg For tests, that is probably the thing that requires the most work, not so much because of Thanks for the start and ideas! |
I see the problem now, # not parallel
>>> %timeit -n 1 list(seq.range(100).map(lambda x: sleep(.1) and x))
1 loop, best of 3: 10 s per loop
# parallel
>>> %timeit -n 1 list(seq.range(100).map(lambda x: sleep(.1) and x))
1 loop, best of 3: 1.64 s per loop And now showing the PIDs. >>> seq.range(20).map(f)
f(16) on Core 15326
f(8) on Core 15325
f(0) on Core 15324
f(17) on Core 15326
f(1) on Core 15324
f(9) on Core 15325
f(2) on Core 15324
f(18) on Core 15326
f(10) on Core 15325
f(19) on Core 15326
f(3) on Core 15324
f(11) on Core 15325
f(4) on Core 15324
f(12) on Core 15325
f(5) on Core 15324
f(13) on Core 15325
f(6) on Core 15324
f(14) on Core 15325
f(7) on Core 15324
f(15) on Core 15325
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] The way in which I like the |
Few comments/questions I had, but nice work! SerializationIs it necessary to serialize the collection itself with something else? Seems like Object model for
|
SerializationThe thing is that Object model for
|
I'll take a closer look at the code, but my suggestion would be creating the execution engine class so that |
Actually, on second thought perhaps |
Another option could be to have |
Seems like that would add a lot of noise to the API though. I could possibly see an argument for something like |
As I see it, implementing |
That being said, I completely understand the logical separation of |
Also, referring to this code: def evaluate(self, sequence):
result = sequence
last_cache_index = self.cache_scan()
staged = []
for transform in self.transformations[last_cache_index:]:
strategies = transform.execution_strategies
if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if strategies and ExecutionStrategies.PARALLEL in strategies:
staged.append(transform.function)
else:
if staged:
result = parallelize(compose(*staged), result)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result)
return iter(result) The second to last 2 lines force any currently staged things to execute at evaluation time, rather than when the first value is asked for. I would imagine having something like below would solve this: def lazy_parallel(result, composed_funcs):
expanded = False
while True:
if not expanded:
expanded = True
result = iter(parallelize(composed_funcs, result))
yield result Then catch the empty iteration error. I am also 100% sure there is a cleaner way than the ad-hoc method above Saw other posts, reading them now |
I'm not sure massive refactoring would be necessary. On the call to |
It sounds good. Some more work would be required to change stuff like >>> i = lambda x: x
>>> pseq(range(20)).map(l).filter(i)
7487
7488
7488
7488
7490
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> seq(range(20)).map(l).filter(i)
7411
7411
7411
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38] And as per the lazy evaluation, I think is already done. >>> s = seq(range(20)).map(l).filter(i)
>>> s.to_list()
7411
7411
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> s
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> ps = pseq(range(20)).map(l).filter(i)
>>> ps.to_list()
7517
7515
7516
7517
7518
...
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
>>> ps
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38] |
OK, I found a useful feature for having execution engines being classes, the passing of the number of processes for the parallel engine (versae@ebe41f4). >>> pseq(range(40), processes=2).map(lambda x: (print(os.getpid()) or x*2)) |
I think that the issue with class ExecutionEngine(object):
def evaluate(self, sequence, transformations):
result = sequence
for transform in transformations:
strategies = transform.execution_strategies
if (strategies is not None
and ExecutionStrategies.PRE_COMPUTE in strategies):
result = transform.function(list(result))
else:
result = transform.function(result)
return iter(result)
def __call__(self, *args):
return self.sequence(*args)
def sequence(self, *args):
if len(args) == 0:
raise TypeError("seq() takes at least 1 argument ({0} given)".format(len(args)))
elif len(args) > 1:
return Sequence(list(args), engine=self)
elif is_primitive(args[0]):
return Sequence([args[0]], engine=self)
else:
return Sequence(args[0], engine=self)
def csv(self, csv_file, dialect='excel', **fmt_params):
if isinstance(csv_file, str):
input_file = ReusableFile(csv_file, mode='r')
elif hasattr(csv_file, 'next') or hasattr(csv_file, '__next__'):
input_file = csv_file
else:
raise ValueError('csv_file must be a file path or implement the iterator interface')
csv_input = csvapi.reader(input_file, dialect=dialect, **fmt_params)
return self.__call__(csv_input).cache(delete_lineage=True)
# Other functions would be defined similarly
class ParallelExecutionEngine(ExecutionEngine):
def __init__(self, processes=None):
self.processes = processes
def __call__(self, *args, processes=None):
self.processes = processes
return self.sequence(*args)
def evaluate(self, sequence, transformations):
processes = self.processes
result = sequence
staged = []
for transform in transformations:
strategies = transform.execution_strategies
if strategies and ExecutionStrategies.PRE_COMPUTE in strategies:
result = list(result)
if strategies and ExecutionStrategies.PARALLEL in strategies:
staged.insert(0, transform.function)
else:
if staged:
result = parallelize(compose(*staged), result, processes)
staged = []
result = transform.function(result)
if staged:
result = parallelize(compose(*staged), result, processes)
return iter(result)
# functional.__init__
seq = ExecutionEngine()
pseq = ParallelExecutionEngine() Its not tested, but should give the general idea. I am still not convinced that this code block doesn't immediately expand though: if staged:
result = parallelize(compose(*staged), result, processes) |
Did some playing around in the terminal about the last bit: from multiprocessing import Pool
p = Pool()
r = p.map(str, [1, 2, 3, 4, 5, 6])
print(type(r))
# list This would imply that if def parallelize(func, result, processes=None):
if not is_serializable(func):
return func(result)
if processes is None or processes < 1:
processes = CPU_COUNT
else:
processes = min(processes, CPU_COUNT)
with Pool(processes=processes) as pool:
chunks = split_every(processes, iter(result))
packed_chunks = (pack(func, (chunk, )) for chunk in chunks)
results = pool.map(unpack, packed_chunks)
return chain.from_iterable(results) The Going back to the Side note: should the function compute the result in serial or throw an error if it can't be serialized? On the one hand, I think the user should know that things are not being computed in parallel when they think it is, on the other hand its a decent fallback. Perhaps this could be a configurable parameter somehow set to one of those two modes? |
Any thoughts @versae? I really like where this is going and think this could very easily become a merged pull request with a little more work! |
I tried the code you posted, I think I misunderstood the purpose of
Regarding the |
Tested the code for The One nice thing to have would be that those functions ( Would # This seems better
from functional import pseq
pseq(...)
# Than this
from functional import seq
seq(..., processes=4)
# And even when you need both, this is still clean
from functional import seq, pseq
seq(,,,)
pseq(...)
# And changing core counts
pseq(..., processes=4)
# or maybe even
pseq.processes = 4
pseq(...) |
Looks great! There are a few minor things here and there, but I could comment on those individually when you open a pull request. Next things are probably:
|
class TestPipeline(unittest.TestCase):
def setUp(self):
self.seq = seq And then using |
Also, I just discovered that stuff like |
The above idea for testing is what I had in mind. I would extend this idea by making a function decorator like How does that fail? |
This demonstrates that capability: class MyTestCase(unittest.TestCase):
@unittest.skip("demonstrating skipping")
def test_nothing(self):
self.fail("shouldn't happen")
@unittest.skipIf(mylib.__version__ < (1, 3),
"not supported in this library version")
def test_format(self):
# Tests that work for only a certain version of the library.
pass
@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
def test_windows_support(self):
# windows specific testing code
pass |
Any updates @versae? I am actually fairly excited to get a PR open and merged. Great work so far! If you happen to be busy or don't want to work out the tests, just let me know and we can figure something out. |
I fixed the problem I found and now all tests run (I basically imported The tests that need to be rethought for the parallel execution are:
Once those tests are solved I will do the pull request and then you could organize the final tests in the best way. |
Also, |
Here's how I solved it: lambda sequence: (lambda s: [wrap(s[i:])
for i in range(len(s) + 1)]
)(list(sequence)), Length could me made more efficient by using With this last change, now the only errors are due to serialization and side-effects. If you are OK with this I can create the PR :) |
Little surprised at the issues with It is also similar to the case with For these, I would like to understand why they weren't working in the first place since they should have. Could you post what was failing? On tests, I'll take a careful look when I get home in a couple hours, but if there is anything that will obviously not work in parallel like the side effect tests, then they can be flagged to be skipped. The point of those tests is check the correctness of the code which is independent of serial/parallel execution. On serialization, I would skip this test as well and if I can fix it later I will. |
And you are absolutely right. It was a problem in the logic of the parallel execution engine. It is fixed now. |
With that fixed, I think you should be able to put in the flags on tests for the ones that should be fixed and be good to go for PR? |
I don't understand, there are no tests yet for |
Sorry, I didn't take a close enough look at the committed code. Just to verify, currently running the tests is substituting To get everything running I would first add a constructor like: class TestPipeline(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.seq = seq
super(TestPipeline, self).__init__(*args, **kwargs) Then change all the references of class TestParallelPipeline(TestPipeline):
def __init__(self, *args, **kwargs):
self.seq = pseq
super(TestParallelPipeline, self).__init__(*args, **kwargs) For all the tests which shouldn't be there because of side effects and what not, put this code: @unittest.skipIf(self.seq is pseq, "seq is pseq, skipping serial test")
def test_side_effect(self):
pass If things work as I think they would, this should lead to |
Gotcha now, I wasn't sure that was the approach you wanted. |
If you have a better idea, go for it. Testing like above looked like a reasonable way to go about it |
Unfortunately the decorator has no access to
|
Looks like there is a method |
Is there anything else left to do? |
There are other kinds of functions that could be running in parallel using a reduce approach, such as |
My thought would be to break these different things into smaller PRs rather than make one giant PR. I think that the work so far could be a single PR which focuses on the added parallelization capabilities. Future commits/PRs could add in other parallel methods, and better reducing methods. It would also be great to merge it so that I can work on implementing |
That sounds good to me too. |
We need to add |
Feature merged in #67 |
Creating issue to discuss potential of implementing a parallel execution engine. From the
0.5.0
milestone this might include:The text was updated successfully, but these errors were encountered: