Skip to content

Commit

Permalink
Merge 182e8be into f8a40e2
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmp85 committed Jun 8, 2015
2 parents f8a40e2 + 182e8be commit 4a14e11
Show file tree
Hide file tree
Showing 20 changed files with 739 additions and 167 deletions.
79 changes: 79 additions & 0 deletions citus_metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

#include <stddef.h>

#include "access/attnum.h"
#include "nodes/nodes.h"
#include "nodes/primnodes.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/lsyscache.h"


/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(partition_column_to_node_string);
PG_FUNCTION_INFO_V1(column_name_to_column);
PG_FUNCTION_INFO_V1(column_to_column_name);


/*
Expand Down Expand Up @@ -57,3 +61,78 @@ partition_column_to_node_string(PG_FUNCTION_ARGS)

PG_RETURN_TEXT_P(partitionColumnText);
}


/*
* column_name_to_column is an internal UDF to obtain a textual representation
* of a particular column node (Var), given a relation identifier and column
* name. There is no requirement that the table be distributed; this function
* simply returns the textual representation of a Var representing a column.
* This function will raise an ERROR if no such column can be found or if the
* provided name refers to a system column.
*/
Datum
column_name_to_column(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
text *columnText = PG_GETARG_TEXT_P(1);
char *columnName = text_to_cstring(columnText);
Var *column = NULL;
char *columnNodeString = NULL;
text *columnNodeText = NULL;

column = ColumnNameToColumn(relationId, columnName);
columnNodeString = nodeToString(column);
columnNodeText = cstring_to_text(columnNodeString);

PG_RETURN_TEXT_P(columnNodeText);
}


/*
* column_to_column_name is an internal UDF to obtain the human-readable name
* of a column given a relation identifier and the column's internal textual
* (Var) representation. This function will raise an ERROR if no such column
* can be found or if the provided Var refers to a system column.
*/
Datum
column_to_column_name(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
text *columnNodeText = PG_GETARG_TEXT_P(1);
char *columnNodeString = text_to_cstring(columnNodeText);
Node *columnNode = NULL;
Var *column = NULL;
AttrNumber columnNumber = InvalidAttrNumber;
char *columnName = NULL;
text *columnText = NULL;

columnNode = stringToNode(columnNodeString);

Assert(IsA(columnNode, Var));
column = (Var *) columnNode;

columnNumber = column->varattno;
if (!AttrNumberIsForUserDefinedAttr(columnNumber))
{
char *relationName = get_rel_name(relationId);

ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("attribute %d of relation \"%s\" is a system column",
columnNumber, relationName)));
}

columnName = get_attname(relationId, column->varattno);
if (columnName == NULL)
{
char *relationName = get_rel_name(relationId);

ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("attribute %d of relation \"%s\" does not exist",
columnNumber, relationName)));
}

columnText = cstring_to_text(columnName);

PG_RETURN_TEXT_P(columnText);
}
2 changes: 2 additions & 0 deletions citus_metadata_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

/* function declarations for syncing metadata with CitusDB */
extern Datum partition_column_to_node_string(PG_FUNCTION_ARGS);
extern Datum column_name_to_column(PG_FUNCTION_ARGS);
extern Datum column_to_column_name(PG_FUNCTION_ARGS);


#endif /* PG_SHARD_CITUS_METADATA_SYNC_H */
56 changes: 42 additions & 14 deletions distribution_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include "executor/spi.h"
#pragma GCC diagnostic pop
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
Expand Down Expand Up @@ -372,24 +373,37 @@ PartitionType(Oid distributedTableId)


/*
* IsDistributedTable simply returns whether the specified table is distributed.
* IsDistributedTable returns whether the specified table is distributed. It
* returns false if the input is InvalidOid.
*/
bool
IsDistributedTable(Oid tableId)
{
Oid metadataNamespaceOid = get_namespace_oid("pgs_distribution_metadata", false);
Oid partitionMetadataTableOid = get_relname_relid("partition", metadataNamespaceOid);
Oid tableNamespaceOid = InvalidOid;
Oid partitionMetadataTableOid = InvalidOid;
bool isDistributedTable = false;
Oid argTypes[] = { OIDOID };
Datum argValues[] = { ObjectIdGetDatum(tableId) };
const int argCount = sizeof(argValues) / sizeof(argValues[0]);
int spiStatus PG_USED_FOR_ASSERTS_ONLY = 0;

/* short-circuit if the input is invalid */
if (tableId == InvalidOid)
{
return false;
}

/*
* The query below hits the partition metadata table, so if we don't detect
* that and short-circuit, we'll get infinite recursion in the planner.
*
* Within CitusDB, a view rewrite the query to reference CitusDB catalogs,
* so we also need to catch whether the table exists in a system namespace.
*/
if (tableId == partitionMetadataTableOid)
tableNamespaceOid = get_rel_namespace(tableId);
partitionMetadataTableOid = get_relname_relid("partition", metadataNamespaceOid);
if (IsSystemNamespace(tableNamespaceOid) || tableId == partitionMetadataTableOid)
{
return false;
}
Expand Down Expand Up @@ -522,19 +536,33 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor)
Oid relationId = DatumGetObjectId(relationIdDatum);

