Skip to content

Commit

Permalink
Merge 74effb9 into 7e6103f
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmp85 committed Oct 28, 2015
2 parents 7e6103f + 74effb9 commit 65386d1
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 41 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,7 @@
### pg_shard v1.2.3 (October 28, 2015) ###

* Addresses a performance regression by caching metadata plans

### pg_shard v1.2.2 (August 28, 2015) ###

* Changes default planner when running within CitusDB
Expand Down
4 changes: 2 additions & 2 deletions META.json
Expand Up @@ -2,7 +2,7 @@
"name": "pg_shard",
"abstract": "Easy sharding for PostgreSQL",
"description": "Shards and replicates PostgreSQL tables for horizontal scale and high availability. Seamlessly distributes SQL statements, without requiring any application changes.",
"version": "1.2.2",
"version": "1.2.3",
"maintainer": "\"Jason Petersen\" <jason@citusdata.com>",
"license": "lgpl_3_0",
"prereqs": {
Expand All @@ -17,7 +17,7 @@
"abstract": "Easy sharding for PostgreSQL",
"file": "sql/pg_shard--1.2.sql",
"docfile": "README.md",
"version": "1.2.2"
"version": "1.2.3"
}
},
"release_status": "stable",
Expand Down
174 changes: 135 additions & 39 deletions src/distribution_metadata.c
Expand Up @@ -123,6 +123,7 @@ LoadShardIntervalList(Oid distributedTableId)
Datum argValues[] = { ObjectIdGetDatum(distributedTableId) };
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

/*
* SPI_connect switches to its own memory context, which is destroyed by
Expand All @@ -135,8 +136,16 @@ LoadShardIntervalList(Oid distributedTableId)

SPI_connect();

spiStatus = SPI_execute_with_args(SHARD_QUERY_PREFIX " WHERE s.relation_id = $1",
argCount, argTypes, argValues, NULL, false, 0);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare(SHARD_QUERY_PREFIX " WHERE s.relation_id = $1", argCount,
argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 0);
Assert(spiStatus == SPI_OK_SELECT);

oldContext = MemoryContextSwitchTo(upperContext);
Expand Down Expand Up @@ -170,6 +179,7 @@ LoadShardInterval(int64 shardId)
Datum argValues[] = { Int64GetDatum(shardId) };
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

/*
* SPI_connect switches to an SPI-specific MemoryContext. See the comment
Expand All @@ -178,8 +188,15 @@ LoadShardInterval(int64 shardId)
MemoryContext upperContext = CurrentMemoryContext, oldContext = NULL;
SPI_connect();

spiStatus = SPI_execute_with_args(SHARD_QUERY_PREFIX " WHERE s.id = $1",
argCount, argTypes, argValues, NULL, false, 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare(SHARD_QUERY_PREFIX " WHERE s.id = $1", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 1);
Assert(spiStatus == SPI_OK_SELECT);

if (SPI_processed != 1)
Expand Down Expand Up @@ -239,6 +256,7 @@ LoadShardPlacementList(int64 shardId)
Datum argValues[] = { Int64GetDatum(shardId) };
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

/*
* SPI_connect switches to an SPI-specific MemoryContext. See the comment
Expand All @@ -247,8 +265,15 @@ LoadShardPlacementList(int64 shardId)
MemoryContext upperContext = CurrentMemoryContext, oldContext = NULL;
SPI_connect();

spiStatus = SPI_execute_with_args(SHARD_PLACEMENT_QUERY, argCount, argTypes,
argValues, NULL, false, 0);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare(SHARD_PLACEMENT_QUERY, argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 0);
Assert(spiStatus == SPI_OK_SELECT);

oldContext = MemoryContextSwitchTo(upperContext);
Expand Down Expand Up @@ -293,6 +318,7 @@ PartitionColumn(Oid distributedTableId)
bool isNull = false;
Datum keyDatum = 0;
char *partitionColumnName = NULL;
static SPIPlanPtr spiPlan = NULL;

/*
* SPI_connect switches to an SPI-specific MemoryContext. See the comment
Expand All @@ -301,10 +327,16 @@ PartitionColumn(Oid distributedTableId)
MemoryContext upperContext = CurrentMemoryContext, oldContext = NULL;
SPI_connect();

spiStatus = SPI_execute_with_args("SELECT key "
"FROM pgs_distribution_metadata.partition "
"WHERE relation_id = $1", argCount, argTypes,
argValues, NULL, false, 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("SELECT key FROM pgs_distribution_metadata.partition "
"WHERE relation_id = $1", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 1);
Assert(spiStatus == SPI_OK_SELECT);

if (SPI_processed != 1)
Expand Down Expand Up @@ -345,13 +377,21 @@ PartitionType(Oid distributedTableId)
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
bool isNull = false;
Datum partitionTypeDatum = 0;
static SPIPlanPtr spiPlan = NULL;

SPI_connect();

spiStatus = SPI_execute_with_args("SELECT partition_method "
"FROM pgs_distribution_metadata.partition "
"WHERE relation_id = $1", argCount, argTypes,
argValues, NULL, false, 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("SELECT partition_method "
"FROM pgs_distribution_metadata.partition "
"WHERE relation_id = $1", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 1);
Assert(spiStatus == SPI_OK_SELECT);

if (SPI_processed != 1)
Expand Down Expand Up @@ -388,6 +428,7 @@ IsDistributedTable(Oid tableId)
Datum argValues[] = { ObjectIdGetDatum(tableId) };
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

/* short-circuit if the input is invalid */
if (tableId == InvalidOid)
Expand All @@ -411,10 +452,16 @@ IsDistributedTable(Oid tableId)

