Skip to content
Permalink
Browse files

Fix motion hazard between outer and joinqual

A motion hazard is a deadlock between motions, a classic motion hazard
in a join executor is formed by its inner and outer motions, it can be
prevented by prefetching the inner plan, refer to motion_sanity_check()
for details.

A similar motion hazard can be formed by the outer motion and the join
qual motion.  A join executor fetches a outer tuple, filters it with the
join qual, then repeat the process on all the outer tuples.  When there
are motions in both outer plan and the join qual then below state is
possible:

0. processes A and B belong to the join slice, process C belongs to the
   outer slice, process D belongs to the JoinQual slice;
1. A has read the first outer tuple and is fetching tuples from D;
2. D is waiting for ACK from B;
3. B is fetching the first outer tuple from C;
4. C is waiting for ACK from A;

So a deadlock is formed A->D->B->C->A.  We can prevent it also by
prefetching the join qual.

Reviewed-by: Jesse Zhang <jzhang@pivotal.io>
Reviewed-by: Gang Xiong <gxiong@pivotal.io>
Reviewed-by: Zhenghua Lyu <zlv@pivotal.io>

(cherry picked from commit fa762b6)
  • Loading branch information...
pivotal-ning-yu committed May 8, 2019
1 parent a9452d5 commit d2ebd40835e481a8724f4e3169c60cade3fed6d8
@@ -1682,6 +1682,87 @@ ExecGetShareNodeEntry(EState* estate, int shareidx, bool fCreate)
return (ShareNodeEntry *) list_nth(*estate->es_sharenode, shareidx);
}

/*
* Prefetch JoinQual to prevent motion hazard.
*
* A motion hazard is a deadlock between motions, a classic motion hazard in a
* join executor is formed by its inner and outer motions, it can be prevented
* by prefetching the inner plan, refer to motion_sanity_check() for details.
*
* A similar motion hazard can be formed by the outer motion and the join qual
* motion. A join executor fetches a outer tuple, filters it with the join
* qual, then repeat the process on all the outer tuples. When there are
* motions in both outer plan and the join qual then below state is possible:
*
* 0. processes A and B belong to the join slice, process C belongs to the
* outer slice, process D belongs to the JoinQual slice;
* 1. A has read the first outer tuple and is fetching tuples from D;
* 2. D is waiting for ACK from B;
* 3. B is fetching the first outer tuple from C;
* 4. C is waiting for ACK from A;
*
* So a deadlock is formed A->D->B->C->A. We can prevent it also by
* prefetching the join qual.
*
* An example is demonstrated and explained in test case
* src/test/regress/sql/deadlock2.sql.
*
* Return true if the JoinQual is prefetched.
*/
bool
ExecPrefetchJoinQual(JoinState *node)
{
EState *estate = node->ps.state;
ExprContext *econtext = node->ps.ps_ExprContext;
PlanState *inner = innerPlanState(node);
PlanState *outer = outerPlanState(node);
List *joinqual = node->joinqual;
TupleTableSlot *innertuple = econtext->ecxt_innertuple;

if (!joinqual)
return false;

/* Outer tuples should not be fetched before us */
Assert(econtext->ecxt_outertuple == NULL);

/* Build fake inner & outer tuples */
econtext->ecxt_innertuple = ExecInitNullTupleSlot(estate,
ExecGetResultType(inner));
econtext->ecxt_outertuple = ExecInitNullTupleSlot(estate,
ExecGetResultType(outer));

/* Fetch subplan with the fake inner & outer tuples */
ExecQual(joinqual, econtext, false);

/* Restore previous state */
econtext->ecxt_innertuple = innertuple;
econtext->ecxt_outertuple = NULL;

return true;
}

