Skip to content

Latest commit

 

History

History
796 lines (597 loc) · 44.7 KB

File metadata and controls

796 lines (597 loc) · 44.7 KB

三、C++ 中的语言级并发和并行

自从 C++ 11 语言标准问世以来,C++ 就对并发编程有了极好的支持。在此之前,线程化是一件由特定平台库处理的事情。微软公司有自己的线程库,其他平台(GNU Linux/macOS X)支持 POSIX 线程模型。作为语言的一部分,线程机制帮助 C++ 程序员编写了可在多个平台上运行的可移植代码。

最初的 C++ 标准发布于 1998 年,语言设计委员会坚信线程、文件系统、GUI 库等最好留给平台特定的库。赫伯·萨特在《多布斯博士杂志》上发表了一篇有影响力的文章,题为《免费午餐结束了》,他在文章中倡导利用当时处理器中可用的多核编程技术。在编写并行代码时,函数式编程模型非常适合这项任务。线程、Lambda 函数和表达式、移动语义和内存保证等特性帮助人们编写并发或并行代码,没有太多麻烦。本章旨在使开发人员能够利用线程库及其最佳实践。

在本章中,我们将涵盖以下主题:

  • 什么是并发?
  • 一个使用多线程的特色 Hello World 程序
  • 如何管理线程的生存期和资源
  • 线程间共享数据
  • 如何编写线程安全的数据结构

什么是并发?

在基本层面上,并发性代表不止一个活动同时发生。我们可以将并发性与现实生活中的许多情况联系起来,比如一边看电影一边吃爆米花,或者同时用两只手执行不同的功能,等等。那么,什么是计算机中的并发?

几十年前,计算机系统能够进行任务切换,多任务操作系统已经存在了很长时间。为什么计算领域突然对并发产生了新的兴趣?微处理器制造商通过向处理器中塞入越来越多的硅来提高计算能力。在这个过程的某个阶段,当他们到达基本的物理极限时,他们无法将更多的东西塞进同一个区域。那些时代的中央处理器一次只有一条执行路径,它们通过切换任务(指令流)运行多条指令路径。在中央处理器级别,只有一个指令流被执行,当事情发生得非常快时(与人类的感知相比),用户感觉动作同时发生。