char partitionType = DatumGetChar(partitionTypeDatum);
if (partitionType == HASH_PARTITION_TYPE)
switch (partitionType)
{
intervalTypeId = INT4OID;
}
else
{
Datum keyDatum = SPI_getbinval(heapTuple, tupleDescriptor,
TLIST_NUM_SHARD_KEY, &isNull);
char *partitionColumnName = TextDatumGetCString(keyDatum);
case APPEND_PARTITION_TYPE:
case RANGE_PARTITION_TYPE:
{
Datum keyDatum = SPI_getbinval(heapTuple, tupleDescriptor,
TLIST_NUM_SHARD_KEY, &isNull);
char *partitionColumnName = TextDatumGetCString(keyDatum);

Var *partitionColumn = ColumnNameToColumn(relationId, partitionColumnName);
intervalTypeId = partitionColumn->vartype;
intervalTypeMod = partitionColumn->vartypmod;
break;
}

case HASH_PARTITION_TYPE:
{
intervalTypeId = INT4OID;
break;
}

Var *partitionColumn = ColumnNameToColumn(relationId, partitionColumnName);
intervalTypeId = partitionColumn->vartype;
intervalTypeMod = partitionColumn->vartypmod;
default:
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported table partition type: %c",
partitionType)));
}
}

getTypeInputInfo(intervalTypeId, &inputFunctionId, &typeIoParam);
Expand Down
1 change: 1 addition & 0 deletions distribution_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#define TLIST_NUM_SHARD_PLACEMENT_NODE_PORT 5

/* denotes partition type of the distributed table */
#define APPEND_PARTITION_TYPE 'a'
#define HASH_PARTITION_TYPE 'h'
#define RANGE_PARTITION_TYPE 'r'

Expand Down
32 changes: 31 additions & 1 deletion expected/citus_metadata_sync.out
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,33 @@ SELECT partition_column_to_node_string('set_of_ids'::regclass);
{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}
(1 row)

-- should get error for column names that are too long
SELECT column_name_to_column('set_of_ids'::regclass, repeat('a', 1024));
ERROR: column name too long
DETAIL: Column name must be less than 64 characters.
-- should get error for system or non-existent column
SELECT column_name_to_column('set_of_ids'::regclass, 'ctid');
ERROR: column "ctid" of relation "set_of_ids" is a system column
SELECT column_name_to_column('set_of_ids'::regclass, 'non_existent');
ERROR: column "non_existent" of relation "set_of_ids" does not exist
-- should get node representation for valid column
SELECT column_name_to_column('set_of_ids'::regclass, 'id') AS column_var
\gset
SELECT replace(:'column_var', ':varattno 1', ':varattno -1') AS ctid_var,
replace(:'column_var', ':varattno 1', ':varattno 2') AS non_ext_var
\gset
-- should get error for system or non-existent column
SELECT column_to_column_name('set_of_ids'::regclass, :'ctid_var');
ERROR: attribute -1 of relation "set_of_ids" is a system column
SELECT column_to_column_name('set_of_ids'::regclass, :'non_ext_var');
ERROR: attribute 2 of relation "set_of_ids" does not exist
-- should get node representation for valid column
SELECT column_to_column_name('set_of_ids'::regclass, :'column_var');
column_to_column_name
-----------------------
id
(1 row)