/*
* Decide if should prefetch joinqual.
*
* Joinqual should be prefetched when both outer and joinqual contain motions.
* In create_*join_plan() functions we set prefetch_joinqual according to the
* outer motions, now we detect for joinqual motions to make the final
* decision.
*
* See ExecPrefetchJoinQual() for details.
*
* This function should be called in ExecInit*Join() functions.
*
* Return true if JoinQual should be prefetched.
*/
bool
ShouldPrefetchJoinQual(EState *estate, Join *join)
{
return (join->prefetch_joinqual &&
findSenderMotion(estate->es_plannedstmt,
estate->currentSliceIdInPlan));
}

/* ----------------------------------------------------------------
* CDB Slice Table utilities
* ----------------------------------------------------------------
@@ -234,6 +234,14 @@ ExecHashJoin_guts(HashJoinState *node)
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
return NULL;

/*
* Prefetch JoinQual to prevent motion hazard.
*
* See ExecPrefetchJoinQual() for details.
*/
if (node->prefetch_joinqual && ExecPrefetchJoinQual(&node->js))
node->prefetch_joinqual = false;

/*
* We just scanned the entire inner side and built the hashtable
* (and its overflow batches). Check here and remember if the inner
@@ -592,6 +600,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
* the fix to MPP-989)
*/
hjstate->prefetch_inner = node->join.prefetch_inner;
hjstate->prefetch_joinqual = ShouldPrefetchJoinQual(estate, &node->join);

/*
* initialize child nodes
@@ -680,6 +680,14 @@ ExecMergeJoin_guts(MergeJoinState *node)
node->prefetch_inner = false;
}

/*
* Prefetch JoinQual to prevent motion hazard.
*
* See ExecPrefetchJoinQual() for details.
*/
if (node->prefetch_joinqual && ExecPrefetchJoinQual(&node->js))
node->prefetch_joinqual = false;

/*
* ok, everything is setup.. let's go to work
*/
@@ -1564,6 +1572,7 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
mergestate->mj_ConstFalseJoin = false;

mergestate->prefetch_inner = node->join.prefetch_inner;
mergestate->prefetch_joinqual = ShouldPrefetchJoinQual(estate, &node->join);
/* Prepare inner operators for rewind after the prefetch */
rewindflag = mergestate->prefetch_inner ? EXEC_FLAG_REWIND : 0;

@@ -142,6 +142,14 @@ ExecNestLoop_guts(NestLoopState *node)
node->reset_inner = false;
}

/*
* Prefetch JoinQual to prevent motion hazard.
*
* See ExecPrefetchJoinQual() for details.
*/
if (node->prefetch_joinqual && ExecPrefetchJoinQual(&node->js))
node->prefetch_joinqual = false;

/*
* Ok, everything is setup for the join so now loop until we return a
* qualifying join tuple.
@@ -382,6 +390,7 @@ ExecInitNestLoop(NestLoop *node, EState *estate, int eflags)
nlstate->shared_outer = node->shared_outer;

nlstate->prefetch_inner = node->join.prefetch_inner;
nlstate->prefetch_joinqual = ShouldPrefetchJoinQual(estate, &node->join);

/*CDB-OLAP*/
nlstate->reset_inner = false;
@@ -871,6 +871,7 @@ CopyJoinFields(const Join *from, Join *newnode)
CopyPlanFields((const Plan *) from, (Plan *) newnode);

COPY_SCALAR_FIELD(prefetch_inner);
COPY_SCALAR_FIELD(prefetch_joinqual);

COPY_SCALAR_FIELD(jointype);
COPY_NODE_FIELD(joinqual);
@@ -406,6 +406,7 @@ _outJoinPlanInfo(StringInfo str, const Join *node)
_outPlanInfo(str, (const Plan *) node);

WRITE_BOOL_FIELD(prefetch_inner);
WRITE_BOOL_FIELD(prefetch_joinqual);

WRITE_ENUM_FIELD(jointype, JoinType);
WRITE_NODE_FIELD(joinqual);
@@ -2427,6 +2427,7 @@ void readJoinInfo(Join *local_node)
readPlanInfo((Plan *) local_node);

READ_BOOL_FIELD(prefetch_inner);
READ_BOOL_FIELD(prefetch_joinqual);

