Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-18305 Avoid follow on crash after splitter prepare exception #10441

Merged
merged 1 commit into from Sep 27, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 46 additions & 36 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Expand Up @@ -113,6 +113,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
~CWriter() { stop(); }
virtual void main()
{
// NB: This thread will not get started if there was a failure during prepareInput()
Semaphore writeBlockSem;
while (!stopped && !parent.eofHit)
current = parent.writeahead(current, stopped, writeBlockSem, UINT_MAX);
Expand Down Expand Up @@ -177,6 +178,7 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
stoppedOutputs = 0;
eofHit = false;
inputPrepared = false;
writeAheadException.clear();
recsReady = 0;
writeBlocked = false;
stalledWriters.kill();
Expand All @@ -194,55 +196,63 @@ class NSplitterSlaveActivity : public CSlaveActivity, implements ISharedSmartBuf
if (!inputPrepared)
{
inputPrepared = true;
assertex(((unsigned)-1) != connectedOutputCount);
activeOutputCount = connectedOutputCount;
PARENT::start();
ForEachItemIn(o, outputs)
{
CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
if (output->isStopped())
--activeOutputCount;
}
assertex(activeOutputCount); // must be >=1, as this output (outIdx) has invoked prepareInput
if (1 == activeOutputCount)
return false;
if (smartBuf)
smartBuf->reset();
else
try
{
if (spill)
assertex(((unsigned)-1) != connectedOutputCount);
activeOutputCount = connectedOutputCount;
PARENT::start();
ForEachItemIn(o, outputs)
{
StringBuffer tempname;
GetTempName(tempname, "nsplit", true); // use alt temp dir
smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input), &container.queryJob().queryIDiskUsage()));
ActPrintLog("Using temp spill file: %s", tempname.str());
CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
if (output->isStopped())
--activeOutputCount;
}
assertex(activeOutputCount); // must be >=1, as this output (outIdx) has invoked prepareInput
if (1 == activeOutputCount)
return false;
if (smartBuf)
smartBuf->reset();
else
{
ActPrintLog("Spill is 'balanced'");
smartBuf.setown(createSharedSmartMemBuffer(this, numOutputs, queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
}
// mark any outputs already stopped
ForEachItemIn(o, outputs)
{
CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
if (output->isStopped() || !connectedOutputSet->test(o))
smartBuf->queryOutput(o)->stop();
if (spill)
{
StringBuffer tempname;
GetTempName(tempname, "nsplit", true); // use alt temp dir
smartBuf.setown(createSharedSmartDiskBuffer(this, tempname.str(), numOutputs, queryRowInterfaces(input), &container.queryJob().queryIDiskUsage()));
ActPrintLog("Using temp spill file: %s", tempname.str());
}
else
{
ActPrintLog("Spill is 'balanced'");
smartBuf.setown(createSharedSmartMemBuffer(this, numOutputs, queryRowInterfaces(input), NSPLITTER_SPILL_BUFFER_SIZE));
}
// mark any outputs already stopped
ForEachItemIn(o, outputs)
{
CSplitterOutput *output = (CSplitterOutput *)outputs.item(o);
if (output->isStopped() || !connectedOutputSet->test(o))
smartBuf->queryOutput(o)->stop();
}
}
if (!spill)
writer.start(); // writer keeps writing ahead as much as possible, the readahead impl. will block when has too much
}
catch (IException *e)
{
eofHit = true;
writeAheadException.setown(e);
return false;
}
if (!spill)
writer.start(); // writer keeps writing ahead as much as possible, the readahead impl. will block when has too much
}
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith Should this be returning false if the first instance failed to start? Should it be using a different exception member to distinguish between a problem starting and a problem when reading rows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be returning false if the first instance failed to start?

The failure in parepareInput triggers eofHit and any subsequent caller to writeahead will exit at start.

Should it be using a different exception member to distinguish between a problem starting and a problem when reading rows.

I considered it, but a failure on start is a failure at 0 == recsReady, i.e. the condition met by the common test in NSplitterSlaveActivity::nextRow, so think it's cleaner like it is.

}
inline const void *nextRow(unsigned outIdx)
inline const void *nextRow(unsigned outIdx, rowcount_t current)
{
if (1 == activeOutputCount) // will be true, if only 1 input connected, or only 1 input was active (others stopped) when it started reading
return inputStream->nextRow();
OwnedConstThorRow row = smartBuf->queryOutput(outIdx)->nextRow(); // will block until available
if (writeAheadException)
if (recsReady == current && writeAheadException.get())
throw LINK(writeAheadException);
return row.getClear();
return smartBuf->queryOutput(outIdx)->nextRow(); // will block until available
}
rowcount_t writeahead(rowcount_t current, const bool &stopped, Semaphore &writeBlockSem, unsigned outIdx)
{
Expand Down Expand Up @@ -472,7 +482,7 @@ const void *CSplitterOutput::nextRow()
max = activity.writeahead(max, activity.queryAbortSoon(), writeBlockSem, outIdx);
// NB: if this is sole input that actually started, writeahead will have returned RCMAX and calls to activity.nextRow will go directly to splitter input
}
const void *row = activity.nextRow(outIdx); // pass ptr to max if need more
const void *row = activity.nextRow(outIdx, rec); // pass ptr to max if need more
++rec;
if (row)
dataLinkIncrement();
Expand Down