In [1]:
.I ../stlab/libraries

# Coroutines By Hand

* Implement sequential process as a coroutine

In [2]:
#include <mutex>
#include <string>
#include <unordered_set>
#include <functional>
#include <condition_variable>
#include <deque>
#include <thread>
#include <iostream>

#define STLAB_DISABLE_FUTURE_COROUTINES 1
#include <stlab/concurrency/future.hpp>
#include <stlab/concurrency/immediate_executor.hpp>
#include <stlab/concurrency/default_executor.hpp>
#include <stlab/concurrency/utility.hpp>

In [3]:
using namespace std;

In [4]:
class task {
    struct concept {
        virtual ~concept() {}
        virtual void invoke() = 0;
    };

    template <class F>
    struct model final : concept {
        F _f;
        model(F f) : _f(move(f)) {}
        void invoke() override { _f(); }
    };
    unique_ptr<concept> _self;

public:
    task() = default;

    template <class F> // F model void()
    task(F f) : _self(make_unique<model<F>>(move(f))) {}

    void operator()() { _self->invoke(); }
};

## Recall `sequential_process` implementation

In [5]:
namespace bcc {

class sequential_process {
    mutex _mutex;
    condition_variable _condition;
    deque<task> _queue;
    bool _done = false;

    void run_loop();

    thread _thread{[this] { run_loop(); }};

public:
    ~sequential_process();
    void async(task);
};

} // namespace bcc

- The logical structure of our coroutine will be:

```cpp
class sequential_process {
    awaitable_queue<task> _queue;
    co_task<void> _running;

public:
    sequential_process() {
        _running = [&]() {
            while (true) {
                (co_await _queue.pop())();
            }
        }
    }
    void async(task f) { _queue.push(move(f)); }
};
```

- **Tip: When desinging code, sketch the code in an ideal form**
- Without building the infrustructure for `awaitable_queue<>` and `co_task<>`
    - We can build the same logical structure directly in `seqential_process`
- Build concrete solutions before complex abstractions

- As a coroutine we no longer need:
    - `_thread`
    - `_condition`
    - `_done` flag
    - `run_loop()`
    - `join()` on destuction
- We will need
    - `_running` flag
    - `resume()`

In [6]:
namespace bcc2 {

class sequential_process {
    mutex _mutex;
    bool _running = false;
    deque<task> _queue;

    void resume();

public:
    void async(task);
};

} // namespace bcc2

In [7]:
namespace bcc {
       
sequential_process::~sequential_process() {
    {
        lock_guard<mutex> lock(_mutex);
        _done = true;
    }
    _condition.notify_one();
    _thread.join();
}

void sequential_process::run_loop() {
    while (true) {
        task work;
        {
            unique_lock<mutex> lock(_mutex);

            while (_queue.empty() && !_done) {
                _condition.wait(lock);
            }

            if (_queue.empty()) return;

            work = move(_queue.front());
            _queue.pop_front();
        }
        work();
    }
}
    
void sequential_process::async(task f) {
    {
        lock_guard<mutex> lock(_mutex);
        _queue.push_back(move(f));
    }
    _condition.notify_one();
}
    
} // namespace bcc

- `resume()` is the body of our coroutine:
```cpp
while (true) {
    (co_await _queue.pop())();
}
```

In [8]:
namespace bcc2 {

void sequential_process::resume() {
    task work;
    while (true) {
        {
            unique_lock<mutex> lock(_mutex);

            if (_queue.empty()) {
                _running = false;
                return;
            }
            work = move(_queue.front());
            _queue.pop_front();
        }
        work();
    }
}

} // namespace bcc2

- `async()` does a push and `resume()` if not running

In [9]:
namespace bcc2 {

void sequential_process::async(task f) {
    bool running = true;
    {
        lock_guard<mutex> lock(_mutex);
        _queue.push_back(move(f));
        swap(running, _running);
    }
    if (!running) resume();
}

} // namespace bcc2

In [10]:
using namespace bcc2;

- `async_packaged()`, `shared_pool`, and `interned_string` are unmodified

In [11]:
namespace {

template <class F> // F models R()
auto async_packaged(sequential_process& process, F&& f) {
    using result_t = std::result_of_t<std::decay_t<F>()>;

    auto task_future = stlab::package<result_t()>(stlab::immediate_executor,
                                                  std::forward<F>(f));

    process.async(move(task_future.first));

    return move(task_future.second);
}
    
} // namespace

In [12]:
namespace {
    
struct shared_pool {
    unordered_set<string> _pool;
    sequential_process _process;

    auto insert(string a) -> stlab::future<const string*> {
        return async_packaged(
            _process, [this, _a = move(a)]() mutable {
                return &*_pool.insert(move(_a)).first;
            });
    }
};
    
} // namespace

In [13]:
namespace {

class interned_string {
    static auto pool() -> shared_pool& {
        static shared_pool result;
        return result;
    }

    stlab::future<const string*> _string;

public:
    interned_string(string a) : _string(pool().insert(move(a))) {}

    auto str() const -> stlab::future<reference_wrapper<const string>> {
        return _string.then([](const string* p) { return cref(*p); });
    }
};
    
} // namespace

- And is used exactly the same way

In [14]:
{
    interned_string s("Hello World!"s);

    auto done = s.str().then([](const string& s) { cout << s << '\n'; });

    blocking_get(done);
}

Hello World!


- Advantages to the coroutine implementation
    - No seperate thread overhead
    - No overhead for waiting on condition variable
    - No blocking
    - Possible to implement with lock-free queue
    - `resume()` need not be executed _inline_
        - It could be queued to a _thread pool_
        - Requires some managment of _object liftimes_

- Disadvantages
    - _inline_ execution may _live lock_

```cpp
void sequential_process::async(task f) {
    bool running = true;
    {
        lock_guard<mutex> lock(_mutex);
        _queue.push_back(move(f));
        swap(running, _running);
    }
    if (!running) async([this]{ resume(); }); // WHAAATTT this???
}
```