Skip to content

Commit

Permalink
Refactor master query to be planned by postgres' planner (#3326)
Browse files Browse the repository at this point in the history
DESCRIPTION: Replace the query planner for the coordinator part with the postgres planner

Closes #2761 

Citus had a simple rule based planner for the query executed on the query coordinator. This planner grew over time with the addigion of SQL support till it was getting close to the functionality of the postgres planner. Except the code was brittle and its complexity rose which made it hard to add new SQL support.

Given its resemblance with the postgres planner it was a long outstanding wish to replace our hand crafted planner with the well supported postgres planner. This patch replaces our planner with a call to postgres' planner.

Due to the functionality of the postgres planner we needed to support both projections and filters/quals on the citus custom scan node. When a sort operation is planned above the custom scan it might require fields to be reordered in the custom scan before returning the tuple (projection). The postgres planner assumes every custom scan node implements projections. Because we controlled the plan that was created we prevented reordering in the custom scan and never had implemented it before.

A same optimisation applies to having clauses that could have been where clauses. Instead of applying the filter as a having on the aggregate it will push it down into the plan which could reach a custom scan node.

For both filters and projections we have implemented them when tuples are read from the tuple store. If no projections or filters are required it will directly return the tuple from the tuple store. Otherwise it will loop tuples from the tuple store through the filter and projection until a tuple is found and returned.

Besides filters being pushed down a side effect of having quals that could have been a where clause is that a call to read intermediate result could be called before the first tuple is fetched from the custom scan. This failed because the intermediate result would only be pulled to the coordinator on the first tuple fetch. To overcome this problem we do run the distributed subplans now before we run the postgres executor. This ensures the intermediate result is present on the coordinator in time. We do account for total time instrumentation by removing the instrumentation before handing control to the psotgres executor and update the timings our self.

For future SQL support it is enough to create a valid query structure for the part of the query to be executed on the query coordinating node. As a utility we do serialise and print the query at debug level4 for engineers to inspect what kind of query is being planned on the query coordinator.
  • Loading branch information
thanodnl committed Feb 25, 2020
1 parent 0c4f9e2 commit a77ed9c
Show file tree
Hide file tree
Showing 55 changed files with 1,273 additions and 1,031 deletions.
31 changes: 22 additions & 9 deletions src/backend/distributed/executor/adaptive_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,28 @@ static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events
eventCount, bool *cancellationReceived);
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);


/*
* AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor
* run. Given that the result of our subplans would be evaluated before the first call to
* the exec function of our custom scan we make sure our subplans have executed before.
*/
void
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
{
DistributedPlan *distributedPlan = scanState->distributedPlan;

/*
* PostgreSQL takes locks on all partitions in the executor. It's not entirely
* clear why this is necessary (instead of locking the parent during DDL), but
* we do the same for consistency.
*/
LockPartitionsForDistributedPlan(distributedPlan);

ExecuteSubPlans(distributedPlan);
}


/*
* AdaptiveExecutor is called via CitusExecScan on the
* first call of CitusExecScan. The function fills the tupleStore
Expand All @@ -649,15 +671,6 @@ AdaptiveExecutor(CitusScanState *scanState)
/* we should only call this once before the scan finished */
Assert(!scanState->finishedRemoteScan);

/*
* PostgreSQL takes locks on all partitions in the executor. It's not entirely
* clear why this is necessary (instead of locking the parent during DDL), but
* we do the same for consistency.
*/
LockPartitionsForDistributedPlan(distributedPlan);

ExecuteSubPlans(distributedPlan);

bool hasDependentJobs = HasDependentJobs(job);
if (hasDependentJobs)
{
Expand Down
59 changes: 54 additions & 5 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static Node * DelayedErrorCreateScan(CustomScan *scan);
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusBeginScanWithCoordinatorProcessing(CustomScanState *node, EState *estate,
int eflags);
static void CitusPreExecScan(CitusScanState *scanState);
static void HandleDeferredShardPruningForFastPathQueries(
DistributedPlan *distributedPlan);
static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan);
Expand Down Expand Up @@ -114,6 +115,29 @@ static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = {
};


