In [1]:
import multiprocessing as mp
from time import sleep
from queue import Empty
import sys

In [2]:
def job_runner(cores, jobqueue, outputs):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    jobno = 0
    jobs = {}
    done = []
    while True:
        try:
            fun_args = jobqueue.get_nowait()
            if len(fun_args) == 1:
                function = fun_args[0]
                args = None
                depends = None
            elif len(fun_args) == 2:
                function, args = fun_args
                depends = None
            elif len(fun_args) == 3:
                function, args, depends = fun_args
            else:
                continue
            jobno += 1
            jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
                           'started': False}
            outputs.put(jobs.copy())
        except Empty:
            pass
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['done']:
                    continue
                ready = True
                if job_info['depends']:
                    for depend in job_info['depends']:
                        if not depend in done:
                            ready = False
                if ready:
                    if job_info['args']:
                        job_info['out'] = job_info['func'](job_info['args'])
                    else:
                        job_info['out'] = job_info['func']()
                    job_info['started'] = True
                    job_info['done'] = True
                    done.append(jobno)
                    if job_info['depends']:
                        outputs.put(jobs.copy())
        sleep(1)

In [3]:
def jon(string='bob'):
    return 'hi ' + string

In [4]:
queue = mp.Queue()
outqueue = mp.Queue()

In [5]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))

In [6]:
runner.start()

In [7]:
queue.empty()

True

In [8]:
outqueue.empty()

True

In [9]:
runner.is_alive()

True

In [10]:
queue.put([jon, 'fred', None])

In [11]:
outdict = {}

In [12]:
sleep(2)
try:
    j = outqueue.get_nowait()
except Empty as e:
    print(repr(e))

In [13]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True}}

In [14]:
if j:
    outdict.update(j)

In [15]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True}}

In [16]:
queue.put([jon, 'bob', [4]])

In [17]:
queue.put([jon, 'joe', None])

In [18]:
sleep(1)
j = outqueue.get_nowait()

In [19]:
if j:
    outdict.update(j)

In [20]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False}}

Note that the get command only gets the first dictionary in the stack, it needs to be run twice if two commands are put.

In [21]:
sleep(1)
j = outqueue.get_nowait()

In [22]:
if j:
    outdict.update(j)

In [23]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True}}

In [24]:
queue.put([jon, 'done', None])

In [25]:
sleep(2)
j = outqueue.get_nowait()

In [26]:
if j:
    outdict.update(j)

In [27]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi done',
  'started': True}}

Note that job 2 has not completed.

In [28]:
sleep(2)
j = outqueue.get_nowait()

In [29]:
if j:
    outdict.update(j)

In [30]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi bob',
  'started': True},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi done',
  'started': True}}

Now it has though, we needed to get it a second time, as one get only fetches one successful loop.

Same exact idea with pool instead of direct running
===================================================

In [31]:
def job_runner(cores, jobqueue, outputs):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    jobno = 0
    jobs = {}
    runners = {}
    done = []
    pool = mp.Pool()
    while True:
        try:
            fun_args = jobqueue.get_nowait()
            if len(fun_args) == 1:
                function = fun_args[0]
                args = None
                depends = None
            elif len(fun_args) == 2:
                function, args = fun_args
                depends = None
            elif len(fun_args) == 3:
                function, args, depends = fun_args
            else:
                continue
            jobno += 1
            jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
                           'started': False}
            outputs.put(jobs.copy())
        except Empty:
            pass
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['done']:
                    continue
                ready = True
                if job_info['depends']:
                    for depend in job_info['depends']:
                        if not depend in done:
                            ready = False
                if ready:
                    if job_info['args']:
                        runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
                    else:
                        runners[jobno] = pool.apply_async(job_info['func'])
                    job_info['started'] = True
                if job_info['started'] and runners[jobno].ready():
                    job_info['out'] = runners[jobno].join()
                    job_info['done'] = True
                    done.append(jobno)
                    outputs.put(jobs.copy())
                    #if job_info['depends']:
                    #    outputs.put(jobs.copy())
        sleep(1)

In [32]:
queue = mp.Queue()
outqueue = mp.Queue()