-- create subset of CitusDB metadata schema
CREATE TABLE pg_dist_partition (
logicalrelid oid NOT NULL,
Expand All @@ -59,6 +86,7 @@ CREATE TABLE pg_dist_shard_placement (
) WITH OIDS;
-- sync metadata and verify it has transferred
SELECT sync_table_metadata_to_citus('set_of_ids');
WARNING: sync_table_metadata_to_citus is deprecated and will be removed in a future version
sync_table_metadata_to_citus
------------------------------

Expand Down Expand Up @@ -97,6 +125,7 @@ ORDER BY nodename;

-- subsequent sync should have no effect
SELECT sync_table_metadata_to_citus('set_of_ids');
WARNING: sync_table_metadata_to_citus is deprecated and will be removed in a future version
sync_table_metadata_to_citus
------------------------------

Expand Down Expand Up @@ -136,13 +165,14 @@ ORDER BY nodename;
-- mark a placement as unhealthy and add a new one
UPDATE pgs_distribution_metadata.shard_placement
SET shard_state = :inactive
WHERE id = 102;
WHERE node_name = 'cluster-worker-02';
INSERT INTO pgs_distribution_metadata.shard_placement
(id, node_name, node_port, shard_id, shard_state)
VALUES
(105, 'cluster-worker-05', 5436, 1, :finalized);
-- write latest changes to CitusDB tables
SELECT sync_table_metadata_to_citus('set_of_ids');
WARNING: sync_table_metadata_to_citus is deprecated and will be removed in a future version
sync_table_metadata_to_citus
------------------------------

Expand Down
41 changes: 19 additions & 22 deletions expected/distribution_metadata.out
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ CREATE FUNCTION create_healthy_local_shard_placement_row(bigint)
AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FUNCTION delete_shard_placement_row(bigint)
RETURNS void
RETURNS bool
AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FUNCTION update_shard_placement_row_state(bigint, int)
RETURNS void
RETURNS bool
AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FUNCTION acquire_shared_shard_lock(bigint)
Expand Down Expand Up @@ -164,18 +164,6 @@ SELECT partition_column_id('events');
2
(1 row)

BEGIN;
UPDATE pgs_distribution_metadata.partition
SET key = REPEAT('a', 1024)
WHERE relation_id = 'events' :: regclass;
---- should see error that partition column is too long
SELECT Partition_column_id('events');
ERROR: column name too long
DETAIL: Column name must be less than 64 characters.
ROLLBACK;
-- should see error (catalog is not distributed)
SELECT partition_column_id('pg_type');
ERROR: no partition column is defined for relation "pg_type"
-- should see hash partition type and fail for non-distributed tables
SELECT partition_type('events');
partition_type
Expand Down Expand Up @@ -284,17 +272,18 @@ WHERE id = :new_shard_id;
-- add a placement and manually inspect row
SELECT create_healthy_local_shard_placement_row(:new_shard_id) AS new_placement_id
\gset
SELECT * FROM pgs_distribution_metadata.shard_placement WHERE id = :new_placement_id;
id | shard_id | shard_state | node_name | node_port
----+----------+-------------+-----------+-----------
1 | 10000 | 1 | localhost | 5432
SELECT shard_state, node_name, node_port FROM pgs_distribution_metadata.shard_placement
WHERE id = :new_placement_id;
shard_state | node_name | node_port
-------------+-----------+-----------
1 | localhost | 5432
(1 row)

-- mark it as unhealthy and inspect
SELECT update_shard_placement_row_state(:new_placement_id, 3);
update_shard_placement_row_state
----------------------------------

t
(1 row)

SELECT shard_state FROM pgs_distribution_metadata.shard_placement
Expand All @@ -308,7 +297,7 @@ WHERE id = :new_placement_id;
SELECT delete_shard_placement_row(:new_placement_id);
delete_shard_placement_row
----------------------------

t
(1 row)

SELECT COUNT(*) FROM pgs_distribution_metadata.shard_placement
Expand All @@ -320,9 +309,17 @@ WHERE id = :new_placement_id;

-- deleting or updating a non-existent row should fail
SELECT delete_shard_placement_row(:new_placement_id);
ERROR: shard placement with ID 1 does not exist
delete_shard_placement_row
----------------------------
f
(1 row)

SELECT update_shard_placement_row_state(:new_placement_id, 3);
ERROR: shard placement with ID 1 does not exist
update_shard_placement_row_state
----------------------------------
f
(1 row)

-- now we'll even test our lock methods...
-- use transaction to bound how long we hold the lock
BEGIN;
Expand Down
13 changes: 13 additions & 0 deletions expected/init.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,16 @@ AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER fake_fdw HANDLER fake_fdw_handler;
CREATE SERVER fake_fdw_server FOREIGN DATA WRAPPER fake_fdw;
-- Set pg_shard sequence to start at same number as that used by CitusDB.
-- This makes testing easier, since shard IDs will match.
DO $$
BEGIN
BEGIN
PERFORM setval('pgs_distribution_metadata.shard_id_sequence',
102008, false);
EXCEPTION
WHEN undefined_table THEN
-- do nothing
END;
END;
$$;
7 changes: 5 additions & 2 deletions expected/modifications.out
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,11 @@ WITH limit_order_placements AS (
AND s.relation_id = 'limit_orders'::regclass
)
INSERT INTO pgs_distribution_metadata.shard_placement
SELECT nextval('pgs_distribution_metadata.shard_placement_id_sequence'),
shard_id,
(shard_id,
shard_state,
node_name,
node_port)
SELECT shard_id,
shard_state,
'badhost',
54321
Expand Down

0 comments on commit 4a14e11

Please sign in to comment.