We will be using time and logging (more convenient than print statements), so be sure to run this cell before proceeding .. 

In [1]:
import time, logging   
logging.basicConfig(
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    format='%(asctime)s.%(msecs)03d (%(threadName)s) %(message)s',
)

# Concurrency
## Threads

Basic setup for running a function in another thread. Notice that execution in the main program continues immediately after t.start(). Notice the difference when uncommenting the t.join(): t.join() forces the main thread to to wait for the t thread to finish.

In [4]:
import threading, time
def foo():
    logging.debug('In')
    time.sleep(0.1)
    print('Out')
    
t = threading.Thread(target=foo)

t.start()
logging.debug('Started')
#t.join()
logging.debug('Joined ..')

17:07:13.274 (Thread-7) In
17:07:13.277 (MainThread) Started
17:07:13.279 (MainThread) Joined ..


Out


The next cell shows that the threads indeed make the work appear to be done concurrently: the total time lapsed is just slightly more than the resting time of the longest resting worker. 

In [5]:
import time
from random import randint
import threading

def foo(n=0):
    rest = randint(30,100) * 0.01
    time.sleep(rest)
    logging.debug("n={}, rested {:3.2} seconds".format(n, rest))
    return rest
    
t = time.time()
workers = [threading.Thread(name=f"Worker-{i}", target=foo, args=(i,)) for i in range(5)]
for w in workers:
    w.start()
for w in workers:
    w.join()
logging.debug("Lapsed: {:.4f}".format( time.time() - t))

17:07:17.495 (Worker-2) n=2, rested 0.33 seconds
17:07:17.497 (Worker-1) n=1, rested 0.33 seconds
17:07:17.716 (Worker-4) n=4, rested 0.55 seconds
17:07:17.967 (Worker-3) n=3, rested 0.8 seconds
17:07:18.095 (Worker-0) n=0, rested 0.93 seconds
17:07:18.098 (MainThread) Lapsed: 0.9341


How would we collect the various resting times, i.e. to compute their sum?
For this you could use the concurrent.futures module: 

In [6]:
import concurrent.futures as cf
pool = cf.ThreadPoolExecutor(5)
futures = (pool.submit(foo, n) for n in range(5))
sum(f.result() for f in cf.as_completed(futures))

17:07:25.775 (ThreadPoolExecutor-0_1) n=1, rested 0.38 seconds
17:07:25.895 (ThreadPoolExecutor-0_0) n=0, rested 0.5 seconds
17:07:25.936 (ThreadPoolExecutor-0_3) n=3, rested 0.54 seconds
17:07:26.166 (ThreadPoolExecutor-0_4) n=4, rested 0.77 seconds
17:07:26.336 (ThreadPoolExecutor-0_2) n=2, rested 0.94 seconds


3.13

Using the ThreadPoolExecutor of concurrent.futures is convenient, but as always you have additional options. You could for instance add a mutable "out" argument to the function:

In [7]:
def foo2(n=0, out=None):
    rest = randint(30,100) * 0.01
    time.sleep(rest)
    logging.debug("n={}, rested {:3.2} seconds".format(n, rest))
    out[threading.current_thread()] = rest
    return rest
    
t = time.time()
result = {}
workers = [threading.Thread(name=f"Worker-{i}", target=foo2, args=(i, result)) for i in range(5)]
for w in workers:
    w.start()
for w in workers:
    
    w.join()
sum(result.values())

17:07:53.220 (Worker-2) n=2, rested 0.44 seconds
17:07:53.380 (Worker-4) n=4, rested 0.6 seconds
17:07:53.382 (Worker-1) n=1, rested 0.6 seconds
17:07:53.389 (Worker-0) n=0, rested 0.61 seconds
17:07:53.394 (Worker-3) n=3, rested 0.61 seconds


2.86

You could write a little decorator that generalizes this solution:

In [9]:
from functools import wraps
def save(out):
    def wrapper(fun):
        @wraps(fun)
        def wrapped(*args, **kwargs):
            out.append(fun(*args, **kwargs))
        return wrapped
    return wrapper

result = []
@save(result)
def foo(n=0):
    rest = randint(30,100) * 0.01
    time.sleep(rest)
    logging.debug("n={}, rested {:3.2} seconds".format(n, rest))
    return rest
