Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnest IN/ANY/EXISTS subqueries and optimize them using semi-join algorithm #8061

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 167 additions & 4 deletions src/jrd/RecordSourceNodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,124 @@ static void genDeliverUnmapped(CompilerScratch* csb, const BoolExprNodeStack& pa
static ValueExprNode* resolveUsingField(DsqlCompilerScratch* dsqlScratch, const MetaName& name,
ValueListNode* list, const FieldNode* flawedNode, const TEXT* side, dsql_ctx*& ctx);

namespace
{
// Search through the list of ANDed booleans to find comparisons
// referring streams of other select expressions.
// Extract those booleans and return them to the caller.

bool findDependentBooleans(CompilerScratch* csb,
const StreamList& rseStreams,
BoolExprNode** parentBoolean,
BoolExprNodeStack& booleanStack)
{
const auto boolean = *parentBoolean;

const auto binaryNode = nodeAs<BinaryBoolNode>(boolean);
if (binaryNode && binaryNode->blrOp == blr_and)
{
const bool found1 = findDependentBooleans(csb, rseStreams,
binaryNode->arg1.getAddress(), booleanStack);
const bool found2 = findDependentBooleans(csb, rseStreams,
binaryNode->arg2.getAddress(), booleanStack);

if (!binaryNode->arg1 && !binaryNode->arg2)
*parentBoolean = nullptr;
else if (!binaryNode->arg1)
*parentBoolean = binaryNode->arg2;
else if (!binaryNode->arg2)
*parentBoolean = binaryNode->arg1;

return (found1 || found2);
}

if (const auto cmpNode = nodeAs<ComparativeBoolNode>(boolean))
{
SortedStreamList streams;
cmpNode->collectStreams(streams);

for (const auto stream : streams)
{
if (!rseStreams.exist(stream))
{
booleanStack.push(boolean);
*parentBoolean = nullptr;
return true;
}
}
}

return false;
}

// Search through the list of ANDed booleans to find correlated EXISTS/IN sub-queries.
// They are candidates to be converted into semi- or anti-joins.

bool findPossibleJoins(CompilerScratch* csb,
BoolExprNode** parentBoolean,
RecordSourceNodeStack& rseStack,
BoolExprNodeStack& booleanStack)
{
auto boolNode = *parentBoolean;

const auto binaryNode = nodeAs<BinaryBoolNode>(boolNode);
if (binaryNode && binaryNode->blrOp == blr_and)
{
const bool found1 = findPossibleJoins(csb, binaryNode->arg1.getAddress(),
rseStack, booleanStack);
const bool found2 = findPossibleJoins(csb, binaryNode->arg2.getAddress(),
rseStack, booleanStack);

if (!binaryNode->arg1 && !binaryNode->arg2)
*parentBoolean = nullptr;
else if (!binaryNode->arg1)
*parentBoolean = binaryNode->arg2;
else if (!binaryNode->arg2)
*parentBoolean = binaryNode->arg1;

return (found1 || found2);
}

const auto rseNode = nodeAs<RseBoolNode>(boolNode);
// Both EXISTS (blr_any) and IN (blr_ansi_any) sub-queries are handled
if (rseNode && (rseNode->blrOp == blr_any || rseNode->blrOp == blr_ansi_any))
{
auto rse = rseNode->rse;
fb_assert(rse);

if (rse->rse_boolean)
{
StreamList streams;
rse->computeRseStreams(streams);

BoolExprNodeStack booleans;
if (findDependentBooleans(csb, streams,
rse->rse_boolean.getAddress(),
booleans))
{
fb_assert(booleans.hasData());
auto boolean = booleans.pop();
while (booleans.hasData())
{
const auto andNode = FB_NEW_POOL(csb->csb_pool)
BinaryBoolNode(csb->csb_pool, blr_and);
andNode->arg1 = boolean;
andNode->arg2 = booleans.pop();
boolean = andNode;
}

rse->flags |= RseNode::FLAG_SEMI_JOINED;
rseStack.push(rse);
booleanStack.push(boolean);
*parentBoolean = nullptr;
return true;
}
}
}

return false;
}
}

//--------------------

Expand Down Expand Up @@ -2994,6 +3112,9 @@ RseNode* RseNode::pass1(thread_db* tdbb, CompilerScratch* csb)
{
SET_TDBB(tdbb);

if (const auto newRse = processPossibleJoins(tdbb, csb))
return newRse->pass1(tdbb, csb);

// for scoping purposes, maintain a stack of RseNode's which are
// currently being parsed; if there are none on the stack as
// yet, mark the RseNode as variant to make sure that statement-
Expand Down Expand Up @@ -3099,6 +3220,12 @@ RseNode* RseNode::pass1(thread_db* tdbb, CompilerScratch* csb)
void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
BoolExprNode** boolean, RecordSourceNodeStack& stack)
{
if (const auto newRse = processPossibleJoins(tdbb, csb))
{
newRse->pass1Source(tdbb, csb, rse, boolean, stack);
return;
}

if (rse_jointype != blr_inner)
{
// Check whether any of the upper level booleans (those belonging to the WHERE clause)
Expand Down Expand Up @@ -3152,15 +3279,15 @@ void RseNode::pass1Source(thread_db* tdbb, CompilerScratch* csb, RseNode* rse,
}
}

// in the case of an RseNode, it is possible that a new RseNode will be generated,
// In the case of an RseNode, it is possible that a new RseNode will be generated,
// so wait to process the source before we push it on the stack (bug 8039)

// The addition of the JOIN syntax for specifying inner joins causes an
// RseNode tree to be generated, which is undesirable in the simplest case
// where we are just trying to inner join more than 2 streams. If possible,
// try to flatten the tree out before we go any further.

if (!isLateral() &&
if (!isLateral() && !isSemiJoined() &&
rse->rse_jointype == blr_inner &&
rse_jointype == blr_inner &&
!rse_sorted && !rse_projection &&
Expand Down Expand Up @@ -3265,11 +3392,11 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr

StreamStateHolder stateHolder(csb, opt->getOuterStreams());

if (opt->isLeftJoin() || isLateral())
if (opt->isLeftJoin() || isLateral() || isSemiJoined())
{
stateHolder.activate();

if (opt->isLeftJoin())
if (opt->isLeftJoin() || isSemiJoined())
{
// Push all conjuncts except "missing" ones (e.g. IS NULL)
for (auto iter = opt->getConjuncts(false, true); iter.hasData(); ++iter)
Expand All @@ -3292,6 +3419,42 @@ RecordSource* RseNode::compile(thread_db* tdbb, Optimizer* opt, bool innerSubStr
return opt->compile(this, &conjunctStack);
}

RseNode* RseNode::processPossibleJoins(thread_db* tdbb, CompilerScratch* csb)
{
if (rse_jointype != blr_inner || !rse_boolean)
return nullptr;

RecordSourceNodeStack rseStack;
BoolExprNodeStack booleanStack;

// Find possibly joinable sub-queries

if (!findPossibleJoins(csb, rse_boolean.getAddress(), rseStack, booleanStack))
return nullptr;

fb_assert(rseStack.hasData() && booleanStack.hasData());
fb_assert(rseStack.getCount() == booleanStack.getCount());

// Create joins between the original node and detected joinable nodes

auto rse = this;
while (rseStack.hasData())
{
const auto newRse = FB_NEW_POOL(*tdbb->getDefaultPool())
RseNode(*tdbb->getDefaultPool());

newRse->rse_relations.add(rse);
newRse->rse_relations.add(rseStack.pop());

newRse->rse_jointype = blr_inner;
newRse->rse_boolean = booleanStack.pop();

rse = newRse;
}

return rse;
}

// Check that all streams in the RseNode have a plan specified for them.
// If they are not, there are streams in the RseNode which were not mentioned in the plan.
void RseNode::planCheck(const CompilerScratch* csb) const
Expand Down
23 changes: 15 additions & 8 deletions src/jrd/RecordSourceNodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -713,14 +713,15 @@ class RseNode final : public TypedNode<RecordSourceNode, RecordSourceNode::TYPE_
public:
enum : USHORT
{
FLAG_VARIANT = 0x01, // variant (not invariant?)
FLAG_SINGULAR = 0x02, // singleton select
FLAG_WRITELOCK = 0x04, // locked for write
FLAG_SCROLLABLE = 0x08, // scrollable cursor
FLAG_DSQL_COMPARATIVE = 0x10, // transformed from DSQL ComparativeBoolNode
FLAG_LATERAL = 0x20, // lateral derived table
FLAG_SKIP_LOCKED = 0x40, // skip locked
FLAG_SUB_QUERY = 0x80 // sub-query
FLAG_VARIANT = 0x01, // variant (not invariant?)
FLAG_SINGULAR = 0x02, // singleton select
FLAG_WRITELOCK = 0x04, // locked for write
FLAG_SCROLLABLE = 0x08, // scrollable cursor
FLAG_DSQL_COMPARATIVE = 0x10, // transformed from DSQL ComparativeBoolNode
FLAG_LATERAL = 0x20, // lateral derived table
FLAG_SKIP_LOCKED = 0x40, // skip locked
FLAG_SUB_QUERY = 0x80, // sub-query
FLAG_SEMI_JOINED = 0x100 // participates in semi-join
};

bool isInvariant() const
Expand Down Expand Up @@ -748,6 +749,11 @@ class RseNode final : public TypedNode<RecordSourceNode, RecordSourceNode::TYPE_
return (flags & FLAG_SUB_QUERY) != 0;
}

bool isSemiJoined() const
{
return (flags & FLAG_SEMI_JOINED) != 0;
}

bool hasWriteLock() const
{
return (flags & FLAG_WRITELOCK) != 0;
Expand Down Expand Up @@ -852,6 +858,7 @@ class RseNode final : public TypedNode<RecordSourceNode, RecordSourceNode::TYPE_
private:
void planCheck(const CompilerScratch* csb) const;
static void planSet(CompilerScratch* csb, PlanNode* plan);
RseNode* processPossibleJoins(thread_db* tdbb, CompilerScratch* csb);

public:
NestConst<ValueExprNode> dsqlFirst;
Expand Down
29 changes: 28 additions & 1 deletion src/jrd/optimizer/InnerJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ void InnerJoin::calculateStreamInfo()
innerStream->baseIndexes = candidate->indexes;
innerStream->baseUnique = candidate->unique;
innerStream->baseNavigated = candidate->navigated;
innerStream->baseConjuncts = candidate->conjuncts;

csb->csb_rpt[innerStream->number].deactivate();
}
Expand Down Expand Up @@ -573,13 +574,39 @@ River* InnerJoin::formRiver()

// Create a hash join
rsb = FB_NEW_POOL(getPool())
HashJoin(tdbb, csb, 2, hashJoinRsbs, keys.begin(), stream.selectivity);
HashJoin(tdbb, csb, INNER_JOIN, 2, hashJoinRsbs, keys.begin(), stream.selectivity);

// Clear priorly processed rsb's, as they're already incorporated into a hash join
rsbs.clear();
}
else
{
StreamList depStreams;

if (optimizer->isSemiJoined() && rsbs.isEmpty())
{
const auto baseStream = getStreamInfo(stream.number);
for (const auto boolean : baseStream->baseConjuncts)
{
if (optimizer->checkEquiJoin(boolean))
{
SortedStreamList nodeStreams;
boolean->collectStreams(nodeStreams);

for (const auto stream : nodeStreams)
{
if (stream != baseStream->number && !depStreams.exist(stream))
depStreams.add(stream);
}
}
}
}

StreamStateHolder stateHolder(csb, depStreams);
stateHolder.deactivate();

rsb = optimizer->generateRetrieval(stream.number, sortPtr, false, false);
}

rsbs.add(rsb);
streams.add(stream.number);
Expand Down
Loading
Loading