In [33]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))

In [34]:
runner.start()

In [35]:
queue.empty()

True

In [36]:
outqueue.empty()

True

In [37]:
runner.is_alive()

True

In [38]:
queue.put([jon, 'fred', None])

In [39]:
outdict = {}

In [40]:
sleep(2)
try:
    j = outqueue.get_nowait()
except Empty as e:
    print(e)

In [41]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

In [43]:
if j:
    outdict.update(j)

In [44]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

Unexpected behavior
-------------------

That should have completed, but it didn't, let's try to get a second time to see if it worked yet.

In [45]:
sleep(2)
try:
    j = outqueue.get_nowait()
except Empty as e:
    print(e)




In [46]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

**Nope**
Still isn't completing *why*?

In [47]:
queue.put([jon, 'bob', [4]])

In [48]:
queue.put([jon, 'joe', None])

In [49]:
sleep(1)
j = outqueue.get_nowait()

In [50]:
if j:
    outdict.update(j)

In [51]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False}}

It worked here once... but it isn't anymore. I am not sure why. Even after though, it was overwritten with the incomplete entry... maybe the dictionary copying is part of the issue?

In [52]:
sleep(2)
j = outqueue.get_nowait()

In [53]:
if j:
    outdict.update(j)

In [54]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

Unexpected behavior
-------------------
We have *reverted* back to the prior dictionary... the completed info for job 1 is gone.

In [55]:
queue.put([jon, 'done', None])

In [56]:
sleep(2)
j = outqueue.get_nowait()

In [57]:
if j:
    outdict.update(j)

In [58]:
outdict

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

No jobs have completed now...

In [59]:
sleep(2)
try:
    j = outqueue.get_nowait()
except Empty as e:
    print(repr(e))

Empty()


In [60]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

Let's try again, but this time doing a clever dictionary update, no copying
---------------------------------------------------------------------------

In [191]:
def job_runner(cores, jobqueue, outputs):
    """
    jobs: [(command, args)]
    outputs: {id: retval}
    """
    import sys
    from copy import deepcopy
    def output(out):
        """Let's try and explicitly clear the dictionary before sending the output."""
        while not outputs.empty():
            # Clear the output object
            outputs.get()
        outputs.put(deepcopy(out))
    jobno = 0
    jobs = {}
    runners = {}
    done = []
    pool = mp.Pool()
    while True:
        try:
            fun_args = jobqueue.get_nowait()
            if len(fun_args) == 1:
                function = fun_args[0]
                args = None
                depends = None
            elif len(fun_args) == 2:
                function, args = fun_args
                depends = None
            elif len(fun_args) == 3:
                function, args, depends = fun_args
            else:
                continue
            jobno += 1
            jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None,
                           'started': False}
            output(jobs)
        except Empty:
            pass
        if jobs:
            for jobno, job_info in jobs.items():
                if job_info['done']:
                    continue
                ready = True
                if job_info['depends']:
                    for depend in job_info['depends']:
                        if not depend in done:
                            ready = False
                if ready:
                    if job_info['args']:
                        runners[jobno] = pool.apply_async(job_info['func'], (job_info['args'],))
                    else:
                        runners[jobno] = pool.apply_async(job_info['func'])
                    job_info['started'] = True
                    output(jobs)
                if job_info['started'] and not job_info['done'] and runners[jobno].ready():
                    job_info['out'] = runners[jobno].get()
                    job_info['done'] = True
                    done.append(jobno)
                    output(jobs)
                    #if job_info['depends']:
                    #    outputs.put(jobs.copy())
        sleep(1)

In [192]:
queue = mp.Queue()
outqueue = mp.Queue()

In [193]:
runner = mp.Process(target=job_runner, args=(3, queue, outqueue))

In [194]:
runner.start()

In [195]:
queue.empty()

True

In [196]:
outqueue.empty()

True

In [197]:
runner.is_alive()

True

In [198]:
queue.put([jon, 'fred', None])

In [199]:
j = outqueue.get_nowait()

In [200]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False}}

In [201]:
count = 1
while not outqueue.empty():
    print(count, outqueue.get_nowait())