workers = [threading.Thread(name=f"Worker-{i}", target=foo, args=(i,)) for i in range(5)]
for w in workers:
    w.start()
for w in workers:
    w.join()
sum(result)

17:08:21.384 (Worker-2) n=2, rested 0.38 seconds
17:08:21.596 (Worker-4) n=4, rested 0.59 seconds
17:08:21.705 (Worker-0) n=0, rested 0.7 seconds
17:08:21.884 (Worker-1) n=1, rested 0.88 seconds
17:08:21.995 (Worker-3) n=3, rested 0.99 seconds


3.54

This works because list.append is **threadsafe**: an append in one thread cannot be interrupted by actions on the list in other threads. This is not true for other actions on lists, or on other container. As we will see, a better solution is to always use threadsafe *containers* such as a Queue.

Python can guarantee list.append to be threadsafe because it uses a lock, the General Interpreter Lock to ensure that calls such as append cannot be interrupted by other threads. This GIL however, works on a very fine level, which means that even basic statements such as augmented assignment (which actually consist of the application of some operation followed by an assignment) are NOT threadsafe: 

In [10]:
import random, time
repeats = 10**6  # try different numbers
record = {'total':0}
def foo(n=0):
    #logging.debug('Entering foo' , n)
    for i in range(10**6):
        record['total'] += 1
    
t = time.time()
workers = [threading.Thread(name=f"Worker-{i}", target=foo, args=(i,)) 
                for i  in range(5,0,-1)]
for w in workers:
    #time.sleep(0.1) # try different delays
    w.start()
for w in workers:
    w.join()
logging.debug("Total: {} [should be: {}]- time: {:.4f}".format(record['total'], repeats * len(workers), time.time() - t))

17:08:25.796 (MainThread) Total: 2442714 [should be: 5000000]- time: 0.5901


In [None]:
As you can see, the totals are way off (and the print statements may also be interrupted ...) 

#### Locks
To deal with this, we need to use our own lock to avoid interruption of the += statements:

In [11]:
#%%timeit 
import random
lock = threading.RLock()
record = {'total':0}
def foo(n=0):
    #logging.debug('Entering foo' , n)  
    with lock:      # a lock needs to be acquired before the thread can proceed; it has to be released 
                    # to give other threads a chance; the with lock ... statement handles this in its entry and exit
        for i in range(10**6):
            record['total'] += 1
t = time.time()
workers = [threading.Thread(name=f"Worker-{i}", target=foo, args=(i,)) 
                for i  in range(5,0,-1)]
for w in workers:
    w.start()
for w in workers:
    w.join()
logging.debug("Total: {} - time: {:.4f}".format(record['total'], time.time() - t))

17:08:35.372 (MainThread) Total: 5000000 - time: 0.5950


By putting the lock outside the loop, we are basically forcing the threads to run one after the other. We could also put the lock on the individual assignment. That will dramatically increase the amount of thread switching and give us an idea of the cost of that switching. You can also compare this to the single threaded solution:

In [12]:
%%timeit
record = {'total':0}
for _ in range(5): 
    for _ in range(10**6): 
        record['total'] += 1