READ_ENUM_FIELD(jointype, JoinType);
READ_NODE_FIELD(joinqual);
@@ -776,6 +776,18 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
if (partition_selector_created)
((Join *) plan)->prefetch_inner = true;

/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->outerjoinpath &&
best_path->outerjoinpath->motionHazard)
((Join *) plan)->prefetch_joinqual = true;

plan->flow = cdbpathtoplan_create_flow(root,
best_path->path.locus,
best_path->path.parent ? best_path->path.parent->relids
@@ -3189,6 +3201,18 @@ create_nestloop_plan(PlannerInfo *root,
if (prefetch)
join_plan->join.prefetch_inner = true;

/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->outerjoinpath &&
best_path->outerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;

return join_plan;
}

@@ -3514,6 +3538,25 @@ create_mergejoin_plan(PlannerInfo *root,

join_plan->join.prefetch_inner = prefetch;

/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->jpath.outerjoinpath &&
best_path->jpath.outerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;
/*
* If inner motion is not under a Material or Sort node then there could
* also be motion deadlock between inner and joinqual in mergejoin.
*/
if (best_path->jpath.innerjoinpath &&
best_path->jpath.innerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;

/* Costs of sort and material steps are included in path cost already */
copy_path_costsize(root, &join_plan->join.plan, &best_path->jpath.path);

@@ -3668,6 +3711,18 @@ create_hashjoin_plan(PlannerInfo *root,
join_plan->join.prefetch_inner = true;
}

/*
* A motion deadlock can also happen when outer and joinqual both contain
* motions. It is not easy to check for joinqual here, so we set the
* prefetch_joinqual mark only according to outer motion, and check for
* joinqual later in the executor.
*
* See ExecPrefetchJoinQual() for details.
*/
if (best_path->jpath.outerjoinpath &&
best_path->jpath.outerjoinpath->motionHazard)
join_plan->join.prefetch_joinqual = true;

copy_path_costsize(root, &join_plan->join.plan, &best_path->jpath.path);

return join_plan;
@@ -470,6 +470,9 @@ extern void UnregisterExprContextCallback(ExprContext *econtext,
/* Share input utilities defined in execUtils.c */
extern ShareNodeEntry * ExecGetShareNodeEntry(EState *estate, int shareid, bool fCreate);

extern bool ExecPrefetchJoinQual(JoinState *node);
extern bool ShouldPrefetchJoinQual(EState *estate, Join *join);

/* ResultRelInfo and Append Only segment assignment */
void ResultRelInfoSetSegno(ResultRelInfo *resultRelInfo, List *mapping);

@@ -2264,6 +2264,7 @@ typedef struct NestLoopState
bool nl_MatchedOuter;
bool shared_outer;
bool prefetch_inner;
bool prefetch_joinqual;
bool reset_inner; /*CDB-OLAP*/
bool require_inner_reset; /*CDB-OLAP*/

@@ -2319,6 +2320,7 @@ typedef struct MergeJoinState
ExprContext *mj_OuterEContext;
ExprContext *mj_InnerEContext;
bool prefetch_inner; /* MPP-3300 */
bool prefetch_joinqual;
} MergeJoinState;

/* ----------------
@@ -2376,6 +2378,7 @@ typedef struct HashJoinState
bool hj_OuterNotEmpty;
bool hj_InnerEmpty; /* set to true if inner side is empty */
bool prefetch_inner;
bool prefetch_joinqual;
bool hj_nonequijoin;

/* set if the operator created workfiles */
@@ -867,6 +867,7 @@ typedef struct Join
List *joinqual; /* JOIN quals (in addition to plan.qual) */

bool prefetch_inner; /* to avoid deadlock in MPP */
bool prefetch_joinqual; /* to avoid deadlock in MPP */
} Join;

/* ----------------
Oops, something went wrong.

0 comments on commit d2ebd40

Please sign in to comment.
You can’t perform that action at this time.