# An introduction to CSP-style concurrency with _aiochan_

## Setting the stage

### What is concurrency

*Aiochan* is a python library for CSP-style concurrency in python. So the first questions to ask are: what is concurrency? Why do we need concurrency? Aren't we doing just fine writing non-concurrent python?

By far the most important reason for want of concurreny is that *concurrency enables your program to deal with multiple (potentially different) things all at once*. For example, suppose you are writing a webserver. Let's say that your code goes like this:

This is all very good but if this is the *outmost loop* of your server, then it is easy to see that at most one client can be served at any one instant. If a user is doing an operation that takes, say, ten minutes to complete, then it is not too far-fetched to assume that other users will not be too happy.


Yes, I hear you say, "I have been writing python webservers using non-concurrent codes not too different from above for a long time, and it is definitely *not* true that only one client is served at any one instant". Well, most likely you are using some web frameworks and it is the framework that controls the outmost loop. In other words, your framework is managing all your concurrency whilst presenting a non-concurrent façade to you.

There is one big caveat that we need to clarify. Above we have defined concurrency as the ability to "deal with multiple things at once". Let's say that you have all the things *you* need to do written on a to-do list. Concurrency only means that you do not need to take the first item off the list, do it *and wait for the result*, then start with the second item. Your first item might well be "watch the news at 9am, then watch the news at 9pm, and find out what new things have happened since 9am". In this case non-concurrent behaviour means sitting there doing nothing after watching the 9am news until 9pm. As long as you *switch context* and starting doing something after the morning news you are in the realm of concurrent behaviour. Note that *you need neither a group of cronies to whom you can delegate your news watching, nor the rather unusual ability to watch two programs at once and perfectly understanding both, to enable concurrent behaviour*. Having a group of cronies doing stuff for you is a form of *parallelism*. Concurrency *enables* parallelism (it is useless to have many cronies if you need to wait for any one of them to complete their work before assigning work to the next one), but parallelism is not *necessary* for concurrency. Parallelism usually (but not always) makes things go faster. Concurrency can also make things go faster *even without parallelism*. In the case of computers, you *do not* need to have multiple processors to benefit from concurrency (in the case of python, this point is actually quite acute--see later when we discuss the GIL).

Now back to our webserver programs. How do you write a concurrent outmost loop then? By analogy with the todo list example, you are probably thinking along something like:

It is not impossible to complete a program like this, but you are right to feel like juggling ten balls at once -- it is difficult, the solution is brittle, and you probably don't want to do this everyday (unless you are a professional juggler).

Here is the place for concurrent libraries, frameworks, or language constructs: as long as you follow certain rules, they enable you to have the benefit of concurrency without the need of professional juggling training. *Aiochan* is such a library.

To recap:

* Concurrency enables you to deal with multiple things at once.
* Concurrency has the potential to decrease latency and increase throughput.
* Concurrency is not parallelism but enables it.
* Concurrency frameworks, libraries and language constructs enable *you* to take advantange of concurrency without writing complicated and brittle code.

### What is CSP (or Communicating Sequential Processes)?

We have said that *aiochan* is a CSP-style concurrency library. It turns out we can go a long way towards understanding how to use *aiochan* by just understanding what *Communicating Sequential Processes* mean. (The name CSP first appears in a paper by C.A.R. Hoare in 1978, and some ideas of CSP exist even before that. So CSP is actually quite an old idea.)

Let's begin with the last word: *processes*. It seems we have immediately hit upon opportunities for great confusion: python has many packages that have "process" on their names, your operating system has something called "processes" or maybe not, depending on what OS you are using, etc. And they are not what we want to talk about now. *Process* is, unfortunately, one of thoses words in computing that is over-used in many subtly different ways. Here we shall be intentially vague: for us, a *process* is just a group of code that executes fairly independently from the outside world, a group of code that you can mentally think of as a whole entity. The quintessential example is that of a function: a function as a process goes from taking in arguments from the caller, and ends when returning to the caller.

What is *sequential* process, then? If you read the word literally, it means that statements or expressions in the process are executed or evaluated in strict order, from top to bottom. In python this is fairly accurate, if special provision is made for control statements such as `while`, `for` and stuff. A better word might be *deterministic*: even in the presence of control statements, if you know all your variables, it is possible to predict precisely what the next statement or expression to be executed or evaluated is. An example is

In [1]:
x = 10
x += 10
x *= 2
x -= 7
print(x)

33


The above calculates `((10 + 10) * 2) - 7 = 33` and is sequential. If your programming language instead calculates `((10 * 2) + 10) - 7 = 3` then your programming environment has some serious issues. So sequential programs are good, it is what we as humans expect.

However, it is actually very easy to have non-sequential programs. Let's first refactor the above program:

In [2]:
x = 10

def f():
    global x
    print('we start with the value', x)
    x += 10
    x *= 2
    x -= 7
    return x

print(f())

we start with the value 10
33


So far so good. But suppose you have two instances of the `f` process executing concurrently. Then you will have troubles. It is actually possible to illustrate this with python's `threading` module, but for simplicity we will just give an illustration in pseudo code:

In the above snippet, for the two functions, the order of the execution of statements within the two functions are as listed: from top to down. Note that though within each function itself the sequence is the same as before, when the two functions are taken together statements are interleaved.

We will get the results:

