## 4.1 等待一个事件或其他条件

- 第一个选择...
- 第二个选择是在等待线程在检查间隙，使用 std::this_thread::sleep_for() 进行周期性的间歇

In [1]:
#include <iostream>
#include <thread>
#include <mutex>

namespace n1 {
    bool flag{};
    std::mutex m;

    void wait_for_flag()
    {
        std::unique_lock<std::mutex> lk(m);
        while (!flag)
        {
            lk.unlock(); // 1 解锁互斥量
            std::cout << "wait ...\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
            lk.lock(); // 3 再锁互斥量
        }
    }
}



In [2]:
{
    using namespace n1;

    auto t1 = std::thread([]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        std::lock_guard<std::mutex> lk(m);
        flag = true;
    });
    wait_for_flag();
    t1.join();
}

wait ...
wait ...
wait ...
wait ...
wait ...




这个实现就进步很多，
- 因为当线程休眠时，线程没有浪费执行时间，
- 但是很难确定正确的休眠时间。太短的休眠和没有休眠一样，都会浪费执行时间；太长的休眠时间，可能会让任务等待线程醒来。

休眠时间过长是很少见的情况，因为这会直接影响到程序的行为，**当在高节奏游戏中，它意味着丢帧，或在一个实时应用中超越了一个时间片**。

**第三个选择(也是优先的选择)** 是，使用C++标准库提供的工具去等待事件的发生。
- 通过另一线程触发等待事件的机制是最基本的唤醒方式(例如：流水线上存在额外的任务时)，**这种机制就称为“条件变量”。**
- 从概念上来说，一个条件变量会与多个事件或其他条件相关，并且一个或多个线程会等待条件的达成。
- 当某些线程被终止时，为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息。


### 4.1.1 等待条件达成

C++标准库对条件变量有两套实现：
- std::condition_variable 和 std::condition_variable_any 。
- 这两个实现都包含在 <condition_variable> 头文件的声明中。
- 两者都需要与一个互斥量一起才能工作(互斥量是为了同步)；
  - 前者仅限于与 std::mutex 一起工作，
  - 而后者可以和任何满足最低标准的互斥量一起工作，从而加上了_any的后缀。
  
因为 std::condition_variable_any 更加通用，这就可能从体积、性能，以及系统资源的使用方面产生额外的开销，**所以 std::condition_variable 一般作为首选的类型**，当对灵活性有硬性要求时，我们才会去考虑 std::condition_variable_any 。


In [3]:
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>

namespace n2 {
    struct data_chunk
    {
        int i;
    };
    std::mutex mut;
    std::queue<data_chunk> data_queue; // 1
    std::condition_variable data_cond;

    static int i = 0;
    bool more_data_to_prepare() {
        return ++i < 5;
    }

    data_chunk prepare_data()
    {
        data_chunk data;
        data.i = i;

        std::cout << "prepare_data: " << data.i << "\n";
        return data;
    }

    void process(const data_chunk& data)
    {
        std::cout << "process: " << data.i << "\n";
    }

    bool is_last_chunk(const data_chunk& data)
    {
        return data.i == 4;
    }

    void data_preparation_thread()
    {
        while (more_data_to_prepare())
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));

            data_chunk const data = prepare_data();
            std::lock_guard<std::mutex> lk(mut);
            data_queue.push(data); // 2
            data_cond.notify_one(); // 3
        }
    }

    void data_processing_thread()
    {
        while (true)
        {
            std::unique_lock<std::mutex> lk(mut); // 4
            data_cond.wait(lk, [] {return !data_queue.empty();}); // 5
            data_chunk data = data_queue.front();
            data_queue.pop();
            lk.unlock(); // 6

            process(data);
            if (is_last_chunk(data))
                break;
        }
    }
}



In [4]:
{
    using namespace n2;

    auto t1 = std::thread(data_preparation_thread);
    auto t2 = std::thread(data_processing_thread);
    t1.join();
    t2.join();
}

prepare_data: 1
process: 1
prepare_data: 2
process: 2
prepare_data: 3
process: 3
prepare_data: 4
process: 4




首先，你拥有一个用来在两个线程之间传递数据的队列①。
- 当数据准备好时，使用 std::lock_guard 对队列上锁，将准备好的数据压入队列中②，之后线程会对队列中的数据上锁。
- 然后调用 std::condition_variable 的notify_one()成员函数，对等待的线程(如果有等待线程)进行通知③。

在另外一侧，你有一个正在处理数据的线程，
- 这个线程首先对互斥量上锁，但在这里 std::unique_lock 要比 std::lock_guard ④更加合适——且听我细细道来。
- 线程之后会调用 std::condition_variable 的成员函数wait()，传递一个锁和一个lambda函数表达式(作为等待的条件⑤)。Lambda函数是 C++11 添加的新特性，它可以让一个匿名函数作为其他表达式的一部分，并且非常合适作为标准函数的谓词，例如wait()函数。
- 在这个例子中，简单的lambda函数 []{return !data_queue.empty();} 会去检查data_queue是否不为空，当data_queue不为空——那就意味着队列中已经准备好数据了。

### 4.1.2 使用条件变量构建线程安全队列