SPI_connect();

spiStatus = SPI_execute_with_args("SELECT NULL "
"FROM pgs_distribution_metadata.partition "
"WHERE relation_id = $1", argCount, argTypes,
argValues, NULL, false, 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("SELECT NULL FROM pgs_distribution_metadata.partition "
"WHERE relation_id = $1", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 1);
Assert(spiStatus == SPI_OK_SELECT);

isDistributedTable = (SPI_processed == 1);
Expand All @@ -434,10 +481,20 @@ DistributedTablesExist(void)
{
bool distributedTablesExist = false;
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

SPI_connect();

spiStatus = SPI_exec("SELECT NULL FROM pgs_distribution_metadata.partition", 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("SELECT NULL FROM pgs_distribution_metadata.partition", 0,
NULL);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, NULL, NULL, true, 1);
Assert(spiStatus == SPI_OK_SELECT);

distributedTablesExist = (SPI_processed > 0);
Expand Down Expand Up @@ -634,13 +691,21 @@ InsertPartitionRow(Oid distributedTableId, char partitionType, text *partitionKe
};
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

SPI_connect();

spiStatus = SPI_execute_with_args("INSERT INTO pgs_distribution_metadata.partition "
"(relation_id, partition_method, key) "
"VALUES ($1, $2, $3)", argCount, argTypes,
argValues, NULL, false, 0);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("INSERT INTO pgs_distribution_metadata.partition "
"(relation_id, partition_method, key) VALUES ($1, $2, $3)",
argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 0);
Assert(spiStatus == SPI_OK_INSERT);

SPI_finish();
Expand Down Expand Up @@ -668,13 +733,21 @@ CreateShardRow(Oid distributedTableId, char shardStorage, text *shardMinValue,
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
bool isNull = false;
Datum shardIdDatum = 0;
static SPIPlanPtr spiPlan = NULL;

SPI_connect();

spiStatus = SPI_execute_with_args("INSERT INTO pgs_distribution_metadata.shard "
"(relation_id, storage, min_value, max_value) "
"VALUES ($1, $2, $3, $4) RETURNING id", argCount,
argTypes, argValues, NULL, false, 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("INSERT INTO pgs_distribution_metadata.shard "
"(relation_id, storage, min_value, max_value) "
"VALUES ($1, $2, $3, $4) RETURNING id", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 1);
Assert(spiStatus == SPI_OK_INSERT_RETURNING);

shardIdDatum = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1,
Expand Down Expand Up @@ -707,14 +780,22 @@ CreateShardPlacementRow(int64 shardId, ShardState shardState, char *nodeName,
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
bool isNull = false;
Datum placementIdDatum = 0;
static SPIPlanPtr spiPlan = NULL;

SPI_connect();

spiStatus = SPI_execute_with_args("INSERT INTO "
"pgs_distribution_metadata.shard_placement "
"(shard_id, shard_state, node_name, node_port) "
"VALUES ($1, $2, $3, $4) RETURNING id", argCount,
argTypes, argValues, NULL, false, 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("INSERT INTO "
"pgs_distribution_metadata.shard_placement "
"(shard_id, shard_state, node_name, node_port) "
"VALUES ($1, $2, $3, $4) RETURNING id", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 1);
Assert(spiStatus == SPI_OK_INSERT_RETURNING);

placementIdDatum = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1,
Expand All @@ -738,13 +819,20 @@ DeleteShardPlacementRow(int64 shardPlacementId)
Datum argValues[] = { Int64GetDatum(shardPlacementId) };
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

SPI_connect();

spiStatus = SPI_execute_with_args("DELETE FROM "
"pgs_distribution_metadata.shard_placement "
"WHERE id = $1", argCount, argTypes, argValues,
NULL, false, 0);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("DELETE FROM pgs_distribution_metadata.shard_placement "
"WHERE id = $1", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 0);
Assert(spiStatus == SPI_OK_DELETE);

if (SPI_processed != 1)
Expand Down Expand Up @@ -773,12 +861,20 @@ UpdateShardPlacementRowState(int64 shardPlacementId, ShardState newState)
};
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;
static SPIPlanPtr spiPlan = NULL;

SPI_connect();

spiStatus = SPI_execute_with_args("UPDATE pgs_distribution_metadata.shard_placement "
"SET shard_state = $2 WHERE id = $1",
argCount, argTypes, argValues, NULL, false, 1);
if (spiPlan == NULL)
{
spiPlan = SPI_prepare("UPDATE pgs_distribution_metadata.shard_placement "
"SET shard_state = $2 WHERE id = $1", argCount, argTypes);

spiStatus = SPI_keepplan(spiPlan);
Assert(spiStatus == 0);
}

spiStatus = SPI_execute_plan(spiPlan, argValues, NULL, false, 1);
Assert(spiStatus == SPI_OK_UPDATE);

if (SPI_processed != 1)
Expand Down

0 comments on commit 65386d1

Please sign in to comment.