Skip to content

Commit

Permalink
Merge topic 'threadPoolBugAndCleanup'
Browse files Browse the repository at this point in the history
25a149f BUG: ThreadPool hangs on Windows when ITK is compiled as DLLs
  • Loading branch information
dzenanz authored and kwrobot committed Nov 22, 2017
2 parents 518af76 + 25a149f commit 1e2ec22
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 34 deletions.
6 changes: 2 additions & 4 deletions Modules/Core/Common/include/itkThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ITKCommon_EXPORT ThreadPool : public Object
void AddThreads(ThreadIdType count);

/** The approximate number of idle threads. */
ThreadIdType GetNumberOfCurrentlyIdleThreads() const;
int GetNumberOfCurrentlyIdleThreads() const;

/** This method blocks until the given job has finished executing. */
void WaitForJob(Semaphore& jobSemaphore);
Expand Down Expand Up @@ -125,8 +125,6 @@ class ITKCommon_EXPORT ThreadPool : public Object
private:
ITK_DISALLOW_COPY_AND_ASSIGN(ThreadPool);

ThreadIdType m_ThreadCount;

/** Set if exception occurs */
bool m_ExceptionOccurred;

Expand All @@ -152,7 +150,7 @@ class ITKCommon_EXPORT ThreadPool : public Object
static Pointer m_ThreadPoolInstance;

/** The continuously running thread function */
static void * ThreadExecute(void *param);
static ITK_THREAD_RETURN_TYPE ThreadExecute(void *param);
};

}
Expand Down
6 changes: 3 additions & 3 deletions Modules/Core/Common/src/itkMultiThreader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ ThreadIdType MultiThreader::GetGlobalDefaultNumberOfThreads()
}

MultiThreader::MultiThreader() :
m_ThreadPool(ThreadPool::GetInstance() ),
m_ThreadPool( ThreadPool::GetInstance() ),
m_UseThreadPool( MultiThreader::GetGlobalDefaultUseThreadPool() )
{
for( ThreadIdType i = 0; i < ITK_MAX_THREADS; ++i )
Expand All @@ -188,8 +188,8 @@ MultiThreader::MultiThreader() :
m_SingleData = ITK_NULLPTR;
if (m_UseThreadPool)
{
ThreadIdType idleCount = m_ThreadPool->GetNumberOfCurrentlyIdleThreads();
idleCount = std::max(1u, idleCount);
ThreadIdType idleCount = std::max<ThreadIdType>(1u,
m_ThreadPool->GetNumberOfCurrentlyIdleThreads());
ThreadIdType maxCount = std::max(1u, GetGlobalDefaultNumberOfThreads());
m_NumberOfThreads = std::min(maxCount, idleCount);
}
Expand Down
65 changes: 43 additions & 22 deletions Modules/Core/Common/src/itkThreadPool.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,23 @@ ::GetInstance()
MutexLockHolder<SimpleFastMutexLock> mutexHolder(m_Mutex);
if( m_ThreadPoolInstance.IsNull() )
{
// Try the factory first
m_ThreadPoolInstance = ObjectFactory< Self >::Create();
// if the factory did not provide one, then create it here
if ( m_ThreadPoolInstance.IsNull() )
{
m_ThreadPoolInstance = new ThreadPool();
// Remove extra reference from construction.
m_ThreadPoolInstance->UnRegister();
new ThreadPool(); //constructor sets m_ThreadPoolInstance
}
}
return m_ThreadPoolInstance;
}

ThreadPool
::ThreadPool() :
m_ThreadCount(0),
m_ExceptionOccurred(false)
{
MutexLockHolder<SimpleFastMutexLock> mutexHolder(m_Mutex);
m_ThreadPoolInstance = this; //threads need this
m_ThreadPoolInstance->UnRegister(); // Remove extra reference
PlatformCreate(m_ThreadsSemaphore);
AddThreads(this->GetGlobalDefaultNumberOfThreads());
}

ThreadIdType
Expand Down Expand Up @@ -147,7 +142,6 @@ ::AddThreads(ThreadIdType count)
for( unsigned int i = 0; i < count; ++i )
{
AddThread();
m_ThreadCount++;
}
}

Expand All @@ -165,12 +159,16 @@ ::DeleteThreads()
}
}

ThreadIdType
int
ThreadPool
::GetNumberOfCurrentlyIdleThreads() const
{
MutexLockHolder<SimpleFastMutexLock> mutexHolder(m_Mutex);
return GetGlobalDefaultNumberOfThreads() - m_WorkQueue.size(); // lousy approximation
if ( m_Threads.empty() ) //not yet initialized
{
const_cast<ThreadPool *>(this)->AddThreads(ThreadPool::GetGlobalDefaultNumberOfThreads());
}
return int(m_Threads.size()) - int(m_WorkQueue.size()); // lousy approximation
}

