Skip to content

Commit

Permalink
Merge 0e7ae29 into 1607944
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmp85 committed Jan 21, 2015
2 parents 1607944 + 0e7ae29 commit e13f072
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 30 deletions.
13 changes: 13 additions & 0 deletions expected/queries.out
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles
6 | 50867
(5 rows)

-- verify pg_shard produces correct remote SQL
SET pg_shard.log_distributed_statements = on;
SET client_min_messages = log;
SELECT count(*) FROM articles WHERE word_count > 10000;
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_10037 WHERE (word_count > 10000)
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_10036 WHERE (word_count > 10000)
count
-------
23
(1 row)

SET client_min_messages = DEFAULT;
SET pg_shard.log_distributed_statements = DEFAULT;
-- verify temp tables used by cross-shard queries do not persist
SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'pg_shard_temp_table%' AND
relkind = 'r';
Expand Down
141 changes: 111 additions & 30 deletions pg_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ bool AllModificationsCommutative = false;
/* informs pg_shard to use the CitusDB planner */
bool UseCitusDBSelectLogic = false;

/* logs each statement used in a distributed plan */
bool LogDistributedStatements = false;


/* planner functions forward declarations */
static PlannedStmt * PgShardPlanner(Query *parse, int cursorOptions,
Expand All @@ -92,9 +95,13 @@ static Oid ExtractFirstDistributedTableId(Query *query);
static bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
static List * DistributedQueryShardList(Query *query);
static bool SelectFromMultipleShards(Query *query, List *queryShardList);
static Query * BuildLocalQuery(Query *query, List *localRestrictList);
static PlannedStmt * PlanSequentialScan(Query *query, int cursorOptions,
ParamListInfo boundParams);
static Query * RowAndColumnFilterQuery(Query *query);
static void ClassifyRestrictions(List *queryRestrictList, List **remoteRestrictList,
List **localRestrictList);
static Query * RowAndColumnFilterQuery(Query *query, List *remoteRestrictList,
List *localRestrictList);
static List * QueryRestrictList(Query *query);
static Const * ExtractPartitionValue(Query *query, Var *partitionColumn);
static bool ExtractFromExpressionWalker(Node *node, List **qualifierList);
Expand Down Expand Up @@ -179,6 +186,11 @@ _PG_init(void)
&UseCitusDBSelectLogic, false, PGC_USERSET, 0, NULL,
NULL, NULL);

DefineCustomBoolVariable("pg_shard.log_distributed_statements",
"Logs each statement used in a distributed plan", NULL,
&LogDistributedStatements, false, PGC_USERSET, 0, NULL,
NULL, NULL);

EmitWarningsOnPlaceholders("pg_shard");
}

Expand Down Expand Up @@ -246,15 +258,27 @@ PgShardPlanner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (selectFromMultipleShards)
{
Oid distributedTableId = InvalidOid;
Query *localQuery = NULL;
List *queryRestrictList = QueryRestrictList(distributedQuery);
List *remoteRestrictList = NIL;
List *localRestrictList = NIL;

/* partition restrictions into remote and local lists */
ClassifyRestrictions(queryRestrictList, &remoteRestrictList,
&localRestrictList);

/* build local and distributed query */
distributedQuery = RowAndColumnFilterQuery(distributedQuery,
remoteRestrictList,
localRestrictList);
localQuery = BuildLocalQuery(query, localRestrictList);

/*
* Force a sequential scan as we change the underlying table to
* point to our intermediate temporary table which contains the
* fetched data.
*/
plannedStatement = PlanSequentialScan(query, cursorOptions, boundParams);

distributedQuery = RowAndColumnFilterQuery(distributedQuery);
plannedStatement = PlanSequentialScan(localQuery, cursorOptions, boundParams);

/* construct a CreateStmt to clone the existing table */
distributedTableId = ExtractFirstDistributedTableId(distributedQuery);
Expand Down Expand Up @@ -638,19 +662,38 @@ SelectFromMultipleShards(Query *query, List *queryShardList)
}


/*
* BuildLocalQuery returns a copy of query with its quals replaced by those
* in localRestrictList. Expects queries with a single entry in their FROM
* list.
*/
static Query *
BuildLocalQuery(Query *query, List *localRestrictList)
{
Query *localQuery = copyObject(query);
FromExpr *joinTree = localQuery->jointree;

Assert(joinTree != NULL);
Assert(list_length(joinTree->fromlist) == 1);
joinTree->quals = (Node *) make_ands_explicit((List *) localRestrictList);

return localQuery;
}


/*
* PlanSequentialScan attempts to plan the given query using only a sequential
* scan of the underlying table. The function disables index scan types and
* plans the query. If the plan still contains a non-sequential scan plan node,
* the function errors out.
* the function errors out. Note this function modifies the query parameter, so
* make a copy before calling PlanSequentialScan if that is unacceptable.
*/
static PlannedStmt *
PlanSequentialScan(Query *query, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *sequentialScanPlan = NULL;
bool indexScanEnabledOldValue = false;
bool bitmapScanEnabledOldValue = false;
Query *queryCopy = NULL;
List *rangeTableList = NIL;
ListCell *rangeTableCell = NULL;

Expand Down Expand Up @@ -678,8 +721,7 @@ PlanSequentialScan(Query *query, int cursorOptions, ParamListInfo boundParams)
enable_indexscan = false;
enable_bitmapscan = false;

queryCopy = copyObject(query);
sequentialScanPlan = standard_planner(queryCopy, cursorOptions, boundParams);
sequentialScanPlan = standard_planner(query, cursorOptions, boundParams);

enable_indexscan = indexScanEnabledOldValue;
enable_bitmapscan = bitmapScanEnabledOldValue;
Expand All @@ -688,20 +730,54 @@ PlanSequentialScan(Query *query, int cursorOptions, ParamListInfo boundParams)
}