511 ms ± 32.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Conditions
Conditions are locks with some additional methods. Once acquired a process can decide to wait (i.e. release the lock but block until it gets notified from some other thread (or its timeout expires):

In [14]:
from threading import Thread, Condition, current_thread
cond = Condition()
bucket = set()

def consumer(bucket):
    catches = 0
    while catches < 10:
        with cond:
            cond.wait()
            if bucket:
                catches += 1
                bucket.pop()
                time.sleep(0.1)
                logging.debug('got {}'.format(catches))
            cond.notify()

def producer(bucket):
    n = 0
    while True:
        with cond:
            time.sleep(0.03)
            n += 1
            bucket.add(object())
            cond.notifyAll()
            cond.wait(1)
            if bucket:
                logging.debug(f"halted at {n}")
                break

c1 = Thread(name='c1', target=consumer, args=(bucket,))
c2 = Thread(name='c2', target=consumer, args=(bucket,))
p = Thread(name='p', target=producer, args=(bucket,))
c2.start()
c1.start()
p.start()
p.join()

17:09:34.276 (c2) got 1
17:09:34.410 (c2) got 2
17:09:34.543 (c2) got 3
17:09:34.676 (c2) got 4
17:09:34.809 (c2) got 5
17:09:34.941 (c2) got 6
17:09:35.073 (c2) got 7
17:09:35.206 (c2) got 8
17:09:35.339 (c2) got 9
17:09:35.472 (c2) got 10
17:09:35.605 (c1) got 1
17:09:35.740 (c1) got 2
17:09:35.872 (c1) got 3
17:09:36.005 (c1) got 4
17:09:36.139 (c1) got 5
17:09:36.271 (c1) got 6
17:09:36.404 (c1) got 7
17:09:36.537 (c1) got 8
17:09:36.670 (c1) got 9
17:09:36.803 (c1) got 10
17:09:37.836 (p) halted at 21


There are a number of ways to improve the above code, e.g. by passing the condition as an argument to the various threads, instead of using a global one. In general the threadsafe Queue offered in the queue module is a preferred way for dealing with coordination and data sharing: 

In [15]:
from threading import Thread
from queue import Queue, Full

queue = Queue()
def consumer(queue, t):
    catches = 0
    while True:
        queue.get()
        catches += 1
        time.sleep(t)
        queue.task_done()
        logging.debug('caught: {}'.format(catches))

def producer(queue, n):
    while n > 0:
        time.sleep(0.1)
        n -= 1
        try:
            queue.put(object(), timeout=1)
        except Full: 
            logging.debug(f"halted at {n}")
            break
    else:
        logging.debug(f"Finished production ..")
        queue.join()

workers = [Thread(name='c'+str(i), target=consumer, args=(queue, i* 0.6)) for i in range(1,3)]
p = Thread(name='p', target=producer, args=(queue, 20))
p.start()
for w in workers:
    #w.daemon = True
    w.start()
p.join()
logging.debug('Ready')

17:09:57.862 (c1) caught: 1
17:09:58.465 (c1) caught: 2
17:09:58.563 (c2) caught: 1
17:09:59.067 (c1) caught: 3
17:09:59.180 (p) Finished production ..
17:09:59.668 (c1) caught: 4
17:09:59.767 (c2) caught: 2
17:10:00.269 (c1) caught: 5
17:10:00.870 (c1) caught: 6
17:10:00.969 (c2) caught: 3
17:10:01.472 (c1) caught: 7
17:10:02.074 (c1) caught: 8
17:10:02.170 (c2) caught: 4
17:10:02.675 (c1) caught: 9
17:10:03.277 (c1) caught: 10
17:10:03.371 (c2) caught: 5
17:10:03.879 (c1) caught: 11
17:10:04.481 (c1) caught: 12
17:10:04.573 (c2) caught: 6
17:10:05.082 (c1) caught: 13
17:10:05.774 (c2) caught: 7
17:10:05.774 (MainThread) Ready


### Other synchronizers
#### Event
Conditions allow a process to wait for some condition and then acquire exclusive access. Events are used ust to wait for some event, when there is no need for exclusive use of resources: 

In [16]:
import threading

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('processing event')

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.is_set():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')

e = threading.Event()
threading.Thread(name='block', target=wait_for_event, args=(e,)).start()
threading.Thread(name='nonblock', target=wait_for_event_timeout, args=(e, 0.3)).start()

logging.debug('Pausing before calling Event.set()')
time.sleep(1.0)
e.set()
logging.debug('Event is set')

17:10:17.037 (block) wait_for_event starting
17:10:17.047 (nonblock) wait_for_event_timeout starting
17:10:17.048 (MainThread) Pausing before calling Event.set()
17:10:17.360 (nonblock) event set: False
17:10:17.361 (nonblock) doing other work
17:10:17.361 (nonblock) wait_for_event_timeout starting
17:10:17.672 (nonblock) event set: False
17:10:17.672 (nonblock) doing other work
17:10:17.673 (nonblock) wait_for_event_timeout starting
17:10:17.987 (nonblock) event set: False
17:10:17.988 (nonblock) doing other work
17:10:17.988 (nonblock) wait_for_event_timeout starting
17:10:18.053 (MainThread) Event is set
17:10:18.053 (block) processing event
17:10:18.053 (nonblock) event set: True
17:10:18.056 (nonblock) processing event


#### Barrier
Allows process to wait until enough others have reported themselves to be ready (by also waiting on that same barrier).

In [17]:
import threading

def worker(barrier):
    logging.debug(f'waiting for barrier with {barrier.n_waiting} others')
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        logging.debug('aborting')
    else:
        logging.debug(f'after barrier {worker_id}')

barrier = threading.Barrier(3)

threads = [threading.Thread(name='worker-%s' % i, target=worker, args=(barrier,))
    for i in range(3)]

for t in threads:
    logging.debug(f'{t.name} starting')
    t.start()
    time.sleep(0.1)

n = len([t for t in threading.enumerate() if t is not threading.main_thread()])

if n < barrier.parties:
    barrier.abort()

for t in threads:
    t.join()

17:10:22.034 (MainThread) worker-0 starting
17:10:22.036 (worker-0) waiting for barrier with 0 others
17:10:22.138 (MainThread) worker-1 starting
17:10:22.139 (worker-1) waiting for barrier with 1 others
17:10:22.242 (MainThread) worker-2 starting
17:10:22.243 (worker-2) waiting for barrier with 2 others
17:10:22.246 (worker-2) after barrier 2
17:10:22.246 (worker-0) after barrier 0
17:10:22.246 (worker-1) after barrier 1


#### Semaphore
A limited set of locks: waiting thread is unblocked whenever a lock comes available:

In [18]:
import random
from threading import Thread

def worker(s):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.current_thread().getName()
        logging.debug(f'Starting: {name}')
        time.sleep(0.1)
        logging.debug(f'Stopping: {name}')

s = threading.Semaphore(2)
workers = [Thread(name=f'Thread-{i}', target=worker, args=(s,)) for i in range(1,5)]
for w in workers: w.start()
for w in workers: w.join()

17:10:27.035 (Thread-1) Waiting to join the pool
17:10:27.038 (Thread-2) Waiting to join the pool
17:10:27.039 (Thread-3) Waiting to join the pool
17:10:27.039 (Thread-4) Waiting to join the pool
17:10:27.039 (Thread-1) Starting: Thread-1
17:10:27.043 (Thread-2) Starting: Thread-2
17:10:27.152 (Thread-1) Stopping: Thread-1
17:10:27.153 (Thread-2) Stopping: Thread-2
17:10:27.153 (Thread-3) Starting: Thread-3
17:10:27.154 (Thread-4) Starting: Thread-4
17:10:27.256 (Thread-4) Stopping: Thread-4
17:10:27.256 (Thread-3) Stopping: Thread-3


### Threadsafe storage
The threading.local class provides one way to reuse global names in a thread without affecting other threads. This is mainly useful if you do not want to adapt single-threaded code. 

In [20]:
import threading
loc = threading.local()
loc.foo = 42
def targ():
    loc.foo = 88
    logging.debug(loc.foo)
t = threading.Thread(target=targ)
t.start()
t.join()
logging.debug(loc.foo)
targ()
logging.debug(loc.foo)


17:11:38.463 (Thread-9) 88
17:11:38.467 (MainThread) 42
17:11:38.467 (MainThread) 88
17:11:38.468 (MainThread) 88


### contextvars
A more general solution for asynchronous processes, including coroutines running in a single thread, is the use of Context Variables. However, dote that the copy will be shallow, so changes to mutable objects will not be restricted to the copied version.

In [22]:
from contextvars import ContextVar, copy_context
x = ContextVar('x')
y = ContextVar('y')
x.set({'x':'spam'})
y.set(4)
def main(): 
    logging.debug(f"{x.get()}, {y.get()}")
    x.get()['x'] = 'ham' 
    y.set(9)
ctx = copy_context() 
ctx.run(main)
x.get(), y.get()

17:12:48.642 (MainThread) {'x': 'spam'}, 4


({'x': 'ham'}, 4)

## Multiprocessing
The following is an example of a simple producer-consumer pattern using multiprocessing. You cannot run this directly from the cell: save it as a Python module, say `multiprog.py` and run it like: `python -m multiprog`

In [None]:
import multiprocessing
import time, logging   
logging.basicConfig(
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    format='%(asctime)s.%(msecs)03d %(message)s')

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            logging.debug(f'{proc_name} awaiting next task ...')
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                logging.debug('Exiting {}'.format(proc_name))
                self.task_queue.task_done()
                break
            logging.debug('New task for {}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)

class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    logging.debug('Creating {} consumers'.format(num_consumers))
    consumers = [Consumer(tasks, results) for i in range(num_consumers)]
    
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 20
    for i in range(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)
    
    # Wait for all of the tasks to finish
    tasks.join()
    
    # Start printing results
    for _ in range(num_jobs):
        result = results.get()
        logging.debug(f'Result: {result}')

17:24:19.619 (MainThread) Creating 16 consumers
17:24:19.625 (MainThread) <Consumer(Consumer-1, initial)>
17:24:19.632 (MainThread) <Consumer(Consumer-2, initial)>
17:24:19.636 (MainThread) <Consumer(Consumer-3, initial)>
17:24:19.641 (MainThread) <Consumer(Consumer-4, initial)>
17:24:19.645 (MainThread) <Consumer(Consumer-5, initial)>
17:24:19.650 (MainThread) <Consumer(Consumer-6, initial)>
17:24:19.656 (MainThread) <Consumer(Consumer-7, initial)>
17:24:19.662 (MainThread) <Consumer(Consumer-8, initial)>
17:24:19.666 (MainThread) <Consumer(Consumer-9, initial)>
17:24:19.670 (MainThread) <Consumer(Consumer-10, initial)>
17:24:19.674 (MainThread) <Consumer(Consumer-11, initial)>
17:24:19.679 (MainThread) <Consumer(Consumer-12, initial)>
17:24:19.684 (MainThread) <Consumer(Consumer-13, initial)>
17:24:19.689 (MainThread) <Consumer(Consumer-14, initial)>
17:24:19.694 (MainThread) <Consumer(Consumer-15, initial)>
17:24:19.699 (MainThread) <Consumer(Consumer-16, initial)>


Using `concurrent.futures.ProcessPoolExecutor` is easier:

In [None]:
import concurrent.futures as cf
import multiprocessing as mp
import time, logging   
logging.basicConfig(
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    format='%(asctime)s.%(msecs)03d %(message)s',
)

def consumer(task):
    name = mp.current_process().name
    logging.debug('New task for {}: {}'.format(name, task))
    return task()

class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)

if __name__ == '__main__':
    # Establish communication queues
    tasks = mp.JoinableQueue()
    
    # Start consumers
    executor = cf.ProcessPoolExecutor()
    logging.debug('Creating {} consumers'.format(executor._max_workers))
    num_jobs = 20
    futures = [executor.submit(consumer, Task(i, i)) for i in range(num_jobs)]
    # Wait for all of the tasks to finish
    
    for f in cf.as_completed(futures):
        logging.debug(f'Result: {f.result()}')

   # or wait for all futures to be ready:      
    ok, failed = cf.wait(futures)
    print(*sorted(f.result() for f in ok), sep='\n')

## Asynchronous processing using coroutines

Input can be 'injected' into generators as yield statements are actually expressions returning whatever value `val` was provided to the `send(val)` method (which also moves process to next `yield`):

In [8]:
def gen():
    yield 1
    val = yield 2
    print(val)
    yield val
g = gen()
print(
    g.send(None), # returns 1, blocked at the 'yield 1' expression 
    g.send(None), # sets the 'yield 1' expression to None, moves to next yield and returns 2
    g.send(9)    # sets the 'yield 2' expression to 9, binds val to its value, moves to next yield and returns val
)

9
1 2 9


In [1]:
def countdown(n):
    while n > 0:
        val = yield n
        if val: n = val
        n -= 1
c = countdown(5)
next(c), c.send(3)

(5, 2)

Two other generator methods complete the groundswork for a coroutine: the `close` method and the `throw` method:

In [15]:
def gen():
    yield 1
    val = yield 2
    print(val)
    yield val
g = gen()
print(next(g))
g.throw(StopIteration)

1


RuntimeError: generator raised StopIteration

In [17]:
def gen():
    yield 1
    val = yield 2
    print(val)
    yield val
g = gen()
print(next(g))
g.close()
next(g)

1


StopIteration: 

In [19]:
def gen():
    try:
        yield 1
        val = yield 2
        print(val)
        yield val
    except GeneratorExit:
        print("Generator exit")
g = gen()
print(next(g))
g.close()

1
Generator exit


In [21]:
def gen():
    try:
        yield 1
        val = yield 2
        print(val)
        yield val
    except AttributeError:
        print("Attribute error")
g = gen()
print(next(g))
g.throw(AttributeError())

1
Attribute error


StopIteration: 

In [None]:
This small taxi sumulator shows the use of generators  

In [33]:
import random
import collections
import queue

DEFAULT_NUMBER_OF_TAXIS = 3
DEFAULT_END_TIME = 180
SEARCH_DURATION = 5
TRIP_DURATION = 20
DEPARTURE_INTERVAL = 5

Event = collections.namedtuple('Event', 'time proc action')

def taxi_process(ident, trips, start_time=0):  
    """Yield to simulator issuing event at each state change"""
    time = yield Event(start_time, ident, 'leave garage')  
    for i in range(trips):  
        time = yield Event(time, ident, 'pick up passenger')  
        time = yield Event(time, ident, 'drop off passenger')  
    yield Event(time, ident, 'going home')  

class Simulator:
    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()
        self.procs = dict(procs_map)

    def run(self, end_time):  # <1>
        """Schedule and display events until time is up"""
        # schedule the first event for each cab
        for _, proc in sorted(self.procs.items()):  
            first_event = next(proc) 
            self.events.put(first_event)  

        # main loop of the simulation
        sim_time = 0
        while sim_time < end_time:
            if self.events.empty():
                print('*** end of events ***')
                break

            current_event = self.events.get()  
            sim_time, proc_id, previous_action = current_event  
            print('taxi:', proc_id, proc_id * '   ', current_event)  
            active_proc = self.procs[proc_id]  
            next_time = sim_time + compute_duration(previous_action) 
            try:
                next_event = active_proc.send(next_time)  
            except StopIteration:
                del self.procs[proc_id]  
            else:
                self.events.put(next_event)
        else:
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))