Now if you are only in control of `f1` you will be very much baffled. Worse, as you can try for yourself, by tweaking the order further you can get other results. The interleaving makes the execution, and hence the result, non-deterministic. Such processes are *not* considered sequential and not allowed (or, not encouraged) in CSP-style programming.

As you probably can see, the fix is actually not that hard: *don't modify global variables*. Any modifications you do must be local to your process (or in our case, function). You should try and see for yourself that if you follow this advice, then whatever the order of interleaving of the two functions, the results are not affected: they are deterministic and are thus considered *sequential*.

In functional languages, it is sometimes enforced that you cannot make any modifications at all --- any computation you do just returns new values without stepping on the old values. However, in python, this is both unnecessary and unnatural. We only need to disallow operations that can interfere with other processes.

Now, you ask, what disturbed minds would write something like our `f`? Well, be assured that that people who wrote `f` habour no ill intensions. The reason that they reach for global variables is most often because that they want to do some form of input/output (i.e., IO, note that the concept of IO is much broader than file or network accesses): we need to get stuff into our function to compute, and we need to notify others who are also computing what the results of our computations are.

If you think about it, IO is the whole point of computation: *we*, at our keyboards (or touch screens, or whatever your newest VR/AR interaction devices), input something for the computer to compute, the the computer returns the results to *us*. So programs without IO is pretty useless. And using global variables in this case is rather convenient: we just take something (input) that is put inside predetermined boxes (global variables), and when we are done, just put them back. Others, by inspecting the boxes, will know what we have done. By the way, at the lowest level, this is how our current computer architecture dictates. A "pure" function that "returns" something without reference to a box location *is* an illusion. But unfortunately, as we have seen, this crude arrangement results in people stepping on each other and chaos if there are no rules.

In the older mainstream languages, the solution is that we put stickers on the boxes when we want to operate on them: "in use --- don't touch until I'm done!" --- locks, semaphores, etc. This solves the problem, but using locks and similar *concurrency primitives* turn out to be rather delicate and error-prone. A classical example is that you and your friend both want to operate on two boxes A and B. You go forward and put your sticker on A, meanwhile your friend has already put his sticker on B. Now both of you are stuck: unless one of you back off, no one can go forward. Preventing such *deadlocks* means having a whole lot of disciplines and guidelines to follow --- more training to become professional jugglers!

Is there a way out of this? Is there a way to avoid arduous juggler training while still doing concurrency? Yes, and this is what the *Communicating* part of CSP says.

The basic idea is this: when doing computations that must involve read-write IO, we do away with boxes. Instead, we specify *meeting points*, or *points of rendezvous*, for which IO is done. For example, you and your friend both want a book. Instead of putting the book in a box so that both of you can do whatever you want with it whenever you want (and risking the book to be stolen), you just take the book away and do your processing with it. After you are satisfied, you and your friend *meet together* and you *hand off* the book to your friend. Once your friend has the book, she can do anything she wants with it, while you can no longer do anything with it at all. There is no longer any stepping over. If you *really* want your book again, you must arange with your friend for a hand-off again.

Such an arrangment is at least familiar (it is how private properties work). Note that it is principally different from putting stickers on boxes: you just take the book and go off! And the amazing thing is, this arrangement actually solves the majority of concurrency problems. To each *sequential process*, things look as if we are in a non-concurrent program: the only concurrent parts are when we want to do IO, we arrange for rendezvous points. No stickers. No locks. Much less opportunities for deadlocks.

Another very important benefit of communicating over rendezvous points is the respect of privacy, or abstraction barriers. Consider the box-book example again. If we want to use stickers to solve that problem successfully, you and your friend both have to agree on a strategy (for example always start with box A), in other words, you are both opening yourselves up to each other, letting the other know things about how *you* operate, which you may be reluctant to due to various reasons. By contrast, we will find that when using rendezvous points to communicate, often the only thing the other parties need to know is the existence of the rendezvous points. The abstraction barrier is respected!

The rest of this tutorial will go into much more details in how to go about setting up and honouring rendezvous appointments, which in the context of *aiochan*, is called a *channel*, or `Chan`. But first, some environment setups have to be done first, which we will deal with in the next section.

To recap, in the context of CSP (*Communicating Sequantial Processes*):

* Processes are group of codes that can be considered as an independent entity.
* Sequential processes are processes that operate in deterministic order producing deterministic results, without danger of stepping over each other.
* Communicating sequantial processes are sequantial processes that do their (necessary) IO by using rendezvous points only.
* CSP style concurrency enables natural program logic resembling non-concurrent codes, respects abstraction barriers, while at the same time eliminating most dangers of deadlocks and stepping over.

## The basics

### Facing the sad reality of python concurrency

Programming languages begin life in different ways, with different goals in mind when their inventors started building them. Unfortunately, concurrency isn't one of the goals when python started (unlike, say, Erlang and Elixir). As a result, concurrency do feel a bit foreign, cumbersome and unnatural in python. This is just something we have to live with.

In particular, when you start your python interactive interpreter, or when your python script starts to run, your code is in a single-threaded process where statements are executed one by one. What does "process" and "thread" mean here?

"Process" here means something different from the "process" in CSP: here it refers to an instance of your program that is executed for you by the operating system. A process has its own memory space and file descriptors provided by the operating system, and these are by default isolated from the other processes on the same system. A process may have one or more *threads of execution* running at the same time, with each thread executing its part of the code in sequence, but different threads can share memory (and potentially step onto each other, as we have previously seen), and due to interleaving of execution, the end result is often non-deterministic (or what we have previously refered to as "non-sequential").

