Skip to content

Commit

Permalink
Added support for semi/anti and outer joins to hash join algorithm. R…
Browse files Browse the repository at this point in the history
…eimplemented support for semi/anti joins inside the nested loop algorithm. Slightly changed implementation of full outer joins. Added transformation of IN/EXISTS subqueries into lateral semi-joins. Basic optimizer support for semi-joins. More efficient optimization for cross joins. Added some debug info (hash table statistics) for hash joins.
  • Loading branch information
dyemanov committed Jul 3, 2023
1 parent 421a73a commit ee56d5a
Show file tree
Hide file tree
Showing 9 changed files with 645 additions and 162 deletions.
171 changes: 167 additions & 4 deletions src/jrd/RecordSourceNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,124 @@ static void genDeliverUnmapped(CompilerScratch* csb, const BoolExprNodeStack& pa
static ValueExprNode* resolveUsingField(DsqlCompilerScratch* dsqlScratch, const MetaName& name,
ValueListNode* list, const FieldNode* flawedNode, const TEXT* side, dsql_ctx*& ctx);

namespace
{
// Search through the list of ANDed booleans to find comparisons
// referring streams of other select expressions.
// Extract those booleans and return them to the caller.

bool findDependentBooleans(CompilerScratch* csb,
const StreamList& rseStreams,
BoolExprNode** parentBoolean,
BoolExprNodeStack& booleanStack)
{
const auto boolean = *parentBoolean;

const auto binaryNode = nodeAs<BinaryBoolNode>(boolean);
if (binaryNode && binaryNode->blrOp == blr_and)
{
const bool found1 = findDependentBooleans(csb, rseStreams,
binaryNode->arg1.getAddress(), booleanStack);
const bool found2 = findDependentBooleans(csb, rseStreams,
binaryNode->arg2.getAddress(), booleanStack);

if (!binaryNode->arg1 && !binaryNode->arg2)
*parentBoolean = nullptr;
else if (!binaryNode->arg1)
*parentBoolean = binaryNode->arg2;
else if (!binaryNode->arg2)
*parentBoolean = binaryNode->arg1;

return (found1 || found2);
}

if (const auto cmpNode = nodeAs<ComparativeBoolNode>(boolean))
{
SortedStreamList streams;
cmpNode->collectStreams(streams);

for (const auto stream : streams)
{
if (!rseStreams.exist(stream))
{
booleanStack.push(boolean);
*parentBoolean = nullptr;
return true;
}
}
}

return false;
}

// Search through the list of ANDed booleans to find correlated EXISTS/IN sub-queries.
// They are candidates to be converted into semi- or anti-joins.

bool findPossibleJoins(CompilerScratch* csb,
BoolExprNode** parentBoolean,
RecordSourceNodeStack& rseStack,
BoolExprNodeStack& booleanStack)
{
auto boolNode = *parentBoolean;

const auto binaryNode = nodeAs<BinaryBoolNode>(boolNode);
if (binaryNode && binaryNode->blrOp == blr_and)
{
const bool found1 = findPossibleJoins(csb, binaryNode->arg1.getAddress(),
rseStack, booleanStack);
const bool found2 = findPossibleJoins(csb, binaryNode->arg2.getAddress(),
rseStack, booleanStack);

if (!binaryNode->arg1 && !binaryNode->arg2)
*parentBoolean = nullptr;
else if (!binaryNode->arg1)
*parentBoolean = binaryNode->arg2;
else if (!binaryNode->arg2)
*parentBoolean = binaryNode->arg1;

return (found1 || found2);
}

const auto rseNode = nodeAs<RseBoolNode>(boolNode);
// Both EXISTS (blr_any) and IN (blr_ansi_any) sub-queries are handled
if (rseNode && (rseNode->blrOp == blr_any || rseNode->blrOp == blr_ansi_any))
{
auto rse = rseNode->rse;
fb_assert(rse);

if (rse->rse_boolean)
{
StreamList streams;
rse->computeRseStreams(streams);

BoolExprNodeStack booleans;
if (findDependentBooleans(csb, streams,
rse->rse_boolean.getAddress(),
booleans))
{
fb_assert(booleans.hasData());
auto boolean = booleans.pop();
while (booleans.hasData())
{
const auto andNode = FB_NEW_POOL(csb->csb_pool)
BinaryBoolNode(csb->csb_pool, blr_and);
andNode->arg1 = boolean;
andNode->arg2 = booleans.pop();
boolean = andNode;
}

rse->flags |= RseNode::FLAG_SEMI_JOINED;
rseStack.push(rse);
booleanStack.push(boolean);
*parentBoolean = nullptr;
return true;
}
}
}

return false;
}
}

//--------------------

Expand Down Expand Up @@ -2783,6 +2901,9 @@ RseNode* RseNode::pass1(thread_db* tdbb, CompilerScratch* csb)
{
SET_TDBB(tdbb);

if (const auto newRse = processPossibleJoins(tdbb, csb))
return newRse->pass1(tdbb, csb);

// for scoping purposes, maintain a stack of RseNode's which are
// currently being parsed; if there are none on the stack as
// yet, mark the RseNode as variant to make sure that statement-
Expand Down Expand Up @@ -2888,6 +3009,12 @@ RseNode* RseNode::pass1(thread_db* tdbb, CompilerScratch* csb)
void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
BoolExprNode** boolean, RecordSourceNodeStack& stack)
{
if (const auto newRse = processPossibleJoins(tdbb, csb))
{
newRse->pass1Source(tdbb, csb, rse, boolean, stack);
return;
}

if (rse_jointype != blr_inner)
{
// Check whether any of the upper level booleans (those belonging to the WHERE clause)
Expand Down Expand Up @@ -2941,15 +3068,15 @@ void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
}
}

// in the case of an RseNode, it is possible that a new RseNode will be generated,
// In the case of an RseNode, it is possible that a new RseNode will be generated,
// so wait to process the source before we push it on the stack (bug 8039)

// The addition of the JOIN syntax for specifying inner joins causes an
// RseNode tree to be generated, which is undesirable in the simplest case
// where we are just trying to inner join more than 2 streams. If possible,
// try to flatten the tree out before we go any further.

if (!isLateral() &&
if (!isLateral() && !isSemiJoined() &&
rse->rse_jointype == blr_inner &&
rse_jointype == blr_inner &&
!rse_sorted && !rse_projection &&
Expand Down Expand Up @@ -3054,11 +3181,11 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr

StreamStateHolder stateHolder(csb, opt->getOuterStreams());

if (opt->isLeftJoin() || isLateral())
if (opt->isLeftJoin() || isLateral() || isSemiJoined())
{
stateHolder.activate();

if (opt->isLeftJoin())
if (opt->isLeftJoin() || isSemiJoined())
{
// Push all conjuncts except "missing" ones (e.g. IS NULL)
for (auto iter = opt->getConjuncts(false, true); iter.hasData(); ++iter)
Expand All @@ -3081,6 +3208,42 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr
return opt->compile(this, &conjunctStack);
}

RseNode* RseNode::processPossibleJoins(thread_db* tdbb, CompilerScratch* csb)
{
if (rse_jointype != blr_inner || !rse_boolean)
return nullptr;

RecordSourceNodeStack rseStack;
BoolExprNodeStack booleanStack;

// Find possibly joinable sub-queries

if (!findPossibleJoins(csb, rse_boolean.getAddress(), rseStack, booleanStack))
return nullptr;

fb_assert(rseStack.hasData() && booleanStack.hasData());
fb_assert(rseStack.getCount() == booleanStack.getCount());

// Create joins between the original node and detected joinable nodes

auto rse = this;
while (rseStack.hasData())
{
const auto newRse = FB_NEW_POOL(*tdbb->getDefaultPool())
RseNode(*tdbb->getDefaultPool());

newRse->rse_relations.add(rse);
newRse->rse_relations.add(rseStack.pop());

newRse->rse_jointype = blr_inner;
newRse->rse_boolean = booleanStack.pop();

rse = newRse;
}

return rse;
}

// Check that all streams in the RseNode have a plan specified for them.
// If they are not, there are streams in the RseNode which were not mentioned in the plan.
void RseNode::planCheck(const CompilerScratch* csb) const
Expand Down
25 changes: 16 additions & 9 deletions src/jrd/RecordSourceNodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -719,15 +719,16 @@ class RseNode final : public TypedNode<RecordSourceNode, RecordSourceNode::TYPE_
public:
enum : USHORT
{
FLAG_VARIANT = 0x01, // variant (not invariant?)
FLAG_SINGULAR = 0x02, // singleton select
FLAG_WRITELOCK = 0x04, // locked for write
FLAG_SCROLLABLE = 0x08, // scrollable cursor
FLAG_DSQL_COMPARATIVE = 0x10, // transformed from DSQL ComparativeBoolNode
FLAG_OPT_FIRST_ROWS = 0x20, // optimize retrieval for first rows
FLAG_LATERAL = 0x40, // lateral derived table
FLAG_SKIP_LOCKED = 0x80, // skip locked
FLAG_SUB_QUERY = 0x100 // sub-query
FLAG_VARIANT = 0x01, // variant (not invariant?)
FLAG_SINGULAR = 0x02, // singleton select
FLAG_WRITELOCK = 0x04, // locked for write
FLAG_SCROLLABLE = 0x08, // scrollable cursor
FLAG_DSQL_COMPARATIVE = 0x10, // transformed from DSQL ComparativeBoolNode
FLAG_OPT_FIRST_ROWS = 0x20, // optimize retrieval for first rows
FLAG_LATERAL = 0x40, // lateral derived table
FLAG_SKIP_LOCKED = 0x80, // skip locked
FLAG_SUB_QUERY = 0x100, // sub-query
FLAG_SEMI_JOINED = 0x200 // participates in semi-join
};

bool isInvariant() const
Expand Down Expand Up @@ -755,6 +756,11 @@ class RseNode final : public TypedNode<RecordSourceNode, RecordSourceNode::TYPE_
return (flags & FLAG_SUB_QUERY) != 0;
}

bool isSemiJoined() const
{
return (flags & FLAG_SEMI_JOINED) != 0;
}

bool hasWriteLock() const
{
return (flags & FLAG_WRITELOCK) != 0;
Expand Down Expand Up @@ -875,6 +881,7 @@ class RseNode final : public TypedNode<RecordSourceNode, RecordSourceNode::TYPE_
private:
void planCheck(const CompilerScratch* csb) const;
static void planSet(CompilerScratch* csb, PlanNode* plan);
RseNode* processPossibleJoins(thread_db* tdbb, CompilerScratch* csb);

public:
NestConst<ValueExprNode> dsqlFirst;
Expand Down
27 changes: 26 additions & 1 deletion src/jrd/optimizer/InnerJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ void InnerJoin::calculateStreamInfo()
innerStream->baseIndexes = candidate->indexes;
innerStream->baseUnique = candidate->unique;
innerStream->baseNavigated = candidate->navigated;
innerStream->baseMatches = candidate->matches;
innerStream->baseDependentFromStreams = candidate->dependentFromStreams;

csb->csb_rpt[innerStream->number].deactivate();
}
Expand Down Expand Up @@ -573,13 +575,36 @@ River* InnerJoin::formRiver()

// Create a hash join
rsb = FB_NEW_POOL(getPool())
HashJoin(tdbb, csb, 2, hashJoinRsbs, keys.begin(), stream.selectivity);
HashJoin(tdbb, csb, INNER_JOIN, 2, hashJoinRsbs, keys.begin(), stream.selectivity);

// Clear priorly processed rsb's, as they're already incorporated into a hash join
rsbs.clear();
}
else
{
StreamList depStreams;

if (optimizer->isSemiJoined() && rsbs.isEmpty())
{
const auto baseStream = getStreamInfo(stream.number);
for (const auto match : baseStream->baseMatches)
{
if (optimizer->checkEquiJoin(match))
{
for (const auto depStream : baseStream->baseDependentFromStreams)
{
if (match->containsStream(depStream))
depStreams.add(depStream);
}
}
}
}

StreamStateHolder stateHolder(csb, depStreams);
stateHolder.deactivate();

rsb = optimizer->generateRetrieval(stream.number, sortPtr, false, false);
}

rsbs.add(rsb);
streams.add(stream.number);
Expand Down
Loading

0 comments on commit ee56d5a

Please sign in to comment.