1 {1: {'args': 'fred', 'out': None, 'func': <function jon at 0x10c7b6d08>, 'depends': None, 'done': False, 'started': True}}
1 {1: {'args': 'fred', 'out': 'hi fred', 'func': <function jon at 0x10c7b6d08>, 'depends': None, 'done': True, 'started': True}}


It works now!

In [202]:
queue.put([jon, 'bob', [4]])

In [203]:
queue.put([jon, 'joe', None])

In [204]:
sleep(1)
j = outqueue.get_nowait()

In [205]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

In [206]:
sleep(1)
j = outqueue.get_nowait()

In [207]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': False},
 3: {'args': 'joe',
  'depends': None,
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True}}

In [208]:
count = 1
while not outqueue.empty():
    j = outqueue.get_nowait()
    print(count, j)

1 {1: {'args': 'fred', 'out': 'hi fred', 'func': <function jon at 0x10c7b6d08>, 'depends': None, 'done': True, 'started': True}, 2: {'args': 'bob', 'out': None, 'func': <function jon at 0x10c7b6d08>, 'depends': [4], 'done': False, 'started': False}, 3: {'args': 'joe', 'out': 'hi joe', 'func': <function jon at 0x10c7b6d08>, 'depends': None, 'done': True, 'started': True}}


In [209]:
queue.put([jon, 'done', None])

In [210]:
count = 1
while not outqueue.empty():
    j = outqueue.get_nowait()
    print(count, j)

1 {1: {'args': 'fred', 'out': 'hi fred', 'func': <function jon at 0x10c7b6d08>, 'depends': None, 'done': True, 'started': True}, 2: {'args': 'bob', 'out': None, 'func': <function jon at 0x10c7b6d08>, 'depends': [4], 'done': False, 'started': False}, 3: {'args': 'joe', 'out': 'hi joe', 'func': <function jon at 0x10c7b6d08>, 'depends': None, 'done': True, 'started': True}, 4: {'args': 'done', 'out': None, 'func': <function jon at 0x10c7b6d08>, 'depends': None, 'done': False, 'started': True}}


In [211]:
sleep(2)
j = outqueue.get_nowait()

In [213]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': False,
  'func': <function __main__.jon>,
  'out': None,
  'started': True},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi done',
  'started': True}}

Working now, but need to get the dictionary a bunch of times, almost continually.

In [214]:
sleep(2)
j = outqueue.get_nowait()

In [215]:
j

{1: {'args': 'fred',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi fred',
  'started': True},
 2: {'args': 'bob',
  'depends': [4],
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi bob',
  'started': True},
 3: {'args': 'joe',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi joe',
  'started': True},
 4: {'args': 'done',
  'depends': None,
  'done': True,
  'func': <function __main__.jon>,
  'out': 'hi done',
  'started': True}}

OK, now it works, approximately.

The below class is the outline of an idea and is not relevant to this example.

class Queue(object):
       
    def job_runner(pool, jobqueue, outputs):
        """
        jobs: [(command, args)]
        outputs: {id: retval}
        """
        import sys
        jobno = 0
        outqueue = {}
        jobs = {}
        done = []
        while True:
            try:
                fun_args = jobqueue.get_nowait()
                function, args, depends 
                jobs[jobno] = {'func': function, 'args': args, 'depends': depends, 'done': False, 'out': None}
                jobno += 1
            except multiprocessing.Empty:
                pass
            if jobs:
                for jobno, job_info in jobs.items():
                    if job_info['depends']:
                        ready = True
                        for depend in depends:
                            if not depend in done:
                                ready = False
                        if ready:
                            job_info['out'] = job_info['func'](job_info['args'])
                            job_info['done'] = True
            output = function(args)
            outputs.put(output)
            sleep(1)
            
    def submit(self, func, args, depends):
        pass
        
    def __init__(self):
        self.queue = multiprocessing.Queue()
        self.outqueue = multiprocessing.Queue()
        self.pool = multiprocessing.Pool()
        self.jobno = 0
        self.jobs = {}
        self.runner = multiprocessing.Process(target=job_runner, args=(self.pool, self.queue, self.outqueue))
        