Python supports threads by providing the `thread` and `threading` module, together with various locking primitives for concurrency control. However, even with all the locking stuff in place, threads in python has greatly diminished value compared to other languages. The reason is that the default python implementation has something called the global interpreter lock, or GIL, which roughly says that within each python process, at most one statement can be executed by the python runtime. Now this immediately excludes any possibility of utilising multiple process cores *within python itself* (as we will see later, if you step out of python there are ways around it). And locking has overheads. And python schedules thread execution in a somewhat unintuitive manner which results in it often favouring slow (or CPU-intensive) operations over fast ones, which is opposite of what most operating system does. The end result is that python code utilizing threads often runs *slower* than those not using threads. This picture isn't very encouraging.

Python also supports spawning processes within python itself using the `multiprocessing` module. However this module isn't that useful either: remember that by default processes don't share memory or resources, so inter-process communicating is restricted and cumbersome. And the overhead of using threads is even greater, even though the GIL restriction no longer applies because now we are in different processes.

### Coroutines and event loops

So multi-threading in python doesn't really have much advantage over single-threaded code due to the GIL, and multi-processing is restricted. But we still need to do concurrency *control* in python. Considering the situation we are in, it seems the best way to go forward is to have something that is single-threaded (lower overhead, hopefully) that can imitate multiple threads of execution. And there is something just doing that built into the language since python 3.4, it is called `asyncio`.

Compared to plain python, `asyncio` utilizes two further keywords (since python 3.5, at least): `async` and `await`. `async` is applied to functions (and methods). An example:

In [1]:
async def silly(x):
    print('executing silly with', x)
    return x+1

It seems normal enough. But when you call it:

In [2]:
silly(2)

<coroutine object silly at 0x7f1172facaf0>

What? It seems that the function doesn't execute but instead returns something called a coroutine. That is right. Calling async function is a two step process: first you call it normally and obtain a coroutine, and the coroutine needs to be given to some scheduler, or *event loop*, for execution. The function `aiochan.run` will do the scheduling and executing part for you:

In [3]:
import aiochan as ac
ac.run(silly(2))

executing silly with 2


3

Every call to `ac.run` creates a new event loop, which runs until the passed in async function finishes executing. All these ceremony sets up the stage for using the `await` keyword:

In [19]:
import asyncio

async def count(tag, n_max=3):
    i = 0
    while i < n_max:
        await asyncio.sleep(0.5)
        i += 1
        print(tag, i)

ac.run(count('counter:'))

counter: 1
counter: 2
counter: 3


 Whatever after the `await` keyword must be an *awaitable*, which roughly says that "this computation here will eventually produce something, and notify me when that something is ready". And when the interpreter encounters the `await` statement, execution is suspended for the current function and returns to the event loop, which may also schedule the execution of the coroutine producing the awaitable. When the awaitable is ready, the suspended function resumes with the value of the awaitable, if any. (If you think that this is somewhat similar to generators, you are right).

Let's see what happens when we run two counters (remember that the function `count`, when called, produces a coroutine, which is an awaitable):

In [15]:
async def main():
    await count('counter a:')
    await count('counter b:')
    
ac.run(main())

counter a: 1
counter a: 2
counter a: 3
counter b: 1
counter b: 2
counter b: 3


Hmm ... this doesn't look very concurrent: the second counter starts counting only after the first counter finishes. But this is what we asked for: we awaited for the completion of the first counter!

To make the two counters execute together, we use the `aiochan.go` function, which takes a coroutine and schedules it for execution but do not wait for the result:

In [16]:
async def main():
    ac.go(count('counter a:'))
    await count('counter b:')
    
ac.run(main())

counter b: 1
counter a: 1
counter b: 2
counter a: 2
counter b: 3
counter a: 3


It looks much better now. Note that you *must* pass the coroutine to `aiochan.go` for execution: calling the function itself has no effect (other than a possible warning):

In [21]:
async def main():
    count('counter a:')
    await count('counter b:')
    
ac.run(main())

  


counter b: 1
counter b: 2
counter b: 3


What happens we replace both counter calls with `aiochan.go`?

In [22]:
async def main():
    ac.go(count('counter a:'))
    ac.go(count('counter b:'))
    
ac.run(main())

Nothing happens! Remember that `ac.run` returns when the coroutine passed in returns, and our `main` returns after having two counters scheduled for execution, without actually executing them!

To make this clearer, note that if we also sleep in the `main` function, the two counters will be executed (while `main` is sleeping):

In [23]:
async def main():
    ac.go(count('counter a:'))
    ac.go(count('counter b:'))
    await asyncio.sleep(3)
    
ac.run(main())

counter a: 1
counter b: 1
counter a: 2
counter b: 2
counter a: 3
counter b: 3


If you have done thread-based programming before, you may think now that asyncio is no different from threading. This is not true. To illustrate, consider:

In [25]:
async def main():
    async def work():
        print('do work')
    print('before')
    ac.go(work())
    print('after')

ac.run(main())

before
after
do work


