Skip to content

Commit

Permalink
HPCC-20864 Avoid race condition fetching loop input dataset
Browse files Browse the repository at this point in the history
If a child query activity asked for the entire global loop
input dataset, a race could happen where the result on some
slaves was not prepared at the time the result was requested,
causing an assert to be hit.

Fix is to move the synchronization point, until after the
input is prepared, to do this required moving some of the
code out of a couple of 'execute' helper functions.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
  • Loading branch information
jakesmith committed Oct 30, 2018
1 parent 78142a3 commit 335cf6d
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 50 deletions.
13 changes: 11 additions & 2 deletions thorlcr/activities/loop/thloop.cpp
Expand Up @@ -248,7 +248,8 @@ class CLoopActivityMaster : public CLoopActivityMasterBase
if (sync(loopCounter))
break;

boundGraph->execute(*this, condLoopCounter, ownedResults, (IRowWriterMultiReader *)NULL, 0, extractBuilder.size(), extractBuilder.getbytes());
boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), ownedResults, NULL);

++loopCounter;
if (barrier) // barrier passed once all slave graphs have completed
{
Expand Down Expand Up @@ -320,9 +321,17 @@ class CGraphLoopActivityMaster : public CLoopActivityMasterBase
unsigned loopCounter = 1;
for (;;)
{
Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
unsigned condLoopCounter = (helper->getFlags() & IHThorGraphLoopArg::GLFcounter) ? loopCounter : 0;
IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
if (condLoopCounter)
boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);

if (sync(loopCounter))
break;
queryContainer().queryLoopGraph()->execute(*this, (helper->getFlags() & IHThorGraphLoopArg::GLFcounter)?loopCounter:0, loopResults.get(), extractBuilder.size(), extractBuilder.getbytes());

boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), results, loopResults);

++loopCounter;
}
}
Expand Down
14 changes: 11 additions & 3 deletions thorlcr/activities/loop/thloopslave.cpp
Expand Up @@ -376,6 +376,10 @@ class CLoopSlaveActivity : public CLoopSlaveActivityBase
if (loopAgain) // cannot be 0
boundGraph->prepareLoopAgainResult(*this, ownedResults, loopAgain);

loopPending->flush();
Owned<IThorResult> inputResult = ownedResults->getResult(1);
inputResult->setResultStream(loopPending.getClear(), loopPendingCount);

// ensure results prepared before graph begins
if (syncIterations)
{
Expand All @@ -394,8 +398,7 @@ class CLoopSlaveActivity : public CLoopSlaveActivityBase
return NULL;
}

loopPending->flush();
boundGraph->execute(*this, condLoopCounter, ownedResults, loopPending.getClear(), loopPendingCount, extractBuilder.size(), extractBuilder.getbytes());
boundGraph->queryGraph()->executeChild(extractBuilder.size(), extractBuilder.getbytes(), ownedResults, NULL);

Owned<IThorResult> result0 = ownedResults->getResult(0);
curInput.setown(result0->getRowStream());
Expand Down Expand Up @@ -506,10 +509,15 @@ class CGraphLoopSlaveActivity : public CLoopSlaveActivityBase
resultWriter->putRow(row.getClear());
}

IThorBoundLoopGraph *boundGraph = queryContainer().queryLoopGraph();
for (; loopCounter<=maxIterations; loopCounter++)
{
unsigned condLoopCounter = (helper->getFlags() & IHThorGraphLoopArg::GLFcounter) ? loopCounter : 0;
Owned<IThorGraphResults> results = queryGraph().createThorGraphResults(1);
if (condLoopCounter)
boundGraph->prepareCounterResult(*this, results, condLoopCounter, 0);
sendLoopingCount(loopCounter, 0);
queryContainer().queryLoopGraph()->execute(*this, (flags & IHThorGraphLoopArg::GLFcounter)?loopCounter:0, loopResults, extractBuilder.size(), extractBuilder.getbytes());
boundGraph->queryGraph()->executeChild(parentExtractSz, parentExtract, results, loopResults);
}
int iNumResults = loopResults->count();
Owned<IThorResult> finalResult = loopResults->getResult(iNumResults-1); //Get the last result, which isnt necessarily 'maxIterations'
Expand Down
44 changes: 1 addition & 43 deletions thorlcr/graph/thgraph.cpp
Expand Up @@ -244,6 +244,7 @@ class CThorBoundLoopGraph : implements IThorBoundLoopGraph, public CInterface
IThorResult *counterResult = results->createResult(activity, pos, countRowIf, thorgraphresult_nul, SPILL_PRIORITY_DISABLE);
Owned<IRowWriter> counterResultWriter = counterResult->getWriter();
counterResultWriter->putRow(counterRowFinal.getClear());
graph->setLoopCounter(loopCounter);
}
virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos)
{
Expand All @@ -259,49 +260,6 @@ class CThorBoundLoopGraph : implements IThorBoundLoopGraph, public CInterface
IThorResult *loopResult = activity.queryGraph().createResult(activity, 0, results, resultRowIf, resultType); // loop output
IThorResult *inputResult = activity.queryGraph().createResult(activity, 1, results, resultRowIf, resultType); // loop input
}
virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *inputStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte *parentExtract)
{
if (counter)
graph->setLoopCounter(counter);
Owned<IThorResult> inputResult = results->getResult(1);
if (inputStream)
inputResult->setResultStream(inputStream, rowStreamCount);
graph->executeChild(parentExtractSz, parentExtract, results, NULL);
}
virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *graphLoopResults, size32_t parentExtractSz, const byte *parentExtract)
{
Owned<IThorGraphResults> results = graph->createThorGraphResults(1);
if (counter)
{
prepareCounterResult(activity, results, counter, 0);
graph->setLoopCounter(counter);
}
try
{
graph->executeChild(parentExtractSz, parentExtract, results, graphLoopResults);
}
catch (IException *e)
{
IThorException *te = QUERYINTERFACE(e, IThorException);
if (!te)
{
Owned<IThorException> e2 = MakeActivityException(&activity, e, "Exception running child graphs");
e->Release();
te = e2.getClear();
}
else if (!te->queryActivityId())
setExceptionActivityInfo(activity.queryContainer(), te);
try { graph->abort(te); }
catch (IException *abortE)
{
Owned<IThorException> e2 = MakeActivityException(&activity, abortE, "Exception whilst aborting graph");
abortE->Release();
EXCLOG(e2, NULL);
}
graph->queryJobChannel().fireException(te);
throw te;
}
}
virtual CGraphBase *queryGraph() { return graph; }
};

Expand Down
2 changes: 0 additions & 2 deletions thorlcr/graph/thgraph.hpp
Expand Up @@ -158,8 +158,6 @@ interface IThorBoundLoopGraph : extends IInterface
virtual void prepareLoopResults(CActivityBase &activity, IThorGraphResults *results) = 0;
virtual void prepareCounterResult(CActivityBase &activity, IThorGraphResults *results, unsigned loopCounter, unsigned pos) = 0;
virtual void prepareLoopAgainResult(CActivityBase &activity, IThorGraphResults *results, unsigned pos) = 0;
virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults *results, IRowWriterMultiReader *rowStream, rowcount_t rowStreamCount, size32_t parentExtractSz, const byte * parentExtract) = 0;
virtual void execute(CActivityBase &activity, unsigned counter, IThorGraphResults * graphLoopResults, size32_t parentExtractSz, const byte * parentExtract) = 0;
virtual CGraphBase *queryGraph() = 0;
};

Expand Down

0 comments on commit 335cf6d

Please sign in to comment.