Skip to content

Commit

Permalink
Merge pull request hpcc-systems#1705 from ghalliday/stepjoinassert
Browse files Browse the repository at this point in the history
Fix problems with global priority stepped join.

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Reviewed-By: Renato Golin <rengolin@hpccsystems.com>
  • Loading branch information
richardkchapman committed Mar 6, 2012
2 parents 2a8c26d + 988ea29 commit e49891a
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 99 deletions.
166 changes: 82 additions & 84 deletions common/thorhelper/thorstep.cpp
Expand Up @@ -129,12 +129,12 @@ CSteppedInputLookahead::CSteppedInputLookahead(ISteppedInput * _input, IInputSte
: input(_input), compare(_compare)
{
maxFields = compare ? compare->maxFields() : 0;
pending = NULL;
pendingMatches = true;
readAheadRow = NULL;
readAheadRowIsExactMatch = true;
stepFlagsMask = 0;
stepFlagsValue = 0;
paranoid = _paranoid;
previousPending = NULL;
previousReadAheadRow = NULL;
rowAllocator.set(_rowAllocator);
inputStepping = _inputStepping;
numStepableFields = inputStepping ? inputStepping->getNumFields() : 0;
Expand All @@ -145,19 +145,17 @@ CSteppedInputLookahead::CSteppedInputLookahead(ISteppedInput * _input, IInputSte

CSteppedInputLookahead::~CSteppedInputLookahead()
{
if (previousPending)
rowAllocator->releaseRow(previousPending);
if (pending)
rowAllocator->releaseRow(pending);
if (previousReadAheadRow)
rowAllocator->releaseRow(previousReadAheadRow);
if (readAheadRow)
rowAllocator->releaseRow(readAheadRow);
}


const void * CSteppedInputLookahead::nextInputRow()
{
if (readAheadRows.ordinality())
return readAheadRows.dequeue();
if (seekRows.ordinality())
return seekRows.dequeue();
return input->nextInputRow();
}

Expand All @@ -172,32 +170,36 @@ const void * CSteppedInputLookahead::nextInputRowGE(const void * seek, unsigned
return (void *)next.getClear();
}
}
while (seekRows.ordinality())
{
OwnedLCRow next = seekRows.dequeue();
if (compare->docompare(next, seek, numFields) >= 0)
{
assertex(wasCompleteMatch);
return (void *)next.getClear();
}
}
return input->nextInputRowGE(seek, numFields, wasCompleteMatch, stepExtra);
}

void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields, unsigned maxcount)
{
//Transfer any rows with fields before the seek position to a list of pending rows, so we don't waste
//time sending seek rows that can't match..
const void * lastSeekRow = NULL;
//Remove any rows from the seek list that occur before the new seek row
while (seekRows.ordinality())
{
const void * next = seekRows.head();
if (compare->docompare(next, seek, numFields) >= 0)
{
//update the seek pointer to the best value - so that lowestInputProvider can skip its seekRows if necessary
seek = seekRows.tail();
lastSeekRow = seek;
break;
}
readAheadRows.enqueue(seekRows.dequeue());
rowAllocator->releaseRow(seekRows.dequeue());
}

//Could the current readahead row be part of the seek set.
if (readAheadRow && compare->docompare(readAheadRow, seek, numFields) >= 0)
{
//Check not already added - could conceivably happen after rows are read directly beyond the matching seeks.
if (!lastSeekRow || compare->docompare(readAheadRow, lastSeekRow, numFields) > 0)
{
seekRows.enqueue(rowAllocator->linkRow(readAheadRow));
lastSeekRow = readAheadRow;
seek = readAheadRow;
}
}

//Return mismatches is selected because we don't want it to seek exact matches beyond the last seek position
Expand All @@ -214,7 +216,13 @@ void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields,
//but if so the next read request will do another blocked read, so just ignore this one.
if (wasCompleteMatch)
{
seekRows.enqueue(next);
readAheadRows.enqueue(next);
if (!lastSeekRow || compare->docompare(next, lastSeekRow, numFields) > 0)
{
//Only record unique seek positions in the seek rows
seekRows.enqueue(rowAllocator->linkRow(next));
lastSeekRow = next;
}
//update the seek pointer to the best value.
seek = next;
}
Expand All @@ -225,41 +233,31 @@ void CSteppedInputLookahead::ensureFilled(const void * seek, unsigned numFields,

unsigned CSteppedInputLookahead::ordinality() const
{
//pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
if ((readAheadRows.ordinality() == 0) && pending)
return seekRows.ordinality() + 1;
return seekRows.ordinality();
}

const void * CSteppedInputLookahead::querySeek(unsigned i) const
{
//pending <= readAheadRows.head(), so if there are any items in readAheadRows, then don't include pending
if ((readAheadRows.ordinality() == 0) && pending)
{
if (i == 0)
return pending;
i--;
}
return seekRows.item(i);
}

const void * CSteppedInputLookahead::consume()
{
if (!pending)
if (!readAheadRow)
fill();
if (!includeInOutput(pending))
if (!includeInOutput(readAheadRow))
return NULL;

if (paranoid && pending)
if (paranoid && readAheadRow)
{
if (previousPending)
rowAllocator->releaseRow(previousPending);
previousPending = rowAllocator->linkRow(pending);
if (previousReadAheadRow)
rowAllocator->releaseRow(previousReadAheadRow);
previousReadAheadRow = rowAllocator->linkRow(readAheadRow);
}

const void * ret = pending;
pending = NULL;
pendingMatches = true;
const void * ret = readAheadRow;
readAheadRow = NULL;
readAheadRowIsExactMatch = true;
return ret;
}

Expand All @@ -275,7 +273,7 @@ void CSteppedInputLookahead::createMultipleSeekWrapper(IMultipleStepSeekInfo * w

void CSteppedInputLookahead::fill()
{
pendingMatches = true;
readAheadRowIsExactMatch = true;
if (restrictValue && numStepableFields)
{
//note - this will either return a valid value to be included in the range,
Expand All @@ -286,66 +284,66 @@ void CSteppedInputLookahead::fill()
//Default to returning mismatches, but could be overidden from outside
unsigned flags = (SSEFreturnMismatches & ~stepFlagsMask) | stepFlagsValue;
SmartStepExtra inputStepExtra(flags, lowestFrequencyInput);
pending = nextInputRowGE(restrictValue, numFields, pendingMatches, inputStepExtra);
readAheadRow = nextInputRowGE(restrictValue, numFields, readAheadRowIsExactMatch, inputStepExtra);

if (paranoid && pending)
if (paranoid && readAheadRow)
{
int c = compare->docompare(pending, restrictValue, numFields);
int c = compare->docompare(readAheadRow, restrictValue, numFields);
if (c < 0)
throw MakeStringException(1001, "Input to stepped join preceeds seek point");
if ((c == 0) && !pendingMatches)
if ((c == 0) && !readAheadRowIsExactMatch)
throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
}
}
else
{
//Unusual. Normally we will step the input but this branch can occur for some unusual joins - e.g. a LEFT ONLY stepped join.
//Likely to cause problems if it occurs on anything other than the lowest frequency input if the index is remote
pending = nextInputRow();
readAheadRow = nextInputRow();
}

if (paranoid && pending && previousPending && compare)
if (paranoid && readAheadRow && previousReadAheadRow && compare)
{
if (compare->docompare(previousPending, pending, maxFields) > 0)
if (compare->docompare(previousReadAheadRow, readAheadRow, maxFields) > 0)
throw MakeStringException(1001, "Input to stepped join isn't sorted as expected");
}
}

const void * CSteppedInputLookahead::next()
{
if (!pendingMatches)
if (!readAheadRowIsExactMatch)
{
if (includeInOutput(pending))
if (includeInOutput(readAheadRow))
skip();
else
return NULL;
}

if (!pending)
if (!readAheadRow)
fill();

if (!includeInOutput(pending))
if (!includeInOutput(readAheadRow))
return NULL;

return pending;
return readAheadRow;
}

const void * CSteppedInputLookahead::nextGE(const void * seek, unsigned numFields, bool & wasCompleteMatch, const SmartStepExtra & stepExtra)
{
if (pending)
if (readAheadRow)
{
int c = compare->docompare(pending, seek, numFields);
int c = compare->docompare(readAheadRow, seek, numFields);
if (c >= 0)
{
if (!includeInOutput(pending))
if (!includeInOutput(readAheadRow))
return NULL;
if (pendingMatches)
return pending;
//pending Row is beyond seek point => ok to return an incomplete match
if (readAheadRowIsExactMatch)
return readAheadRow;
//readAheadRow is beyond seek point => ok to return an incomplete match
if (stepExtra.returnMismatches() && (c != 0))
{
wasCompleteMatch = pendingMatches;
return pending;
wasCompleteMatch = readAheadRowIsExactMatch;
return readAheadRow;
}
}
skip();
Expand All @@ -359,39 +357,39 @@ const void * CSteppedInputLookahead::nextGE(const void * seek, unsigned numField
unsigned stepFields = (numFields <= numStepableFields) ? numFields : numStepableFields;
loop
{
pendingMatches = true;
pending = nextInputRowGE(seek, stepFields, pendingMatches, inputStepExtra);
readAheadRowIsExactMatch = true;
readAheadRow = nextInputRowGE(seek, stepFields, readAheadRowIsExactMatch, inputStepExtra);

if (paranoid && pending)
if (paranoid && readAheadRow)
{
int c = compare->docompare(pending, seek, stepFields);
int c = compare->docompare(readAheadRow, seek, stepFields);
if (c < 0)
throw MakeStringException(1001, "Input to stepped join preceeds seek point");
if ((c == 0) && !pendingMatches)
if ((c == 0) && !readAheadRowIsExactMatch)
throw MakeStringException(1001, "Input to stepped join returned mismatch that matched equality fields");
}

if (!pending || !includeInOutput(pending))
if (!readAheadRow || !includeInOutput(readAheadRow))
return NULL;

if (numFields <= numStepableFields)
{
wasCompleteMatch = pendingMatches;
return pending;
wasCompleteMatch = readAheadRowIsExactMatch;
return readAheadRow;
}

//if !pendingMatches then isCompleteMatch must have been provided => ok to return a mismatch
//if !readAheadRowIsExactMatch then isCompleteMatch must have been provided => ok to return a mismatch
//if mismatch on stepFields, then must have mismatch on numFields (since stepFields <= numFields) => can return now
if (!pendingMatches)
if (!readAheadRowIsExactMatch)
{
wasCompleteMatch = pendingMatches;
return pending;
wasCompleteMatch = readAheadRowIsExactMatch;
return readAheadRow;
}

if (compare->docompare(pending, seek, numFields) >= 0)
if (compare->docompare(readAheadRow, seek, numFields) >= 0)
{
wasCompleteMatch = pendingMatches;
return pending;
wasCompleteMatch = readAheadRowIsExactMatch;
return readAheadRow;
}

skip();
Expand Down Expand Up @@ -448,19 +446,19 @@ void CSteppedInputLookahead::skip()
{
if (paranoid)
{
if (previousPending)
rowAllocator->releaseRow(previousPending);
previousPending = pending;
if (previousReadAheadRow)
rowAllocator->releaseRow(previousReadAheadRow);
previousReadAheadRow = readAheadRow;
}
else
{
if (pending)
rowAllocator->releaseRow(pending);
if (readAheadRow)
rowAllocator->releaseRow(readAheadRow);
}

//NB: Don't read ahead until we have to...
pending = NULL;
pendingMatches = true;
readAheadRow = NULL;
readAheadRowIsExactMatch = true;
}

const void * CSteppedInputLookahead::skipnext()
Expand Down
13 changes: 6 additions & 7 deletions common/thorhelper/thorstep.ipp
Expand Up @@ -215,7 +215,7 @@ public:
virtual const void * querySeek(unsigned i) const;

//inline helper functions
inline void clearPending() { if (pending) skip(); }
inline void clearPending() { if (readAheadRow) skip(); }
inline bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return input->gatherConjunctions(collector); }
inline unsigned getStepFlags() { return inputStepping ? inputStepping->getSteppedFlags() : 0; }
inline double getPriority() { return inputStepping ? inputStepping->getPriority() : 0.0; }
Expand All @@ -240,7 +240,6 @@ public:
}
return next();
}
inline bool hasRecordPending() { return pending != NULL; }
inline void resetInputEOF() { input->resetEOF(); }

private:
Expand All @@ -258,24 +257,24 @@ protected:

private:
Linked<ISteppedInput> input;
LinkedRowQueue seekRows; // rows that have been read from the input to provide as seek pointers for the next term
LinkedRowQueue readAheadRows; // previous seek rows, that have been read past, but still required to return as results.
LinkedRowQueue readAheadRows; // rows that have been read from the input to provide seekRows for other terms
LinkedRowQueue seekRows; // unique read-ahead rows that are >= the last seek position provided to ensureFilled()

protected:
IRangeCompare * compare;
const void * pending;
const void * readAheadRow;
IInputSteppingMeta * inputStepping;
IMultipleStepSeekInfo * lowestFrequencyInput;
Linked<IEngineRowAllocator> rowAllocator;
const void * restrictValue;
const void * previousPending;
const void * previousReadAheadRow;
unsigned maxFields;
unsigned numStepableFields;
unsigned numRestrictFields;
unsigned stepFlagsMask;
unsigned stepFlagsValue;
bool paranoid;
bool pendingMatches;
bool readAheadRowIsExactMatch;
bool isPostFiltered;
};

Expand Down
1 change: 1 addition & 0 deletions ecl/hql/hqlgram.y
Expand Up @@ -8867,6 +8867,7 @@ stepFlag
{
$$.setExpr(createExprAttribute(filteredAtom), $1);
}
| hintAttribute
;


Expand Down

0 comments on commit e49891a

Please sign in to comment.