What you get is *always* `before`, `after`, and `do work`, in that order. In some languages, using thread, it is even possible to get garbled texts, since the various calls to `print` (or whatever it is called) can step on each other. By contrast, asyncio event loops uses only a single thread, and it is guaranteed that unless you have a call to `await`, nothing else will get in your way when you are executing. (If you read the documentations for asyncio, you will find that even things like locks and semaphores are marked "not thread-safe" --- they are only safe with respect to the non-interrupting guaruantees provided by asyncio!)

So asyncio guarantees "no break unless await". What if you want to manually return control to the event loop? You can await for `aiochan.nop`:

In [31]:
async def main():
    async def work():
        print('do work')
    print('before')
    ac.go(work())
    await ac.nop()
    print('after')

ac.run(main())

before
do work
after


Note the order. Also note that in this case, the order *isn't* guaranteed --- that you always get this order back should be considered an implementation detail.

Now you know how to make coroutines and run them. That roughly corresponds to the "sequential processes" that we talked about before (and remember not to touch global states). In the next section, we will learn about the "communicating" part.

To recap:

* Python was not designed for concurrency, so things are a bit clumsy.
* There are a number of ways to do concurrency in python: processes, threads, and asyncio event-loops.
* Asyncio event loops are single-threaded schedulers responsible for executing coroutines.
* Coroutine functions are made with `async` and `await` keywords. No interleaving of execution can occur unless an `await` keyword is encountered execution.

Useful functions:

* `aiochan.run`
* `aiochan.go`
* `aiochan.nop`
* `asyncio.sleep`

There is also `aiochan.run_in_thread`, which is recommended for scripts. `aiochan.run` is recommended when programming interactively.

#### Appendix: running async functions without aiochan

In our exposition we used the function `aiochan.run` to run all our async functions. How to do it with native python libraries? We use asyncio event loops to do the execution:

You can try running the above code. What you get depends on how you run it: if you run it in a script or in an interactive interpreter, then you will see printed:

However, if you run it in jupyter notebooks or jupyterlab, there is a possibility that you will get an exception thrown at your face (or maybe not, it all depends):

So we already have a loop running? Ok, it is still possible to proceed in this case

In [5]:
import asyncio

loop = asyncio.get_event_loop()
result = loop.create_task(silly(2))
print('the result is', result)

the result is <Task pending coro=<silly() running at <ipython-input-3-709c439f84a4>:1>>
executing silly with 2


So apparently our async function is executed now, but now we only get a task back, not the result itself! To get the result:

In [6]:
result.result()

3

... which seems to be fine, but that is only because you are executing it interactively. If you put this line directly below `print`, you most certainly will get:

which tells you that you are calling the function too soon! You will need to wait a little bit (but if you do it wrong it will deadlock), or you create some future and set the result use a callback. If you really want to figure it out you can read the python documentations.

So now you believe me when I say that doing concurrency in python feels foreign, cumbersome and unnatural. Running everything in a script is a solution, but one of the appeal of python is its interactivity.

You can also replace calls to `aiochan.go` with `asyncio.get_running_loop().create_task`, but ... what a mouthful! `asyncio.ensure_future` is also a possibility, but in addition to its questionable name, its use in spawning tasks for execution is deprecated in python 3.7 in favour of `asyncio.create_task`. However, `asyncio.create_task` doesn't exist prior to python 3.7. So ... if you intend to use `aiochan` at all, we urge you to stay with `aiochan.go`.

### Channels

Now we know how to make corountines and schedule them for execution. As we said before, for the coroutines to do IO safely and in a principled way, we will use points of rendezvous, which in `aiochan` is called `Chan`, for "channel". Constructing a channel is easy:

In [1]:
import aiochan as ac
import asyncio

c = ac.Chan()
c

Chan<_unk_0139643078943416>

Now we try to use it. Suppose we have a producer that can be tasked to producing items, and a consumer that needs to consume items. The IO in this case is obvious: the producer outputs, and the consumer inputs, and these two input and output are linked in a point of rendezvous. In code:

In [2]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        await c.put('product ' + str(i))
        
async def consumer(c):
    while True:
        product = await c.get()
        print('obtained:', product)
        
async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    
ac.run(main())

obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.


We see that `Chan` has two methods `put` and `get`. `put` is used to put stuff into the channel, and `get` is for getting stuff out: obvious enough. Both of the return awaitables, signaling that doing IO with channels involves potential waiting, since two parties need to come together for either of them to proceed. Awaiting a `get` produces the value that is just `put` into the channel.

In `aiochan`, you cannot `put` something into a channel that turns out to be `None` (other falsy values such as `0`, `0.0`, `[]`, `{}`, `False` are ok). The reason is that a channel can be closed, and we need to signal somehow to the users of channel that it is closed, and we use `None` for the signal. If we don't do this, then the next possibility is throwing exceptions, but we have found that throwing exceptions in async code can be *very* confusing. So, following Clojure's core.async, we don't allow `None` values in channels.

Speaking of closing channels, note that in our previous example, `main` just walks away when it is determined that everyone should go home. But `producer` and `consumer` are just left there dangling, which is very rude of `main`. Closing the channel is a polite way of notifying them both:

In [3]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product ' + str(i))
        if not still_open:
            print('producer goes home')
            break
            
        
async def consumer(c):
    while True:
        product = await c.get()
        if product is not None:
            print('obtained:', product)
        else:
            print('consumer goes home')
            break
        
async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer
    
ac.run(main())

obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home


We see that after the channel is closed with `c.close()`, awaiting a `get` will produce a `None`, whereas awaiting a `put` will produce `False` (before closing it will return `True`).