def compute_duration(previous_action):
    """Compute action duration using exponential distribution"""
    if previous_action in ['leave garage', 'drop off passenger']:
        # new state is prowling
        interval = SEARCH_DURATION
    elif previous_action == 'pick up passenger':
        # new state is trip
        interval = TRIP_DURATION
    elif previous_action == 'going home':
        interval = 1
    else:
        raise ValueError('Unknown previous_action: %s' % previous_action)
    return int(random.expovariate(1/interval)) + 1

def main(end_time=DEFAULT_END_TIME, num_taxis=DEFAULT_NUMBER_OF_TAXIS,
         seed=None):
    """Initialize random generator, build procs and run simulation"""
    if seed is not None:
        random.seed(seed)  # get reproducible results

    taxis = {i: taxi_process(i, (i+1)*2, i*DEPARTURE_INTERVAL)
             for i in range(num_taxis)}
    sim = Simulator(taxis)
    sim.run(end_time)

main()

taxi: 0  Event(time=0, proc=0, action='leave garage')
taxi: 0  Event(time=2, proc=0, action='pick up passenger')
taxi: 1     Event(time=5, proc=1, action='leave garage')
taxi: 1     Event(time=10, proc=1, action='pick up passenger')
taxi: 2        Event(time=10, proc=2, action='leave garage')
taxi: 2        Event(time=12, proc=2, action='pick up passenger')
taxi: 1     Event(time=15, proc=1, action='drop off passenger')
taxi: 2        Event(time=18, proc=2, action='drop off passenger')
taxi: 2        Event(time=21, proc=2, action='pick up passenger')
taxi: 1     Event(time=22, proc=1, action='pick up passenger')
taxi: 2        Event(time=35, proc=2, action='drop off passenger')
taxi: 2        Event(time=39, proc=2, action='pick up passenger')
taxi: 1     Event(time=46, proc=1, action='drop off passenger')
taxi: 1     Event(time=55, proc=1, action='pick up passenger')
taxi: 0  Event(time=63, proc=0, action='drop off passenger')
taxi: 1     Event(time=75, proc=1, action='drop off passeng

One other new keyword; `yield from`

In [59]:
def chain(*iterables):
    for it in iterables:
        for i in it:
            yield i
list(chain(range(5), range(9,13)))

[0, 1, 2, 3, 4, 9, 10, 11, 12]

In [None]:
More convenient:

In [37]:
def chain(*iterables):
    for it in iterables:
        yield from it
list(chain(range(5), range(9,13)))

[0, 1, 2, 3, 4, 9, 10, 11, 12]

In [None]:
def treegen(n):
    while n > 0:
        

Two new keywords to specify coroutine functions: `async` and `await`:

In [40]:
import asyncio 
async def coro(name='world'): 
    print('Hello ...') 
    await asyncio.sleep(0.5) 
    print(f'... {name}!')  
# asyncio.run(coro()) if no loop is running yet; in notebook there is one running already, so use create_task
futures = [asyncio.create_task(coro(name)) for name in "Jan Piet Klaas Mien Klarien".split()]

Hello ...
Hello ...
Hello ...
Hello ...
Hello ...
... Jan!
... Klaas!
... Klarien!
... Piet!
... Mien!


Note: 1) total time remains almost constant when adding names, 2) the order in which tasks finish is undetermined.

