Skip to content

Commit

Permalink
Warper: Eliminate thread lock timeouts on Windows (#3788) (fixes #3631)
Browse files Browse the repository at this point in the history
  • Loading branch information
abellgithub committed May 4, 2021
1 parent 3899277 commit 15cc546
Showing 1 changed file with 109 additions and 140 deletions.
249 changes: 109 additions & 140 deletions gdal/alg/gdalwarpkernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

#include <algorithm>
#include <limits>
#include <mutex>
#include <new>
#include <vector>

Expand Down Expand Up @@ -200,33 +201,39 @@ static CPLErr GWKBilinearNoMasksOrDstDensityOnlyUShort( GDALWarpKernel * );
/* GWKJobStruct */
/************************************************************************/

typedef struct _GWKJobStruct GWKJobStruct;

struct _GWKJobStruct
struct GWKJobStruct
{
std::mutex& mutex;
std::condition_variable& cv;
int& counter;
bool& stopFlag;
GDALWarpKernel *poWK;
int iYMin;
int iYMax;
volatile int *pnCounter;
volatile int *pbStop;
CPLCond *hCond;
CPLMutex *hCondMutex;
int (*pfnProgress)(GWKJobStruct* psJob);
void *pTransformerArg;

void (*pfnFunc)(void*); // used by GWKRun() to assign the proper pTransformerArg
int iYMin;
int iYMax;
int (*pfnProgress)(GWKJobStruct* psJob);
void *pTransformerArg;
void (*pfnFunc)(void*); // used by GWKRun() to assign the proper pTransformerArg

GWKJobStruct(std::mutex& mutex_, std::condition_variable& cv_,
int& counter_, bool& stopFlag_) :
mutex(mutex_), cv(cv_), counter(counter_), stopFlag(stopFlag_),
poWK(nullptr), iYMin(0), iYMax(0), pfnProgress(nullptr), pTransformerArg(nullptr),
pfnFunc(nullptr)
{}
} ;

struct GWKThreadData
{
std::unique_ptr<CPLJobQueue> poJobQueue{};
GWKJobStruct* pasThreadJob = nullptr;
int nThreads = 0;
CPLCond* hCond = nullptr;
CPLMutex* hCondMutex = nullptr;
bool bTransformerArgInputAssignedToThread = false;
void* pTransformerArgInput = nullptr; // owned by calling layer. Not to be destroyed
std::map<GIntBig, void*> mapThreadToTransformerArg{};
std::unique_ptr<CPLJobQueue> poJobQueue {};
std::unique_ptr<std::vector<GWKJobStruct>> threadJobs {};
int nThreads {0};
int counter {0};
bool stopFlag {false};
std::mutex mutex {};
std::condition_variable cv {};
bool bTransformerArgInputAssignedToThread {false};
void * pTransformerArgInput {nullptr}; // owned by calling layer. Not to be destroyed
std::map<GIntBig, void*> mapThreadToTransformerArg {};
};

/************************************************************************/
Expand All @@ -236,13 +243,15 @@ struct GWKThreadData
// Return TRUE if the computation must be interrupted.
static int GWKProgressThread( GWKJobStruct* psJob )
{
CPLAcquireMutex(psJob->hCondMutex, 1.0);
(*(psJob->pnCounter))++;
CPLCondSignal(psJob->hCond);
int bStop = *(psJob->pbStop);
CPLReleaseMutex(psJob->hCondMutex);
bool stop = false;
{
std::lock_guard<std::mutex> lock(psJob->mutex);
psJob->counter++;
stop = psJob->stopFlag;
}
psJob->cv.notify_one();

return bStop;
return stop;
}

/************************************************************************/
Expand All @@ -253,13 +262,12 @@ static int GWKProgressThread( GWKJobStruct* psJob )
static int GWKProgressMonoThread( GWKJobStruct* psJob )
{
GDALWarpKernel *poWK = psJob->poWK;
int nCounter = ++(*(psJob->pnCounter));
if( !poWK->pfnProgress( poWK->dfProgressBase + poWK->dfProgressScale *
(nCounter / static_cast<double>(psJob->iYMax)),
"", poWK->pProgress ) )
(++psJob->counter / static_cast<double>(psJob->iYMax)),
"", poWK->pProgress ) )
{
CPLError( CE_Failure, CPLE_UserInterrupt, "User terminated" );
*(psJob->pbStop) = TRUE;
psJob->stopFlag = true;
return TRUE;
}
return FALSE;
Expand All @@ -270,25 +278,20 @@ static int GWKProgressMonoThread( GWKJobStruct* psJob )
/************************************************************************/