By the way, on python 3.6 and above, we can simplify our consumer a bit: here we are just iterating over the values in the channel one by one, which is exactly what an asynchronous iterator does. So we can write

In [4]:
async def producer(c):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product ' + str(i))
        if not still_open:
            print('producer goes home')
            break
            
        
async def consumer(c):
    async for product in c:
        print('obtained:', product)
    print('consumer goes home')
        
async def main():
    c = ac.Chan()
    ac.go(producer(c))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer
    
ac.run(main())

obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home


It is also no longer necessary to test whether `product` is None: the iteration stops automatically when the channel is closed.

Note that in `aiochan`, a channel is just an object --- in some circles, this is called a "first-class construct". This means that it can be passed as arguments to functions (which we just did), returned from functions, or storing them in a datastructure for later use (unlike, say, in Erlang). For example, we can make our producer producing the channel instead:

In [5]:
async def producer():
    c = ac.Chan()
    
    async def work():
        i = 0
        while True:
            await asyncio.sleep(0.1) # producing stuff takes time
            i += 1
            still_open = await c.put('product ' + str(i))
            if not still_open:
                print('producer goes home')
                break
                
    ac.go(work())
    return c
            
        
async def consumer(c):
    async for product in c:
        print('obtained:', product)
    print('consumer goes home')
        
async def main():
    c = await producer()
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer
    
ac.run(main())

obtained: product 1
obtained: product 2
obtained: product 3
obtained: product 4
obtained: product 5
It is late, let us call it a day.
consumer goes home
producer goes home


But in this case, *not* letting the producer producing its own channel actually has benefit: we can easily have several producers working in parallel:

In [6]:
async def producer(c, tag):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product %s from %s' % (i, tag))
        if not still_open:
            print('producer %s goes home' % tag)
            break
            
        
async def consumer(c):
    async for product in c:
        print('obtained:', product)
    print('consumer goes home')
        
async def main():
    c = ac.Chan()
    for i in range(3):
        ac.go(producer(c, 'p%s' % i))
    ac.go(consumer(c))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer
    
ac.run(main())

obtained: product 1 from p0
obtained: product 1 from p1
obtained: product 1 from p2
obtained: product 2 from p0
obtained: product 2 from p1
obtained: product 2 from p2
obtained: product 3 from p0
obtained: product 3 from p1
obtained: product 3 from p2
obtained: product 4 from p0
obtained: product 4 from p1
obtained: product 4 from p2
obtained: product 5 from p0
obtained: product 5 from p1
obtained: product 5 from p2
It is late, let us call it a day.
consumer goes home
producer p0 goes home
producer p1 goes home
producer p2 goes home


This is call *fan-in*: different producers fanning their products into the same channel. We can also have *fan-out*:

In [7]:
async def producer(c, tag):
    i = 0
    while True:
        await asyncio.sleep(0.1) # producing stuff takes time
        i += 1
        still_open = await c.put('product %s from %s' % (i, tag))
        if not still_open:
            print('producer %s goes home' % tag)
            break
            
        
async def consumer(c, tag):
    async for product in c:
        print('%s obtained: %s' % (tag, product))
    print('consumer %s goes home' % tag)
        
async def main():
    c = ac.Chan()
    for i in range(3):
        ac.go(producer(c, 'p%s' % i))
    for i in range(3):
        ac.go(consumer(c, 'c%s' % i))
    await asyncio.sleep(0.6)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer
    
ac.run(main())

c0 obtained: product 1 from p0
c1 obtained: product 1 from p1
c2 obtained: product 1 from p2
c0 obtained: product 2 from p0
c1 obtained: product 2 from p1
c2 obtained: product 2 from p2
c0 obtained: product 3 from p0
c1 obtained: product 3 from p1
c2 obtained: product 3 from p2
c0 obtained: product 4 from p0
c1 obtained: product 4 from p1
c2 obtained: product 4 from p2
c0 obtained: product 5 from p0
c1 obtained: product 5 from p1
c2 obtained: product 5 from p2
It is late, let us call it a day.
consumer c0 goes home
consumer c1 goes home
consumer c2 goes home
producer p0 goes home
producer p1 goes home
producer p2 goes home


We see that works are divided between producers and consumers evenly automatically. Even if producers produce things at different rate, this fan-in, fan-out pattern will automatically do the right thing:

In [10]:
async def producer(c, tag, interval):
    i = 0
    while True:
        await asyncio.sleep(interval) # producing stuff takes time
        i += 1
        still_open = await c.put('product %s from %s' % (i, tag))
        if not still_open:
            print('producer %s goes home' % tag)
            break
            
        
async def consumer(c, tag):
    async for product in c:
        print('%s obtained: %s' % (tag, product))
    print('consumer %s goes home' % tag)
        
async def main():
    c = ac.Chan()
    for i in range(3):
        ac.go(producer(c, ('p%s' % i), interval=(i+1)*0.1))
    for i in range(3):
        ac.go(consumer(c, 'c%s' % i))
    await asyncio.sleep(1)
    print('It is late, let us call it a day.')
    c.close()
    await asyncio.sleep(0.2) # necessary to wait for producer
    
ac.run(main())

