In [1]:
%run -i ../python/common.py
publish=False

if not publish:
    # cleanup any old state
    # demke - fill in as we see what state gets generated in this page.
    bashCmds('''[[ -d mydir ]] && rm -rf mydir
    #''')
else:
    bashCmds('''rm -rf ~/*''')
    
closeAllOpenTtySessions()



In [2]:
appdir=os.getenv('HOME')
appdir=appdir + "/sync"
output = runTermCmd("[[ -d " + appdir + " ]] &&  rm -rf "+ appdir + 
             ";cp -r ../src/sync " + appdir)

bash = BashSession(cwd=appdir)

(cont:sync:ordering)=
# Ordering Thread Events

=(cont:sync:ordering:sleep_wakeup

As we saw in {numref}`cont:sync:locks:sleep_wakeup`, the `wait_queue` data structure provides a way for threads to avoid wasting CPU time while they are waiting for something to happen. The mechanics of atomically putting a thread to sleep on a wait_queue must be handled carefully, and require some interaction with the CPU scheduler. Different operating systems have different ways of doing this, and we will revisit this issue when we discuss locking in the Linux kernel. For now, we will simply use these abstractions to explore other synchronization problems and build some higher-level synchronization primitives.

As a reminder, here are the operations on a `wait_queue`:
- `sleep(wait_queue)`: blocks the calling thread; thread is added to `wait_queue` and another thread is selected to run.
- `wakeup(wait_queue)`: removes a thread from `wait_queue` and adds it to the scheduler Ready queue.
- `wakeup_all(wait_queue)`: moves *all* threads from `wait_queue` to the the scheduler Ready queue.

Waiting for a lock to be released is just one possible reason that a thread might need to wait. We saw another example in the code of {numref}`listing:sync:check_spinlock_fairness`, where each child thread spins waiting for the parent to finish creating all the other children. The `wakeup_all()` operation is handy in a case like this where we want to allow all the waiting threads to resume activity.

There are many examples where we need to control the order in which threads can execute. In the remainder of this chapter, we will consider some of these problems. 




## The Bounded Buffer Problem

Let's look at one classic example, known as the Bounded Buffer problem (also called the Producer/Consumer problem). In this problem, a set of threads communicate through a shared circular buffer that can hold N items. We maintain a count of the number of items currently in the buffer. Producer threads generate new items and add them to the buffer; consumer threads remove items from the buffer.  The items could be an arbitrary structure, or just bytes of data. We will begin by considering the special case of a single producer and a single consumer. This problem is essentially a simplified version of the [pipe abstraction](cont:gs:abstractions:pipes).


```{figure} ../images/sync/bbuf_setup.drawio.png
---
width: 75%
name: fig:sync:ordering:bbuf_setup
---
The bounded buffer problem. Producers continually add items to the buffer and consumers continually remove items.
```

We illustrate this problem setup in {numref}`fig:sync:ordering:bbuf_setup`. We have declared a `struct bounded_buffer` data type to encapsulate the properties of the bounded buffer, and we have declared the `count` member as an `atomic_int` so that simple increment and decrement operations on `count` will be performed atomically; we use ordinary int types for the `in` and `out` members since `in` is only used the the producer, and `out` is only used by the consumer. In the code snippets, however, there is no coordination (or *synchronization*) between the producer thread and the consumer thread. The producer blindly stuffs items into the buffer, possibly over-writing previous items that the consumer has not had a chance to remove yet. Similarly, the consumer blindly grabs items out of the buffer, without regard for whether the producer has actually filled those buffer slots or not. Clearly, this will not lead to correct results, even if the count is correct! We must introduce some synchronization constraints: the producer must wait if the buffer is full (i.e., if `count == N`); the consumer must wait if the buffer is empty (i.e., if `count == 0`). In addition, we require the producer to wake up a waiting consumer when the first item is added to the buffer (i.e., when the buffer becomes non-empty), and for the consumer to wake up a waiting producer when an item is removed from a full buffer (i.e., when the buffer becomes non-full).

Let's try using the `sleep()` and `wakeup()` operations to synchronize our threads, as shown in {numref}`listing:sync:ordering:bbuf_wq`. (Note that this code example is incomplete, and is provided for illustration purposes only. The `sleep()` function here is our assumed wait_queue sleep operation, and not the C library `sleep()`.) 

```{literalinclude} /src/sync/bounded_buffer_wq.c
:name: listing:sync:ordering:bbuf_wq
:caption: Bounded buffer with sleep() and wakeup()
```

In [3]:
# demke:
# 
# This cell is removed in the html, but displays the code in the Jupyter notebook.
#

display(Markdown('<font size="1.2rem">' + FileCodeBox(
    file=appdir + "/bounded_buffer_wq.c", 
    lang="", 
    number=True,
    title="<b>Bounded buffer with sleep() and wakeup()</b>",
    h="100%", 
    w="100%"
) + '</font>'))

<font size="1.2rem"><b>Bounded buffer with sleep() and wakeup()</b>
<div style="width:100%; height:100%; font-size:inherit; overflow: auto;" >


``` 
 1: /* For illustration purposes only. 
 2:  * This code is not complete and cannot be compiled into an executable.
 3:  */
 4: #include <stdatomic.h>
 5: #include <stdbool.h>
 6: #include "waitqueue.h"
 7: 
 8: extern char produce_item();
 9: extern void consume_item(char item);
10: 
11: #define N 2
12: 
13: struct bounded_buffer {
14: 	char data[N];   /* 'items' in this example are just bytes */
15: 	int in;     /* index where producer inserts items, initially 0 */
16: 	int out;    /* index where consumer removes items, initially 0 */
17: 	atomic_int count;  /* number of items currently in buffer, initially 0 */
18: 	struct wait_queue waitq; /* keep track of waiting threads */
19: };
20: 
21: struct bounded_buffer bb;
22: 
23: void *producer(void *arg) {
24:     while(true) {
25:         char item = produce_item();
26:         if (bb.count == N)
27:             sleep(&bb.waitq);
28:         bb.data[bb.in] = item;
29:         bb.in = (bb.in + 1) % N;
30:         bb.count++;
31:         if (bb.count == 1) 
32:             wakeup(&bb.waitq);
33:     }
34: }
35: 
36: void *consumer(void *arg) {
37:     while(true) {
38:         if (bb.count == 0) 
39:             sleep(&bb.waitq);
40:         char item = bb.data[bb.out];
41:         bb.out = (bb.out + 1) % N;
42:         bb.count--;
43:         if (bb.count == N-1) 
44:             wakeup(&bb.waitq);
45:         consume_item(item);
46:     }
47: }
```


</div>
</font>

Does this solve the problem? Alas, you may have noticed that both the producer and the consumer make decisions based on the value of `count`, and even if the `count` variable is updated atomically there is still a race condition. A thread, T1, can be interrupted after it has tested the value of `count` and decided that it needs call `sleep()`, but before the `sleep()` has actually executed. If the other thread, T2, calls `wakeup()` at this point, it will have no effect since there is not a waiting thread to wake up. When T1 runs again, it will proceed with the `sleep()` and can be blocked forever. This is known as the `lost wakeup` problem, and is illustrated in {numref}`fig:sync:ording:lost_wakeup`. In the figure, we show the interleaved thread execution on the left with the actual values of buffer variables substituted into the code statements. On the right, we show the buffer variables as they are updated by the threads. 

```{figure} ../images/sync/lost_wakeup_short.drawio.png
---
width: 75%
name: fig:sync:ordering:lost_wakeup
---
The lost wakeup problem. 
```
The producer thread has started to add the first item to the buffer, but has not updated the `count` before it is preempted. The consumer thread runs, sees that the buffer is empty and decides to sleep, but is preempted before it calls `sleep()`. The producer then updates `count` and calls `wakeup`, which has no effect. When the consumer runs again, it calls `sleep()` and blocks awaiting the `wakeup()` which has, unfortunately, already happened. Since the consumer is blocked, the producer runs, adds a second item to the buffer, and then generates a third item to add to the buffer. Now, the producer finds that the buffer is full, so it also calls `sleep()`, waiting for the consumer to remove something from the buffer and make space for the third item to be added. At this point, we are completely stuck---both threads are sleeping and waiting for the other thread to do something and wake them up. This is called *deadlock*. We will examine the deadlock problem in more detail later.


Before describing synchronization primitives that are described to solve this problem, let's consider some non-solutions.

You might be tempted to think that we can avoid the lost wakeup by simply having the producer and consumer call `wakeup()` on *every* iteration after adding or removing an item, rather than only doing so when the buffer becomes non-empty or non-full. That mostly works in this particular instance, where the producer and consumer are both in an infinite loop, and there is enough space in the buffer for the producer to add a second item. It could still leave the consumer waiting longer than it should, while the producer goes about generating another item to add to the buffer. It is also a brittle solution---if we had only a single slot in the buffer (N==1), then the producer would be stuck after adding the first item and losing the first `wakeup()` to the consumer would be fatal. Or if the producer stopped after adding a finite number of items, an unlucky consumer could be stranded forever with an item still left in the buffer.

Now, you might observe that testing the value of `count` and taking some action based on the result is a critical section of code that should execute atomically, and locks were our solution to critical section problems. So, what if we added a lock to our bounded buffer structure, and had threads acquire the lock before checking `count` and either going to sleep or issuing a wake up? This also doesn't work. If we allow a thread to hold the lock when it calls `sleep()`, then the other thread will be blocked forever trying to obtain the lock so that it can issue a `wakeup`. On the other hand, if we release the lock prior to calling `sleep()`, then we right back where we started---a thread could be preempted right after releasing the lock, and before calling `sleep()`, once again missing the `wakeup()`. 

We can also observe that the *lost wakeup* problem occurs because a `wakeup()` has no effect when no one is waiting yet. Perhaps we could avoid the problem if we kept track of the `wakeup()` and allowed a thread the return immediately from `sleep()` if a `wakeup()` had already been sent. This is the key idea behind the *semaphore* synchronization primitive introduced by Djikstra {cite}`10.5555/1102034`. A semaphore has private data consisting of a non-negative integer count and a queue of waiting threads, which can only be accessed by two **atomic operations**, `sem_wait()` and `sem_post()`. There is also a `sem_init()` operation that sets the semaphore count to some initial value.

```{note}
We are using the POSIX names for the semaphore operations. The original names of the semaphore operations used by Djikstra were `P()` and `V()`. In other literature they are variously called `down` and `up`, `wait` and `signal`, `await` and `notify`, and even `acquire` and `release`.
```

The semaphore operations are defined as follows:
- `sem_init(sem_t *sem, int pshared, unsigned int value)` - initializes the semaphore pointed to by sem with the initial value `value`. (The pshared argument should be 0 for semaphores shared by threads in the same process.)
- `sem_wait(sem_t *sem)` - decrements the internal count of the semaphore pointed to by `sem`.  If the semaphore's value is greater than zero, then the decrement proceeds, and the function returns, immediately.  If the semaphore currently has the value zero, then the call blocks until it becomes possible to perform  the  decrement  (i.e.,  the  semaphore value rises above zero).
- `sem_post(sem_t *sem)` - increments the internal count of the semaphore pointed to by `sem`.  If the semaphore's value consequently becomes greater than zero, then another process or thread blocked in a `sem_wait()` call will be woken up.

It is important that the `sem_wait()` and `sem_post()` calls are atomic, meaning that once a thread starts one of these operations, it cannot be interleaved with the execution of another operation on the same semaphore. For user-level implementations, this generally requires a system call since we may need to change the state of a thread from running to blocked, or from blocked to runnable. In the operating system, we need to ensure that checking the semaphore count and putting a thread to sleep is atomic, either by disabling interrupts on a uniprocessor, or by using a spinlock on a multiprocessor. Once the thread has been enqueued on the semaphore's wait list, the lock can be released before yielding the CPU to another thread. 

How does the semaphore synchronization primitive help us solve the bounded buffer problem? Instead of a single count of items in the buffer, we start by breaking apart the conditions that producers and consumers must wait upon. The producer thread can proceed as long as there are empty slots in the buffer to put items into. We can represent this using a semaphore called `sem_empty` with initial value N. The consumer can proceed as long as there are filled slots in the buffer to remove items from. We can represent this using a semaphore called `sem_filled`.  Prior to adding an item, the producer must perform a `sem_wait(&sem_empty)`, which will decrement the semaphore's count and return immediately as long as there is space in the buffer. Once the semaphore's value is 0, the producer will block awaiting a `sem_post(&sem_empty)` from the consumer to indicate that an item has been removed an an empty slot is now available. Each time an item is added to the buffer, the producer must issue a `sem_post(&sem_filled)` to let the consumer know that a slot has been filled and an item is ready to be removed from the buffer. The code for a single producer and consumer using semaphores is shown in {numref}`listing:sync:ordering:bbuf_sem`.

```{literalinclude} /src/sync/bounded_buffer_sem.c
:name: listing:sync:ordering:bbuf_sem
:caption: Bounded buffer with semaphores
```

In [5]:
# demke:
# 
# This cell is removed in the html, but displays the code in the Jupyter notebook.
#

display(Markdown('<font size="1.2rem">' + FileCodeBox(
    file=appdir + "/bounded_buffer_sem.c", 
    lang="", 
    number=True,
    title="<b>Bounded buffer with semaphores</b>",
    h="100%", 
    w="100%"
) + '</font>'))

<font size="1.2rem"><b>Bounded buffer with semaphores</b>
<div style="width:100%; height:100%; font-size:inherit; overflow: auto;" >


``` 
 1: /* For illustration purposes only. 
 2:  * This code is not complete and cannot be compiled into an executable.
 3:  */
 4: #include <semaphore.h>
 5: #include <stdbool.h>
 6: 
 7: extern char produce_item();
 8: extern void consume_item(char item);
 9: 
10: #define N 2
11: 
12: struct bounded_buffer {
13: 	char data[N];   /* 'items' in this example are just bytes */
14: 	int in;     /* index where producer inserts items, initially 0 */
15: 	int out;    /* index where consumer removes items, initially 0 */
16:     sem_t sem_empty;
17:     sem_t sem_filled;
18: };
19: 
20: struct bounded_buffer bb;
21: 
22: void *producer(void *arg) {
23:     while(true) {
24:         char item = produce_item();
25:         sem_wait(&bb.sem_empty);
26:         bb.data[bb.in] = item;
27:         bb.in = (bb.in + 1) % N;
28:         sem_post(&bb.sem_filled);
29:     }
30: }
31: 
32: void *consumer(void *arg) {
33:     while(true) {
34:         sem_wait(&bb.sem_filled);
35:         char item = bb.data[bb.out];
36:         bb.out = (bb.out + 1) % N;
37:         sem_post(&bb.sem_empty);
38:         consume_item(item);
39:     }
40: }
41: 
42: int main()
43: {
44:     sem_init(&bb.sem_empty, 0, N);
45:     sem_init(&bb.sem_filled,0, 0);
46:     
47:     /*** Thread creation stuff here ***/
48:     
49:     return 0;
50: }
```


</div>
</font>