-
Notifications
You must be signed in to change notification settings - Fork 0
/
WorkStealQueue.h
171 lines (153 loc) · 4.3 KB
/
WorkStealQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#ifndef _WORK_STEAL_QUEUE_H_
#define _WORK_STEAL_QUEUE_H_
#include "Allocator.h"
namespace Odin
{
/*
Chase-Lev work stealing deque
*/
// Max size of the queue
#define WORK_QUEUE_SIZE 1024
// Forward declaration
struct Task;
class WorkStealQueue
{
private:
Allocator* mAlloc; // Pointer to the passed allocator
std::atomic<size_t> mTop, mBottom; // The top and bottom indexes
std::atomic<std::atomic<Task*>*> mArray; // Pointer to the array
public:
WorkStealQueue(Allocator* alloc) : mAlloc(alloc), mTop(0), mBottom(0)
{
// ASSERT_ERROR(alloc != nullptr, "No allocator passed to WorkStealQueue");
mArray = ODIN_NEW_ARRAY(std::atomic<Task*>[WORK_QUEUE_SIZE], mAlloc);
}
WorkStealQueue(const WorkStealQueue& other) = delete;
WorkStealQueue& operator = (const WorkStealQueue& other) = delete;
~WorkStealQueue()
{
std::atomic<Task*>* p = mArray.load(std::memory_order_relaxed);
if (p)
ODIN_DELETE_ARRAY(p, mAlloc);
}
void push(Task* task)
{
size_t b = mBottom.load(std::memory_order_relaxed);
size_t t = mTop.load(std::memory_order_acquire);
std::atomic<Task*>* a = mArray.load(std::memory_order_relaxed);
if (b - t <= WORK_QUEUE_SIZE - 1)
{
a[b].store(task, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
mBottom.store(b + 1, std::memory_order_relaxed);
}
else
{
// Log error message
}
}
Task* pop()
{
size_t b = mBottom.load(std::memory_order_relaxed) - 1;
std::atomic<Task*>* a = mArray.load(std::memory_order_relaxed);
mBottom.store(b, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
size_t t = mTop.load(std::memory_order_relaxed);
if (t <= b)
{
Task* x = a[b].load(std::memory_order_relaxed);
if (t == b)
{
if (!mTop.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
x = nullptr;
mBottom.store(b + 1, std::memory_order_relaxed);
}
return x;
}
else
{
mBottom.store(b + 1, std::memory_order_relaxed);
return nullptr;
}
}
Task* steal()
{
size_t t = mTop.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
size_t b = mBottom.load(std::memory_order_acquire);
Task* x = nullptr;
if (t < b)
{
std::atomic<Task*>* a = mArray.load(std::memory_order_relaxed);
x = a[t].load(std::memory_order_relaxed);
if (!mTop.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
return nullptr;
}
return x;
}
};
//-----------------------------------------------------------------------------------------
/*
A simple queue representing the global work queue in the scheduler.
Pushing tasks into the back of this queue should be done by ONLY ONE THREAD.
Popping tasks from the front of the queue are thread safe.
*/
class GlobalWorkQueue
{
private:
Allocator* mAlloc; // Pointer to the passed allocator
std::atomic<size_t> mTop, mBottom; // The top and bottom indexes
std::atomic<Task*>* mArray; // Pointer to the array
GlobalWorkQueue(Allocator* alloc) : mAlloc(alloc), mTop(0), mBottom(0)
{
ASSERT_ERROR(alloc != nullptr, "No allocator passed to GlobalWorkQueue");
mArray = ODIN_NEW_ARRAY(std::atomic<Task*>[WORK_QUEUE_SIZE], mAlloc);
}
public:
GlobalWorkQueue(const GlobalWorkQueue& other) = delete;
GlobalWorkQueue& operator = (const GlobalWorkQueue& other) = delete;
~GlobalWorkQueue()
{
if (mArray)
ODIN_DELETE_ARRAY(mArray, mAlloc);
}
void push(Task* task)
{
if (mArray)
{
// This operation should be performed by a single thread only
if (mBottom < WORK_QUEUE_SIZE)
mArray[mBottom++] = task;
else
{
// TODO: Log error message
}
}
else
{
// TODO: Log error message
}
}
Task* pop()
{
if (mArray)
{
// This operation is thread safe
size_t t = mTop.load(std::memory_order_acquire);
if (t < mBottom)
{
// TODO: Check for race condition
mTop.store(t + 1, std::memory_order_release);
Task* x = mArray[t].load(std::memory_order_relaxed);
mArray[t].store(nullptr, std::memory_order_relaxed);
}
else
return nullptr;
}
else
return nullptr;
}
friend class Scheduler;
};
}
#endif // _WORK_STEAL_QUEUE_H_