Skip to content

Commit

Permalink
Merge 4f4af5b into 05d1277
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmp85 committed Feb 25, 2015
2 parents 05d1277 + 4f4af5b commit 0f2fc49
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 30 deletions.
7 changes: 7 additions & 0 deletions expected/queries.out
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ INSERT INTO articles VALUES (47, 7, 'abeyance', 1772);
INSERT INTO articles VALUES (48, 8, 'alkylic', 18610);
INSERT INTO articles VALUES (49, 9, 'anyone', 2681);
INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);
-- first, test zero-shard query
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
count
-------
0
(1 row)

-- single-shard tests
-- test simple select for a single row
SELECT * FROM articles WHERE author_id = 10 AND id = 50;
Expand Down
69 changes: 39 additions & 30 deletions pg_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ static DistributedPlan * BuildDistributedPlan(Query *query, List *shardIntervalL
/* executor functions forward declarations */
static void PgShardExecutorStart(QueryDesc *queryDesc, int eflags);
static bool IsPgShardPlan(PlannedStmt *plannedStmt);
static void NextExecutorStartHook(QueryDesc *queryDesc, int eflags);
static LOCKMODE CommutativityRuleToLockMode(CmdType commandType);
static void AcquireExecutorShardLocks(List *taskList, LOCKMODE lockMode);
static int CompareTasksByShardId(const void *leftElement, const void *rightElement);
Expand Down Expand Up @@ -610,9 +611,10 @@ ExtractRangeTableEntryWalker(Node *node, List **rangeTableList)

/*
* DistributedQueryShardList prunes the shards for the table in the query based
* on the query's restriction qualifiers, and returns this list. If the function
* cannot find any shards for the distributed table, it errors out. In other sense,
* the function errors out or returns a non-empty list.
* on the query's restriction qualifiers, and returns this list. It is possible
* that all shards will be pruned if a query's restrictions are unsatisfiable.
* In that case, this function can return an empty list; however, if the table
* being queried has no shards created whatsoever, this function errors out.
*/
static List *
DistributedQueryShardList(Query *query)
Expand Down Expand Up @@ -640,9 +642,6 @@ DistributedQueryShardList(Query *query)
prunedShardList = PruneShardList(distributedTableId, restrictClauseList,
shardIntervalList);

/* shouldn't be an empty list, but assert in case something's very wrong */
Assert(prunedShardList != NIL);

return prunedShardList;
}

Expand Down Expand Up @@ -1106,9 +1105,18 @@ PgShardExecutorStart(QueryDesc *queryDesc, int eflags)
if (pgShardExecution)
{
DistributedPlan *distributedPlan = (DistributedPlan *) plannedStatement->planTree;

bool selectFromMultipleShards = distributedPlan->selectFromMultipleShards;
if (!selectFromMultipleShards)
bool zeroShardQuery = (list_length(distributedPlan->taskList) == 0);

if (zeroShardQuery)
{
/* if zero shards are involved, let query hit local table */
Plan *originalPlan = distributedPlan->originalPlan;
plannedStatement->planTree = originalPlan;

NextExecutorStartHook(queryDesc, eflags);
}
else if (!selectFromMultipleShards)
{
bool topLevel = true;
LOCKMODE lockMode = NoLock;
Expand Down Expand Up @@ -1173,28 +1181,12 @@ PgShardExecutorStart(QueryDesc *queryDesc, int eflags)
originalPlan = distributedPlan->originalPlan;
plannedStatement->planTree = originalPlan;

/* call into the standard executor start, or hook if set */
if (PreviousExecutorStartHook != NULL)
{
PreviousExecutorStartHook(queryDesc, eflags);
}
else
{
standard_ExecutorStart(queryDesc, eflags);
}
NextExecutorStartHook(queryDesc, eflags);
}
}
else
{
/* call the next hook in the chain or the standard one, if no other hook was set */
if (PreviousExecutorStartHook != NULL)
{
PreviousExecutorStartHook(queryDesc, eflags);
}
else
{
standard_ExecutorStart(queryDesc, eflags);
}
NextExecutorStartHook(queryDesc, eflags);
}
}

Expand All @@ -1214,6 +1206,26 @@ IsPgShardPlan(PlannedStmt *plannedStmt)
}


/*
* NextExecutorStartHook simply encapsulates the common logic of calling the
* next executor start hook in the chain or the standard executor start hook
* if no other hooks are present.
*/
static void
NextExecutorStartHook(QueryDesc *queryDesc, int eflags)
{
/* call into the standard executor start, or hook if set */
if (PreviousExecutorStartHook != NULL)
{
PreviousExecutorStartHook(queryDesc, eflags);
}
else
{
standard_ExecutorStart(queryDesc, eflags);
}
}


/*
* CommutativityRuleToLockMode determines the commutativity rule for the given
* command and returns the appropriate lock mode to enforce that rule. The
Expand Down Expand Up @@ -1778,10 +1790,7 @@ ExecuteSingleShardSelect(DistributedPlan *distributedPlan, EState *executorState
TupleTableSlot *tupleTableSlot = NULL;

List *taskList = distributedPlan->taskList;
if (list_length(taskList) != 1)
{
ereport(ERROR, (errmsg("cannot execute select over multiple shards")));
}
Assert(list_length(taskList) == 1);

task = (Task *) linitial(taskList);
tupleStore = tuplestore_begin_heap(false, false, work_mem);
Expand Down
3 changes: 3 additions & 0 deletions sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ INSERT INTO articles VALUES (48, 8, 'alkylic', 18610);
INSERT INTO articles VALUES (49, 9, 'anyone', 2681);
INSERT INTO articles VALUES (50, 10, 'anjanette', 19519);

-- first, test zero-shard query
SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;

-- single-shard tests

-- test simple select for a single row
Expand Down

0 comments on commit 0f2fc49

Please sign in to comment.