-
Notifications
You must be signed in to change notification settings - Fork 67
/
ParallelTask.h
148 lines (114 loc) · 3.67 KB
/
ParallelTask.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*++
Module Name:
ParallelTask.h
Abstract:
Simple parallel task manager
Authors:
Ravi Pandya, May 2012
Environment:
User mode service.
Revision History:
--*/
#pragma once
#include "stdafx.h"
#include "Compat.h"
/*++
Simple class to handle parallelized algorithms.
TContext should extend TContextBase, and provide the following methods:
void initializeThread()
Called once on main thread after TContext has been assigned from common,
and threadNum set.
void runThread()
Called to run the thread's work until termination.
May use something like RangeSplitter to get work.
void finishThread(TContext* common)
Called once on main thread after all threads have finished,
to write results back to common.
--*/
template <class TContext>
class ParallelTask
{
public:
inline TContext* getCommonContext() { return common; }
// i_common should have totalThreads & bindToProcessors set
ParallelTask(TContext* i_common);
// run all threads until completion, gather results in common
void run();
private:
// initial & final context
TContext* common;
// array of per-thread contexts
TContext* contexts;
static void threadWorker(void* threadContext);
};
/*++
Base for type parameter to parallel task
--*/
struct TaskContextBase
{
// should be set before passing to ParallelTask constructor
int totalThreads;
bool bindToProcessors;
// time taken to run in millis
_int64 time;
// for internal use:
int threadNum; // current thread number, 0...totalThreads-1
SingleWaiterObject *doneWaiter; // Gets notified when the last thread ends.
volatile int runningThreads;
volatile int *pRunningThreads;
};
template <class TContext>
ParallelTask<TContext>::ParallelTask(
TContext* i_common)
: common(i_common), contexts(NULL)
{
_ASSERT(i_common->totalThreads > 0);
}
template <class TContext>
void
ParallelTask<TContext>::run()
{
_int64 start = timeInMillis();
SingleWaiterObject doneWaiter;
if (!CreateSingleWaiterObject(&doneWaiter)) {
fprintf(stderr, "Failed to create single waiter object for thread completion.\n");
exit(1);
}
common->doneWaiter = &doneWaiter;
common->runningThreads = common->totalThreads;
common->pRunningThreads = &common->runningThreads;
contexts = new TContext[common->totalThreads];
for (int i = 0; i < common->totalThreads; i++) {
contexts[i] = *common;
contexts[i].threadNum = i;
contexts[i].initializeThread();
if (!StartNewThread(ParallelTask<TContext>::threadWorker, &contexts[i])) {
fprintf(stderr, "Unable to start worker thread.\n");
exit(1);
}
}
if (!WaitForSingleWaiterObject(&doneWaiter)) {
fprintf(stderr, "Waiting for all threads to finish failed\n");
exit(1);
}
DestroySingleWaiterObject(&doneWaiter);
for (int i = 0; i < common->totalThreads; i++) {
contexts[i].finishThread(common);
}
common->time = timeInMillis() - start;
}
template <class TContext>
void
ParallelTask<TContext>::threadWorker(
void* threadArg)
{
TContext* context = (TContext*) threadArg;
if (context->bindToProcessors) {
BindThreadToProcessor(context->threadNum);
}
context->runThread();
// Decrement the running thread count and wake up the waiter if it hits 0.
if (0 == InterlockedDecrementAndReturnNewValue(context->pRunningThreads)) {
SignalSingleWaiterObject(context->doneWaiter);
}
}