c0 obtained: product 1 from p0
c1 obtained: product 1 from p1
c2 obtained: product 2 from p0
c0 obtained: product 1 from p2
c1 obtained: product 3 from p0
c2 obtained: product 2 from p1
c0 obtained: product 4 from p0
c1 obtained: product 5 from p0
c2 obtained: product 2 from p2
c0 obtained: product 3 from p1
c1 obtained: product 6 from p0
c2 obtained: product 7 from p0
c0 obtained: product 4 from p1
c1 obtained: product 8 from p0
c2 obtained: product 3 from p2
c0 obtained: product 9 from p0
It is late, let us call it a day.
consumer c1 goes home
consumer c2 goes home
consumer c0 goes home
producer p1 goes home
producer p0 goes home
producer p2 goes home


We see that jobs are still divided evenly between consumers, but more jobs come from faster producers.

To recap:

* The construct for inter-coroutine communication is the channel.
* Getting and putting to channels complete the rendezvous between coroutines.
* Channels are first-class construct: we can pass them around, return them, or store them.
* Channels can be closed.
* `None` values are not allowed on channels.
* Strategically closing channels can be used for execution control.
* Fan-in and fan-out can be used for distributing works among different coroutines.

Useful constructs:

* `aiochan.Chan`
* `aiochan.Chan.put`
* `aiochan.Chan.get`
* `aiochan.Chan.close`

There is also `aiochan.Chan.put_nowait` and `aiochan.Chan.get_nowait`, but their use is best reserved for advanced and rather low-level stuff. In most cases the waiting variants suffice. (In particular, in golang there is no non-waiting channel operations on channels --- if you want non-waiting behaviour you must use `select`.)

### `select`: the paramount operation

We have seen that with channels and put and get operations, we can already build rather complicated systems: the system already has great expressive power. Now we introduce the operation `select`, which hugely increases the expressive power further.

Suppose we have channels `c1`, `c2` and `c3`. If we write

then `result` will hold the result of one and only one `get` operation on `c1`, `c2` and `c3`. And this is the important point: *only one operation will be attempted*! This means that if we have several operations that can be completed at the same time, only one will complete, and the non-completing ones *will not run at all*. This is in constrast with, say, `asyncio.wait`.

Let's have some examples:

In [1]:
import asyncio
import aiochan as ac

async def main():
    c1 = ac.Chan(name='c1').add(1, 2, 3).close()
    c2 = ac.Chan(name='c2').add('a', 'b', 'c').close()
    c3 = ac.Chan(name='c3').add('x', 'y', 'z').close()
    
    result, chan = await ac.select(c1, c2, c3)
    print('the result is', result)
    print('the result is from', chan)
    
    async for v in c1:
        print('c1 still has value:', v)
    
    async for v in c2:
        print('c2 still has value:', v)
        
    async for v in c3:
        print('c3 still has value:', v)

ac.run(main())

the result is a
the result is from Chan<c2 140435052033424>
c1 still has value: 1
c1 still has value: 2
c1 still has value: 3
c2 still has value: b
c2 still has value: c
c3 still has value: x
c3 still has value: y
c3 still has value: z


Here we have also used some new operations on channels:

* We can give names to channels by `Chan(name='some name')`,
* `ch.add(...)` can add elements to channels on the background when it is possible to do so,
* `close` closes the channel immediately, but all pending puts (here those by `add`) will still have an opportunity to complete,
* `add` and `close` can be chained as both these methods return the channel itself.

And for our `select`:

* it returns a tuple: the value together with the channel that is involved,
* if several operations can all be completed, which one is completed is non-deterministic (try running the above script several times to see).

Actually, it is not only get operations that can be `select`ed:

In [50]:
async def receive(c):
    r = await c.get()
    print('received', r, 'on', c)

async def main():
    c1 = ac.Chan(name='c1')
    c2 = ac.Chan(name='c2')
        
    ac.go(receive(c1))
    ac.go(receive(c2))
    
    await ac.nop()

    result, chan = await ac.select((c1, 'A'), (c2, 'B'))
    print('select completes on', chan)

ac.run(main())

select completes on Chan<c2 140377518153552>
received B on Chan<c2 140377518153552>


we see that if we give an argument like `(chan, value)` it is interpreted as akin to a put operation `chan.put(value)`. Again, one and only one will complete. You can also mix get operations with put operations.

Also, if you are careful, you will have noticed that we have inserted a `nop` above. If it is not there, the `select` will always complete on `c1`. You may want to think about why.

The more non-trivial the application is, the more use of `select` you can find. One of its simplest use is for stopping many workers at once:

In [7]:
async def worker(out, stop, tag):
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.1)
        result, c = await ac.select(stop, (out, '%s-%s' % (tag, i)), priority=True)
        if c is stop:
            print('%s stopped' % tag)
            break
            
async def consumer(c, stop):
    while True:
        result, c = await ac.select(stop, c, priority=True)
        if c is stop:
            print('consumer stopped')
            break
        else:
            print('received', result)

async def main():
    c = ac.Chan()
    stop = ac.Chan()
    for i in range(3):
        ac.go(worker(c, stop, 'worker%s' % i))
    ac.go(consumer(c, stop))
    await asyncio.sleep(0.6)
    stop.close()
    await asyncio.sleep(0.2)

ac.run(main())

received worker0-1
received worker1-1
received worker2-1
received worker0-2
received worker1-2
received worker2-2
received worker0-3
received worker1-3
received worker2-3
received worker0-4
received worker1-4
received worker2-4
received worker0-5
received worker1-5
received worker2-5
consumer stopped
worker0 stopped
worker1 stopped
worker2 stopped


