Skip to content

Fix segfaults on exiting std.parallelism #256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 16, 2011
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions std/parallelism.d
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,6 @@ ReturnType!(F) run(F, Args...)(F fpOrDelegate, ref Args args) {
return fpOrDelegate(args);
}


/**
Creates a $(D Task) on the GC heap that calls an alias. This may be executed
via $(D Task.executeInNewThread) or by submitting to a
Expand Down Expand Up @@ -818,6 +817,38 @@ the operating system.
*/
immutable uint totalCPUs;

/*
This class serves two purposes:

1. It distinguishes std.parallelism threads from other threads so that
the std.parallelism daemon threads can be terminated.

2. It adds a reference to the pool that the thread is a member of,
which is also necessary to allow the daemon threads to be properly
terminated.
*/
private final class ParallelismThread : Thread {
this(void delegate() dg) {
super(dg);
}

TaskPool pool;
}

// Kill daemon threads.
shared static ~this() {
auto allThreads = Thread.getAll();

foreach(thread; allThreads) {
auto pthread = cast(ParallelismThread) thread;
if(pthread is null) continue;
auto pool = pthread.pool;
if(!pool.isDaemon) continue;
pool.stop();
pthread.join();
}
}

/**
This class encapsulates a task queue and a set of worker threads. Its purpose
is to efficiently map a large number of $(D Task)s onto a smaller number of
Expand Down Expand Up @@ -845,7 +876,7 @@ private:
// Task.executeInNewThread().
bool isSingleTask;

Thread[] pool;
ParallelismThread[] pool;
Thread singleTaskThread;

AbstractTask* head;
Expand Down Expand Up @@ -1176,9 +1207,10 @@ public:
workerCondition = new Condition(queueMutex);
waiterCondition = new Condition(waiterMutex);

pool = new Thread[nWorkers];
pool = new ParallelismThread[nWorkers];
foreach(ref poolThread; pool) {
poolThread = new Thread(&workLoop);
poolThread = new ParallelismThread(&workLoop);
poolThread.pool = this;
poolThread.start();
}
}
Expand Down