/
boundedBuffer.chpl
270 lines (236 loc) · 9.54 KB
/
boundedBuffer.chpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
/* boundedBuffer.chpl - a simple bounded buffer pattern.
*
* Based on a UW CSEP 524 exercise developed by Brandon Myers, February 2013
* Updated by Brad Chamberlain, November 2016
*
*/
use Random, Time; // get a RNG and sleep() to introduce noise
use IO; // allow use of stderr
config const numProducers = 1, // the number of producers to create
numConsumers = 1, // the number of consumers to create
bufferSize = 30, // the capacity of the bounded buffer
numItems = 1000, // the number of items to produce/consume
verbose = true, // print what's produced/consumed?
//
// the following configs control whether or not noise is
// injected into the computation in the form of sleep()
// calls for random numbers of seconds (0.0-1.0), scaled
// by the given factors for the producer and consumer.
//
noisy = true, // inject noise into the computation?
prodNoiseScale = 100,
consNoiseScale = prodNoiseScale;
//
// STEP 0: Compile and run the code as-is. In the default --verbose
// mode, you'll see a trace of what the single producer and consumer
// are doing and tallies at the end showing how many values were
// produced and consumed.
//
//
// The following check is here to make sure that nobody tries to run
// the baseline framework with multiple producers and consumers, since
// it's not designed to handle that case well.
//
if (numProducers > 1 || numConsumers > 1) then
halt("This program needs changes to handle multiple producers/consumers");
//
// STEP 1: When you're ready to start modifying the code, remove the
// stopgap test + halt just above. You may also want to change the
// default number of producers and consumers above to avoid having to
// set them on every run (I had good results with 5 of each).
//
// STEP 2: Modify task 1 of the cobegin below to create 'numProducers'
// producers numbered 1..numProducers. Similarly, modify task 2 so
// that it creates 'numConsumers' consumers numbered 1..numConsumers.
// While you can compile and run at this point (and should), note that
// the program is probably not correct (see STEP 3 below).
//
proc main() {
// a shared bounded buffer with the requested capacity
var buffer = new owned BoundedBuffer(capacity=bufferSize);
// per-producer/consumer counts of the number of items they processed
var prodCounts: [1..numProducers] int,
consCounts: [1..numConsumers] int;
// spawn two tasks using a 'cobegin'
cobegin with (ref prodCounts, ref consCounts) {
// Task 1: run a single producer and store the number of things
// it produces in 'prodCounts[1]'. When it's done, write a
// sentinel value per consumer.
{
prodCounts[1] = producer(buffer, pid=1);
for i in 1..numConsumers do
buffer.writeSentinel();
}
// Task 2: create a consumer and store the number of things it
// consumers in 'consCounts[1]'.
consCounts[1] = consumer(buffer, cid=1);
}
// if we're in verbose mode, print out the counts
if (verbose) {
writeln("Producer counts: ", prodCounts);
writeln("Consumer counts: ", consCounts);
}
// count the total number of things produced and consumed (not
// particularly interested until there are multiple producers
// and consumers).
const prodTot = + reduce prodCounts,
consTot = + reduce consCounts;
// verify that the counts came out as expected
if (prodTot != numItems) then
stderr.writef("Producer(s) produced %i items rather than %i\n",
prodTot, numItems);
else if (consTot != numItems) then
stderr.writef("Consumer(s) consumed %i items rather than %i\n",
consTot, numItems);
else
stderr.writef("Producers/consumers processed %i items total.\n", numItems);
}
//
// produce 1/numProducers of the requested 'numItems' items using an
// aligned strided range. Return the number of items we produced.
//
proc producer(b: BoundedBuffer, pid: int) {
var myItems = 0..#numItems by numProducers align pid-1;
for i in myItems {
if verbose then writeln("producer ", pid, " producing ", i);
b.produce(i);
}
return myItems.size;
}
//
// consume items greedily until a sentinel value is found. Return
// the number of items we successfully consumed.
//
proc consumer(b: BoundedBuffer, cid: int) {
var count = 0;
do {
const (data, more) = b.consume();
if more {
if verbose then writeln("consumer ", cid, " consumed ", data);
count += 1;
} else {
if verbose then writeln("consumer ", cid, " got its sentinel value ");
}
} while (more);
return count;
}
//
// STEP 3: The head and tail member variables below are currently
// unsynchronized, meaning that when multiple producers and consumers
// are running, their reads and writes to them may race. The
// problematic statement is the following one from 'advance()':
//
// pos = (pos + 1) % capacity;
//
// Specifically, if two tasks are executing this statement
// simultaneously, their operations may interleave as follows:
// Task A reads pos -- let's say that it has the value '21'
// Task B reads pos, which is still '21'
// Task A computes the new value of pos as '22' and writes it
// Task B also computes the value of '22' and writes it
// each task returns '21'
//
// This is a race because it means that both tasks will have obtained
// the same position rather than each getting a unique position.
//
// While the chances of this race are extremely slim (and slimmer in
// --verbose=true mode because of the time required for all of the
// printing), if this were your bank account or airline reservation,
// you wouldn't want to take the risk of not hitting it.
//
// A conventional approach might be to put some sort of mutual
// exclusion around the noted statement so that only one task could
// execute it at a time, but in Chapel, we prefer data-centric
// coordination between tasks. To that end, guard against this race
// by changing head and tail to 'sync' variables. Note that in
// addition to changing their definitions, you'll need to change the
// advance() helper function below.
//
// STEP 4: Now try doing step 3 again by making head and tail 'atomic'
// variables (save a copy of your 'sync' version in case you want to
// return to it later).
//
// Hints for the atomic-based solution:
//
// (1) atomic variables can't currently be initialized at their
// declaration point (something we're working on fixing in the
// language), but since they are integers, their initial values
// will be zero, so you probably will not need to initialize them.
// If you do, add an initializer to 'BoundedBuffer' of the form:
//
// proc init() {
// ... put your code to initialize the atomics here ...
// }
//
// (2) read(), write() and compareAndSwap() are going to be the most
// useful methods on atomics for this exercise. If you haven't worked
// with atomics before, refer to the online documentation or ask one
// of the helpers for a hint:
//
// https://chapel-lang.org/docs/builtins/internal/Atomics.html
//
// STEP 5 (optional): Compare the performance of your two versions
// (note: you may want to turn off the --noisy and/or --verbose
// configs when doing timings, and as always for performance runs, be
// sure to compile with the --fast flag).
//
// STEP 6 (optional): What would it take to make this bounded buffer
// code a distributed memory program?
//
// STEP 7 (optional and macho): What would it take to make this
// bounded buffer code into a *scalable* distributed memory program?
//
//
// This is a generic bounded buffer class
//
class BoundedBuffer {
type eltType = real; // it can store scalar types; the default is 'real'
const capacity: int, // the capacity of the buffer
sentinel: eltType = -1.0; // the sentinel value
var buff: [0..#capacity] sync eltType, // the sync values, empty by default
head = 0, // the head's cursor position
tail = 0; // the tail's cursor position
var rng = new randomStream(real, false);
proc init(type eltType = real, capacity: int, sentinel: eltType = -1.0) {
this.eltType = eltType;
this.capacity = capacity;
this.sentinel = sentinel;
}
//
// Place an item at the head position of the buffer, assuming
// it's available (empty). If not, the write to 'buff[head]' will
// block until it is. Then advance the 'head' position.
//
proc produce(item: eltType) {
if noisy then sleep(rng.next() / prodNoiseScale);
buff[advance(head)].writeEF(item);
}
//
// Mark that all the producers are done. This should be called once
// per consumer and only once all producers are done.
//
proc writeSentinel() {
produce(sentinel);
}
//
// Consume() an item from the tail position of the buffer, assuming
// it's available (full). If not, the read from 'buff[tail]' will
// block until it is. Then advance the 'tail' position. Return a
// tuple containing (1) the data value read and (2) 'true' if there
// is more data to be read (the sentinel value has not been found),
// 'false' otherwise.
//
proc consume(): (eltType, bool) {
if noisy then sleep(rng.next() / consNoiseScale);
const val = buff[advance(tail)].readFE();
return (val, val != sentinel);
}
//
// a simple helper function for advancing the head or tail position.
//
inline proc advance(ref pos) {
const prevPos = pos;
pos = (pos + 1) % capacity;
return prevPos;
}
}