/
safe_object.cpp
123 lines (103 loc) · 2.76 KB
/
safe_object.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright (c) 2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <hpx/hpx.hpp>
#include <hpx/hpx_init.hpp>
#include <hpx/include/parallel_algorithm.hpp>
#include <boost/range/functions.hpp>
#include <cstddef>
#include <cstdlib>
#include <iostream>
#include <utility>
#include <vector>
///////////////////////////////////////////////////////////////////////////////
template <typename T>
struct safe_object
{
private:
HPX_MOVABLE_ONLY(safe_object);
public:
safe_object()
: data_(hpx::get_os_thread_count())
{
}
safe_object(safe_object && rhs)
: data_(std::move(rhs.data_))
{}
safe_object& operator=(safe_object && rhs)
{
if (this != &rhs)
data_ = std::move(rhs.data_);
return *this;
}
T& get()
{
std::size_t idx = hpx::get_worker_thread_num();
HPX_ASSERT(idx < hpx::get_os_thread_count());
return data_[idx];
}
T const& get() const
{
std::size_t idx = hpx::get_worker_thread_num();
HPX_ASSERT(idx < hpx::get_os_thread_count());
return data_[idx];
}
template <typename F>
void reduce (F const& f) const
{
for (T const& d : data_)
{
f(d);
}
}
private:
std::vector<T> data_;
};
///////////////////////////////////////////////////////////////////////////////
std::vector<int> random_fill(std::size_t size)
{
std::vector<int> c(size);
std::generate(boost::begin(c), boost::end(c), std::rand);
return c;
}
inline bool satisfies_criteria(int d)
{
return d > 500 && (d % 7) == 0;
}
int hpx_main(int argc, char* argv[])
{
using hpx::parallel::for_each;
using hpx::parallel::execution::par;
// initialize data
std::vector<int> data = random_fill(1000);
// run a parallel loop to demonstrate thread safety of safe-object
safe_object<std::vector<int> > ho;
for_each(par, boost::begin(data), boost::end(data),
[&ho](int d)
{
if (satisfies_criteria(d))
ho.get().push_back(d);
});
// invoke the given reduce operation on the safe-object
std::vector<int> result;
ho.reduce(
[&result](std::vector<int> const& chunk)
{
result.insert(result.end(), chunk.begin(), chunk.end());
});
// make sure all numbers conform to criteria
for (int i : result)
{
if (!satisfies_criteria(i))
{
std::cout << "Number does not satisfy given criteria: " << i << "\n";
}
}
return hpx::finalize();
}
int main(int argc, char* argv[])
{
// Initialize and run HPX
return hpx::init(argc, argv);
}