Skip to content

Commit

Permalink
Merge 6241a9f into 1607944
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmp85 committed Jan 21, 2015
2 parents 1607944 + 6241a9f commit 9a4abad
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 24 deletions.
7 changes: 7 additions & 0 deletions expected/queries.out
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles
6 | 50867
(5 rows)

-- test zero-shard query
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
count
-------
0
(1 row)

-- verify temp tables used by cross-shard queries do not persist
SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'pg_shard_temp_table%' AND
relkind = 'r';
Expand Down
47 changes: 24 additions & 23 deletions pg_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ static void ErrorIfQueryNotSupported(Query *queryTree);
static Oid ExtractFirstDistributedTableId(Query *query);
static bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
static List * DistributedQueryShardList(Query *query);
static bool SelectFromMultipleShards(Query *query, List *queryShardList);
static bool NeedsMultiShardSelectLogic(Query *query, List *queryShardList);
static PlannedStmt * PlanSequentialScan(Query *query, int cursorOptions,
ParamListInfo boundParams);
static Query * RowAndColumnFilterQuery(Query *query);
Expand Down Expand Up @@ -217,7 +217,7 @@ PgShardPlanner(Query *query, int cursorOptions, ParamListInfo boundParams)
DistributedPlan *distributedPlan = NULL;
Query *distributedQuery = copyObject(query);
List *queryShardList = NIL;
bool selectFromMultipleShards = false;
bool useMultiShardSelectLogic = false;
CreateStmt *createTemporaryTableStmt = NULL;

/* call standard planner first to have Query transformations performed */
Expand All @@ -233,7 +233,7 @@ PgShardPlanner(Query *query, int cursorOptions, ParamListInfo boundParams)
queryShardList = DistributedQueryShardList(distributedQuery);

/*
* If a select query touches multiple shards, we don't push down the
* Unless a SELECT involves exactly one shard, we don't push down the
* query as-is, and instead only push down the filter clauses and select
* needed columns. We then copy those results to a local temporary table
* and then modify the original PostgreSQL plan to perform a sequential
Expand All @@ -242,8 +242,8 @@ PgShardPlanner(Query *query, int cursorOptions, ParamListInfo boundParams)
* scans. We will revisit this by potentially using another type of scan
* node instead of a sequential scan.
*/
selectFromMultipleShards = SelectFromMultipleShards(query, queryShardList);
if (selectFromMultipleShards)
useMultiShardSelectLogic = NeedsMultiShardSelectLogic(query, queryShardList);
if (useMultiShardSelectLogic)
{
Oid distributedTableId = InvalidOid;

Expand All @@ -263,7 +263,7 @@ PgShardPlanner(Query *query, int cursorOptions, ParamListInfo boundParams)

distributedPlan = BuildDistributedPlan(distributedQuery, queryShardList);
distributedPlan->originalPlan = plannedStatement->planTree;
distributedPlan->selectFromMultipleShards = selectFromMultipleShards;
distributedPlan->useMultiShardSelectLogic = useMultiShardSelectLogic;
distributedPlan->createTemporaryTableStmt = createTemporaryTableStmt;

plannedStatement->planTree = (Plan *) distributedPlan;
Expand Down Expand Up @@ -586,9 +586,10 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList)

/*
* DistributedQueryShardList prunes the shards for the table in the query based
* on the query's restriction qualifiers, and returns this list. If the function
* cannot find any shards for the distributed table, it errors out. In other sense,
* the function errors out or returns a non-empty list.
* on the query's restriction qualifiers, and returns this list. It is possible
* that all shards will be pruned if a query's restrictions are unsatisfiable.
* In that case, this function can return an empty list; however, if the table
* being queried has no shards created whatsoever, this function errors out.
*/
static List *
DistributedQueryShardList(Query *query)
Expand Down Expand Up @@ -616,18 +617,19 @@ DistributedQueryShardList(Query *query)
prunedShardList = PruneShardList(distributedTableId, restrictClauseList,
shardIntervalList);

/* shouldn't be an empty list, but assert in case something's very wrong */
Assert(prunedShardList != NIL);

return prunedShardList;
}


/* Returns true if the query is a select query that reads data from multiple shards. */
/*
* NeedsMultiShardSelectLogic returns true if the query is a SELECT query
* requiring multi-shard SELECT. This is true of any SELECT query involving
* more than one shard, but also applies to queries that involve zero shards.
*/
static bool
SelectFromMultipleShards(Query *query, List *queryShardList)
NeedsMultiShardSelectLogic(Query *query, List *queryShardList)
{
if ((query->commandType == CMD_SELECT) && (list_length(queryShardList) > 1))
if ((query->commandType == CMD_SELECT) && (list_length(queryShardList) != 1))
{
return true;
}
Expand Down Expand Up @@ -1020,8 +1022,8 @@ PgShardExecutorStart(QueryDesc *queryDesc, int eflags)
{
DistributedPlan *distributedPlan = (DistributedPlan *) plannedStatement->planTree;

bool selectFromMultipleShards = distributedPlan->selectFromMultipleShards;
if (!selectFromMultipleShards)
bool useMultiShardSelectLogic = distributedPlan->useMultiShardSelectLogic;
if (!useMultiShardSelectLogic)
{
bool topLevel = true;
LOCKMODE lockMode = NoLock;
Expand All @@ -1047,8 +1049,8 @@ PgShardExecutorStart(QueryDesc *queryDesc, int eflags)
else
{
/*
* If its a SELECT query over multiple shards, we fetch the relevant
* data from the remote nodes and insert it into a temp table. We then
* When using multi-shard SELECT logic, we fetch all relevant data
* from the remote nodes and insert it into a temp table. We then
* point the existing plan to scan this temp table instead of the
* original one.
*/
Expand Down Expand Up @@ -1691,10 +1693,9 @@ ExecuteSingleShardSelect(DistributedPlan *distributedPlan, EState *executorState
TupleTableSlot *tupleTableSlot = NULL;

List *taskList = distributedPlan->taskList;
if (list_length(taskList) != 1)
{
ereport(ERROR, (errmsg("cannot execute select over multiple shards")));
}

/* if we have anything other than a single shard, something's wrong */
Assert(list_length(taskList) == 1);

task = (Task *) linitial(taskList);
tupleStore = tuplestore_begin_heap(false, false, work_mem);
Expand Down
2 changes: 1 addition & 1 deletion pg_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ typedef struct DistributedPlan
List *taskList; /* list of tasks to run as part of this plan */
List *targetList; /* copy of the target list for remote SELECT queries only */

bool selectFromMultipleShards; /* does the select run across multiple shards? */
bool useMultiShardSelectLogic; /* does the SELECT require multi-shard logic? */
CreateStmt *createTemporaryTableStmt; /* valid for multiple shard selects */
} DistributedPlan;

Expand Down
3 changes: 3 additions & 0 deletions sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles
ORDER BY sum(word_count) DESC
LIMIT 5;

-- test zero-shard query
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;

-- verify temp tables used by cross-shard queries do not persist
SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'pg_shard_temp_table%' AND
relkind = 'r';
Expand Down

0 comments on commit 9a4abad

Please sign in to comment.