static CPLErr GWKGenericMonoThread( GDALWarpKernel *poWK,
void (*pfnFunc) (void *pUserData) )
void (*pfnFunc) (void *pUserData) )
{
volatile int bStop = FALSE;
volatile int nCounter = 0;

GWKJobStruct sThreadJob;
sThreadJob.poWK = poWK;
sThreadJob.pnCounter = &nCounter;
sThreadJob.iYMin = 0;
sThreadJob.iYMax = poWK->nDstYSize;
sThreadJob.pbStop = &bStop;
sThreadJob.hCond = nullptr;
sThreadJob.hCondMutex = nullptr;
sThreadJob.pfnProgress = GWKProgressMonoThread;
sThreadJob.pTransformerArg = poWK->pTransformerArg;

pfnFunc(&sThreadJob);

return !bStop ? CE_None : CE_Failure;
GWKThreadData td;

// NOTE: the mutex is not used.
GWKJobStruct job(td.mutex, td.cv, td.counter, td.stopFlag);
job.poWK = poWK;
job.iYMin = 0;
job.iYMax = poWK->nDstYSize;
job.pfnProgress = GWKProgressMonoThread;
job.pTransformerArg = poWK->pTransformerArg;
pfnFunc(&job);

return td.stopFlag ? CE_Failure : CE_None;
}

/************************************************************************/
Expand All @@ -315,41 +318,18 @@ void* GWKThreadsCreate( char** papszWarpOptions,
nThreads = 128;

GWKThreadData* psThreadData = new GWKThreadData();
CPLCond* hCond = nullptr;
if( nThreads )
hCond = CPLCreateCond();
auto poThreadPool = nThreads > 0 ? GDALGetGlobalThreadPool(nThreads) : nullptr;
if( nThreads && hCond && poThreadPool )
auto poThreadPool =
nThreads > 0 ? GDALGetGlobalThreadPool(nThreads) : nullptr;
if( nThreads && poThreadPool )
{
psThreadData->nThreads = nThreads;
psThreadData->hCond = hCond;
psThreadData->pasThreadJob = static_cast<GWKJobStruct *>(
VSI_CALLOC_VERBOSE(sizeof(GWKJobStruct), nThreads));
if( psThreadData->pasThreadJob == nullptr )
{
GWKThreadsEnd(psThreadData);
return nullptr;
}

psThreadData->hCondMutex = CPLCreateMutex();
if( psThreadData->hCondMutex == nullptr )
{
GWKThreadsEnd(psThreadData);
return nullptr;
}
CPLReleaseMutex(psThreadData->hCondMutex);

for( int i = 0; i < nThreads; i++ )
{
psThreadData->pasThreadJob[i].hCond = psThreadData->hCond;
psThreadData->pasThreadJob[i].hCondMutex = psThreadData->hCondMutex;
}
psThreadData->threadJobs.reset(new std::vector<GWKJobStruct>(nThreads,
GWKJobStruct(psThreadData->mutex, psThreadData->cv,
psThreadData->counter, psThreadData->stopFlag)));

psThreadData->poJobQueue = poThreadPool->CreateJobQueue();
psThreadData->pTransformerArgInput = pTransformerArg;
}
else if( hCond )
CPLDestroyCond(hCond);

return psThreadData;
}
Expand All @@ -373,11 +353,6 @@ void GWKThreadsEnd( void* psThreadDataIn )
}
psThreadData->poJobQueue.reset();
}
CPLFree(psThreadData->pasThreadJob);
if( psThreadData->hCond )
CPLDestroyCond(psThreadData->hCond);
if( psThreadData->hCondMutex )
CPLDestroyMutex(psThreadData->hCondMutex);
delete psThreadData;
}

Expand All @@ -394,20 +369,23 @@ static void ThreadFuncAdapter(void* pData)
// Look if we have already a per-thread transformer
void* pTransformerArg = nullptr;
const GIntBig nThreadId = CPLGetPID();
CPLAcquireMutex(psThreadData->hCondMutex, 1.0);
auto oIter = psThreadData->mapThreadToTransformerArg.find(nThreadId);
if (oIter != psThreadData->mapThreadToTransformerArg.end())
{
pTransformerArg = oIter->second;
}
else if( !psThreadData->bTransformerArgInputAssignedToThread )


{
// Borrow the original transformer, as it has not already been done
psThreadData->bTransformerArgInputAssignedToThread = true;
pTransformerArg = psThreadData->pTransformerArgInput;
psThreadData->mapThreadToTransformerArg[nThreadId] = pTransformerArg;
std::lock_guard<std::mutex> lock(psThreadData->mutex);
auto oIter = psThreadData->mapThreadToTransformerArg.find(nThreadId);
if (oIter != psThreadData->mapThreadToTransformerArg.end())
{
pTransformerArg = oIter->second;
}
else if( !psThreadData->bTransformerArgInputAssignedToThread )
{
// Borrow the original transformer, as it has not already been done
psThreadData->bTransformerArgInputAssignedToThread = true;
pTransformerArg = psThreadData->pTransformerArgInput;
psThreadData->mapThreadToTransformerArg[nThreadId] = pTransformerArg;
}
}
CPLReleaseMutex(psThreadData->hCondMutex);

