forked from chenshuo/muduo
/
BlockingQueue_test.cc
106 lines (92 loc) · 2.7 KB
/
BlockingQueue_test.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <muduo/base/BlockingQueue.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/Thread.h>
#include <algorithm>
#include <functional>
#include <vector>
#include <memory>
#include <string>
#include <stdio.h>
#include <unistd.h>
class Test
{
public:
Test(int numThreads)
: latch_(numThreads)
// if you do not remark line 20, the program will core. the vector constructor:
// vector ( size_type n, const T& value= T(), const Allocator& = Allocator() );
// , threads_(numThreads)
{
for (int i = 0; i < numThreads; ++i)
{
char name[32];
snprintf(name, sizeof name, "work thread %d", i);
threads_.push_back(std::unique_ptr<muduo::Thread>(new muduo::Thread(
std::bind(&Test::threadFunc, this), muduo::string(name))));
}
std::for_each(threads_.begin(), threads_.end(), std::bind(&muduo::Thread::start, std::placeholders::_1));
}
void run(int times)
{
printf("waiting for count down latch\n");
latch_.wait();
printf("all threads started\n");
for (int i = 0; i < times; ++i)
{
char buf[32];
snprintf(buf, sizeof buf, "hello %d", i);
queue_.put(buf);
printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size());
}
}
void joinAll()
{
for (size_t i = 0; i < threads_.size(); ++i)
{
queue_.put("stop");
}
std::for_each(threads_.begin(), threads_.end(), std::bind(&muduo::Thread::join, std::placeholders::_1));
}
private:
void threadFunc()
{
printf("tid=%d, %s started\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
latch_.countDown();
bool running = true;
while (running)
{
std::string d(queue_.take());
printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size());
running = (d != "stop");
}
printf("tid=%d, %s stopped\n",
muduo::CurrentThread::tid(),
muduo::CurrentThread::name());
}
muduo::BlockingQueue<std::string> queue_;
muduo::CountDownLatch latch_;
std::vector<std::unique_ptr<muduo::Thread>> threads_;
};
void testMove()
{
// std::unique_ptr requires gcc 4.4 or later
muduo::BlockingQueue<std::unique_ptr<int>> queue;
queue.put(std::unique_ptr<int>(new int(42)));
std::unique_ptr<int> x = queue.take();
printf("took %d\n", *x);
*x = 123;
queue.put(std::move(x));
std::unique_ptr<int> y = queue.take();
printf("took %d\n", *y);
}
int main()
{
printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());
Test t(5);
t.run(100);
t.joinAll();
testMove();
printf("number of created threads %d\n", muduo::Thread::numCreated());
}