Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1421 from MartinNowak/fix15268
Browse files Browse the repository at this point in the history
fix Issue 15268 - deadlock for Thread.getAll/Thread.opApply
  • Loading branch information
dnadlinger committed Oct 31, 2015
2 parents 1c95b58 + d56a259 commit 713aa05
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/core/exception.d
Expand Up @@ -682,7 +682,7 @@ private T staticError(T, Args...)(auto ref Args args)

// Suppress traceinfo generation when the GC cannot be used. Workaround for
// Bugzilla 14993. We should make stack traces @nogc instead.
private class SuppressTraceInfo : Throwable.TraceInfo
package class SuppressTraceInfo : Throwable.TraceInfo
{
override int opApply(scope int delegate(ref const(char[]))) const { return 0; }
override int opApply(scope int delegate(ref size_t, ref const(char[]))) const { return 0; }
Expand Down
104 changes: 76 additions & 28 deletions src/core/thread.d
Expand Up @@ -271,11 +271,8 @@ else version( Posix )
//Thread.add( obj );
scope( exit )
{
// NOTE: isRunning should be set to false after the thread is
// removed or a double-removal could occur between this
// function and thread_suspendAll.
Thread.remove( obj );
atomicStore!(MemoryOrder.raw)(obj.m_isRunning,false);
atomicStore!(MemoryOrder.raw)(obj.m_isRunning, false);
}
Thread.add( &obj.m_main );

Expand Down Expand Up @@ -1165,6 +1162,8 @@ class Thread

/**
* Provides a list of all threads currently being tracked by the system.
* Note that threads in the returned array might no longer run (see
* $(D Thread.)$(LREF isRunning)).
*
* Returns:
* An array containing references to all threads currently being
Expand All @@ -1173,47 +1172,82 @@ class Thread
*/
static Thread[] getAll()
{
synchronized( slock )
static void resize(ref Thread[] buf, size_t nlen)
{
size_t pos = 0;
Thread[] buf = new Thread[sm_tlen];

foreach( Thread t; Thread )
{
buf[pos++] = t;
}
return buf;
buf.length = nlen;
}
return getAllImpl!resize();
}


/**
* Operates on all threads currently being tracked by the system. The
* result of deleting any Thread object is undefined.
* Note that threads passed to the callback might no longer run (see
* $(D Thread.)$(LREF isRunning)).
*
* Params:
* dg = The supplied code as a delegate.
*
* Returns:
* Zero if all elemented are visited, nonzero if not.
*/
static int opApply( scope int delegate( ref Thread ) dg )
static int opApply(scope int delegate(ref Thread) dg)
{
synchronized( slock )
import core.stdc.stdlib : free, realloc;

static void resize(ref Thread[] buf, size_t nlen)
{
int ret = 0;
buf = (cast(Thread*)realloc(buf.ptr, nlen * Thread.sizeof))[0 .. nlen];
}
auto buf = getAllImpl!resize;
scope(exit) if (buf.ptr) free(buf.ptr);

foreach (t; buf)
{
if (auto res = dg(t))
return res;
}
return 0;
}

unittest
{
auto t1 = new Thread({
foreach (_; 0 .. 20)
Thread.getAll;
}).start;
auto t2 = new Thread({
foreach (_; 0 .. 20)
GC.collect;
}).start;
t1.join();
t2.join();
}

private static Thread[] getAllImpl(alias resize)()
{
import core.atomic;

for( Thread t = sm_tbeg; t; t = t.next )
Thread[] buf;
while (true)
{
immutable len = atomicLoad!(MemoryOrder.raw)(*cast(shared)&sm_tlen);
resize(buf, len);
assert(buf.length == len);
synchronized (slock)
{
ret = dg( t );
if( ret )
break;
if (len == sm_tlen)
{
size_t pos;
for (Thread t = sm_tbeg; t; t = t.next)
buf[pos++] = t;
return buf;
}
}
return ret;
}
}


///////////////////////////////////////////////////////////////////////////
// Static Initalizer
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1608,6 +1642,9 @@ private:
//
// All use of the global lists should synchronize on this lock.
//
// Careful as the GC acquires this lock after the GC lock to suspend all
// threads any GC usage with slock held can result in a deadlock through
// lock order inversion.
@property static Mutex slock() nothrow
{
return cast(Mutex)_locks[0].ptr;
Expand Down Expand Up @@ -1790,10 +1827,12 @@ private:
in
{
assert( t );
assert( t.next || t.prev );
}
body
{
// Thread was already removed earlier, might happen b/c of thread_detachInstance
if (!t.next && !t.prev)
return;
slock.lock_nothrow();
{
// NOTE: When a thread is removed from the global thread list its
Expand All @@ -1813,6 +1852,7 @@ private:
t.next.prev = t.prev;
if( sm_tbeg is t )
sm_tbeg = t.next;
t.prev = t.next = null;
--sm_tlen;
}
// NOTE: Don't null out t.next or t.prev because opApply currently
Expand Down Expand Up @@ -2012,6 +2052,7 @@ extern (C) void thread_init()
*/
extern (C) void thread_term()
{
assert(Thread.sm_tbeg && Thread.sm_tlen == 1);
Thread.termLocks();

version( OSX )
Expand Down Expand Up @@ -2290,11 +2331,13 @@ shared static ~this()
// NOTE: The functionality related to garbage collection must be minimally
// operable after this dtor completes. Therefore, only minimal
// cleanup may occur.

for( Thread t = Thread.sm_tbeg; t; t = t.next )
auto t = Thread.sm_tbeg;
while (t)
{
if( !t.isRunning )
Thread.remove( t );
auto tn = t.next;
if (!t.isRunning)
Thread.remove(t);
t = tn;
}
}

Expand Down Expand Up @@ -2612,8 +2655,10 @@ extern (C) void thread_suspendAll() nothrow
// abort, and Bad Things to occur.

Thread.criticalRegionLock.lock_nothrow();
for (Thread t = Thread.sm_tbeg; t !is null; t = t.next)
auto t = Thread.sm_tbeg;
while (t)
{
auto tn = t.next;
Duration waittime = dur!"usecs"(10);
Lagain:
if (!t.isRunning)
Expand All @@ -2632,6 +2677,7 @@ extern (C) void thread_suspendAll() nothrow
{
suspend(t);
}
t = tn;
}
Thread.criticalRegionLock.unlock_nothrow();
}
Expand Down Expand Up @@ -2936,6 +2982,8 @@ private void onThreadError(string msg = null, Throwable next = null) nothrow
__gshared ThreadError error = new ThreadError(null);
error.msg = msg;
error.next = next;
import core.exception : SuppressTraceInfo;
error.info = SuppressTraceInfo.instance;
throw error;
}

Expand Down

0 comments on commit 713aa05

Please sign in to comment.