// If no transformer assigned to current thread, instantiate one
if( pTransformerArg == nullptr )
Expand All @@ -416,16 +394,15 @@ static void ThreadFuncAdapter(void* pData)
// which should normally be the case.
pTransformerArg =
GDALCloneTransformer(psThreadData->pTransformerArgInput);

// Lock for the stop flag and the transformer map.
std::lock_guard<std::mutex> lock(psThreadData->mutex);
if( !pTransformerArg )
{
*(psJob->pbStop) = TRUE;
psJob->stopFlag = true;
return;
}

// register in map
CPLAcquireMutex(psThreadData->hCondMutex, 1.0);
psThreadData->mapThreadToTransformerArg[nThreadId] = pTransformerArg;
CPLReleaseMutex(psThreadData->hCondMutex);
}

psJob->pTransformerArg = pTransformerArg;
Expand Down Expand Up @@ -482,64 +459,57 @@ static CPLErr GWKRun( GDALWarpKernel *poWK,

CPLDebug("WARP", "Using %d threads", nThreads);

volatile int bStop = FALSE;
volatile int nCounter = 0;

CPLAcquireMutex(psThreadData->hCondMutex, 1000);

/* -------------------------------------------------------------------- */
/* Submit jobs */
/* -------------------------------------------------------------------- */
for( int i = 0; i < nThreads; i++ )
auto& jobs = *psThreadData->threadJobs;
// Fill-in job structures.
GIntBig i = 0;
for (auto& job : jobs)
{
psThreadData->pasThreadJob[i].poWK = poWK;
psThreadData->pasThreadJob[i].pnCounter = &nCounter;
psThreadData->pasThreadJob[i].iYMin =
static_cast<int>((static_cast<GIntBig>(i)) * nDstYSize / nThreads);
psThreadData->pasThreadJob[i].iYMax =
static_cast<int>((static_cast<GIntBig>(i + 1)) *
nDstYSize / nThreads);
psThreadData->pasThreadJob[i].pbStop = &bStop;
job.poWK = poWK;
job.iYMin = static_cast<int>(i * nDstYSize / nThreads);
job.iYMax = static_cast<int>((i + 1) * nDstYSize / nThreads);
if( poWK->pfnProgress != GDALDummyProgress )
psThreadData->pasThreadJob[i].pfnProgress = GWKProgressThread;
else
psThreadData->pasThreadJob[i].pfnProgress = nullptr;
psThreadData->pasThreadJob[i].pfnFunc = pfnFunc;
psThreadData->poJobQueue->SubmitJob( ThreadFuncAdapter,
static_cast<void*>(&psThreadData->pasThreadJob[i]) );
job.pfnProgress = GWKProgressThread;
job.pfnFunc = pfnFunc;
i++;
}

{
std::unique_lock<std::mutex> lock(psThreadData->mutex);

// Start jobs.
for (auto& job : jobs)
psThreadData->poJobQueue->SubmitJob( ThreadFuncAdapter,
static_cast<void*>(&job) );

/* -------------------------------------------------------------------- */
/* Report progress. */
/* -------------------------------------------------------------------- */
if( poWK->pfnProgress != GDALDummyProgress )
{
while( nCounter < nDstYSize )
if( poWK->pfnProgress != GDALDummyProgress )
{
CPLCondWait(psThreadData->hCond, psThreadData->hCondMutex);

if( !poWK->pfnProgress(
int& counter = psThreadData->counter;
while (counter < nDstYSize)
{
psThreadData->cv.wait(lock);
if( !poWK->pfnProgress(
poWK->dfProgressBase + poWK->dfProgressScale *
(nCounter / static_cast<double>(nDstYSize)),
(counter / static_cast<double>(nDstYSize)),
"", poWK->pProgress ) )
{
CPLError( CE_Failure, CPLE_UserInterrupt, "User terminated" );
bStop = TRUE;
break;
{
CPLError( CE_Failure, CPLE_UserInterrupt,
"User terminated" );
psThreadData->stopFlag = true;
break;
}
}
}
}

/* Release mutex before joining threads, otherwise they will dead-lock */
/* forever in GWKProgressThread() */
CPLReleaseMutex(psThreadData->hCondMutex);

/* -------------------------------------------------------------------- */
/* Wait for all jobs to complete. */
/* -------------------------------------------------------------------- */
psThreadData->poJobQueue->WaitCompletion();

return !bStop ? CE_None : CE_Failure;
return psThreadData->stopFlag ? CE_Failure : CE_None;
}

/************************************************************************/
Expand Down Expand Up @@ -4812,7 +4782,6 @@ static CPL_INLINE bool GWKCheckAndComputeSrcOffsets(
/************************************************************************/

static void GWKGeneralCaseThread( void* pData)

{
GWKJobStruct* psJob = reinterpret_cast<GWKJobStruct *>(pData);
GDALWarpKernel *poWK = psJob->poWK;
Expand Down

0 comments on commit 15cc546

Please sign in to comment.