Showing with 100 additions and 29 deletions.
  1. +100 −29 src/core/thread.d
129 changes: 100 additions & 29 deletions src/core/thread.d
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ version( Windows )
obj.m_main.tstack = obj.m_main.bstack;
obj.m_tlsgcdata = rt_tlsgc_init();

Thread.setThis(obj);
// Thread may only be suspended after setThis
atomicStore(obj.m_isInCriticalRegion, false);

Thread.setThis( obj );
//Thread.add( obj );
scope( exit )
{
Thread.remove( obj );
Expand Down Expand Up @@ -261,10 +259,8 @@ else version( Posix )
obj.m_tlsgcdata = rt_tlsgc_init();

atomicStore!(MemoryOrder.raw)(obj.m_isRunning, true);
Thread.setThis(obj);
// Thread may only be suspended after setThis
atomicStore(obj.m_isInCriticalRegion, false);

Thread.setThis( obj );
//Thread.add( obj );
scope( exit )
{
// NOTE: isRunning should be set to false after the thread is
Expand Down Expand Up @@ -385,9 +381,14 @@ else version( Posix )
// stack, any other stack data used by this function should
// be gone before the stack cleanup code is called below.
Thread obj = Thread.getThis();
assert(obj !is null);

if( !obj.m_lock )
// NOTE: The thread reference returned by getThis is set within
// the thread startup code, so it is possible that this
// handler may be called before the reference is set. In
// this case it is safe to simply suspend and not worry
// about the stack pointers as the thread will not have
// any references to GC-managed data.
if( obj && !obj.m_lock )
{
obj.m_curr.tstack = getStackTop();
}
Expand All @@ -401,13 +402,13 @@ else version( Posix )
status = sigdelset( &sigres, resumeSignalNumber );
assert( status == 0 );

version (FreeBSD) obj.m_suspendagain = false;
version (FreeBSD) Thread.sm_suspendagain = false;
status = sem_post( &suspendCount );
assert( status == 0 );

sigsuspend( &sigres );

if( !obj.m_lock )
if( obj && !obj.m_lock )
{
obj.m_curr.tstack = obj.m_curr.bstack;
}
Expand All @@ -418,7 +419,7 @@ else version( Posix )
{
if (THR_IN_CRITICAL(pthread_self()))
{
Thread.getThis().m_suspendagain = true;
Thread.sm_suspendagain = true;
if (sem_post(&suspendCount)) assert(0);
return;
}
Expand Down Expand Up @@ -630,9 +631,6 @@ class Thread
onThreadError( "Error creating thread" );
}

// Start thread as non-suspendable, undone in thread_entryPoint
atomicStore(m_isInCriticalRegion, true);

// NOTE: The starting thread must be added to the global thread list
// here rather than within thread_entryPoint to prevent a race
// with the main thread, which could finish and terminat the
Expand Down Expand Up @@ -682,6 +680,16 @@ class Thread
onThreadError( "Error creating thread" );
}

// NOTE: when creating threads from inside a DLL, DllMain(THREAD_ATTACH)
// might be called before ResumeThread returns, but the dll
// helper functions need to know whether the thread is created
// from the runtime itself or from another DLL or the application
// to just attach to it
// as a consequence, the new Thread object is added before actual
// creation of the thread. There should be no problem with the GC
// calling thread_suspendAll, because of the slock synchronization
//
// VERIFY: does this actually also apply to other platforms?
add( this );
return this;
}
Expand Down Expand Up @@ -1111,7 +1119,19 @@ class Thread
// NOTE: This function may not be called until thread_init has
// completed. See thread_suspendAll for more information
// on why this might occur.
return sm_this;
version( OSX )
{
return sm_this;
}
else version( Posix )
{
auto t = cast(Thread) pthread_getspecific( sm_this );
return t;
}
else
{
return sm_this;
}
}


Expand Down Expand Up @@ -1335,7 +1355,22 @@ private:
//
// Local storage
//
static Thread sm_this;
version( OSX )
{
static Thread sm_this;
}
else version( Posix )
{
// On Posix (excluding OSX), pthread_key_t is explicitly used to
// store and access thread reference. This is needed
// to avoid TLS access in signal handlers (malloc deadlock)
// when using shared libraries, see issue 11981.
__gshared pthread_key_t sm_this;
}
else
{
static Thread sm_this;
}