使用队列在多个线程中转移数据(如清单4.1)是很常见的。做得好的话，同步操作可以限制在队列本身，同步问题和条件竞争出现的概率也会降低。鉴于这些好处，现在从清单4.1中提取出一个通用线程安全的队列。

#### 清单4.5 使用条件变量的线程安全队列(完整版)

In [5]:
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

namespace n3 {
    template<typename T>
    class threadsafe_queue
    {
    private:
        mutable std::mutex mut; // 1 互斥量必须是可变的
        std::queue<T> data_queue;
        std::condition_variable data_cond;

    public:
        threadsafe_queue()
        {}

        threadsafe_queue(threadsafe_queue const& other)
        {
            std::lock_guard<std::mutex> lk(other.mut);
            data_queue = other.data_queue;
        }

        void push(T new_value)
        {
            std::lock_guard<std::mutex> lk(mut);
            data_queue.push(new_value);
            data_cond.notify_one();
        }

        void wait_and_pop(T& value)
        {
            std::unique_lock<std::mutex> lk(mut);
            data_cond.wait(lk, [this] {return !data_queue.empty(); });
            value = data_queue.front();
            data_queue.pop();
        }

        std::shared_ptr<T> wait_and_pop()
        {
            std::unique_lock<std::mutex> lk(mut);
            data_cond.wait(lk, [this] {return !data_queue.empty(); });
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }

        bool try_pop(T& value)
        {
            std::lock_guard<std::mutex> lk(mut);
            if (data_queue.empty())
                return false;
            value = data_queue.front();
            data_queue.pop();
            return true;
        }

        std::shared_ptr<T> try_pop()
        {
            std::lock_guard<std::mutex> lk(mut);
            if (data_queue.empty())
                return std::shared_ptr<T>();
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }

        bool empty() const
        {
            std::lock_guard<std::mutex> lk(mut);
            return data_queue.empty();
        }
    };
}



- 传入拷贝构造函数的other形参是一个const引用；因为其他线程可能有这个类型的非const引用对象，并调用变种成员函数，所以这里有必要对互斥量上锁。
- 如果锁住互斥量是一个可变操作，那么这个**互斥量对象就会标记为可变的①**，之后他就可以在empty()和拷贝构造函数中上锁了。


In [6]:
#include <iostream>

namespace n3 {
    const int MAX_ID = 4;
    struct data_chunk
    {
        int id;
    };
    threadsafe_queue<data_chunk> data_queue; // 1

    static int seq = 0;
    bool more_data_to_prepare()
    {
        return seq < MAX_ID;
    }

    data_chunk prepare_data()
    {
        data_chunk data;
        data.id = ++seq;
        std::cout << "prepare_data: " << data.id << '\n';
        return data;
    }

    void process(data_chunk const& data)
    {
        std::cout << "thread ID: " << std::this_thread::get_id() << " - process: " << data.id << '\n';
    }

    bool is_last_chunk(data_chunk const& data)
    {
        return data.id == MAX_ID;
    }

    void data_preparation_thread()
    {
        while (more_data_to_prepare())
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));

            data_chunk const data = prepare_data();
            data_queue.push(data); // 2
        }
    }

    void data_processing_thread()
    {
        while (true)
        {
            data_chunk data;
            data_queue.wait_and_pop(data); // 3
            process(data);
            if (is_last_chunk(data))
                break;
        }
    }
}



In [7]:
{
    auto t1 = std::thread(n3::data_preparation_thread);
    auto t2 = std::thread(n3::data_processing_thread);
    t1.join();
    t2.join();
}

prepare_data: 1
thread ID: 140512656873216 - process: 1
prepare_data: 2
thread ID: 140512656873216 - process: 2
prepare_data: 3
thread ID: 140512656873216 - process: 3
prepare_data: 4
thread ID: 140512656873216 - process: 4




**条件变量在多个线程等待同一个事件时，也是很有用的。**当线程用来分解工作负载，并且只有一个线程可以对通知做出反应，与清单4.1中使用的结构完全相同；运行多个数据实例——处理线程(processing thread)。当新的数据准备完成，调用**notify_one()将会触发一个**正在执行wait()的线程，去检查条件和wait()函数的返回状态(因为你仅是向data_queue添加一个数据项)。 这里不保证线程一定会被通知到，即使只有一个等待线程被通知时，所有处线程也有可能都在处理数据。

另一种可能是，很多线程等待同一事件，对于通知他们都需要做出回应。这会发生在共享数据正在初始化的时候，当处理线程可以使用同一数据时，就要等待数据被初始化(有不错的机制可用来应对；可见第3章，3.3.1节)，或等待共享数据的更新，比如，定期重新初始化(periodic reinitialization)。在这些情况下，准备线程准备数据数据时，就会通过条件变量**调用notify_all()成员函数**，而非直接调用notify_one()函数。顾名思义，这就是全部线程在都去执行wait()(检查他们等待的条件是否满足)的原因。

当等待线程只等待一次，当条件为true时，它就不会再等待条件变量了，所以一个条件变量可能并非同步机制的最好选择。尤其是，**条件在等待一组可用的数据块时。在这样的情况下，期望(future)就是一个适合的选择。**