Skip to content

Commit

Permalink
Add triggers, views, to handle CitusDB metadata
Browse files Browse the repository at this point in the history
Allows upgrades from PostgreSQL + pg_shard to CitusDB to flow smoothly.
  • Loading branch information
jasonmp85 committed May 23, 2015
1 parent faed405 commit 2814e90
Show file tree
Hide file tree
Showing 15 changed files with 291 additions and 108 deletions.
4 changes: 3 additions & 1 deletion distribution_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "access/htup.h"
#include "access/tupdesc.h"
#include "executor/spi.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
Expand Down Expand Up @@ -375,6 +376,7 @@ PartitionType(Oid distributedTableId)
bool
IsDistributedTable(Oid tableId)
{
Oid tableNamespaceOid = get_rel_namespace(tableId);
Oid metadataNamespaceOid = get_namespace_oid("pgs_distribution_metadata", false);
Oid partitionMetadataTableOid = get_relname_relid("partition", metadataNamespaceOid);
bool isDistributedTable = false;
Expand All @@ -387,7 +389,7 @@ IsDistributedTable(Oid tableId)
* The query below hits the partition metadata table, so if we don't detect
* that and short-circuit, we'll get infinite recursion in the planner.
*/
if (tableId == partitionMetadataTableOid)
if (IsSystemNamespace(tableNamespaceOid) || tableId == partitionMetadataTableOid)
{
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion expected/citus_metadata_sync.out
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ ORDER BY nodename;
-- mark a placement as unhealthy and add a new one
UPDATE pgs_distribution_metadata.shard_placement
SET shard_state = :inactive
WHERE id = 102;
WHERE node_name = 'cluster-worker-02';
INSERT INTO pgs_distribution_metadata.shard_placement
(id, node_name, node_port, shard_id, shard_state)
VALUES
Expand Down
29 changes: 19 additions & 10 deletions expected/distribution_metadata.out
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ CREATE FUNCTION create_healthy_local_shard_placement_row(bigint)
AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FUNCTION delete_shard_placement_row(bigint)
RETURNS void
RETURNS bool
AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FUNCTION update_shard_placement_row_state(bigint, int)
RETURNS void
RETURNS bool
AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FUNCTION acquire_shared_shard_lock(bigint)
Expand Down Expand Up @@ -275,17 +275,18 @@ WHERE id = :new_shard_id;
-- add a placement and manually inspect row
SELECT create_healthy_local_shard_placement_row(:new_shard_id) AS new_placement_id
\gset
SELECT * FROM pgs_distribution_metadata.shard_placement WHERE id = :new_placement_id;
id | shard_id | shard_state | node_name | node_port
----+----------+-------------+-----------+-----------
1 | 10000 | 1 | localhost | 5432
SELECT shard_state, node_name, node_port FROM pgs_distribution_metadata.shard_placement
WHERE id = :new_placement_id;
shard_state | node_name | node_port
-------------+-----------+-----------
1 | localhost | 5432
(1 row)

-- mark it as unhealthy and inspect
SELECT update_shard_placement_row_state(:new_placement_id, 3);
update_shard_placement_row_state
----------------------------------

t
(1 row)

SELECT shard_state FROM pgs_distribution_metadata.shard_placement
Expand All @@ -299,7 +300,7 @@ WHERE id = :new_placement_id;
SELECT delete_shard_placement_row(:new_placement_id);
delete_shard_placement_row
----------------------------

t
(1 row)

SELECT COUNT(*) FROM pgs_distribution_metadata.shard_placement
Expand All @@ -311,9 +312,17 @@ WHERE id = :new_placement_id;

-- deleting or updating a non-existent row should fail
SELECT delete_shard_placement_row(:new_placement_id);
ERROR: shard placement with ID 1 does not exist
delete_shard_placement_row
----------------------------
f
(1 row)

SELECT update_shard_placement_row_state(:new_placement_id, 3);
ERROR: shard placement with ID 1 does not exist
update_shard_placement_row_state
----------------------------------
f
(1 row)

-- now we'll even test our lock methods...
-- use transaction to bound how long we hold the lock
BEGIN;
Expand Down
13 changes: 13 additions & 0 deletions expected/init.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,16 @@ AS 'pg_shard'
LANGUAGE C STRICT;
CREATE FOREIGN DATA WRAPPER fake_fdw HANDLER fake_fdw_handler;
CREATE SERVER fake_fdw_server FOREIGN DATA WRAPPER fake_fdw;
-- Set pg_shard sequence to start at same number as that used by CitusDB.
-- This makes testing easier, since shard IDs will match.
DO $$
BEGIN
BEGIN
PERFORM setval('pgs_distribution_metadata.shard_id_sequence',
102008, false);
EXCEPTION
WHEN undefined_table THEN
-- do nothing
END;
END;
$$;
11 changes: 7 additions & 4 deletions expected/modifications.out.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ ERROR: cannot plan INSERT using row with NULL value in partition column
INSERT INTO limit_orders VALUES (18811, 'BUD', 14962, '2014-04-05 08:32:16', 'sell',
-5.00);
WARNING: Bad result from localhost:$PGPORT
DETAIL: Remote message: new row for relation "limit_orders_10034" violates check constraint "limit_orders_limit_price_check"
DETAIL: Remote message: new row for relation "limit_orders_102042" violates check constraint "limit_orders_limit_price_check"
ERROR: could not modify any active placements
-- INSERT violating primary key constraint
INSERT INTO limit_orders VALUES (32743, 'LUV', 5994, '2001-04-16 03:37:28', 'buy', 0.58);
WARNING: Bad result from localhost:$PGPORT
DETAIL: Remote message: duplicate key value violates unique constraint "limit_orders_pkey_10035"
DETAIL: Remote message: duplicate key value violates unique constraint "limit_orders_pkey_102043"
ERROR: could not modify any active placements
-- commands with non-constant partition values are unsupported
INSERT INTO limit_orders VALUES (random() * 100, 'ORCL', 152, '2011-08-25 11:50:45',
Expand Down Expand Up @@ -189,8 +189,11 @@ WITH limit_order_placements AS (
AND s.relation_id = 'limit_orders'::regclass
)
INSERT INTO pgs_distribution_metadata.shard_placement
SELECT nextval('pgs_distribution_metadata.shard_placement_id_sequence'),
shard_id,
(shard_id,
shard_state,
node_name,
node_port)
SELECT shard_id,
shard_state,
'badhost',
54321
Expand Down
14 changes: 5 additions & 9 deletions expected/queries.out
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
0
(1 row)

-- zero-shard modifications should be no-ops but not fail
-- zero-shard modifications should be no-ops in pg_shard, fail in CitusDB
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
-- single-shard tests
Expand Down Expand Up @@ -201,11 +201,6 @@ DETAIL: Joins are not supported in distributed queries.
SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id);
ERROR: cannot perform distributed planning for the given query
DETAIL: Joins are not supported in distributed queries.
-- test CitusDB code path (this will error out in normal PostgreSQL)
SELECT sync_table_metadata_to_citus('articles');
ERROR: relation "pg_dist_shard_placement" does not exist
CONTEXT: SQL statement "LOCK TABLE pg_dist_shard_placement IN EXCLUSIVE MODE"
PL/pgSQL function sync_table_metadata_to_citus(text) line 7 at SQL statement
-- with normal PostgreSQL, expect error about CitusDB being missing
-- with CitusDB, expect an error about JOINing local table with distributed
SET pg_shard.use_citusdb_select_logic TO true;
Expand Down Expand Up @@ -239,7 +234,8 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles
-- cross-shard queries on a foreign table should fail
-- we'll just point the article shards to a foreign table
BEGIN;
CREATE FOREIGN TABLE foreign_articles (author_id bigint) SERVER fake_fdw_server;
CREATE FOREIGN TABLE foreign_articles (id bigint, author_id bigint)
SERVER fake_fdw_server;
UPDATE pgs_distribution_metadata.partition
SET relation_id='foreign_articles'::regclass
WHERE relation_id='articles'::regclass;
Expand All @@ -254,8 +250,8 @@ ROLLBACK;
SET pg_shard.log_distributed_statements = on;
SET client_min_messages = log;
SELECT count(*) FROM articles WHERE word_count > 10000;
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_10036 WHERE (word_count > 10000)
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_10037 WHERE (word_count > 10000)
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_102044 WHERE (word_count > 10000)
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_102045 WHERE (word_count > 10000)
count
-------
23
Expand Down
18 changes: 7 additions & 11 deletions expected/queries_1.out
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ SELECT COUNT(*) FROM articles WHERE author_id = 1 AND author_id = 2;
0
(1 row)

-- zero-shard modifications should be no-ops but not fail
-- zero-shard modifications should be no-ops in pg_shard, fail in CitusDB
UPDATE articles SET title = '' WHERE author_id = 1 AND author_id = 2;
ERROR: cannot execute UPDATE on a distributed table on master node
DELETE FROM articles WHERE author_id = 1 AND author_id = 2;
ERROR: cannot execute DELETE on a distributed table on master node
-- single-shard tests
-- test simple select for a single row
SELECT * FROM articles WHERE author_id = 10 AND id = 50;
Expand Down Expand Up @@ -201,13 +203,6 @@ DETAIL: Joins are not supported in distributed queries.
SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id);
ERROR: cannot perform distributed planning for the given query
DETAIL: Joins are not supported in distributed queries.
-- test CitusDB code path (this will error out in normal PostgreSQL)
SELECT sync_table_metadata_to_citus('articles');
sync_table_metadata_to_citus
------------------------------

(1 row)

-- with normal PostgreSQL, expect error about CitusDB being missing
-- with CitusDB, expect an error about JOINing local table with distributed
SET pg_shard.use_citusdb_select_logic TO true;
Expand Down Expand Up @@ -239,7 +234,8 @@ SELECT author_id, sum(word_count) AS corpus_size FROM articles
-- cross-shard queries on a foreign table should fail
-- we'll just point the article shards to a foreign table
BEGIN;
CREATE FOREIGN TABLE foreign_articles (author_id bigint) SERVER fake_fdw_server;
CREATE FOREIGN TABLE foreign_articles (id bigint, author_id bigint)
SERVER fake_fdw_server;
UPDATE pgs_distribution_metadata.partition
SET relation_id='foreign_articles'::regclass
WHERE relation_id='articles'::regclass;
Expand All @@ -254,8 +250,8 @@ ROLLBACK;
SET pg_shard.log_distributed_statements = on;
SET client_min_messages = log;
SELECT count(*) FROM articles WHERE word_count > 10000;
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_10036 WHERE (word_count > 10000)
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_10037 WHERE (word_count > 10000)
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_102044 WHERE (word_count > 10000)
LOG: distributed statement: SELECT NULL::unknown FROM ONLY articles_102045 WHERE (word_count > 10000)
count
-------
23
Expand Down

0 comments on commit 2814e90

Please sign in to comment.