//
Expand All @@ -1346,7 +1381,7 @@ private:
version (FreeBSD)
{
// set when suspend failed and should be retried, see Issue 13416
shared bool m_suspendagain;
static shared bool sm_suspendagain;
}


Expand Down Expand Up @@ -1375,7 +1410,7 @@ private:
shared bool m_isRunning;
}
bool m_isDaemon;
shared bool m_isInCriticalRegion;
bool m_isInCriticalRegion;
Throwable m_unhandled;

version( Solaris )
Expand All @@ -1394,7 +1429,18 @@ private:
//
static void setThis( Thread t )
{
sm_this = t;
version( OSX )
{
sm_this = t;
}
else version( Posix )
{
pthread_setspecific( sm_this, cast(void*) t );
}
else
{
sm_this = t;
}
}


Expand Down Expand Up @@ -1533,7 +1579,12 @@ private:
return cast(Mutex)_locks[0].ptr;
}

__gshared byte[__traits(classInstanceSize, Mutex)][1] _locks;
@property static Mutex criticalRegionLock() nothrow
{
return cast(Mutex)_locks[1].ptr;
}

__gshared byte[__traits(classInstanceSize, Mutex)][2] _locks;

static void initLocks()
{
Expand Down Expand Up @@ -1911,6 +1962,9 @@ extern (C) void thread_init()

status = sem_init( &suspendCount, 0, 0 );
assert( status == 0 );

status = pthread_key_create( &Thread.sm_this, null );
assert( status == 0 );
}
Thread.sm_main = thread_attachThis();
}
Expand All @@ -1923,6 +1977,14 @@ extern (C) void thread_init()
extern (C) void thread_term()
{
Thread.termLocks();

version( OSX )
{
}
else version( Posix )
{
pthread_key_delete( Thread.sm_this );
}
}


Expand Down Expand Up @@ -2457,7 +2519,7 @@ private void suspend( Thread t ) nothrow
version (FreeBSD)
{
// avoid deadlocks, see Issue 13416
if (t.m_suspendagain) goto Lagain;
if (Thread.sm_suspendagain) goto Lagain;
}
}
else if( !t.m_lock )
Expand Down Expand Up @@ -2512,6 +2574,7 @@ extern (C) void thread_suspendAll() nothrow
// cause the second suspend to fail, the garbage collection to
// abort, and Bad Things to occur.

Thread.criticalRegionLock.lock();
for (Thread t = Thread.sm_tbeg; t !is null; t = t.next)
{
Duration waittime = dur!"usecs"(10);
Expand All @@ -2520,17 +2583,20 @@ extern (C) void thread_suspendAll() nothrow
{
Thread.remove(t);
}
else if (atomicLoad(t.m_isInCriticalRegion))
else if (t.m_isInCriticalRegion)
{
Thread.criticalRegionLock.unlock();
Thread.sleep(waittime);
if (waittime < dur!"msecs"(10)) waittime *= 2;
Thread.criticalRegionLock.lock();
goto Lagain;
}
else
{
suspend(t);
}
}
Thread.criticalRegionLock.unlock();
}
}

Expand Down Expand Up @@ -2779,7 +2845,8 @@ in
}
body
{
atomicStore(Thread.getThis().m_isInCriticalRegion, true);
synchronized (Thread.criticalRegionLock)
Thread.getThis().m_isInCriticalRegion = true;
}


Expand All @@ -2797,7 +2864,8 @@ in
}
body
{
atomicStore(Thread.getThis().m_isInCriticalRegion, false);
synchronized (Thread.criticalRegionLock)
Thread.getThis().m_isInCriticalRegion = false;
}


Expand All @@ -2814,7 +2882,8 @@ in
}
body
{
return atomicLoad(Thread.getThis().m_isInCriticalRegion);
synchronized (Thread.criticalRegionLock)
return Thread.getThis().m_isInCriticalRegion;
}


Expand Down Expand Up @@ -2884,11 +2953,13 @@ unittest
thr.start();

sema.wait();
assert(atomicLoad(thr.m_isInCriticalRegion));
synchronized (Thread.criticalRegionLock)
assert(thr.m_isInCriticalRegion);
semb.notify();

sema.wait();
assert(!atomicLoad(thr.m_isInCriticalRegion));
synchronized (Thread.criticalRegionLock)
assert(!thr.m_isInCriticalRegion);
semb.notify();

thr.join();
Expand Down