Skip to content

Commit

Permalink
Revert "Add an asyncBuf overload for the case where data is being rea…
Browse files Browse the repository at this point in the history
…d into a buffer, and the buffer is to be recycled instead of reallocated. An example of this is by-line or by-chunk file I/O. Also fix two related edge-case bugs in TaskPool.map and the other overload of TaskPool.asyncBuf."

This reverts commit 382380c.

Conflicts:

	std/parallelism.d
  • Loading branch information
dsimcha committed Aug 14, 2011
1 parent 2c354d1 commit 3d5e64a
Showing 1 changed file with 25 additions and 217 deletions.
242 changes: 25 additions & 217 deletions std/parallelism.d
Expand Up @@ -376,22 +376,6 @@ private template ElementsCompatible(R, A) {
}
}

private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) {
enum isRoundRobin = true;
}

private template isRoundRobin(T) {
enum isRoundRobin = false;
}

unittest{
static assert(isRoundRobin!(
RoundRobinBuffer!(void delegate(char[]), bool delegate())
));

static assert(!isRoundRobin!uint);
}

/**
$(D Task) represents the fundamental unit of work. A $(D Task) may be
executed in parallel with any other $(D Task). Using this struct directly
Expand Down Expand Up @@ -1634,7 +1618,7 @@ public:
}

amapImpl.__ctor(pool, workUnitSize, FromType.init, buf1);
buf1 = fillBuf(buf1);
fillBuf(buf1);
submitBuf2();
}

Expand Down Expand Up @@ -1723,8 +1707,7 @@ public:
} else {

bool empty() @property {
// popFront() sets this when range is empty
return buf1.length == 0;
return buf1 is null; // popFront() sets this when range is empty
}
}
}
Expand All @@ -1744,24 +1727,17 @@ public:
Examples:
---
import std.conv, std.stdio;
auto lines = File("foo.txt").byLine();
auto duped = std.algorithm.map!"a.idup"(lines);
void main() {
// Fetch lines of a file in a background thread while processing
// prevously fetched lines, dealing with byLine's buffer recycling by
// eagerly duplicating every line.
auto lines = File("foo.txt").byLine();
auto duped = std.algorithm.map!"a.idup"(lines);
// Fetch more lines in the background while we process the lines already
// read into memory into a matrix of doubles.
double[][] matrix;
auto asyncReader = taskPool.asyncBuf(duped);
// Fetch more lines in the background while we process the lines already
// read into memory into a matrix of doubles.
double[][] matrix;
auto asyncReader = taskPool.asyncBuf(duped);
foreach(line; asyncReader) {
auto ls = line.split("\t");
matrix ~= to!(double[])(ls);
}
foreach(line; asyncReader) {
auto ls = line.split("\t");
matrix ~= to!(double[])(ls);
}
---
Expand All @@ -1770,7 +1746,7 @@ public:
Any exceptions thrown while iterating over $(D range) are re-thrown on a
call to $(D popFront).
*/
auto asyncBuf(R)(R range, size_t bufSize = 100) if(isInputRange!R) {
auto asyncBuf(R)(R range, size_t bufSize = 100) {
static final class AsyncBuf {
// This is a class because the task and the range both need to be on
// the heap.
Expand Down Expand Up @@ -1806,7 +1782,7 @@ public:
_length = range.length;
}

buf1 = fillBuf(buf1);
fillBuf(buf1);
submitBuf2();
}

Expand Down Expand Up @@ -1877,85 +1853,13 @@ public:

///
bool empty() @property {
// popFront() sets this when range is empty:
return buf1.length == 0;
return buf1 is null; // popFront() sets this when range is empty
}
}
}
return new AsyncBuf(range, bufSize, this);
}