/*
* ClassifyRestrictions divides a query's restriction list in two: the subset
* of restrictions safe for remote evaluation and the subset of restrictions
* that must be evaluated locally. remoteRestrictList and localRestrictList are
* output parameters to receive these two subsets.
*
* Currently places all restrictions in the remote list and leaves the local
* one totally empty.
*/
static void
ClassifyRestrictions(List *queryRestrictList, List **remoteRestrictList,
List **localRestrictList)
{
ListCell *restrictCell = NULL;

*remoteRestrictList = NIL;
*localRestrictList = NIL;

foreach(restrictCell, queryRestrictList)
{
Node *restriction = (Node *) lfirst(restrictCell);
bool restrictionSafeToSend = true;

if (restrictionSafeToSend)
{
*remoteRestrictList = lappend(*remoteRestrictList, restriction);
}
else
{
*localRestrictList = lappend(*localRestrictList, restriction);
}
}
}


/*
* RowAndColumnFilterQuery builds a query which contains the filter clauses from
* the original query and also only selects columns needed for the original
* query. This new query can then be pushed down to the worker nodes.
*/
static Query *
RowAndColumnFilterQuery(Query *query)
RowAndColumnFilterQuery(Query *query, List *remoteRestrictList, List *localRestrictList)
{
Query *filterQuery = NULL;
List *rangeTableList = NIL;
List *whereClauseList = NIL;
List *whereClauseColumnList = NIL;
List *whereColumnList = NIL;
List *projectColumnList = NIL;
List *columnList = NIL;
List *requiredColumnList = NIL;
ListCell *columnCell = NULL;
List *uniqueColumnList = NIL;
List *targetList = NIL;
Expand All @@ -712,35 +788,35 @@ RowAndColumnFilterQuery(Query *query)
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
Assert(list_length(rangeTableList) == 1);

/* build the where-clause expression */
whereClauseList = QueryRestrictList(query);
/* build the expression to supply FROM/WHERE for the remote query */
fromExpr = makeNode(FromExpr);
fromExpr->quals = (Node *) make_ands_explicit((List *) whereClauseList);
fromExpr->quals = (Node *) make_ands_explicit((List *) remoteRestrictList);
fromExpr->fromlist = QueryFromList(rangeTableList);

/* extract columns from both the where and projection clauses */
whereClauseColumnList = pull_var_clause((Node *) fromExpr, aggregateBehavior,
placeHolderBehavior);
/* must retrieve all columns referenced by local WHERE clauses... */
whereColumnList = pull_var_clause((Node *) localRestrictList, aggregateBehavior,
placeHolderBehavior);

/* as well as any used in projections (GROUP BY, etc.) */
projectColumnList = pull_var_clause((Node *) query->targetList, aggregateBehavior,
placeHolderBehavior);
columnList = list_union(whereClauseColumnList, projectColumnList);
placeHolderBehavior);

/*
* list_union() filters duplicates, but only between the lists. For example,
* if we have a where clause like (where order_id = 1 OR order_id = 2), we
* end up with two order_id columns. We therefore de-dupe the columns here.
*/
foreach(columnCell, columnList)
/* put them together to get list of required columns for query */
requiredColumnList = list_concat(requiredColumnList, whereColumnList);
requiredColumnList = list_concat(requiredColumnList, projectColumnList);

/* ensure there are no duplicates in the list */
foreach(columnCell, requiredColumnList)
{
Var *column = (Var *) lfirst(columnCell);

uniqueColumnList = list_append_unique(uniqueColumnList, column);
}

/*
* If the query is a simple "SELECT count(*)", add a NULL constant. This
* constant deparses to "SELECT NULL FROM ...". postgres_fdw generates a
* similar string when no columns are selected.
* If we still have no columns, possible in a query like "SELECT count(*)",
* add a NULL constant. This constant results in "SELECT NULL FROM ...".
* postgres_fdw generates a similar string when no columns are selected.
*/
if (uniqueColumnList == NIL)
{
Expand All @@ -756,8 +832,8 @@ RowAndColumnFilterQuery(Query *query)
filterQuery = makeNode(Query);
filterQuery->commandType = CMD_SELECT;
filterQuery->rtable = rangeTableList;
filterQuery->targetList = targetList;
filterQuery->jointree = fromExpr;
filterQuery->targetList = targetList;

return filterQuery;
}
Expand Down Expand Up @@ -990,6 +1066,11 @@ BuildDistributedPlan(Query *query, List *shardIntervalList)

deparse_shard_query(query, shardId, queryString);

if (LogDistributedStatements)
{
ereport(LOG, (errmsg("distributed statement: %s", queryString->data)));
}

task = (Task *) palloc0(sizeof(Task));
task->queryString = queryString;
task->taskPlacementList = finalizedPlacementList;
Expand Down
9 changes: 9 additions & 0 deletions sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles
ORDER BY sum(word_count) DESC
LIMIT 5;

-- verify pg_shard produces correct remote SQL
SET pg_shard.log_distributed_statements = on;
SET client_min_messages = log;

SELECT count(*) FROM articles WHERE word_count > 10000;

SET client_min_messages = DEFAULT;
SET pg_shard.log_distributed_statements = DEFAULT;

-- verify temp tables used by cross-shard queries do not persist
SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'pg_shard_temp_table%' AND
relkind = 'r';
Expand Down

0 comments on commit e13f072

Please sign in to comment.