In this simple situation the signal to stop can actually be sent by closing the fan-in-fan-out channel directly, but in more complicated situations (for example, closing down in response to *any* of a range of conditions) `select` is essential.

In this example we have also seen that `select` takes an argument `priority`, which defaults to `False`. Here we set it to true, which means that when several operations become completable at the same time, it is guaranteed that the leftmost completable operation will complete. Here we want the operation to stop at the earliest instance.

There is also a `default` argument to `select`, which says that when none of the operations can be completed immediately, the value given to the argument `default` will be returned together with `None` in the place where you usually find the completed channel. Thus the following snippet completes the put only if it can be done immediately:

In [10]:
async def main():
    ch = ac.Chan()
    result, c = await ac.select((ch, 'value'), default='giveup')
    if c is None:
        print(result)
        print('put cannot complete immediately and was given up')
        
ac.run(main())

giveup
put cannot complete immediately and was given up


By now you should know how to use `select`. It certainly seems a simple operation --- simple to understand, at least. However, `select` is non-trivial. What we mean by that is that, using only channels and put and get operations on channels (and their "nowait" counterparts), it is not possible to write a `select` operation that follows the correct semantics. There are three key points of the semantics of `select`:

* at least one operation is completed;
* at most one operation is completed;
* an operation is completed at the earliest possible time (no unnecessary waiting).

With only `Chan`, `get` and `put`, writing an operation satisfying any two of the above is easy. But `select` satisfies all three. To satisfy all three, you will need to submit your operations to the involved channels at the time of calling, and at the time of completion of any operation, you will need to notify all other operations to cancel themselves. Thus the semantics of `select` is woven into the inner structure of channels.

If you think about it carefully, `select` is actually the whole point of `aiochan`: `asyncio` do provide us with futures, locks and things, which are somewhat like our channels superficially. But `select` is conspicuously missing. Channels are made to make `select` possible. Rob Pike, the inventor of golang, mentions `select` as the reason why channels in golang is provided by the language itself instead of as a library.

Another way of putting this is: in the hierarchy of concurrency operations, `select` is on the highest level of abstraction. Consider the following:

* unlike python, Java was designed with concurrency (with threads) in mind, so thread primitives exist from the beginning;
* but as working with the primitives were too low-level, `java.util.concurrent` was later added as a libray;
* Clojure runs on the JVM so can from day-one use all the Java concurrency libraries. Clojure also adds its own flavour of concurrency-friendly constructs in the form of refs (atoms, agents, and even STM)
* BUT Clojure still needs `core.async` as a library, since writing a `select` that works well on all the previous stuff is not possible! (By the way, `select` is called `alt!`, `alts!`, `alt!!` and `alts!!` in core.async. Yes there are four of them.)

By the way, python has a built-in library called `select` and a higher-level one called `selectors`. But these selects only work with files or sockets, not plain python objects, and the availability of the various operations in theses libraries depend on the system. That is because the library just offloads it work to operating system calls. Usually we think of system calls as pretty low level. How many times have you encountered some abstraction that is provided by the lower-level operating system but not by the higher-level programming language?

To recap:

* The `select` operator completes exactly one operation from the given operations,
* `select` can be used as a control structure,
* `select` is non-trivial.

Useful constructs:

* `select`
* `aiochan.Chan.add`
* Channel operations can be chained (more to come)

### Channel buffering

We have said from the beginning that channels are used as points of rendezvous: a pair of put/get operations can only be completed when *both* involved parties are present at the same time. However, in practice, it is sometimes necessary to relax this a little bit so that puts can complete immediately even when no one is there to get, and if previous puts are available, a get can complete without a put sitting there waiting. This behaviour where we further decouple put and get operations in time is called buffering.

In principle, buffering can be done without any further support from the library: we can have a pair of channels, `ch_in` and `ch_out`, acting as one, and a coroutine busy working in the background, promptly getting values from `ch_in` whenever they are available and store them onto some data structure (a `deque` is a good choice), and at the same time feeding values to getters of `ch_out` whenever they come by. You can do this as an exercise (hint: use `select`).

However, to reduce clutter, and to improve performance, `Chan` has built-in support for buffer. In `aiochan`, buffering is always bounded: you have to decide at the onset how much pending stuff stored in your buffer you can tolerate. Some languages like Erlang nominally support unbounded buffering as the default, but the limit imposed by the operating system is always there.

Let's have an example:

In [11]:
import asyncio
import aiochan as ac

async def main():
    c = ac.Chan(1)
    
    await c.put('a')
    result = await c.get()
    print('result', result)
    
ac.run(main())

result a


As we can see, a buffered channel is created by having a positive number as the argument to the channel constructor. Note that in this example, if there were no buffer, the example would deadlock: the first `await` would never complete because no-one is there to get the value.

The positive number to the constructor signifies the size of the buffer. In the above example, the size is one, so if we have two puts in a row the example would block.

This is an example of *fixed length buffers*. The constructor call `Chan(1)` is actually a shorthand for `Chan('f', 1)`, `'f'` for fixed length. These buffers block on put when they are full. 

Fixed length buffers are often used to implement back-pressure:

In [23]:
async def worker(c):
    i = 0
    while True:
        i += 1
        await asyncio.sleep(0.05)
        print('producing', i)
        await c.put(i)