/**
Given a callable object $(D next) that writes to a user-provided buffer and
a second callable object $(D empty) that determines whether more data is
available to write via $(D next), returns an input range that
asynchronously calls $(D next) with a set of size $(D nBuffers) of buffers
and makes the results available in the order they were obtained via the
input range interface of the returned object. Similarly to the
input range overload of $(D asyncBuf), the first half of the buffers
are made available via the range interface while the second half are
filled and vice-versa.
Params:
next = A callable object that takes a single argument that must be an array
with mutable elements. When called, $(D next) writes data to
the array provided by the caller.
empty = A callable object that takes no arguments and returns a type
implicitly convertible to $(D bool). This is used to signify
that no more data is available to be obtained by calling $(D next).
initialBufSize = The initial size of each buffer. If $(D next) takes its
array by reference, it may resize the buffers.
nBuffers = The number of buffers to cycle through when calling $(D next).
Examples:
---
// Fetch lines of a file in a background thread while processing prevously
// fetched lines, without duplicating any lines.
auto file = File("foo.txt");
void next(ref char[] buf) {
file.readln(buf);
}
// Fetch more lines in the background while we process the lines already
// read into memory into a matrix of doubles.
double[][] matrix;
auto asyncReader = taskPool.asyncBuf(&next, &file.eof);
foreach(line; asyncReader) {
auto ls = line.split("\t");
matrix ~= to!(double[])(ls);
}
---
$(B Exception Handling):
Any exceptions thrown while iterating over $(D range) are re-thrown on a
call to $(D popFront).
Warning:
Using the range returned by this function in a parallel foreach loop
will not work because buffers may be overwritten while the task that
processes them is in queue. This is checked for at compile time
and will result in a static assertion failure.
*/
auto asyncBuf(C1, C2)
(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
if(is(typeof(C2.init()) : bool) &&
ParameterTypeTuple!(C1).length == 1 &&
ParameterTypeTuple!(C2).length == 0 &&
isArray!(ParameterTypeTuple!(C1)[0])
) {
auto roundRobin = RoundRobinBuffer!(C1, C2)
(next, empty, initialBufSize, nBuffers);
return asyncBuf(roundRobin, nBuffers / 2);
}

/**
Parallel reduce on a random access range. Except as otherwise noted, usage
is similar to $(D std.algorithm.reduce) . This function works by splitting
Expand Down Expand Up @@ -2280,18 +2184,18 @@ public:
// a set of files, one for each thread. This allows results to be written
// out without any synchronization.
import std.stdio, std.conv, std.range, std.numeric;
import std.stdio, std.conv, std.range, std.numeric, std.parallelism;
void main() {
auto filesHandles = new File[taskPool.size + 1];
auto fileHandles = new File[taskPool.size + 1];
scope(exit) {
foreach(ref handle; fileHandles) {
handle.close();
}
}
foreach(i, ref handle; fileHandles) {
handle = File("workerResults" ~ to!string(i) ~ ".txt");
handle = File("workerResults" ~ to!string(i) ~ ".txt", "wb");
}
foreach(num; parallel(iota(1_000))) {
Expand Down Expand Up @@ -2800,12 +2704,15 @@ Calling the setter after the first call to $(D taskPool) does not changes
number of worker threads in the instance returned by $(D taskPool).
*/
@property uint defaultPoolThreads() @trusted {
return atomicLoad(_defaultPoolThreads);
// Kludge around lack of atomic load.
// return atomicLoad(_defaultPoolThreads);
return atomicOp!"+"(_defaultPoolThreads, 0U);
}

/// Ditto
@property void defaultPoolThreads(uint newVal) @trusted {
atomicStore(_defaultPoolThreads, newVal);
// atomicStore(_defaultPoolThreads, newVal);
cas(cast(shared) &_defaultPoolThreads, _defaultPoolThreads, newVal);
}

/**
Expand Down Expand Up @@ -3267,6 +3174,7 @@ if(!randLen!Range) {

static if(is(typeof(range.buf1)) && is(typeof(range.bufPos)) &&
is(typeof(range.doBufSwap()))) {
<<<<<<< HEAD
// Make sure we don't have the buffer recycling overload of
// asyncBuf.
static if(is(typeof(range.range)) &&
Expand All @@ -3275,6 +3183,8 @@ if(!randLen!Range) {
"the buffer recycling overload of asyncBuf.");
}

=======
>>>>>>> /bad-path/
enum bool bufferTrick = true;
} else {
enum bool bufferTrick = false;
Expand Down Expand Up @@ -3424,73 +3334,6 @@ private struct ParallelForeach(R) {
}
}

/*
This struct buffers the output of a callable that outputs data into a
user-supplied buffer into a set of buffers of some fixed size. It allows these
buffers to be accessed with an input range interface. This is used internally
in the buffer-recycling overload of TaskPool.asyncBuf, which creates an
instance and forwards it to the input range overload of asyncBuf.
*/
private struct RoundRobinBuffer(C1, C2) {
// No need for constraints because they're already checked for in asyncBuf.

alias ParameterTypeTuple!(C1.init)[0] Array;
alias typeof(Array.init[0]) T;

T[][] bufs;
size_t index;
C1 nextDel;
C2 emptyDel;
bool _empty;
bool primed;

this(
C1 nextDel,
C2 emptyDel,
size_t initialBufSize,
size_t nBuffers
) {
this.nextDel = nextDel;
this.emptyDel = emptyDel;
bufs.length = nBuffers;

foreach(ref buf; bufs) {
buf.length = initialBufSize;
}
}

void prime()
in {
assert(!empty);
} body {
scope(success) primed = true;
nextDel(bufs[index]);
}


T[] front() @property
in {
assert(!empty);
} body {
if(!primed) prime();
return bufs[index];
}

void popFront() {
if(empty || emptyDel()) {
_empty = true;
return;
}

index = (index + 1) % bufs.length;
primed = false;
}

bool empty() @property const pure nothrow @safe {
return _empty;
}
}

version(unittest) {
// This was the only way I could get nested maps to work.
__gshared TaskPool poolInstance;
Expand Down Expand Up @@ -3683,41 +3526,6 @@ unittest {
poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
));

{
auto file = File("tempDelMe.txt", "wb");
auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]];
foreach(row; written) {
file.writeln(join(to!(string[])(row), "\t"));
}

file.close();
scope(exit) {
import std.file;
remove("tempDelMe.txt");
}

file = File("tempDelMe.txt");

void next(ref char[] buf) {
file.readln(buf);
if(std.algorithm.endsWith(buf, '\n')) {
buf.length -= 1;
}
}

double[][] read;
auto asyncReader = taskPool.asyncBuf(&next, &file.eof);

foreach(line; asyncReader) {
if(line.length == 0) continue;
auto ls = line.split("\t");
read ~= to!(double[])(ls);
}

assert(read == written);
file.close();
}

// Test Map/AsyncBuf chaining.

auto abuf = poolInstance.asyncBuf(iota(-1, 3_000_000), 100);
Expand Down

0 comments on commit 3d5e64a

Please sign in to comment.