Skip to content

Commit

Permalink
Merge 0c0c3d3 into 304cd8f
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonmp85 committed Jan 20, 2015
2 parents 304cd8f + 0c0c3d3 commit 79306af
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 3 deletions.
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
#-------------------------------------------------------------------------

MODULE_big = pg_shard
OBJS = connection.o create_shards.o distribution_metadata.o extend_ddl_commands.o \
generate_ddl_commands.o pg_shard.o prune_shard_list.o repair_shards.o ruleutils.o
OBJS = connection.o create_shards.o citus_metadata_sync.o distribution_metadata.o \
extend_ddl_commands.o generate_ddl_commands.o pg_shard.o prune_shard_list.o \
repair_shards.o ruleutils.o

PG_CPPFLAGS = -std=c99 -Wall -Wextra -I$(libpq_srcdir)

Expand Down Expand Up @@ -42,7 +43,7 @@ REGRESS_PREP = sql/connection.sql expected/connection.out sql/create_shards.sql
expected/repair_shards.out expected/modifications.out
REGRESS = init connection distribution_metadata extend_ddl_commands \
generate_ddl_commands create_shards prune_shard_list repair_shards \
modifications queries utilities
modifications queries utilities citus_metadata_sync

# The launcher regression flag lets us specify a special wrapper to handle
# testing rather than psql directly. Our wrapper swaps in a known worker list.
Expand Down
60 changes: 60 additions & 0 deletions citus_metadata_sync.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*-------------------------------------------------------------------------
*
* citus_metadata_sync.c
*
* This file contains functions to sync pg_shard metadata to the CitusDB
* metadata tables.
*
* Copyright (c) 2014, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"
#include "c.h"
#include "postgres_ext.h"

#include "citus_metadata_sync.h"
#include "distribution_metadata.h"

#include <stddef.h>

#include "nodes/nodes.h"
#include "nodes/primnodes.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"


/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(partition_column_to_node_string);


/*
* partition_column_to_node_string is an internal UDF to obtain the textual
* representation of a partition column node (Var), suitable for use within
* CitusDB's metadata tables. This function expects an Oid identifying a table
* previously distributed using pg_shard and will raise an ERROR if the Oid
* is NULL, or does not identify a pg_shard-distributed table.
*/
Datum
partition_column_to_node_string(PG_FUNCTION_ARGS)
{
Oid distributedTableId = InvalidOid;
Var *partitionColumn = NULL;
char *partitionColumnString = NULL;
text *partitionColumnText = NULL;

if (PG_ARGISNULL(0))
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("table_oid cannot be null")));
}

distributedTableId = PG_GETARG_OID(0);
partitionColumn = PartitionColumn(distributedTableId);
partitionColumnString = nodeToString(partitionColumn);
partitionColumnText = cstring_to_text(partitionColumnString);

PG_RETURN_TEXT_P(partitionColumnText);
}
24 changes: 24 additions & 0 deletions citus_metadata_sync.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*-------------------------------------------------------------------------
*
* citus_metadata_sync.h
*
* Declarations for public functions and types related to syncing metadata with
* CitusDB.
*
* Copyright (c) 2014, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#ifndef PG_SHARD_CITUS_METADATA_SYNC_H
#define PG_SHARD_CITUS_METADATA_SYNC_H

#include "postgres.h"
#include "fmgr.h"


/* function declarations for syncing metadata with Citus */
extern Datum partition_column_to_node_string(PG_FUNCTION_ARGS);


#endif /* PG_SHARD_CITUS_METADATA_SYNC_H */
89 changes: 89 additions & 0 deletions expected/citus_metadata_sync.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
-- ===================================================================
-- test metadata sync functionality
-- ===================================================================
-- set up a table and "distribute" it manually
CREATE TABLE set_of_ids ( id bigint );
INSERT INTO pgs_distribution_metadata.shard
(id, relation_id, storage, min_value, max_value)
VALUES
(1, 'set_of_ids'::regclass, 't', '0', '10'),
(2, 'set_of_ids'::regclass, 't', '10', '20');
INSERT INTO pgs_distribution_metadata.shard_placement
(id, node_name, node_port, shard_id, shard_state)
VALUES
(101, 'cluster-worker-01', 5432, 1, 0),
(102, 'cluster-worker-02', 5433, 2, 0);
INSERT INTO pgs_distribution_metadata.partition (relation_id, partition_method, key)
VALUES
('set_of_ids'::regclass, 'h', 'id');
-- should get ERROR for NULL, non-existent, or non-distributed table
SELECT partition_column_to_node_string(NULL);
ERROR: table_oid cannot be null
SELECT partition_column_to_node_string(0);
ERROR: could not find partition for distributed relation 0
SELECT partition_column_to_node_string('pg_class'::regclass);
ERROR: could not find partition for distributed relation 1259
-- should get node representation for distributed table
SELECT partition_column_to_node_string('set_of_ids'::regclass);
partition_column_to_node_string
------------------------------------------------------------------------------------------------------------------------
{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}
(1 row)

-- create subset of Citus metadata schema
CREATE TABLE pg_dist_partition (
logicalrelid oid NOT NULL,
partmethod "char" NOT NULL,
partkey text
);
CREATE TABLE pg_dist_shard (
logicalrelid oid NOT NULL,
shardid bigint NOT NULL,
shardstorage "char" NOT NULL,
shardalias text,
shardminvalue text,
shardmaxvalue text
);
CREATE TABLE pg_dist_shard_placement (
shardid bigint NOT NULL,
shardstate integer NOT NULL,
shardlength bigint NOT NULL,
nodename text,
nodeport integer
) WITH OIDS;
-- sync metadata and verify it has transferred
SELECT sync_table_metadata_to_citus('set_of_ids');
sync_table_metadata_to_citus
------------------------------