Asynchronous processing is particularly useful in dealing with IO. To use coroutines it is important to use libraries that have themselves also been set up as coroutines, otherwise you will still run into blocking IO calls: any such routines have to yield when running into potential blocking calls. The `aiohttp` module is such a "coroutine-ready"  module: 

In [57]:
import aiohttp

async def print_preview(url):
    # connect to the server
    async with aiohttp.ClientSession() as session:
        # create get request
        async with session.get(url) as response:
            # wait for response
            response = await response.text()
            # print first 3 not empty lines
            lines = list(filter(lambda x: len(x) > 0, response.split('\n')))
            print('-'*80)
            for line in lines[:3]:
                print(line)
            print()

def print_pages():
    pages = [
        'http://textfiles.com/adventure/amforever.txt',
        'http://textfiles.com/adventure/ballyhoo.txt',
        'http://textfiles.com/adventure/bardstale.txt',
    ]
    loop = asyncio.new_event_loop()
    for page in pages:
        asyncio.create_task(print_preview(page))

print_pages()
print("Doing other stuff while waiting for pages to come in ...")

Doing other stuff while waiting for pages to come in ...
--------------------------------------------------------------------------------
              A   M I N D   F O R E V E R   V O Y A G I N G 
                                 (Infocom)


--------------------------------------------------------------------------------
from: the Asimov collection
                                 BALLYHOO
                                 (Infocom)