async def consumer(c):
    while True:
        await asyncio.sleep(0.2)
        result = await c.get()
        print('consuming', result)

async def main():
    c = ac.Chan(3)
    ac.go(worker(c))
    ac.go(consumer(c))
    await asyncio.sleep(1)
    
ac.run(main())

producing 1
producing 2
producing 3
consuming 1
producing 4
producing 5
consuming 2
producing 6
consuming 3
producing 7
consuming 4
producing 8


In this example, producers and consumers are working at different rates. We want to ensure the consumer always have something to work with, so producers have to work ahead of consumers, but we also want to ensure that producers don't work so fast as to make the program *diverge*. A buffer solves the problem well. Note that buffering solution still works even if the time taken to produce/consume items are somewhat random: within bounds, appropriate buffering can ensure minimal waiting while preventing divergence.

In situations that getters just can't keep up with putters *and* you definitely cannot tolerate blocking for producers (maybe because you don't control the producers), you have to make some compromise and use some other kinds of buffers which will *discard* some elements in exchange for non-blocking puts. We have built-in support for two of them: *dropping buffers* will just silent drop any more incoming puts when they become full:

In [12]:
async def main():
    c = ac.Chan('d', 2) # 'd' for 'dropping'
    await c.put(1)
    await c.put(2)
    await c.put(3)
    c.close()
    async for v in c:
        print(v)

ac.run(main())

1
2


Look: the last value is missing. We also have *sliding buffers*, which when full, will drop the *earliest* pending value:

In [13]:
async def main():
    c = ac.Chan('s', 2) # 'd' for 'dropping'
    await c.put(1)
    await c.put(2)
    await c.put(3)
    c.close()
    async for v in c:
        print(v)

ac.run(main())

2
3


As expected, the first value is now missing.

At the beginning we have said that channels are used to circumvent the use of locks and semaphores so that our programs are easier to develop and easier to reason about. Well, sometimes locks and semaphores are the most natural solutions to a problem. And in such situations, *buffered channels can be used as locks and semaphores*.

An example:

In [38]:
async def worker(lock, tag):
    while True:
        await lock.get()
        print('%s is now working' % tag)
        await asyncio.sleep(0.1)
        await lock.put(True)

async def main():
    lock = ac.Chan(1).add(True)
    for i in range(10):
        ac.go(worker(lock, i))
    await asyncio.sleep(1)
    
ac.run(main())

0 is now working
1 is now working
2 is now working
3 is now working
4 is now working
5 is now working
6 is now working
7 is now working
8 is now working
9 is now working


You can see the working of the lock as in 1 second, only 10 operations complete even though we have 10 workers who can potenntially complete 100 operations in 1 second. In the presence of the lock, work becomes serial.

Using a buffer size greater than 1 gives you a semaphore, which in our case increases the throughput:

In [40]:
async def worker(lock, tag):
    while True:
        await lock.get()
        print('%s is now working' % tag)
        await asyncio.sleep(0.1)
        await lock.put(True)

async def main():
    lock = ac.Chan(2).add(True, True)
    for i in range(10):
        ac.go(worker(lock, i))
    await asyncio.sleep(1)
    
ac.run(main())

0 is now working
1 is now working
2 is now working
3 is now working
4 is now working
5 is now working
6 is now working
7 is now working
8 is now working
9 is now working
0 is now working
1 is now working
2 is now working
3 is now working
4 is now working
5 is now working
6 is now working
7 is now working
8 is now working
9 is now working


But why would you want to use channels *as* locks when you can use the builtin locks from `asyncio`? Consistency and flexibility. Remember `select`? Now we can `select` on locks! You can do all kinds of funky stuff with `select` and locks:

In [51]:
import random

async def worker(locks, tag):
    while True:
        _, lock = await ac.select(*locks)
        print('%s working' % tag)
        await asyncio.sleep(0.1)
        await lock.put(True)
                
async def main():
    locks = [ac.Chan(1, name='lock%s' % i).add(True) for i in range(3)]
    for i in range(3):
        ac.go(worker(locks, 'worker-%s' % i))
    await asyncio.sleep(0.5)
    
ac.run(main())

worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working
worker-0 working
worker-1 working
worker-2 working


With 3 locks we got 15 units of work done in half a second. You can change to 2 locks, in which case only 10 units of work would be done.

To recap:

* Channels support buffering.
* Fixed length buffering blocks on put when full, whereas dropping and sliding buffering never blocks but may throw away items when full.
* Buffering can be used to implement back-pressure.
* Buffered channels can be used as locks and semaphores, and you can `select` on them.

*Congratulations!* Now you know *almost* everything you need to write non-trivial concurrency applications with `aiochan`. You are only limited by your imagination! Still, there are various *patterns* of concurrency programs that occur so often so that we have implemented them as additional functions and methods that you can readily use. None of them is essential, but using the provided *convenience* functions make your code easier to read, especially for others. Let's continue onto the next section!

## Writing concurrent programs

### Time operators

### Functional operators

### Channel combinations

### Parallelism

### Back to normal code

* primitives, combinations, abstractions
    * primitives: channels
    * combinations: `put`, `get`, `select`
    * abstractions: "megachannels", something in, something out

* creating channels
    

* combining for dataflow
    

* Timing, backpressure, reactive programming

## Parallelism, non-concurrent programs

* The various forms of pipe

* How to get data in and out, from non-concurrent programs