(1 row)

SELECT partmethod, partkey
FROM pg_dist_partition
WHERE logicalrelid = 'set_of_ids'::regclass;
partmethod | partkey
------------+------------------------------------------------------------------------------------------------------------------------
h | {VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}
(1 row)

SELECT shardid, shardstorage, shardalias, shardminvalue, shardmaxvalue
FROM pg_dist_shard
WHERE logicalrelid = 'set_of_ids'::regclass
ORDER BY shardid;
shardid | shardstorage | shardalias | shardminvalue | shardmaxvalue
---------+--------------+------------+---------------+---------------
1 | t | | 0 | 10
2 | t | | 10 | 20
(2 rows)

SELECT * FROM pg_dist_shard_placement
WHERE shardid IN (SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'set_of_ids'::regclass)
ORDER BY nodename;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-------------------+----------
1 | 0 | 0 | cluster-worker-01 | 5432
2 | 0 | 0 | cluster-worker-02 | 5433
(2 rows)

63 changes: 63 additions & 0 deletions pg_shard--1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,66 @@ CREATE FUNCTION worker_copy_shard_placement(table_name text, source_node_name te
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;

CREATE FUNCTION partition_column_to_node_string(table_oid oid)
RETURNS text
AS 'MODULE_PATHNAME'
LANGUAGE C;

COMMENT ON FUNCTION partition_column_to_node_string(oid)
IS 'return textual form of distributed table''s partition column';

CREATE FUNCTION sync_table_metadata_to_citus(table_name text) RETURNS VOID
AS $sync_table_metadata_to_citus$
DECLARE
table_relation_id CONSTANT oid NOT NULL := table_name::regclass::oid;
dummy_shard_length CONSTANT bigint := 0;
BEGIN
-- copy shard placement metadata
INSERT INTO pg_dist_shard_placement
(shardid,
shardstate,
shardlength,
nodename,
nodeport)
SELECT shard_id,
shard_state,
dummy_shard_length,
node_name,
node_port
FROM pgs_distribution_metadata.shard_placement
WHERE shard_id IN (SELECT id
FROM pgs_distribution_metadata.shard
WHERE relation_id = table_relation_id);

-- copy shard metadata
INSERT INTO pg_dist_shard
(shardid,
logicalrelid,
shardstorage,
shardminvalue,
shardmaxvalue)
SELECT id,
relation_id,
storage,
min_value,
max_value
FROM pgs_distribution_metadata.shard
WHERE relation_id = table_relation_id;

-- copy partition metadata, which also converts the partition column to
-- a node string representation as expected by CitusDB
INSERT INTO pg_dist_partition
(logicalrelid,
partmethod,
partkey)
SELECT relation_id,
partition_method,
partition_column_to_node_string(table_relation_id)
FROM pgs_distribution_metadata.partition
WHERE relation_id = table_relation_id;
END;
$sync_table_metadata_to_citus$ LANGUAGE 'plpgsql';

COMMENT ON FUNCTION sync_table_metadata_to_citus(text)
IS 'synchronize a distributed table''s pg_shard metadata to CitusDB';
72 changes: 72 additions & 0 deletions sql/citus_metadata_sync.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-- ===================================================================
-- test metadata sync functionality
-- ===================================================================

-- set up a table and "distribute" it manually
CREATE TABLE set_of_ids ( id bigint );

INSERT INTO pgs_distribution_metadata.shard
(id, relation_id, storage, min_value, max_value)
VALUES
(1, 'set_of_ids'::regclass, 't', '0', '10'),
(2, 'set_of_ids'::regclass, 't', '10', '20');

INSERT INTO pgs_distribution_metadata.shard_placement
(id, node_name, node_port, shard_id, shard_state)
VALUES
(101, 'cluster-worker-01', 5432, 1, 0),
(102, 'cluster-worker-02', 5433, 2, 0);

INSERT INTO pgs_distribution_metadata.partition (relation_id, partition_method, key)
VALUES
('set_of_ids'::regclass, 'h', 'id');

-- should get ERROR for NULL, non-existent, or non-distributed table
SELECT partition_column_to_node_string(NULL);
SELECT partition_column_to_node_string(0);
SELECT partition_column_to_node_string('pg_class'::regclass);

-- should get node representation for distributed table
SELECT partition_column_to_node_string('set_of_ids'::regclass);

-- create subset of Citus metadata schema
CREATE TABLE pg_dist_partition (
logicalrelid oid NOT NULL,
partmethod "char" NOT NULL,
partkey text
);

CREATE TABLE pg_dist_shard (
logicalrelid oid NOT NULL,
shardid bigint NOT NULL,
shardstorage "char" NOT NULL,
shardalias text,
shardminvalue text,
shardmaxvalue text
);

CREATE TABLE pg_dist_shard_placement (
shardid bigint NOT NULL,
shardstate integer NOT NULL,
shardlength bigint NOT NULL,
nodename text,
nodeport integer
) WITH OIDS;

-- sync metadata and verify it has transferred
SELECT sync_table_metadata_to_citus('set_of_ids');

SELECT partmethod, partkey
FROM pg_dist_partition
WHERE logicalrelid = 'set_of_ids'::regclass;

SELECT shardid, shardstorage, shardalias, shardminvalue, shardmaxvalue
FROM pg_dist_shard
WHERE logicalrelid = 'set_of_ids'::regclass
ORDER BY shardid;

SELECT * FROM pg_dist_shard_placement
WHERE shardid IN (SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'set_of_ids'::regclass)
ORDER BY nodename;

0 comments on commit 79306af

Please sign in to comment.