/*
* IsCitusCustomState returns if a given PlanState node is a CitusCustomState node.
*/
bool
IsCitusCustomState(PlanState *planState)
{
if (!IsA(planState, CustomScanState))
{
return false;
}

CustomScanState *css = castNode(CustomScanState, planState);
if (css->methods == &AdaptiveExecutorCustomExecMethods ||
css->methods == &TaskTrackerCustomExecMethods ||
css->methods == &CoordinatorInsertSelectCustomExecMethods)
{
return true;
}

return false;
}


/*
* Let PostgreSQL know about Citus' custom scan nodes.
*/
Expand Down Expand Up @@ -141,7 +165,24 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
CitusScanState *scanState = (CitusScanState *) node;

#if PG_VERSION_NUM >= 120000

/*
* Since we are using a tuplestore we cannot use the virtual tuples postgres had
* already setup on the CustomScan. Instead we need to reinitialize the tuples as
* minimal.
*
* During initialization postgres also created the projection information and the
* quals, but both are 'compiled' to be executed on virtual tuples. Since we replaced
* the tuples with minimal tuples we also compile both the projection and the quals
* on to these 'new' tuples.
*/
ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple);

ExecInitScanTupleSlot(node->ss.ps.state, &node->ss, node->ss.ps.scandesc,
&TTSOpsMinimalTuple);
ExecAssignScanProjectionInfoWithVarno(&node->ss, INDEX_VAR);

node->ss.ps.qual = ExecInitQual(node->ss.ps.plan->qual, (PlanState *) node);
#endif

DistributedPlan *distributedPlan = scanState->distributedPlan;
Expand All @@ -158,6 +199,16 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
}


/*
* CitusPreExecScan is called right before postgres' executor starts pulling tuples.
*/
static void
CitusPreExecScan(CitusScanState *scanState)
{
AdaptiveExecutorPreExecutorRun(scanState);
}


/*
* CitusExecScan is called when a tuple is pulled from a custom scan.
* On the first call, it executes the distributed query and writes the
Expand All @@ -176,9 +227,7 @@ CitusExecScan(CustomScanState *node)
scanState->finishedRemoteScan = true;
}

TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState);

return resultSlot;
return ReturnTupleFromTuplestore(scanState);
}


Expand Down Expand Up @@ -596,6 +645,7 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
scanState->distributedPlan = GetDistributedPlan(scan);

scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
scanState->PreExecScan = &CitusPreExecScan;

return (Node *) scanState;
}
Expand Down Expand Up @@ -726,8 +776,7 @@ CitusReScan(CustomScanState *node)
TupleDesc
ScanStateGetTupleDescriptor(CitusScanState *scanState)
{
return scanState->customScanState.ss.ps.ps_ResultTupleSlot->
tts_tupleDescriptor;
return scanState->customScanState.ss.ss_ScanTupleSlot->tts_tupleDescriptor;
}


Expand Down
159 changes: 156 additions & 3 deletions src/backend/distributed/executor/multi_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_master_planner.h"
Expand All @@ -34,6 +35,7 @@
#include "distributed/worker_protocol.h"
#include "executor/execdebug.h"
#include "commands/copy.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/parsetree.h"
Expand Down Expand Up @@ -69,6 +71,9 @@ int ExecutorLevel = 0;
static Relation StubRelation(TupleDesc tupleDescriptor);
static bool AlterTableConstraintCheck(QueryDesc *queryDesc);
static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan);
static List * FindCitusCustomScanStates(PlanState *planState);
static bool CitusCustomScanStateWalker(PlanState *planState,
List **citusCustomScanStates);

/*
* CitusExecutorStart is the ExecutorStart_hook that gets called when
Expand Down Expand Up @@ -123,10 +128,25 @@ CitusExecutorRun(QueryDesc *queryDesc,
{
DestReceiver *dest = queryDesc->dest;

/*
* We do some potentially time consuming operations our self now before we hand of
* control to postgres' executor. To make sure that time spent is accurately measured
* we remove the totaltime instrumentation from the queryDesc. Instead we will start
* and stop the instrumentation of the total time and put it back on the queryDesc
* before returning (or rethrowing) from this function.
*/
Instrumentation *volatile totalTime = queryDesc->totaltime;
queryDesc->totaltime = NULL;