ITK_THREAD_RETURN_TYPE
Expand All @@ -182,20 +180,39 @@ noOperation(void *)
ThreadPool
::~ThreadPool()
{
//add dummy jobs for clean thread exit
std::vector<Semaphore> jobSem(m_Threads.size());
for (ThreadIdType i = 0; i < m_Threads.size(); i++)
bool waitForThreads = true;
#if defined(WIN32) && defined(ITKCommon_EXPORTS)
//this destructor is called during DllMain's DLL_PROCESS_DETACH.
//Because ITKCommon-4.X.dll is usually being detached due to process termination,
//lpvReserved is non-NULL meaning that "all threads in the process
//except the current thread either have exited already or have been
//explicitly terminated by a call to the ExitProcess function".
if ( !m_Threads.empty() ) //thread pool was used
{
ThreadJob dummy;
dummy.m_ThreadFunction = &noOperation;
dummy.m_Semaphore = &jobSem[i];
dummy.m_UserData = ITK_NULLPTR; //makes dummy jobs easier to spot while debugging
AddWork(dummy);
DWORD dwWaitResult = WaitForSingleObject(m_Threads[0], 1);
if (dwWaitResult == WAIT_OBJECT_0) //thread has finished
{
waitForThreads = false;
}
}
#endif

for (ThreadIdType i = 0; i < m_Threads.size(); i++)
if (waitForThreads) //add dummy jobs for clean thread exit
{
WaitForJob(jobSem[i]);
std::vector<Semaphore> jobSem(m_Threads.size());
for (ThreadIdType i = 0; i < m_Threads.size(); i++)
{
ThreadJob dummy;
dummy.m_ThreadFunction = &noOperation;
dummy.m_Semaphore = &jobSem[i];
dummy.m_UserData = ITK_NULLPTR; //makes dummy jobs easier to spot while debugging
AddWork(dummy);
}

for (ThreadIdType i = 0; i < m_Threads.size(); i++)
{
WaitForJob(jobSem[i]);
}
}

DeleteThreads();
Expand All @@ -216,6 +233,10 @@ ::AddWork(const ThreadJob& threadJob)
{
{
MutexLockHolder<SimpleFastMutexLock> mutexHolder(m_Mutex);
if ( m_Threads.empty() ) //first job
{
AddThreads(ThreadPool::GetGlobalDefaultNumberOfThreads());
}
m_WorkQueue.push_back(threadJob);
}

Expand All @@ -224,7 +245,7 @@ ::AddWork(const ThreadJob& threadJob)
}


void *
ITK_THREAD_RETURN_TYPE
ThreadPool
::ThreadExecute(void *)
{
Expand Down Expand Up @@ -262,7 +283,7 @@ ::ThreadExecute(void *)
threadPool->m_ExceptionOccurred = true;
throw;
}
return ITK_NULLPTR;
return ITK_THREAD_RETURN_VALUE;
}

}
13 changes: 8 additions & 5 deletions Modules/Core/Common/src/itkThreadPoolWin32.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*=========================================================================*/
#include "itkThreadPool.h"
#include <utility>
#include <process.h>

namespace itk
{
Expand Down Expand Up @@ -57,7 +58,7 @@ void
ThreadPool
::PlatformCreate(Semaphore &semaphore)
{
semaphore = CreateSemaphore(ITK_NULLPTR, 0, 1000, ITK_NULLPTR);
semaphore = CreateSemaphore(ITK_NULLPTR, 0, 0x7ffffffel, ITK_NULLPTR);
if (semaphore == ITK_NULLPTR)
{
itkGenericExceptionMacro(<< "CreateSemaphore error. " << GetLastErrorAsString());
Expand Down Expand Up @@ -102,19 +103,21 @@ ::PlatformClose(ThreadProcessIdType &threadId)
return CloseHandle(threadId);
}

typedef unsigned int(__cdecl *ThreadExFunction)(void *);

void
ThreadPool
::AddThread()
{
ThreadProcessIdType threadHandle = CreateThread(
ThreadProcessIdType threadHandle = reinterpret_cast<ThreadProcessIdType>(_beginthreadex(
ITK_NULLPTR,
0,
(LPTHREAD_START_ROUTINE) ThreadPool::ThreadExecute,
ThreadExFunction(ThreadPool::ThreadExecute),
ITK_NULLPTR,
0,
ITK_NULLPTR);
ITK_NULLPTR));

if( threadHandle == NULL )
if (threadHandle == ITK_NULLPTR)
{
itkDebugMacro(<< "ERROR adding thread to thread pool");
itkExceptionMacro(<< "Cannot create thread. " << GetLastErrorAsString());
Expand Down

0 comments on commit 1e2ec22

Please sign in to comment.