编写一个通用算法,应用给定的一元函数并行转换一个范围的元素。用于转换范围的一元操作不得使范围迭代器无效或修改范围的元素。并行级别,即执行线程的数量和实现方式,是一个实现细节。
实现通用并行算法,分别找出给定范围内的最小值和最大值。并行性应该使用线程来实现,尽管并发线程的数量是一个实现细节。
实现通用并行算法,分别找出给定范围内的最小值和最大值。并行性应该使用异步函数来实现,尽管并发函数的数量是一个实现细节。
按照问题 53 的定义,编写排序算法的并行版本。排序算法,在第 6 章、算法和数据结构中,给定一对随机访问迭代器来定义其上下界,使用快速排序算法对范围的元素进行排序。函数应该使用比较运算符来比较范围的元素。并行级别和实现方式是一个实现细节。
编写一个类,通过同步对标准输出流的访问,使在不同线程中运行的组件能够安全地将日志消息打印到控制台,以保证输出的完整性。这个日志记录组件应该有一个名为log()
的方法,它带有一个字符串参数,表示要打印到控制台的消息。
编写一个程序,模拟在办公室为顾客提供服务的方式。办公室有三张桌子,可以同时接待顾客。顾客可以随时进入办公室。他们从售票机上取一张带有服务号码的票,在其中一个服务台等待下一个服务号码。顾客按照他们进入办公室的顺序,或者更准确地说,按照他们的票给出的顺序得到服务。每次服务台服务完一位顾客,下一位顾客就会得到服务。在特定数量的顾客已经得到票和服务之后,模拟应该停止。
通用函数std::transform()
将给定的函数应用于一个范围,并将结果存储在另一个(或相同的)范围内。这个问题的要求是实现这样一个函数的并行版本。通用迭代器将迭代器作为参数来定义范围的第一个和最后一个元素。因为一元函数以相同的方式应用于范围的所有元素,所以并行化操作相当简单。对于这个任务,我们将使用线程。由于没有规定应该同时运行多少个线程,我们可以使用std::thread::hardware_concurrency()
。此函数返回实现支持的并发线程数的提示。
只有当范围的大小超过特定的阈值时,并行版本的算法才能比顺序实现更好地执行,该阈值可能因编译选项、平台或硬件而异。在下面的实现中,该阈值被设置为 10,000 个元素。作为进一步的练习,您可以试验各种阈值和范围大小,看看执行时间是如何变化的。
以下函数ptransform()
按照要求实现并行变换算法。如果范围大小没有超过定义的阈值,则简单地调用std::transform()
。否则,它会将范围分成几个相等的部分,每个线程一个,并为特定的子范围调用每个线程上的std::transform()
。在这种情况下,函数阻塞调用线程,直到所有工作线程完成执行:
template <typename RandomAccessIterator, typename F>
void ptransform(RandomAccessIterator begin, RandomAccessIterator end,
F&& f)
{
auto size = std::distance(begin, end);
if (size <= 10000)
{
std::transform(begin, end, begin, std::forward<F>(f));
}
else
{
std::vector<std::thread> threads;
int thread_count = std::thread::hardware_concurrency();
auto first = begin;
auto last = first;
size /= thread_count;
for (int i = 0; i < thread_count; ++ i)
{
first = last;
if (i == thread_count - 1) last = end;
else std::advance(last, size);
threads.emplace_back([first, last, &f]() {
std::transform(first, last, first, std::forward<F>(f));
});
}
for (auto & t : threads) t.join();
}
}
如下所示,函数palter()
是一个辅助函数,它将ptransform()
应用到一个std::vector
并返回另一个std::vector
结果:
template <typename T, typename F>
std::vector<T> palter(std::vector<T> data, F&& f)
{
ptransform(std::begin(data), std::end(data),
std::forward<F>(f));
return data;
}
该函数可以如下使用(完整的例子可以在本书附带的源代码中找到):
int main()
{
std::vector<int> data(1000000);
// init data
auto result = palter(data, [](int const e) {return e * e; });
}
In C++ 17, a series of standard general-purpose algorithms, including std::transform()
, have overloads that implement a parallel version of the algorithm that can be executed according to a specified execution policy.
这个问题及其解决方案在大多数方面与前一个问题相似。略有不同的是,在每个线程上并发执行的函数必须返回一个值,该值代表子范围中的最小或最大元素。pprocess()
函数模板如下所示,是一个更高级别的函数,一般以下列方式实现所请求的功能:
- 它的参数是范围的第一个和最后一个迭代器,以及处理我们称之为
f
的范围的函数对象。 - 如果范围的大小小于一个特定的阈值,这里设置为 10,000 个元素,它只执行作为参数接收的函数对象
f
。 - 否则,它会将输入范围分割成多个大小相等的子范围,每个子范围对应一个可以执行的并发线程。每个线程在选定的子范围内运行
f
。 f
的并行执行结果被收集在一个std::vector
中,在所有线程的执行完成后,再次使用f
从中间结果中确定整体结果:
template <typename Iterator, typename F>
auto pprocess(Iterator begin, Iterator end, F&& f)
{
auto size = std::distance(begin, end);
if (size <= 10000)
{
return std::forward<F>(f)(begin, end);
}
else
{
int thread_count = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
std::vector<typename std::
iterator_traits<Iterator>::value_type>
mins(thread_count);
auto first = begin;
auto last = first;
size /= thread_count;
for (int i = 0; i < thread_count; ++ i)
{
first = last;
if (i == thread_count - 1) last = end;
else std::advance(last, size);
threads.emplace_back([first, last, &f, &r=mins[i]]() {
r = std::forward<F>(f)(first, last);
});
}
for (auto & t : threads) t.join();
return std::forward<F>(f)(std::begin(mins), std::end(mins));
}
}
提供了两个称为pmin()
和pmax()
的函数来实现所需的通用最小和最大并行算法。这两个依次调用pprocess()
,为第三个参数传递使用std::min_element()
或std::max_element()
标准算法的λ:
template <typename Iterator>
auto pmin(Iterator begin, Iterator end)
{
return pprocess(begin, end,
[](auto b, auto e){return *std::min_element(b, e);});
}
template <typename Iterator>
auto pmax(Iterator begin, Iterator end)
{
return pprocess(begin, end,
[](auto b, auto e){return *std::max_element(b, e);});
}
这些功能可以如下使用:
int main()
{
std::vector<int> data(count);
// init data
auto rmin = pmin(std::begin(data), std::end(data));
auto rmax = pmin(std::begin(data), std::end(data));
}
You can take it as a further exercise to implement yet another general-purpose algorithm that computes the sum of all the elements of a range in parallel using threads.
这个问题和前一个问题的唯一区别是并行性是如何实现的。对于前面的问题,需要使用线程。对于这个,您必须使用异步函数。一个函数可以与std::async()
异步执行。这个函数创建一个承诺,它是异步执行的函数结果的异步提供者。承诺有一个共享状态(可以存储函数的返回值或函数执行产生的异常)和一个相关的未来对象,该对象提供从不同线程对共享状态的访问。promise-future 对定义了一个能够跨线程传递值的通道。std::async()
返回与其创造的承诺相关的未来。
在pprocess()
的以下实现中,使用来自先前版本的线程已经被对std::async()
的调用所取代。请注意,您必须将std::launch::async
指定为std::async()
的第一个参数,以保证异步执行,而不是惰性计算。与以前的实现相比,变化量非常小,根据以前的实现对算法的描述,遵循代码应该很容易:
template <typename Iterator, typename F>
auto pprocess(Iterator begin, Iterator end, F&& f)
{
auto size = std::distance(begin, end);
if (size <= 10000)
{
return std::forward<F>(f)(begin, end);
}
else
{
int task_count = std::thread::hardware_concurrency();
std::vector<std::future<
typename std::iterator_traits<Iterator>::value_type>> tasks;
auto first = begin;
auto last = first;
size /= task_count;
for (int i = 0; i < task_count; ++ i)
{
first = last;
if (i == task_count - 1) last = end;
else std::advance(last, size);
tasks.emplace_back(std::async(
std::launch::async,
[first, last, &f]() {
return std::forward<F>(f)(first, last);
}));
}
std::vector<typename std::iterator_traits<Iterator>::value_type>
mins;
for (auto & t : tasks)
mins.push_back(t.get());
return std::forward<F>(f)(std::begin(mins), std::end(mins));
}
}
template <typename Iterator>
auto pmin(Iterator begin, Iterator end)
{
return pprocess(begin, end,
[](auto b, auto e){return *std::min_element(b, e);});
}
template <typename Iterator>
auto pmax(Iterator begin, Iterator end)
{
return pprocess(begin, end,
[](auto b, auto e){return *std::max_element(b, e);});
}
下面的代码显示了如何使用这个函数:
int main()
{
std::vector<int> data(count);
// init data
auto rmin = pmin(std::begin(data), std::end(data));
auto rmax = pmax(std::begin(data), std::end(data));
}
You can again take it as a further exercise to implement a general-purpose algorithm that computes the sum of all the elements of a range in parallel using asynchronous functions.
我们之前看到了 quicksort 算法的顺序实现。快速排序是一种分治算法,它依赖于将要排序的范围划分为两部分,一部分只包含比所选元素小的元素,称为透视,另一部分只包含比透视大的元素。然后,它继续递归地对两个分区应用相同的算法,直到这些分区只有一个元素或者没有元素。由于算法的性质,quicksort 可以很容易地并行化,以便在两个分区上同时递归应用算法。
pquicksort()
函数为此使用异步函数。然而,并行化只对较大的范围有效。存在一个阈值,在该阈值下,并行执行的上下文切换开销太大,并且并行执行时间大于顺序执行时间。在下面的实现中,该阈值被设置为 100,000 个元素,但是作为进一步的练习,您可以尝试设置不同的值,并查看并行版本与顺序版本相比的表现:
template <class RandomIt>
RandomIt partition(RandomIt first, RandomIt last)
{
auto pivot = *first;
auto i = first + 1;
auto j = last - 1;
while (i <= j)
{
while (i <= j && *i <= pivot) i++ ;
while (i <= j && *j > pivot) j--;
if (i < j) std::iter_swap(i, j);
}
std::iter_swap(i - 1, first);
return i - 1;
}
template <class RandomIt>
void pquicksort(RandomIt first, RandomIt last)
{
if (first < last)
{
auto p = partition(first, last);
if(last - first <= 100000)
{
pquicksort(first, p);
pquicksort(p + 1, last);
}
else
{
auto f1 = std::async(std::launch::async,
[first, p](){ pquicksort(first, p);});
auto f2 = std::async(std::launch::async,
[last, p]() { pquicksort(p+1, last);});
f1.wait();
f2.wait();
}
}
}
下面的代码显示了如何使用pquicksort()
函数对随机整数的大向量(值在 1 到 1000 之间)进行排序:
int main()
{
std::random_device rd;
std::mt19937 mt;
auto seed_data = std::array<int, std::mt19937::state_size> {};
std::generate(std::begin(seed_data), std::end(seed_data),
std::ref(rd));
std::seed_seq seq(std::begin(seed_data), std::end(seed_data));
mt.seed(seq);
std::uniform_int_distribution<> ud(1, 1000);
const size_t count = 1000000;
std::vector<int> data(count);
std::generate_n(std::begin(data), count,
[&mt, &ud]() {return ud(mt); });
pquicksort(std::begin(data), std::end(data));
}
虽然 C++ 没有控制台的概念,而是使用流对文件等顺序媒体执行输入和输出操作,但是std::cout
和std::wcout
全局对象控制到与 C 输出流stdout
相关联的流缓冲区的输出。无法从不同的线程安全地访问这些全局流对象。如果需要,您必须同步对它们的访问。这正是这个问题所请求的组件的目的。
如下所示的logger
类使用std::mutex
来同步对log()
方法中的std::cout
对象的访问。该类实现为线程安全的单例。静态方法instance()
返回对本地静态对象(具有存储持续时间)的引用。在 C++ 11 中,静态对象的初始化只发生一次,即使几个线程试图同时初始化同一个静态对象。在这种情况下,并发线程被阻塞,直到在第一个调用线程上执行的初始化完成。因此,不需要额外的用户定义同步机制:
class logger
{
protected:
logger() {}
public:
static logger& instance()
{
static logger lg;
return lg;
}
logger(logger const &) = delete;
logger& operator=(logger const &) = delete;
void log(std::string_view message)
{
std::lock_guard<std::mutex> lock(mt);
std::cout << "LOG: " << message << std::endl;
}
private:
std::mutex mt;
};
前面的logger
类可以用来从多个线程中写入控制台消息:
int main()
{
std::vector<std::thread> modules;
for(int id = 1; id <= 5; ++ id)
{
modules.emplace_back([id](){
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<> ud(100, 1000);
logger::instance().log(
"module " + std::to_string(id) + " started");
std::this_thread::sleep_for(std::chrono::milliseconds(ud(mt)));
logger::instance().log(
"module " + std::to_string(id) + " finished");
});
}
for(auto & m : modules) m.join();
}
为了根据需要实现客户服务办公室的模拟,我们可以使用几个助手类。ticketing_machine
是一个类,它模拟了一个非常简单的机器,从一个初始的、用户指定的种子开始,发布递增的票务号码。customer
是代表进入店铺并从售票机领取车票的顾客的类。operator<
为了将顾客存储在优先队列中,按照他们的票号给出的顺序从优先队列中取票,这个类已经超负荷了。此外,前一个问题中的logger
类用于向控制台打印消息:
class ticketing_machine
{
public:
ticketing_machine(int const start) :
last_ticket(start),first_ticket(start)
{}
int next() { return last_ticket++ ; }
int last() const { return last_ticket - 1; }
void reset() { last_ticket = first_ticket; }
private:
int first_ticket;
int last_ticket;
};
class customer
{
public:
customer(int const no) : number(no) {}
int ticket_number() const noexcept { return number; }
private:
int number;
friend bool operator<(customer const & l, customer const & r);
};
bool operator<(customer const & l, customer const & r)
{
return l.number > r.number;
}
办公室的每张桌子都用不同的线建模。进入商店并在拿到票后排队的顾客使用一个单独的线程进行建模。在下面的模拟中,每 200-500 毫秒就有一个新客户进入商店,获得一张票据,并被放在优先队列中。商店线程的执行在 25 个顾客进入商店并被放入队列后结束。std::condition_variable
用于线程之间的通信,以通知新客户已被放入队列或现有客户已从队列中移除(这发生在客户移动到开放的办公桌时)。代表办公桌的线程一直在运行,直到指示办公室已打开的标志被重置,但不是在队列中的所有客户都得到服务之前。在这个模拟中,每个客户在办公桌前花费 2,000 到 3,000 毫秒:
int main()
{
std::priority_queue<customer> customers;
bool office_open = true;
std::mutex mt;
std::condition_variable cv;
std::vector<std::thread> desks;
for (int i = 1; i <= 3; ++ i)
{
desks.emplace_back([i, &office_open, &mt, &cv, &customers]() {
std::random_device rd;
auto seed_data = std::array<int, std::mt19937::state_size> {};
std::generate(std::begin(seed_data), std::end(seed_data),
std::ref(rd));
std::seed_seq seq(std::begin(seed_data), std::end(seed_data));
std::mt19937 eng(seq);
std::uniform_int_distribution<> ud(2000, 3000);
logger::instance().log("desk " + std::to_string(i) + " open");
while (office_open || !customers.empty())
{
std::unique_lock<std::mutex> locker(mt);
cv.wait_for(locker, std::chrono::seconds(1),
[&customers]() {return !customers.empty(); });
if (!customers.empty())
{
auto const c = customers.top();
customers.pop();
logger::instance().log(
"[-] desk " + std::to_string(i) + " handling customer "
+ std::to_string(c.ticket_number()));
logger::instance().log(
"[=] queue size: " + std::to_string(customers.size()));
locker.unlock();
cv.notify_one();
std::this_thread::sleep_for(
std::chrono::milliseconds(ud(eng)));
logger::instance().log(
"[ ] desk " + std::to_string(i) + " done with customer "
+ std::to_string(c.ticket_number()));
}
}
logger::instance().log("desk " + std::to_string(i) + " closed");
});
}
std::thread store([&office_open, &customers, &mt, &cv]() {
ticketing_machine tm(100);
std::random_device rd;
auto seed_data = std::array<int, std::mt19937::state_size> {};
std::generate(std::begin(seed_data), std::end(seed_data),
std::ref(rd));
std::seed_seq seq(std::begin(seed_data), std::end(seed_data));
std::mt19937 eng(seq);
std::uniform_int_distribution<> ud(200, 500);
for (int i = 1; i <= 25; ++ i)
{
customer c(tm.next());
customers.push(c);
logger::instance().log("[+] new customer with ticket " +
std::to_string(c.ticket_number()));
logger::instance().log("[=] queue size: " +
std::to_string(customers.size()));
cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(ud(eng)));
}
office_open = false;
});
store.join();
for (auto & desk : desks) desk.join();
}
下面是这个问题的一个执行输出片段:
LOG: desk 1 open
LOG: desk 2 open
LOG: desk 3 open
LOG: [+] new customer with ticket 100
LOG: [-] desk 2 handling customer 100
LOG: [=] queue size: 0
LOG: [=] queue size: 0
LOG: [+] new customer with ticket 101
LOG: [=] queue size: 1
LOG: [-] desk 3 handling customer 101
LOG: [=] queue size: 0
LOG: [+] new customer with ticket 102
LOG: [=] queue size: 1
LOG: [-] desk 1 handling customer 102
LOG: [=] queue size: 0
LOG: [+] new customer with ticket 103
LOG: [=] queue size: 1
...
LOG: [+] new customer with ticket 112
LOG: [=] queue size: 7
LOG: [+] new customer with ticket 113
LOG: [=] queue size: 8
LOG: [ ] desk 2 done with customer 103
LOG: [-] desk 2 handling customer 106
LOG: [=] queue size: 7
...
LOG: [ ] desk 1 done with customer 120
LOG: [-] desk 1 handling customer 123
LOG: [=] queue size: 1
LOG: [ ] desk 2 done with customer 121
LOG: [-] desk 2 handling customer 124
LOG: [=] queue size: 0
LOG: [ ] desk 3 done with customer 122
LOG: desk 3 closed
LOG: [ ] desk 1 done with customer 123
LOG: desk 1 closed
LOG: [ ] desk 2 done with customer 124
LOG: desk 2 closed
作为进一步的练习,您可以尝试更改顾客进入商店的时间间隔,在办公室关闭前允许多少顾客获得一张票,为他们服务需要多长时间,或者办公室打开了多少张桌子。