PG_TRY();
{
ExecutorLevel++;

if (totalTime)
{
InstrStartNode(totalTime);
}

if (CitusHasBeenLoaded())
{
if (IsLocalReferenceTableJoinPlan(queryDesc->plannedstmt) &&
Expand Down Expand Up @@ -174,13 +194,47 @@ CitusExecutorRun(QueryDesc *queryDesc,
}
else
{
/* switch into per-query memory context before calling PreExecScan */
MemoryContext oldcontext = MemoryContextSwitchTo(
queryDesc->estate->es_query_cxt);

/*
* Call PreExecScan for all citus custom scan nodes prior to starting the
* postgres exec scan to give some citus scan nodes some time to initialize
* state that would be too late if it were to initialize when the first tuple
* would need to return.
*/
List *citusCustomScanStates = FindCitusCustomScanStates(queryDesc->planstate);
CitusScanState *citusScanState = NULL;
foreach_ptr(citusScanState, citusCustomScanStates)
{
if (citusScanState->PreExecScan)
{
citusScanState->PreExecScan(citusScanState);
}
}

/* postgres will switch here again and will restore back on its own */
MemoryContextSwitchTo(oldcontext);

standard_ExecutorRun(queryDesc, direction, count, execute_once);
}

if (totalTime)
{
InstrStopNode(totalTime, queryDesc->estate->es_processed);
queryDesc->totaltime = totalTime;
}

ExecutorLevel--;
}
PG_CATCH();
{
if (totalTime)
{
queryDesc->totaltime = totalTime;
}

ExecutorLevel--;

PG_RE_THROW();
Expand All @@ -189,6 +243,38 @@ CitusExecutorRun(QueryDesc *queryDesc,
}


/*
* FindCitusCustomScanStates returns a list of all citus custom scan states in it.
*/
static List *
FindCitusCustomScanStates(PlanState *planState)
{
List *citusCustomScanStates = NIL;
CitusCustomScanStateWalker(planState, &citusCustomScanStates);
return citusCustomScanStates;
}


/*
* CitusCustomScanStateWalker walks a planState tree structure and adds all
* CitusCustomState nodes to the list passed by reference as the second argument.
*/
static bool
CitusCustomScanStateWalker(PlanState *planState, List **citusCustomScanStates)
{
if (IsCitusCustomState(planState))
{
CitusScanState *css = (CitusScanState *) planState;
*citusCustomScanStates = lappend(*citusCustomScanStates, css);

/* breaks the walking of this tree */
return true;
}
return planstate_tree_walker(planState, CitusCustomScanStateWalker,
citusCustomScanStates);
}


/*
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* given Citus scan node and returns it. It returns null if all tuples are read
Expand All @@ -214,10 +300,77 @@ ReturnTupleFromTuplestore(CitusScanState *scanState)
forwardScanDirection = false;
}

TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot);
ExprState *qual = scanState->customScanState.ss.ps.qual;
ProjectionInfo *projInfo = scanState->customScanState.ss.ps.ps_ProjInfo;
ExprContext *econtext = scanState->customScanState.ss.ps.ps_ExprContext;

return resultSlot;
if (!qual && !projInfo)
{
/* no quals, nor projections return directly from the tuple store. */
TupleTableSlot *slot = scanState->customScanState.ss.ss_ScanTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, slot);
return slot;
}

for (;;)
{
/*
* If there is a very selective qual on the Citus Scan node we might block
* interupts for a longer time if we would not check for interrupts in this loop
*/
CHECK_FOR_INTERRUPTS();

/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle.
*/
ResetExprContext(econtext);

TupleTableSlot *slot = scanState->customScanState.ss.ss_ScanTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, slot);

if (TupIsNull(slot))
{
/*
* When the tuple is null we have reached the end of the tuplestore. We will
* return a null tuple, however, depending on the existence of a projection we
* need to either return the scan tuple or the projected tuple.
*/
if (projInfo)
{
return ExecClearTuple(projInfo->pi_state.resultslot);
}
else
{
return slot;
}
}

/* place the current tuple into the expr context */
econtext->ecxt_scantuple = slot;

if (!ExecQual(qual, econtext))
{
/* skip nodes that do not satisfy the qual (filter) */
InstrCountFiltered1(scanState, 1);
continue;
}

/* found a satisfactory scan tuple */
if (projInfo)
{
/*
* Form a projection tuple, store it in the result tuple slot and return it.
* ExecProj works on the ecxt_scantuple on the context stored earlier.
*/
return ExecProject(projInfo);
}
else
{
/* Here, we aren't projecting, so just return scan tuple */
return slot;
}
}
}


Expand Down

0 comments on commit a77ed9c

Please sign in to comment.