diff --git a/csp/csp.py b/csp/csp.py index 7389410..a96b1cf 100644 --- a/csp/csp.py +++ b/csp/csp.py @@ -30,6 +30,8 @@ from __future__ import absolute_import +from contextlib import contextmanager + import os import sys @@ -71,3 +73,93 @@ from .os_process import * except: from .os_thread import * + + + +class CSP(object): + """Context manager to execute Python functions sequentially or in + parallel, similarly to OCCAM syntax: + + csp = CSP() + with csp.seq: + csp.process(myfunc1, arg1, arg2) + with csp.par: + csp.process(myfunc2, arg1, arg2) + csp.process(myfunc3, arg1, arg2) + csp.start() + # myfunc3 and myfunc4 will be executed in parallel. + # myfunc1 and myfunc2 will be executed sequentially, + # and myfunc3 and myfunc4 will be executed after + # myfunc2 has returned. + """ + + def __init__(self): + self.processes = [] + + @contextmanager + def par(self): + """Context manager to execute functions in parallel. + + csp = CSP() + with csp.seq: + csp.process(myfunc1, arg1, arg2) + csp.process(myfunc2, arg1, arg2) + csp.start() + # myfunc1 and myfunc2 will be executed in parallel. + """ + self.processes.append([]) + yield + proc_list = self.processes.pop() + par = Par(*proc_list) + if len(self.processes) > 0: + self.processes[-1].append(par) + else: + self.processes.append(par) + return + + @contextmanager + def seq(self): + """Context manager to execute functions in sequence. + + csp = CSP() + with csp.seq: + csp.process(myfunc1, arg1, arg2) + csp.process(myfunc2, arg1, arg2) + csp.start() + # myfunc1 and myfunc2 will be executed sequentially. + """ + self.processes.append([]) + yield + proc_list = self.processes.pop() + seq = Seq(*proc_list) + if len(self.processes) > 0: + self.processes[-1].append(seq) + else: + self.processes.append(seq) + return + + def process(self, func, *args, **kwargs): + """Add a process to the current list of proceses. + + Likely, this will be called from inside a context manager, e.g.: + + csp = CSP() + with csp.par: + csp.process(myfunc1, arg1, arg2) + csp.process(myfunc2, arg1, arg2) + csp.start() + """ + self.processes[-1].append(CSPProcess(func, *args, **kwargs)) + return + + def start(self): + """Start all processes in self.processes (in parallel) and run + to completion. + """ + if len(self.processes) == 0: + return + elif len(self.processes) == 1: + self.processes[0].start() + else: + Par(*self.processes).start() + return diff --git a/csp/os_process.py b/csp/os_process.py index e295a02..b7f45d3 100644 --- a/csp/os_process.py +++ b/csp/os_process.py @@ -601,7 +601,8 @@ def terminate(self): def join(self): for proc in self.procs: - proc.join() + if proc._popen: + proc.join() def start(self): """Start then synchronize with the execution of parallel processes. diff --git a/csp/os_thread.py b/csp/os_thread.py index 6664558..c947681 100644 --- a/csp/os_thread.py +++ b/csp/os_thread.py @@ -610,7 +610,8 @@ def getPid(self): def join(self): for proc in self.procs: - proc.join() + if proc._Thread__started.is_set(): + proc.join() def start(self): """Start then synchronize with the execution of parallel processes. diff --git a/test/test_contexts.py b/test/test_contexts.py new file mode 100644 index 0000000..30dac65 --- /dev/null +++ b/test/test_contexts.py @@ -0,0 +1,61 @@ +#!/usr/bin/python + +""" +Test the CSP class, found in csp.csp and its context managers. + +TODO: Replace this with proper unit testing. + +Copyright (C) Sarah Mount, 2010. + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +import sys + +sys.path.insert(0, "..") + +from csp.csp import CSP + + +def printme(*args): + print ' '.join(map(lambda x: str(x), args)) + + +def testme1(): + p = CSP() + with p.par(): + p.process(printme, 1, 2, 3, 4, 5) + p.process(printme, 6, 7, 7, 8, 9) + p.process(printme, 2, 3, 6, 3, 2) + p.start() + + +def testme2(): + p = CSP() + with p.seq(): + p.process(printme, 1, 2, 3) + with p.par(): + p.process(printme, 1) + p.process(printme, 2) + p.process(printme, 3) + p.process(printme, 5, 6, 7) + p.start() + + +if __name__ == '__main__': + print 'Test 1' + testme1() + print 'Test 2' + testme2() +