--------------------------------------------------------------------------------
from: the Asimov collection
                    T H E   B A R D ' S   T A L E   I 
                          (Tales Of The Unknown)



Below is another example, running two tasks and cancelling one when the other is done. Note the `async for` loop in `print_prime`: you need an async version of the for loop when the generator is not a regular generator but an asynchronous one like `prime_generator`

In [58]:
import random, asyncio

async def is_prime(n):
    if n < 2:
        return True
    for i in range(2, n):
        # allow event_loop to run other coroutine
        await asyncio.sleep(0.01)
        if n % i == 0:
            return False
    return True

async def prime_generator(n_prime):
    counter = 0
    n = 0
    while counter < n_prime:
        n += 1
        # wait for is_prime to finish
        prime = await is_prime(n)
        if prime:
            yield n
            counter += 1

async def check_email():
    while True:
        try:
            n = random.randrange(1,18)
            print(f'{n} new email')
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            print("Cancelling email checks ..")
            break

async def print_prime(n):
    async for prime in prime_generator(n):
        print('new prime number found:', prime)

async def main():
    email = asyncio.create_task(check_email())
    prime = asyncio.create_task(print_prime(15))
    await asyncio.wait({prime})
    print ('Prime generator finished ...')
    email.cancel()

asyncio.create_task(main())

<Task pending coro=<main() running at <ipython-input-58-010b8608ef9f>:38>>

13 new email
new prime number found: 1
new prime number found: 2
new prime number found: 3
new prime number found: 5
new prime number found: 7
new prime number found: 11
new prime number found: 13
7 new email
new prime number found: 17
new prime number found: 19
8 new email
new prime number found: 23
new prime number found: 29
17 new email
new prime number found: 31
17 new email
new prime number found: 37
17 new email
new prime number found: 41
14 new email
new prime number found: 43
Prime generator finished ...
Cancelling email checks ..