大约在 2005 年,英特尔宣布了他们的新多核处理器(支持硬件级别的多种执行路径),这是一个游戏规则的改变者。多核处理器不是一个处理器通过在它们之间切换来完成每一项任务,而是作为一种解决方案来并行执行它们。但这给程序员带来了另一个挑战;编写代码来利用硬件级的并发性。此外,与任务切换造成的假象相比,实际硬件并发行为的问题也出现了。在多核处理器问世之前,芯片制造商一直在竞相提高计算能力,预计在 21 世纪第一个十年结束之前,计算能力可能会达到 10 千兆赫。正如赫伯·萨特在免费午餐结束(http://www.gotw.ca/publications/concurrency-ddj.htm)中所说,“如果软件要利用这种增加的计算能力,它必须被设计成同时运行多个任务”。赫伯警告程序员,那些忽视并发性的人在编写程序时也必须考虑到这一点。

现代 C++ 标准库提供了一套支持并发和并行的机制。首先,std::thread与同步对象(如std::mutexstd::lock_guardsstd::unique_lockstd::condition_variables等)一起授权程序员使用标准 C++ 编写并发多线程代码。其次,使用基于任务的并行性(如。NET 和 Java),C++ 引入了类std::futurestd::promise,两者成对工作,分离函数调用,等待结果。

最后,为了避免管理线程的额外开销,C++ 引入了一个名为std::async的类,这将在下一章详细介绍,其中讨论的重点将是编写无锁并发程序(嗯,至少尽可能最小化锁)。

Concurrency is when two or more threads or execution paths can start, run, and complete in overlapping time periods (in some kind of interleaved execution). Parallelism means two tasks can run at the same time (like you see on a multicore CPU). Concurrency is about response time and parallelism is mostly about exploiting available resources.

你好并发世界(使用标准::线程)

现在,让我们开始使用std::thread库的第一个程序。您应该使用 C++ 11 或更高版本来编译我们将在本章中讨论的程序。在进入多线程 Hello World 之前,让我们举一个简单、经典的 Hello World 示例作为参考:

//---- Thanks to Dennis Ritchie and Brian Kernighan, this is a norm for all languages
#include <iostream> 
int main() 
{ 
   std::cout << "Hello World\n"; 
} 

这个程序只是将 Hello World 写入标准输出流(主要是控制台)。现在,让我们看另一个例子,它做同样的事情,但是使用一个后台线程(通常称为工作线程):

#include <iostream> 
#include <thread> 
#include <string> 
//---- The following function will be invoked by the thread library 
void thread_proc(std::string msg) 
{ 
   std::cout << "ThreadProc msg:" << msg; 
}  
int main() 
{ 
   // creates a new thread and execute thread_proc on it. 
   std::thread t(thread_proc, "Hello World\n");  
   // Waiting for the thread_proc to complete its execution 
   // before exiting from the program 
   t.join(); 
} 

与传统代码的第一个区别是包含了<thread>标准头文件。所有多线程支持函数和类都在这个新的头中声明。但是为了实现同步和共享数据保护,支持类在其他头中可用。如果您熟悉 Windows 或 POSIX 系统中的平台级线程,所有线程都需要一个初始函数。标准库也遵循同样的概念。在这个例子中,thread_proc函数是在主函数中声明的线程的初始函数。初始函数(通过函数指针)在std::thread对象t的构造函数中指定,构造开始线程的执行。

最显著的区别是,现在应用将新线程(后台线程)的消息写入标准输出流,这导致在这个应用中有两个线程或一个执行路径。一旦新线程被启动,主线程就继续执行。如果主线程没有等待新启动的线程完成,则main()函数将结束,因此这将是应用的结束——甚至在新线程有机会完成其执行之前。这就是在主线程结束前调用join()的原因,为了等待这里开始的新线程t

管理线程

在运行时,执行从用户入口点main()开始(在启动代码执行之后),并且它将在已经创建的默认线程中执行。因此,每个程序都至少有一个执行线程。在程序执行过程中,可以通过标准库或特定于平台的库创建任意数量的线程。如果中央处理器内核可以执行,这些线程可以并行运行。如果线程的数量多于 CPU 内核的数量,即使存在并行性,我们也不能同时运行所有的线程。所以,线程切换也发生在这里。一个程序可以从主线程启动任意数量的线程,并且这些线程在初始线程上并发运行。我们可以看到,一个程序线程的初始函数是main(),当主线程从执行中返回时,程序结束。这将终止所有并行线程。因此,主线程需要等到所有子线程完成执行。那么,让我们看看线程的启动和连接是如何发生的。

线程启动

在前面的例子中,我们看到初始化函数作为参数传递给std::thread构造函数,线程被启动。这个函数在自己的线程上运行。线程启动发生在线程对象的构造过程中,但是初始化函数也可以有其他选择。函数对象是线程类中另一个可能的参数。C++ 标准库确保std::thread可以与任何可调用类型一起工作。

现代 C++ 标准支持通过以下方式初始化线程:

  • 函数指针(如前一节)
  • 实现调用运算符的对象
  • 希腊字母的第 11 个

任何可调用实体都是初始化线程的候选对象。这使得std::thread能够接受带有重载函数调用运算符的类对象:

class parallel_job 
{ 
public: 
void operator() () 
{ 
    some_implementation(); 
} 
};  
parallel_job job; 
std::thread t(job); 

这里,新创建的线程将对象复制到其存储中,因此必须确保复制行为。在这里,我们也可以使用std::move来避免与复制相关的问题:

std::thread t(std::move(job)); 

如果传递临时(右值)而不是函数对象,语法如下:

std::thread t(parallel_job()); 

该代码可以被编译器解释为接受函数指针并返回std::thread对象的函数声明。但是,我们可以通过使用新的统一初始化语法来避免这种情况,如下所示:

std::thread t{ parallel_job() };

如下面的代码片段所示,一组额外的括号也可以避免将std::thread对象声明解释为函数声明:

std::thread t((parallel_job()));

启动线程的另一个有趣的方法是将 C++ Lambdas 作为参数提供给std::thread构造函数。Lambdas 可以捕获局部变量,从而避免不必要的使用任何参数。Lambdas 在编写匿名函数时非常有用,但这并不意味着它们应该在任何地方都使用。

Lambda 函数可以与线程声明一起使用,如下所示:

std::thread t([]{ 
    some_implementation(); 
}); 

线程连接

在 Hello World 示例中,您可能已经注意到在离开函数之前在main()末尾使用了t.join()。对相关线程实例上的join()的调用确保了启动的函数将等待直到后台线程完成它的执行。在没有连接的情况下,线程将在线程开始之前被终止,直到当前上下文完成(它们的子线程也将被终止)。

join()是直接函数,要么等待线程完成,要么不完成。为了获得对线程的更多控制,我们有其他机制,如互斥、条件变量和未来,它们将在本章和下一章的后面部分讨论。对join()的调用清理了与线程相关联的存储,因此它确保对象不再与启动的线程相关联。这表明join()函数每个线程只能调用一次;对joinable()的呼叫在对join()的呼叫之后将总是返回假。前面有功能对象的例子可以修改如下理解join():

class parallel_job 
{ 
   int& _iterations; 

public: 
    parallel_job(int& input): _iterations(input) 
    {} 

    void operator() () 
    { 
        for (int i = 0; i < _iterations; ++ i) 
        { 
            some_implementation(i); 
        } 
    } 
}; 
void func() 
{ 
    int local_Val = 10000; 
    parallel_job job(local_Val); 
    std::thread t(job); 

    if(t.joinable()) 
        t.join(); 
} 

在这种情况下,在func()函数结束时,验证线程对象以确认线程是否仍在执行。我们调用joinable()来查看它的返回值,然后再进行加入调用。

为了防止等待func(),标准引入了一种机制来继续执行,即使父函数完成了它的执行。这可以使用另一个标准功能detach()来实现:

if(t.joinable()) 
         t.detach(); 

在分离线程之前,我们需要考虑几件事情;当func()退出时,t线程可能仍在运行。根据前面例子中给出的实现,线程正在使用在func()中创建的局部变量的引用,这不是一个好主意,因为旧的堆栈变量可以在大多数架构上随时被覆盖。在代码中使用detach()时,必须始终解决这些情况。处理这种情况最常见的方法是使一个线程成为独立的,并将数据复制到线程中,而不是共享它。

将参数传递给线程

因此,我们已经知道如何启动并等待一个线程。现在,让我们看看如何将参数传递给线程初始化函数。让我们看一个例子来找出一个数的阶乘:

class Factorial 
{ 
private: 
    long long myFact; 

public: 
    Factorial() : myFact(1) 
    { 
    } 

    void operator() (int number) 
    { 
        myFact = 1; 
        for (int i = 1; i <= number; ++ i) 
        { 
            myFact *= i; 
        } 
        std::cout << "Factorial of " << number << " is " << myFact; 
    } 
}; 

int main() 
{ 
    Factorial fact; 

    std::thread t1(fact, 10); 

    t1.join(); 
} 

从这个例子中,很明显,将参数传递到线程函数或线程可调用对象中可以通过将附加参数传递到std::thread()声明中来实现。有一点我们必须牢记在心;传递的参数被复制到线程的内部存储中,以便进一步执行。对于线程的执行来说,拥有自己的参数副本是很重要的,因为我们已经看到了与超出范围的局部变量相关的问题。为了进一步讨论将参数传递到线程中,让我们从本章回到我们的第一个 Hello World 示例:

void thread_proc(std::string msg); 

std::thread t(thread_proc, "Hello World\n"); 

在这种情况下,thread_proc()函数将std::string作为参数,但是我们将一个const char*作为参数传递给线程函数。只有在线程的情况下,参数才会被传递、转换和复制到线程的内部存储中。在这里,const char*将转换为std::string。在牢记这一点的同时,必须选择提供给线程的参数类型。让我们看看如果将指针作为参数提供给线程会发生什么:

void thread_proc(std::string msg); 
void func() 
{ 
   char buf[512]; 
   const char* hello = "Hello World\n"; 
   std::strcpy(buf, hello); 

   std::thread t(thread_proc, buf); 
   t.detach(); 
} 

在前面的代码中,提供给线程的参数是指向局部变量buf的指针。在线程上发生bufstd::string的转换之前,func()功能很可能会退出。这可能会导致未定义的行为。这个问题可以通过在声明本身中将buf变量转换为std::string来解决,如下所示:

std::thread t(thread_proc, std::string(buf)); 

现在,让我们看看您希望引用在线程中更新的情况。在典型的场景中,线程复制提供给线程的值以确保安全执行,但是标准库也提供了一种通过引用线程来传递参数的方法。在许多实际的系统中,您可能已经看到共享数据结构正在线程内部更新。下面的示例显示了如何在线程中实现引用传递:

void update_data(shared_data& data);

void another_func() 
{ 
   shared_data data; 
   std::thread t(update_data, std::ref(data)); 
   t.join(); 
   do_something_else(data); 
} 

在前面的代码中,用std::ref包装传递给std::thread构造函数的参数确保了线程内部提供的变量引用了实际参数。您可能已经注意到线程初始化函数的函数原型正在接受对shared_data对象的引用,但是为什么您仍然需要std::ref()包装来调用线程呢?考虑下面的线程调用代码:

std::thread t(update_data, data);

在这种情况下,update_data()函数期望将shared_data参数视为对实际参数的引用。但是当用作线程初始化函数时,参数只是在内部复制。当调用update_data()时,它将传递一个对参数内部副本的引用,而不是对实际参数的引用。

使用兰姆达斯

现在,让我们看看 Lambda 表达式对于多线程的用处。在下面的代码中,我们将创建五个线程,并将它们放入一个向量容器中。每个线程将使用一个 Lambda 函数作为初始化函数。以下代码中初始化的线程正在按值捕获循环索引:

int main() 
{ 
    std::vector<std::thread> threads; 

    for (int i = 0; i < 5; ++ i) 
    { 
        threads.push_back(std::thread( [i]() { 
            std::cout << "Thread #" << i << std::endl; 
        })); 
    } 

    std::cout << "nMain function"; 

    std::for_each(threads.begin(), threads.end(), [](std::thread &t) { 
        t.join(); 
    }); 
} 

向量容器线程存储在循环内部创建的五个线程。一旦执行结束,它们就在main()函数的末尾连接起来。前面代码的输出可能如下所示:

Thread # Thread # Thread # Thread # Thread #
Main function
0
4
1
3
2

每次运行的程序输出可能不同。这个程序是展示与并发编程相关的非确定性的一个很好的例子。在下一节中,我们将讨论std::thread对象的移动属性。

所有权管理

从本章到目前为止讨论的例子中,您可能已经注意到启动线程的函数必须等待线程使用join()函数完成其执行,否则它将调用detach(),代价是程序失去对线程的控制。在现代 C++ 中,很多标准类型是可移动的,但不能复制;std::thread就是其中之一。这意味着在移动语义的帮助下,线程执行的所有权可以在std::thread实例之间移动。

在许多情况下,我们希望将所有权转移到另一个线程,例如,如果我们希望线程在后台运行,而不在创建线程的函数上等待它。这可以通过将线程所有权传递给调用函数来实现,而不是等待它在创建的函数中完成。在另一个实例中,将所有权传递给某个其他函数,该函数将等待线程完成其执行。这两种情况都可以通过将所有权从一个线程实例传递给另一个线程实例来实现。

为了进一步解释,让我们定义两个函数用作线程函数:

void function1() 
{ 
    std::cout << "function1()n"; 
} 

void function2() 
{ 
    std::cout << "function2()n"; 
} 

让我们看看从先前声明的函数中产生线程的主要函数:

int main() 
{ 
    std::thread t1(function1); 

    // Ownership of t1 is transferred to t2 
    std::thread t2 = std::move(t1);

在前面的代码中,一个新的线程在main()的第一行以t1开始。然后使用std::move()函数将所有权转移到t2,该函数调用与t2关联的std::thread的移动构造函数。现在,t1 实例没有相关的执行线程。初始化功能function1()现在与t2相关联:

    t1 = std::thread(function2); 

然后,使用右值启动一个新线程,该右值调用std::thread的移动分配运算符,该运算符与t1相关联。因为我们使用的是右值,所以不需要明确调用std::move():

    // thread instance Created without any associated thread execution 
    std::thread t3; 

    // Ownership of t2 is transferred to t3 
    t3 = std::move(t2); 

t3在没有任何执行线程的情况下被实例化,这意味着它正在调用默认构造函数。然后,通过明确调用std::move()函数,移动分配操作符将当前与t2关联的所有权转移到t3:

    // No need to join t1, no longer has any associated thread of execution 
    if (t1.joinable())  t1.join(); 
    if (t3.joinable())  t3.join(); 

    return 0; 
} 

最后,在程序退出之前,具有相关执行线程的std::thread实例被连接。这里,t1t3是具有相关执行线程的实例。

现在,让我们假设以下代码出现在前面示例中的线程join()之前:

t1 = std::move(t3); 

这里,实例t1已经与正在运行的函数(function2)相关联。当std::move()试图将function1的所有权转移回t1时,会调用std::terminate()终止程序。这保证了std::thread析构器的一致性。

std::thread中的移动支持有助于将线程的所有权转移出函数。以下示例演示了这样一个场景:

void func() 
{ 
    std::cout << "func()n"; 
} 

std::thread thread_creator() 
{ 
    return std::thread(func); 
} 

void thread_wait_func() 
{ 
    std::thread t = thread_creator(); 

    t.join(); 
} 

这里,thread_creator()函数返回与func()函数相关的std::threadthread_wait_func()函数调用thread_creator(),然后返回线程对象,这是一个赋给std::thread对象的右值。这将线程的所有权转移到std::thread对象t中,对象t正在等待转移函数中线程执行的完成。

线程间共享数据

我们已经看到了如何启动一个线程以及管理它们的不同方法。现在,让我们讨论如何在线程之间共享数据。并发的一个关键特性是它能够在运行的线程之间共享数据。首先,让我们看看与线程访问公共(共享)数据相关的问题是什么。

如果线程之间共享的数据是不可变的(只读),就不会有问题,因为一个线程读取的数据不受其他线程是否读取相同数据的影响。线程开始修改共享数据的那一刻就是问题开始出现的时候。

例如,如果线程正在访问一个公共数据结构,那么如果更新正在发生,那么与该数据结构相关联的不变量就会被破坏。在这种情况下,元素的数量存储在数据结构中,这通常需要修改多个值。考虑自平衡树或双向链表的删除操作。如果您没有做任何特殊的事情来确保,否则,如果一个线程正在读取数据结构,而另一个线程正在移除节点,读取线程很可能会看到部分移除节点的数据结构,因此不变量被破坏。这可能会永久破坏数据结构,并可能导致程序崩溃。

An invariant is a set of assertions that must always be true during the execution of a program or lifetime of an object. Placing proper assertion within the code to see whether invariants have been violated will result in robust code. This is a great way to document software as well as a good mechanism to prevent regression bugs. More can be read about this in the following Wikipedia article: https://en.wikipedia.org/wiki/Invariant_(computer_science).

这通常会导致一种叫做竞争条件的情况,这是并发程序中最常见的 bug 原因。在多线程中,竞争条件意味着线程竞争执行各自的操作。这里,结果取决于在两个或更多线程中执行操作的相对顺序。通常,术语“竞争条件”意味着有问题的竞争条件;正常的比赛状态不会导致任何错误。有问题的竞争条件通常发生在操作完成需要修改两位或更多位数据的情况下,例如删除树数据结构或双链表中的一个节点。因为修改必须访问单独的数据,所以当另一个线程试图访问数据结构时,必须在单独的指令中修改这些数据。当前面的修改完成一半时,就会出现这种情况。

竞争条件通常很难找到,也很难复制,因为它们发生在很短的执行窗口内。对于使用并发的软件,实现的主要复杂性来自于避免有问题的竞争条件。

有许多方法来处理有问题的比赛条件。最常见和最简单的选择是使用同步原语,这是基于锁的保护机制。这通过使用一些锁定机制来包装数据结构,以防止在数据结构执行期间访问其他线程。我们将在本章中详细讨论可用的同步原语及其用途。

另一个选择是改变数据结构及其不变量的设计,这样修改就保证了代码的顺序一致性,甚至跨多个线程。这是一种很难写程序的方式,通常被称为无锁编程。无锁编程和 C++ 内存模型将在第 4 章c++ 异步和无锁编程中介绍。

然后,还有其他机制,例如将数据结构的更新作为事务处理,因为数据库的更新是在事务中完成的。目前,这个话题不在本书的讨论范围内,因此不在讨论范围内。

现在,让我们考虑一下 C++ 标准中保护共享数据的最基本机制,即互斥

互斥体

互斥是并发控制中用来防止竞争条件的一种机制。互斥锁的功能是防止一个执行线程进入其临界区,同时另一个并发线程进入其自己的临界区。它是一个可锁定的对象,用于在代码的关键部分需要独占访问时发出信号,从而限制执行中具有相同保护的其他并发线程以及内存访问。C++ 11 标准在标准库中引入了std::mutex类,以实现跨并发线程的数据保护。

std::mutex类由lock()unlock()函数组成,用于在代码中创建关键部分。在使用成员函数创建关键部分时,需要记住的一点是,永远不要跳过与锁定函数相关联的解锁函数来标记代码中的关键部分。

现在,让我们讨论一下用于讨论带有线程的 Lambdas 的相同代码。在那里,我们观察到程序的输出由于具有公共资源std::coutstd::ostream操作符的竞争条件而被打乱。该代码现在正在使用std::mutex打印线程索引进行重写:

#include <iostream> 
#include <thread> 
#include <mutex> 
#include <vector>  
std::mutex m; 
int main() 
{ 
    std::vector<std::thread> threads; 

    for (int i = 1; i < 10; ++ i) 
    { 
        threads.push_back(std::thread( [i]() { 
            m.lock(); 
            std::cout << "Thread #" << i << std::endl; 
            m.unlock();
        })); 
    }      
    std::for_each(threads.begin(), threads.end(), [](std::thread &t) { 
        t.join(); 
    }); 
} 

前面代码的输出可能如下所示:

Thread #1 
Thread #2 
Thread #3 
Thread #4 
Thread #5 
Thread #6 
Thread #7 
Thread #8 
Thread #9 

在前面的代码中,互斥用于保护共享资源,即std::cout和级联的std::ostream运算符。与前面的例子不同,在代码中添加互斥体避免了混乱的输出,但是它会以随机的顺序出现。在std::mutex类中使用lock()unlock()功能保证了输出不会乱码。但是,不建议直接调用成员函数,因为您需要在函数的每个代码路径上调用 unlock,包括异常情况。相反,C++ 标准引入了一个新的模板类std::lock_guard,它实现了互斥体的资源获取是初始化(RAI)习惯用法。它在构造函数中锁定提供的互斥体,并在析构函数中解锁它。这个模板类的实现可以在<mutex>标准头库中找到。前面的例子可以用std::lock_guard改写如下:

std::mutex m; 
int main() 
{ 
    std::vector<std::thread> threads;  
    for (int i = 1; i < 10; ++ i) 
    { 
        threads.push_back(std::thread( [i]() { 
            std::lock_guard<std::mutex> local_lock(m); 
            std::cout << "Thread #" << i << std::endl; 
        })); 
    }      
    std::for_each(threads.begin(), threads.end(), [](std::thread &t) { 
        t.join(); 
    }); 
}

在前面的代码中,保护关键部分的互斥体在全局范围内,并且每次线程执行时std::lock_guard对象都是 Lambda 的本地对象。这样,一旦对象被构造,互斥体就获得了锁。当 Lambda 执行结束时,它通过调用析构函数来解锁互斥体。

RAII is a C++ idiom where the lifetime of entities such as database/file handles, socket handles, mutexes, dynamically allocated memory on the heap, and so on are bounded to the life cycle of the object holding it. You can read more about RAII at the following Wikipedia page: https://en.wikipedia.org/wiki/Resource_acquisition_is_initialization.

避免死锁

在处理互斥体时,可能出现的最大问题是死锁。要理解什么是死锁,就想象一个 iPod。iPod 要达到目的,既需要 iPod,也需要耳机。如果两个兄弟姐妹共用一个 iPod,就会出现两个人都想同时听音乐的情况。想象一下,一个人拿着 iPod,另一个人拿着耳机,两个人都不愿意分享他们拥有的东西。现在他们被卡住了,除非他们中的一个人试着变好,让另一个人听音乐。

在这里,兄弟姐妹们在为一个 iPod 和一个耳机争吵,但回到我们的情况,线程们在为互斥锁争吵。这里,每个线程都有一个互斥体,并在等待另一个。这里不能进行互斥,因为每个线程都在等待另一个线程释放它的互斥。这个场景叫做死锁

避免死锁有时非常简单,因为不同的互斥体服务于不同的目的,但是有些情况下处理这种情况并不那么明显。我能给你的避免死锁的最好建议是总是以相同的顺序锁定多个互斥锁。这样,你就永远不会陷入僵局。

考虑一个有两个线程的程序的例子;每个线程都打算单独打印奇数和偶数。由于两个线程的意图不同,程序使用两个互斥体来控制每个线程。两个线程共享的资源是std::cout。让我们来看看下面这个有死锁情况的程序:

// Global mutexes 
std::mutex evenMutex; 
std::mutex oddMutex;  
// Function to print even numbers 
void printEven(int max) 
{ 
    for (int i = 0; i <= max; i +=2) 
    { 
        oddMutex.lock(); 
        std::cout << i << ","; 
        evenMutex.lock(); 
        oddMutex.unlock(); 
        evenMutex.unlock(); 
    } 
} 

printEven()功能定义为将小于max值的所有正偶数打印到标准控制台中。同样,让我们定义一个printOdd()函数来打印所有小于max的正奇数,如下所示:

// Function to print odd numbers 
void printOdd(int max) 
{ 
    for (int i = 1; i <= max; i +=2) 
    { 
        evenMutex.lock(); 
        std::cout << i << ","; 
        oddMutex.lock(); 
        evenMutex.unlock(); 
        oddMutex.unlock(); 

    } 
} 

现在,让我们编写main函数,使用之前定义的函数作为每个操作的线程函数,生成两个独立的线程来打印奇数和偶数:

int main() 
{ 
    auto max = 100; 

    std::thread t1(printEven, max); 
    std::thread t2(printOdd, max); 

    if (t1.joinable()) 
        t1.join(); 
    if (t2.joinable()) 
        t2.join(); 
} 

在本例中,std::cout由两个互斥体printEvenprintOdd保护,它们以不同的顺序执行锁定。有了这段代码,我们总是以死锁告终,因为每个线程显然都在等待被另一个线程锁定的互斥体。运行这段代码会导致挂起。如前所述,死锁可以通过以相同的顺序锁定它们来避免,如下所示:

void printEven(int max) 
{ 
    for (int i = 0; i <= max; i +=2) 
    { 
        evenMutex.lock(); 
        std::cout << i << ","; 
        oddMutex.lock(); 
        evenMutex.unlock(); 
        oddMutex.unlock(); 
    } 
}  
void printOdd(int max) 
{ 
    for (int i = 1; i <= max; i +=2) 
    { 
        evenMutex.lock(); 
        std::cout << i << ","; 
        oddMutex.lock(); 
        evenMutex.unlock(); 
        oddMutex.unlock(); 

    } 
} 

但是这个代码显然不干净。您已经知道,将互斥体与 RAII 习惯用法一起使用会使代码更加干净和安全,但是为了确保锁定的顺序,C++ 标准库引入了一个新的函数,std::lock—一个可以一次锁定两个或多个互斥体而没有死锁风险的函数。下面的例子显示了如何在我们之前的奇偶校验程序中使用它:

void printEven(int max) 
{ 
    for (int i = 0; i <= max; i +=2) 
    { 
        std::lock(evenMutex, oddMutex); 
        std::lock_guard<std::mutex> lk_even(evenMutex, std::adopt_lock); 
        std::lock_guard<std::mutex> lk_odd(oddMutex, std::adopt_lock); 
        std::cout << i << ","; 
    } 
}  
void printOdd(int max) 
{ 
    for (int i = 1; i <= max; i +=2) 
    { 
        std::lock(evenMutex, oddMutex); 
        std::lock_guard<std::mutex> lk_even(evenMutex, std::adopt_lock); 
        std::lock_guard<std::mutex> lk_odd(oddMutex, std::adopt_lock); 

        std::cout << i << ","; 

    } 
} 

在这种情况下,线程执行一进入循环,对std::lock的调用就锁定了两个互斥体。为每个互斥体构造两个std::lock_guard实例。除了互斥实例之外,还向std::lock_guard提供了std::adopt_lock参数,以指示互斥体已经被锁定,它们应该只采用互斥体上现有锁的所有权,而不是试图在构造函数中锁定互斥体。这保证了安全解锁,即使在特殊情况下。

但是std::lock可以帮助你避免在程序要求同时锁定两个或多个互斥体的情况下出现死锁;如果它们是单独获得的,也没有帮助。死锁是多线程程序中最难解决的问题之一。它最终依赖于程序员的纪律,不会陷入任何死锁情况。

用 std::unique_lock 锁定

std::lock_guard相比,std::unique_lock在操作上更加灵活一点。std::unique_lock实例并不总是拥有与之相关联的互斥体。首先,您可以将std::adopt_lock作为第二个参数传递给构造函数,以管理类似于std::lock_guard的互斥锁。其次,通过将std::defer_lock作为第二个参数传递给构造函数,互斥体可以在构造过程中保持解锁状态。因此,在代码的后面,可以通过在同一个std::unique_lock对象上调用lock()来获取锁。但是std::unique_lock的灵活性是有代价的;就存储这些额外信息而言,它比lock_guard稍慢,需要更新。因此,建议使用lock_guard,除非std::unique_lock提供的灵活性确实需要。

std::unique_lock的另一个有趣的特点是它可以转让所有权。由于std::unique_lock必须拥有其关联的互斥体,这导致了互斥体的所有权转移。与std::thread类似,std::unique_lock级也是只动型。C++ 标准库中所有的移动语义语言细微差别和右值引用处理也适用于std::unique_lock

类似于std::mutexlock()unlock()等成员函数的可用性,与std::lock_guard相比,增加了其在代码中使用的灵活性。在std::unique_lock实例被破坏之前释放锁的能力,这意味着如果很明显不再需要锁,你可以在代码的任何地方随意释放它。不必要地按住锁会大大降低应用的性能,因为等待锁的线程被阻止执行超过必要的时间。因此,std::unique_lock是 C++ 标准库引入的一个非常方便的特性,它支持 RAII 习惯用法,并且可以有效地最小化适用代码的关键部分的大小:

void retrieve_and_process_data(data_params param) 
{ 
   std::unique_lock<std::mutex> local_lock(global_mutex, std::defer_lock); 
   prepare_data(param); 

   local_lock.lock(); 
   data_class data = get_data_to_process(); 
   local_lock.unlock(); 

   result_class result = process_data(data); 

   local_lock.lock(); 
   strore_result(result); 
} 

在前面的代码中,您可以看到通过利用std::unique_lock的灵活性实现的细粒度锁定。当函数开始执行时,一个std::unique_lock对象被构造为处于解锁状态的global_mutex。数据立即用参数准备,参数不需要独占访问;它是自由执行的。在检索准备好的数据之前,local_lock正在使用std::unique_lock中的锁定成员功能标记关键部分的开始。数据检索一结束,锁就被释放,标志着关键部分的结束。紧接着,对process_data()函数的调用(同样不需要独占访问)被自由执行。最后,在执行store_result()函数之前,互斥锁被锁定以保护写操作,写操作更新处理结果。退出函数时,当std::unique_lock的本地实例被破坏时,锁被释放。

条件变量

我们已经知道互斥体可以用来共享公共资源和同步线程之间的操作。但是如果不小心的话,使用互斥锁的同步有点复杂并且容易出现死锁。在本节中,我们将讨论如何等待带有条件变量的事件,以及如何以更简单的方式使用它们进行同步。

当使用互斥锁进行同步时,如果等待的线程已经获得了对互斥锁的锁定,它就不能被任何其他线程锁定。此外,等待一个线程通过定期检查由互斥体保护的状态标志来完成其执行是对 CPU 资源的浪费。这是因为这些资源可以被系统中的其他线程有效利用,而不必等待更长的时间。

为了解决这些问题,C++ 标准库提供了条件变量的两种实现:std::condition_variablestd::condition_variable_any。两者都在<condition_variable>库头中声明,并且两个实现都需要使用互斥来同步线程。std::condition_variable的实施仅限于与std::mutex合作。另一方面,std::condition_variable_any可以处理任何满足类互斥标准(类互斥语义)的东西,因此suffix _any。由于其通用行为,std::condition_variable_any最终会消耗更多内存并降低性能。除非有真正的、量身定制的需求,否则不建议这样做。

下面的程序是我们在讨论互斥锁时讨论的奇偶线程的实现,现在正在使用条件变量重新实现:

std::mutex numMutex; 
std::condition_variable syncCond; 
auto bEvenReady = false; 
auto bOddReady  = false; 
void printEven(int max) 
{ 
    for (int i = 0; i <= max; i +=2) 
    { 
        std::unique_lock<std::mutex> lk(numMutex); 
        syncCond.wait(lk, []{return bEvenReady;}); 

        std::cout << i << ","; 

        bEvenReady = false; 
        bOddReady  = true; 
        syncCond.notify_one(); 
    } 
}

程序从声明一个互斥体、一个条件变量和两个全局布尔标志开始,这样我们就可以在两个线程之间同步它们。printEven函数在工作线程中执行,只打印从 0 开始的偶数。在这里,当它进入循环时,互斥用std::unique_lock而不是std::lock_guard来保护;我们一会儿就会看到原因。然后线程调用std::condition_variable中的wait()函数,传递锁对象和表示等待条件的 Lambda 谓词函数。这可以用任何返回 bool 的可调用对象来替换。在这个函数中,谓词函数返回bEvenReady标志,这样函数在变为真时继续执行。如果谓词返回 false,wait()函数将解锁互斥体,并等待另一个线程通知它,因此std::unique_lock对象在这里很方便,提供了锁定和解锁的灵活性。

一旦std::cout打印出循环索引,bEvenReady标志被提升为假,bOddReady被提升为真。然后,对与syncCond关联的notify_one()函数的调用向等待的奇数线程发出信号,将奇数写入标准输出流:

void printOdd(int max) 
{ 
    for (int i = 1; i <= max; i +=2) 
    { 
        std::unique_lock<std::mutex> lk(numMutex); 
        syncCond.wait(lk, []{return bOddReady;}); 

        std::cout << i << ","; 

        bEvenReady = true; 
        bOddReady  = false; 
        syncCond.notify_one(); 
    } 
} 

printOdd函数在另一个工作线程中执行,只打印从1开始的奇数。像printEven函数一样,循环迭代并打印由全局声明的条件变量和互斥体保护的索引。与printEven函数不同,条件变量的wait()函数中使用的谓词返回bOddReady,并且bEvenReady标志被提升为true,而bOddReady标志被提升为false。随后,调用与syncCond关联的notify_one()函数向等待的偶数线程发出信号,将偶数写入标准输出流。偶数和奇数的交错打印一直持续到最大值:

int main() 
{ 
    auto max = 10; 
    bEvenReady = true; 

    std::thread t1(printEven, max); 
    std::thread t2(printOdd, max); 

    if (t1.joinable()) 
        t1.join(); 
    if (t2.joinable()) 
        t2.join(); 

} 

主功能启动两个后台线程,t1,关联printEven功能,t2,关联printOdd功能。当线程启动前通过将bEvenReady标志升至 true 来确认奇偶校验时,输出开始。

线程安全的堆栈数据结构

到目前为止,我们已经讨论了如何启动和管理线程,以及如何同步并发线程之间的操作。但是,当涉及到实际系统时,数据以数据结构的形式表示,必须根据情况适当选择数据结构,以保证程序的性能。在本节中,我们将讨论如何使用条件变量和互斥体设计并发堆栈。下面的程序是std::stack的包装器,它在库标题<stack>下声明,堆栈包装器将有不同的重载用于 pop 和 push 功能(这样做是为了保持列表较小,这也演示了我们如何调整顺序数据结构以在并发上下文中工作):

template <typename T> 
class Stack 
{ 
private: 
    std::stack<T> myData; 
    mutable std::mutex myMutex; 
    std::condition_variable myCond; 

public: 
    Stack() = default; 
    ~Stack() = default; 
    Stack& operator=(const Stack&) = delete; 

    Stack(const Stack& that) 
    { 
        std::lock_guard<std::mutex> lock(that.myMutex); 
        myData = that.myData; 
    }

Stack类包含模板类std::stack的对象,以及用于std::mutexstd::condition_variable的成员变量。该类的构造函数和析构函数被标记为默认,让编译器为它们生成一个默认实现,复制赋值运算符被标记为删除,以防止在编译时调用该类的赋值运算符。复制构造函数被定义,它通过调用它自己的复制赋值操作符来复制std::stack成员对象myData,该操作符由右侧对象的互斥体保护:

      void push(T new_value) 
      { 
          std::lock_guard<std::mutex> local_lock(myMutex); 
          myData.push(new_value); 
          myCond.notify_one(); 
      } 

会员功能push()正在包装std::stack containerpush功能。如您所见,互斥成员变量myMutexstd::lock_guard对象锁定,以保护下一行的push操作。随后,使用成员std::condition_variable对象调用notify_one()函数,以引发一个事件,通过这个相同的条件变量通知等待的线程。您将在下面的代码清单中看到pop操作有两个重载,它们会等待这个条件变量发出信号:

    bool try_pop(T& return_value) 
    { 
        std::lock_guard<std::mutex> local_lock(myMutex); 
        if (myData.empty()) return false; 
        return_value = myData.top(); 
        myData.pop(); 
        return true; 
    }

try_pop()函数以模板参数为参考。由于实现从不等待堆栈填充至少一个元素,因此使用std::lock_guard对象来保护线程。如果堆栈为空,函数返回false,否则返回true。这里,通过调用std::stacktop()函数,输出被分配为输入引用参数,该函数返回堆栈中最上面的元素,然后调用pop()函数从堆栈中清除最上面的元素。pop函数的所有重载都会调用top()函数,然后调用std::stackpop()函数:

    std::shared_ptr<T> try_pop() 
    { 
        std::lock_guard<std::mutex> local_lock(myMutex); 
        if (myData.empty()) return std::shared_ptr<T>(); 

        std::shared_ptr<T> return_value(std::make_shared<T>(myData.top())); 
        myData.pop(); 

        return return_value;
    } 

这是try_pop()函数的另一个重载,它返回模板类型的std::shared_ptr(智能指针)的一个实例。正如你已经看到的,try_pop函数重载,从不等待栈填充至少一个元素;因此,本实现使用std::lock_guard。如果内部堆栈为空,该函数返回一个std::shared_ptr的实例,并且不保存堆栈中的任何元素。否则,返回保存堆栈顶部元素的std::shared_ptr实例:

    void wait_n_pop(T& return_value) 
    { 
        std::unique_lock<std::mutex> local_lock(myMutex); 
        myCond.wait(local_lock, [this]{ return !myData.empty(); }); 
        return_value = myData.top(); 
        myData.pop(); 
    }      
    std::shared_ptr<T> wait_n_pop() 
    { 
        std::unique_lock<std::mutex> local_lock(myMutex); 
        myCond.wait(local_lock, [this]{ return !myData.empty(); }); 
        std::shared_ptr<T> return_value(std::make_shared<T>(myData.top())); 
        return return_value; 
    }   
}; 

到目前为止,pop函数的重载并没有等待堆栈填充至少一个空元素。为了实现这一点,增加了pop函数的两个重载,它使用了与std::condition_variable关联的等待函数。第一个实现返回模板值作为输出参数,第二个实现返回std::shared_ptr实例。两个功能都使用std::unique_lock来控制互斥量,以提供std::condition_variablewait()功能。在wait功能中,predicate功能是检查堆栈是否为空。如果堆栈为空,则wait()功能解锁互斥体,并继续等待,直到收到来自push()功能的通知。一调用 push,谓词就返回 true,wait_n_pop继续执行。函数重载接受模板引用,并将顶部元素赋给输入参数,后者实现返回一个std::shared_ptr实例,保存顶部元素。

摘要

在本章中,我们讨论了 C++ 标准库中可用的线程库。我们看到了如何启动和管理线程,并讨论了线程库的不同方面,例如如何将参数传递到线程中、线程对象的所有权管理、线程之间的数据共享等等。C++ 标准线程库可以作为线程执行大多数可调用对象!我们已经看到了与线程相关联的所有可用可调用对象的重要性,例如std::function、Lambdas 和函子。我们讨论了 C++ 标准库中可用的同步原语,从简单的std::mutex开始,使用 RAII 习惯用法来保护互斥体免受未处理的退出情况的影响,以避免显式解锁,并使用类,如std::lock_guardstd::unique_lock。我们还讨论了线程同步上下文中的条件变量(std::condition_variable)。本章为现代 C++ 中引入的并发支持奠定了良好的基础,从而将本书的旅程推向函数式习惯用法。

在下一章中,我们将介绍 C++ 中更多的并发库特性,例如基于任务的并行和无锁编程。