From 6c61c9dd2cb63f77fd4efae489a9956976e96a71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Fri, 14 Oct 2022 11:09:53 +0200 Subject: [PATCH] Add function to alter data nodes Add a new function, `alter_data_node()`, which can be used to change a data node's configuration on the access node, including connection information like host, port, and database. In addition, the function allows marking a data node as "unavailable", which means the node should no longer be used for reads and writes. Only read "failover" is implemented as part of this change, however. To fail over reads, the alter data node function finds all the chunks for which the unavailable data node is the "primary" query target and sets a chunk replica on another data node to use instead. If some chunks do not have a replica to fail over to, a warning will be raised. When a data node is available again, the function can be used to switch back to once again use the data node for queries. Closes #2104 --- sql/ddl_api.sql | 9 + sql/pre_install/tables.sql | 2 + sql/updates/latest-dev.sql | 10 + sql/updates/reverse-dev.sql | 3 + src/cross_module_fn.c | 2 + src/cross_module_fn.h | 1 + src/hypertable.c | 14 +- src/hypertable.h | 3 +- src/ts_catalog/catalog.c | 1 + src/ts_catalog/catalog.h | 14 +- src/ts_catalog/chunk_data_node.c | 18 +- src/ts_catalog/chunk_data_node.h | 7 +- src/ts_catalog/dimension_partition.c | 1 + src/utils.c | 22 ++ src/utils.h | 3 + tsl/src/chunk.c | 134 ++++++++- tsl/src/chunk.h | 2 +- tsl/src/chunk_api.c | 2 +- tsl/src/data_node.c | 303 ++++++++++++++++++- tsl/src/data_node.h | 1 + tsl/src/fdw/option.c | 6 + tsl/src/init.c | 1 + tsl/test/expected/data_node.out | 387 +++++++++++++++++++++++++ tsl/test/shared/expected/extension.out | 11 +- tsl/test/sql/data_node.sql | 170 +++++++++++ 25 files changed, 1087 insertions(+), 40 deletions(-) diff --git a/sql/ddl_api.sql b/sql/ddl_api.sql index c1b469bde86..659063313fe 100644 --- a/sql/ddl_api.sql +++ b/sql/ddl_api.sql @@ -216,3 +216,12 @@ CREATE OR REPLACE PROCEDURE @extschema@.refresh_continuous_aggregate( window_start "any", window_end "any" ) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh'; + +CREATE OR REPLACE FUNCTION @extschema@.alter_data_node( + node_name NAME, + host TEXT = NULL, + database NAME = NULL, + port INTEGER = NULL, + options TEXT[] = NULL +) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, options TEXT[]) +AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE; diff --git a/sql/pre_install/tables.sql b/sql/pre_install/tables.sql index 6f8d9097890..7c532544383 100644 --- a/sql/pre_install/tables.sql +++ b/sql/pre_install/tables.sql @@ -251,6 +251,8 @@ CREATE TABLE _timescaledb_catalog.chunk_data_node ( CONSTRAINT chunk_data_node_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ); +CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name); + SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_data_node', ''); -- Default jobs are given the id space [1,1000). User-installed jobs and any jobs created inside tests diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index fb506472d0a..1bb305cdce5 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -297,3 +297,13 @@ CREATE FUNCTION @extschema@.add_reorder_policy( ) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add' LANGUAGE C VOLATILE; + +CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name); +CREATE FUNCTION @extschema@.alter_data_node( + node_name NAME, + host TEXT = NULL, + database NAME = NULL, + port INTEGER = NULL, + options TEXT[] = NULL +) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, options TEXT[]) +AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index 8ee47e78e9f..1ee647f534b 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -224,3 +224,6 @@ ALTER TABLE _timescaledb_internal.bgw_policy_chunk_stats ADD CONSTRAINT bgw_policy_chunk_stats_job_id_fkey FOREIGN KEY(job_id) REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE; + +DROP INDEX IF EXISTS chunk_data_node_node_name_idx; +DROP FUNCTION @extschema@.alter_data_node; diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index eec82d93d2a..1c2c8902267 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -102,6 +102,7 @@ CROSSMODULE_WRAPPER(data_node_add); CROSSMODULE_WRAPPER(data_node_delete); CROSSMODULE_WRAPPER(data_node_attach); CROSSMODULE_WRAPPER(data_node_detach); +CROSSMODULE_WRAPPER(data_node_alter); CROSSMODULE_WRAPPER(chunk_drop_replica); CROSSMODULE_WRAPPER(chunk_set_default_data_node); @@ -504,6 +505,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .data_node_attach = error_no_default_fn_pg_community, .data_node_ping = error_no_default_fn_pg_community, .data_node_detach = error_no_default_fn_pg_community, + .data_node_alter = error_no_default_fn_pg_community, .data_node_allow_new_chunks = error_no_default_fn_pg_community, .data_node_block_new_chunks = error_no_default_fn_pg_community, .distributed_exec = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 1207a60ab64..4b361ffbcee 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -158,6 +158,7 @@ typedef struct CrossModuleFunctions PGFunction data_node_attach; PGFunction data_node_ping; PGFunction data_node_detach; + PGFunction data_node_alter; PGFunction data_node_allow_new_chunks; PGFunction data_node_block_new_chunks; diff --git a/src/hypertable.c b/src/hypertable.c index 12f836514c6..abd9c3d2062 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -2693,7 +2693,16 @@ ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cub ts_hypercube_get_slice_by_dimension_id(cube, space_dim->fd.id); const DimensionPartition *dp = ts_dimension_partition_find(space_dim->dimension_partitions, slice->fd.range_start); - chunk_data_nodes = dp->data_nodes; + ListCell *lc; + + /* Filter out data nodes that aren't available */ + foreach (lc, dp->data_nodes) + { + char *node_name = lfirst(lc); + + if (ts_data_node_is_available(node_name)) + chunk_data_nodes = lappend(chunk_data_nodes, node_name); + } } else { @@ -2740,6 +2749,9 @@ typedef bool (*hypertable_data_node_filter)(const HypertableDataNode *hdn); static bool filter_non_blocked_data_nodes(const HypertableDataNode *node) { + if (!ts_data_node_is_available(NameStr(node->fd.node_name))) + return false; + return !node->fd.block_chunks; } diff --git a/src/hypertable.h b/src/hypertable.h index fbce1b22c82..99ac720cf7c 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -151,7 +151,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compressed(Hypertable *ht, extern TSDLLEXPORT bool ts_hypertable_unset_compressed(Hypertable *ht); extern TSDLLEXPORT void ts_hypertable_clone_constraints_to_compressed(const Hypertable *ht, List *constraint_list); -extern List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cube); +extern TSDLLEXPORT List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, + const Hypercube *cube); extern TSDLLEXPORT List *ts_hypertable_get_data_node_name_list(const Hypertable *ht); extern TSDLLEXPORT List *ts_hypertable_get_data_node_serverids_list(const Hypertable *ht); extern TSDLLEXPORT List *ts_hypertable_get_available_data_nodes(const Hypertable *ht, diff --git a/src/ts_catalog/catalog.c b/src/ts_catalog/catalog.c index a2b6e574afe..89cdf3bcfe1 100644 --- a/src/ts_catalog/catalog.c +++ b/src/ts_catalog/catalog.c @@ -190,6 +190,7 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES] .names = (char *[]) { [CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_chunk_id_node_name_key", [CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_node_chunk_id_node_name_key", + [CHUNK_DATA_NODE_NODE_NAME_IDX] = "chunk_data_node_node_name_idx", } }, [TABLESPACE] = { diff --git a/src/ts_catalog/catalog.h b/src/ts_catalog/catalog.h index 39af6ac3a17..d7a26d07ac6 100644 --- a/src/ts_catalog/catalog.h +++ b/src/ts_catalog/catalog.h @@ -596,6 +596,7 @@ enum { CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX, CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX, + CHUNK_DATA_NODE_NODE_NAME_IDX, _MAX_CHUNK_DATA_NODE_INDEX, }; @@ -625,6 +626,17 @@ struct FormData_chunk_data_node_node_chunk_id_node_name_idx NameData node_name; }; +enum Anum_chunk_data_node_node_name_idx +{ + Anum_chunk_data_node_name_idx_node_name = 1, + _Anum_chunk_data_node_node_name_idx_max, +}; + +struct FormData_chunk_data_node_node_name_idx +{ + NameData node_name; +}; + /************************************ * * Tablespace table definitions @@ -1456,7 +1468,7 @@ extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid); extern TSDLLEXPORT void ts_catalog_delete_only(Relation rel, HeapTuple tuple); extern TSDLLEXPORT void ts_catalog_delete(Relation rel, HeapTuple tuple); -extern void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation); +extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation); bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey, int num_keys, tuple_found_func tuple_found, LOCKMODE lockmode, diff --git a/src/ts_catalog/chunk_data_node.c b/src/ts_catalog/chunk_data_node.c index b28c9637bd0..78e3afd19be 100644 --- a/src/ts_catalog/chunk_data_node.c +++ b/src/ts_catalog/chunk_data_node.c @@ -106,14 +106,15 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data) bool should_free; HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple); + ForeignServer *server; ChunkDataNode *chunk_data_node; MemoryContext old; + server = GetForeignServerByName(NameStr(form->node_name), false); old = MemoryContextSwitchTo(ti->mctx); chunk_data_node = palloc(sizeof(ChunkDataNode)); memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node)); - chunk_data_node->foreign_server_oid = - get_foreign_server_oid(NameStr(form->node_name), /* missing_ok = */ false); + chunk_data_node->foreign_server_oid = server->serverid; *nodes = lappend(*nodes, chunk_data_node); MemoryContextSwitchTo(old); @@ -339,3 +340,16 @@ ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id) F_INT4EQ, Int32GetDatum(chunk_id)); } + +void +ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name) +{ + it->ctx.index = + catalog_get_index(ts_catalog_get(), CHUNK_DATA_NODE, CHUNK_DATA_NODE_NODE_NAME_IDX); + ts_scan_iterator_scan_key_reset(it); + ts_scan_iterator_scan_key_init(it, + Anum_chunk_data_node_name_idx_node_name, + BTEqualStrategyNumber, + F_NAMEEQ, + CStringGetDatum(node_name)); +} diff --git a/src/ts_catalog/chunk_data_node.h b/src/ts_catalog/chunk_data_node.h index 1aef275b99a..83a35c78583 100644 --- a/src/ts_catalog/chunk_data_node.h +++ b/src/ts_catalog/chunk_data_node.h @@ -32,7 +32,10 @@ extern int ts_chunk_data_node_delete_by_node_name(const char *node_name); extern TSDLLEXPORT List * ts_chunk_data_node_scan_by_node_name_and_hypertable_id(const char *node_name, int32 hypertable_id, MemoryContext mctx); -extern ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt); -extern void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id); +extern TSDLLEXPORT ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt); +extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, + int32 chunk_id); +extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, + const char *node_name); #endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */ diff --git a/src/ts_catalog/dimension_partition.c b/src/ts_catalog/dimension_partition.c index 6d467d3f021..d1040d71862 100644 --- a/src/ts_catalog/dimension_partition.c +++ b/src/ts_catalog/dimension_partition.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include diff --git a/src/utils.c b/src/utils.c index f09017228ff..2e236fa5ce2 100644 --- a/src/utils.c +++ b/src/utils.c @@ -1270,3 +1270,25 @@ ts_copy_relation_acl(const Oid source_relid, const Oid target_relid, const Oid o ReleaseSysCache(source_tuple); table_close(class_rel, RowExclusiveLock); } + +bool +ts_data_node_is_available_by_server(const ForeignServer *server) +{ + ListCell *lc; + + foreach (lc, server->options) + { + DefElem *elem = lfirst(lc); + + if (strcmp(elem->defname, "available") == 0 && !defGetBoolean(elem)) + return false; + } + + return true; +} + +bool +ts_data_node_is_available(const char *name) +{ + return ts_data_node_is_available_by_server(GetForeignServerByName(name, false)); +} diff --git a/src/utils.h b/src/utils.h index a94017b03d8..43d0b248656 100644 --- a/src/utils.h +++ b/src/utils.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -200,5 +201,7 @@ extern TSDLLEXPORT void ts_alter_table_with_event_trigger(Oid relid, Node *cmd, bool recurse); extern TSDLLEXPORT void ts_copy_relation_acl(const Oid source_relid, const Oid target_relid, const Oid owner_id); +extern TSDLLEXPORT bool ts_data_node_is_available_by_server(const ForeignServer *server); +extern TSDLLEXPORT bool ts_data_node_is_available(const char *node_name); #endif /* TIMESCALEDB_UTILS_H */ diff --git a/tsl/src/chunk.c b/tsl/src/chunk.c index 783149c78eb..71d24cf7a14 100644 --- a/tsl/src/chunk.c +++ b/tsl/src/chunk.c @@ -4,6 +4,7 @@ * LICENSE-TIMESCALE for a copy of the license. */ +#include "utils.h" #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +38,7 @@ #include #include #include +#include "hypercube.h" #include "chunk.h" #include "chunk_api.h" @@ -66,7 +69,7 @@ chunk_match_data_node_by_server(const Chunk *chunk, const ForeignServer *server) } static bool -chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server) +chunk_set_foreign_server(const Chunk *chunk, const ForeignServer *new_server) { Relation ftrel; HeapTuple tuple; @@ -134,32 +137,131 @@ chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server) return true; } -void -chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id) +/* + * Change the data node used for query a chunk. + * + * Either switch away from using the "default" or reset to the "default". + * + * Return true if the chunk's data node was changed or it was already + * configured the way requested. Otherwise return false. + */ +bool +chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid data_node_id, bool reset) { - ListCell *lc; - ChunkDataNode *new_server = NULL; - Chunk *chunk = ts_chunk_get_by_id(chunk_id, true); ForeignTable *foreign_table = NULL; + ForeignServer *server = NULL; + bool new_data_node_found = false; + ListCell *lc; Assert(chunk->relkind == RELKIND_FOREIGN_TABLE); foreign_table = GetForeignTable(chunk->table_id); - /* no need to update since foreign table doesn't reference server we try to remove */ - if (existing_server_id != foreign_table->serverid) - return; + /* Cannot switch to other data node if only one or none assigned */ + if (list_length(chunk->data_nodes) < 2) + return false; - Assert(list_length(chunk->data_nodes) > 1); + /* Nothing to do if the chunk table already has the requested data node set */ + if ((!reset && data_node_id != foreign_table->serverid) || + (reset && data_node_id == foreign_table->serverid)) + return true; - foreach (lc, chunk->data_nodes) + if (reset) { - new_server = lfirst(lc); - if (new_server->foreign_server_oid != existing_server_id) - break; + /* Switch to using the given data node, but only on chunks where the + * given node is the "default" according to partitioning */ + Cache *htcache = ts_hypertable_cache_pin(); + const Hypertable *ht = + ts_hypertable_cache_get_entry(htcache, chunk->hypertable_relid, CACHE_FLAG_NONE); + const Dimension *dim = hyperspace_get_closed_dimension(ht->space, 0); + + if (dim != NULL) + { + /* For space-partitioned tables, use the current partitioning + * configuration in that dimension (dimension partition) as a + * template for picking the query data node */ + const DimensionSlice *slice = + ts_hypercube_get_slice_by_dimension_id(chunk->cube, dim->fd.id); + int i; + + Assert(dim->dimension_partitions); + + for (i = 0; i < dim->dimension_partitions->num_partitions; i++) + { + const DimensionPartition *dp = dim->dimension_partitions->partitions[i]; + + /* Match the chunk with the dimension partition. Count as a + * match if the start of chunk is within the range of the + * partition. This captures both the case when the chunk + * aligns perfectly with the partition and when it is bigger + * or smaller (due to a previous partitioning change). */ + if (slice->fd.range_start >= dp->range_start && + slice->fd.range_start <= dp->range_end) + { + ListCell *lc; + + /* Use the data node for queries if it is the first + * available data node in the partition's list (i.e., the + * default choice) */ + foreach (lc, dp->data_nodes) + { + const char *node_name = lfirst(lc); + server = GetForeignServerByName(node_name, false); + + if (ts_data_node_is_available_by_server(server)) + { + new_data_node_found = (server->serverid == data_node_id); + break; + } + } + } + } + } + else + { + /* For hypertables without a space partition, use the data node + * assignment logic to figure out whether to use the data node as + * query data node. The "default" query data node is the first in + * the list. The chunk assign logic only returns available data + * nodes. */ + List *datanodes = ts_hypertable_assign_chunk_data_nodes(ht, chunk->cube); + const char *node_name = linitial(datanodes); + server = GetForeignServerByName(node_name, false); + + if (strcmp(server->servername, node_name) == 0) + new_data_node_found = true; + } + + ts_cache_release(htcache); + } + else + { + /* Switch "away" from using the given data node. Pick the first + * "available" data node referenced by the chunk */ + foreach (lc, chunk->data_nodes) + { + const ChunkDataNode *cdn = lfirst(lc); + + if ((!reset && cdn->foreign_server_oid != data_node_id) || + (reset && cdn->foreign_server_oid == data_node_id)) + { + server = GetForeignServer(cdn->foreign_server_oid); + + if (ts_data_node_is_available_by_server(server)) + { + new_data_node_found = true; + break; + } + } + } + } + + if (new_data_node_found) + { + Assert(server != NULL); + chunk_set_foreign_server(chunk, server); } - Assert(new_server != NULL); - chunk_set_foreign_server(chunk, GetForeignServer(new_server->foreign_server_oid)); + return new_data_node_found; } Datum diff --git a/tsl/src/chunk.h b/tsl/src/chunk.h index 0ede76be13e..370a138a9ec 100644 --- a/tsl/src/chunk.h +++ b/tsl/src/chunk.h @@ -10,7 +10,7 @@ #include #include -extern void chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id); +extern bool chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid data_node_id, bool reset); extern Datum chunk_set_default_data_node(PG_FUNCTION_ARGS); extern Datum chunk_drop_replica(PG_FUNCTION_ARGS); extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type); diff --git a/tsl/src/chunk_api.c b/tsl/src/chunk_api.c index 05bfc7138ef..30fdcf3091d 100644 --- a/tsl/src/chunk_api.c +++ b/tsl/src/chunk_api.c @@ -1767,6 +1767,6 @@ chunk_api_call_chunk_drop_replica(const Chunk *chunk, const char *node_name, Oid * This chunk might have this data node as primary, change that association * if so. Then delete the chunk_id and node_name association. */ - chunk_update_foreign_server_if_needed(chunk->fd.id, serverid); + chunk_update_foreign_server_if_needed(chunk, serverid, false); ts_chunk_data_node_delete_by_chunk_id_and_node_name(chunk->fd.id, node_name); } diff --git a/tsl/src/data_node.c b/tsl/src/data_node.c index 790f61f54eb..b3843deccdd 100644 --- a/tsl/src/data_node.c +++ b/tsl/src/data_node.c @@ -3,9 +3,7 @@ * Please see the included NOTICE for copyright information and * LICENSE-TIMESCALE for a copy of the license. */ -#include "cache.h" #include - #include #include #include @@ -17,22 +15,30 @@ #include #include #include +#include #include +#include #include #include #include #include #include +#include +#include #include #include +#include #include #include #include +#include #include #include "compat/compat.h" #include "config.h" #include "extension.h" +#include "cache.h" +#include "chunk.h" #include "fdw/fdw.h" #include "remote/async.h" #include "remote/connection.h" @@ -45,7 +51,8 @@ #include "dist_util.h" #include "utils/uuid.h" #include "mb/pg_wchar.h" -#include "chunk.h" +#include "scan_iterator.h" +#include "ts_catalog/catalog.h" #include "ts_catalog/chunk_data_node.h" #include "ts_catalog/dimension_partition.h" #include "ts_catalog/hypertable_data_node.h" @@ -679,6 +686,16 @@ get_server_port() return pg_strtoint32(portstr); } +static void +validate_data_node_port(int port) +{ + if (port < 1 || port > PG_UINT16_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + (errmsg("invalid port number %d", port), + errhint("The port number must be between 1 and %u.", PG_UINT16_MAX)))); +} + /* set_distid may need to be false for some otherwise invalid configurations * that are useful for testing */ static Datum @@ -719,11 +736,7 @@ data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("data node name cannot be NULL")))); - if (port < 1 || port > PG_UINT16_MAX) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - (errmsg("invalid port number %d", port), - errhint("The port number must be between 1 and %u.", PG_UINT16_MAX)))); + validate_data_node_port(port); result = get_database_info(MyDatabaseId, &database); Assert(result); @@ -1135,8 +1148,8 @@ data_node_modify_hypertable_data_nodes(const char *node_name, List *hypertable_d foreach (cs_lc, chunk_data_nodes) { ChunkDataNode *cdn = lfirst(cs_lc); - - chunk_update_foreign_server_if_needed(cdn->fd.chunk_id, cdn->foreign_server_oid); + const Chunk *chunk = ts_chunk_get_by_id(cdn->fd.chunk_id, true); + chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false); ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id, NameStr(cdn->fd.node_name)); } @@ -1416,6 +1429,276 @@ data_node_detach(PG_FUNCTION_ARGS) PG_RETURN_INT32(removed); } +enum Anum_show_conn +{ + Anum_alter_data_node_node_name = 1, + Anum_alter_data_node_host, + Anum_alter_data_node_port, + Anum_alter_data_node_database, + Anum_alter_data_node_options, + _Anum_alter_data_node_max, +}; + +#define Natts_alter_data_node (_Anum_alter_data_node_max - 1) + +static HeapTuple +create_alter_data_node_tuple(TupleDesc tupdesc, const char *node_name, List *options) +{ + Datum values[Natts_alter_data_node]; + bool nulls[Natts_alter_data_node] = { false }; + ListCell *lc; + ArrayBuildState *arrbuilder = initArrayResult(TEXTOID, CurrentMemoryContext, false); + + MemSet(nulls, false, sizeof(nulls)); + + values[AttrNumberGetAttrOffset(Anum_alter_data_node_node_name)] = CStringGetDatum(node_name); + + foreach (lc, options) + { + DefElem *elem = lfirst(lc); + + if (strcmp("host", elem->defname) == 0) + { + values[AttrNumberGetAttrOffset(Anum_alter_data_node_host)] = + CStringGetTextDatum(defGetString(elem)); + } + else if (strcmp("port", elem->defname) == 0) + { + int port = atoi(defGetString(elem)); + values[AttrNumberGetAttrOffset(Anum_alter_data_node_port)] = Int32GetDatum(port); + } + else if (strcmp("dbname", elem->defname) == 0) + { + values[AttrNumberGetAttrOffset(Anum_alter_data_node_database)] = + CStringGetDatum(defGetString(elem)); + } + else + { + StringInfo opt = makeStringInfo(); + appendStringInfo(opt, "%s=%s", elem->defname, defGetString(elem)); + accumArrayResult(arrbuilder, + CStringGetTextDatum(opt->data), + false, + TEXTOID, + CurrentMemoryContext); + } + } + + values[AttrNumberGetAttrOffset(Anum_alter_data_node_options)] = + makeArrayResult(arrbuilder, CurrentMemoryContext); + + return heap_form_tuple(tupdesc, values, nulls); +} + +/* + * Switch data node to use for queries on chunks. + * + * When reset=false it will switch from the given data node to another one, + * but only if the data node is currently used for queries on the chunk. + * + * When reset=true it will switch to the given data node, if it is "primary" + * for the chunk (according to the current partitioning configuration). + */ +static void +switch_data_node_on_chunks(const ForeignServer *datanode, bool reset) +{ + unsigned int failed_update_count = 0; + ScanIterator it = ts_chunk_data_nodes_scan_iterator_create(CurrentMemoryContext); + ts_chunk_data_nodes_scan_iterator_set_node_name(&it, datanode->servername); + + /* Scan for chunks that reference the given data node */ + ts_scanner_foreach(&it) + { + TupleTableSlot *slot = ts_scan_iterator_slot(&it); + bool PG_USED_FOR_ASSERTS_ONLY isnull = false; + Datum chunk_id = slot_getattr(slot, Anum_chunk_data_node_chunk_id, &isnull); + + Assert(!isnull); + + const Chunk *chunk = ts_chunk_get_by_id(DatumGetInt32(chunk_id), true); + if (!chunk_update_foreign_server_if_needed(chunk, datanode->serverid, reset)) + failed_update_count++; + } + + if (!reset && failed_update_count > 0) + elog(WARNING, "could not switch data node on %u chunks", failed_update_count); + + ts_scan_iterator_close(&it); +} + +static List * +append_data_node_option(List *new_options, List **current_options, const char *name, Node *value) +{ + DefElem *elem; + ListCell *lc; + bool option_found = false; +#if PG13_LT + ListCell *prev_lc = NULL; +#endif + + foreach (lc, *current_options) + { + elem = lfirst(lc); + + if (strcmp(elem->defname, name) == 0) + { + option_found = true; + /* Remove the option which is replaced so that the remaining + * options can be merged later into an updated list */ +#if PG13_GE + *current_options = list_delete_cell(*current_options, lc); +#else + *current_options = list_delete_cell(*current_options, lc, prev_lc); +#endif + break; + } +#if PG13_LT + prev_lc = lc; +#endif + } + + elem = makeDefElemExtended(NULL, + pstrdup(name), + value, + option_found ? DEFELEM_SET : DEFELEM_ADD, + -1); + return lappend(new_options, elem); +} + +/* + * Alter a data node. + * + * Change the configuration of a data node, including host, port, and + * database. + * + * Can also be used to mark a data node "unavailable", which ensures it is no + * longer used for reads as long as there are replica chunks on other data + * nodes to use for reads instead. If it is not possible to fail over all + * chunks, a warning will be raised. + */ +Datum +data_node_alter(PG_FUNCTION_ARGS) +{ + const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0)); + const char *host = PG_ARGISNULL(1) ? NULL : TextDatumGetCString(PG_GETARG_TEXT_P(1)); + const char *database = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2)); + int port = PG_ARGISNULL(3) ? -1 : PG_GETARG_INT32(3); + ArrayType *arropts = PG_ARGISNULL(4) ? NULL : PG_GETARG_ARRAYTYPE_P(4); + List *options = NIL; + List *current_options = NIL; + bool available = true; + ForeignServer *server = NULL; + TupleDesc tupdesc; + AlterForeignServerStmt alter_server_stmt = { + .type = T_AlterForeignServerStmt, + .servername = node_name ? pstrdup(node_name) : NULL, + .has_version = false, + .version = NULL, + .options = NIL, + }; + + TS_PREVENT_FUNC_IF_READ_ONLY(); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + + tupdesc = BlessTupleDesc(tupdesc); + + /* Check if a data node with the given name actually exists, or raise an error. */ + server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false /* missing_ok */); + + if (host == NULL && database == NULL && port == -1 && arropts == NULL) + PG_RETURN_DATUM( + HeapTupleGetDatum(create_alter_data_node_tuple(tupdesc, node_name, server->options))); + + current_options = list_copy(server->options); + + if (host != NULL) + options = append_data_node_option(options, + ¤t_options, + "host", + (Node *) makeString((char *) host)); + + if (database != NULL) + options = append_data_node_option(options, + ¤t_options, + "dbname", + (Node *) makeString((char *) database)); + + if (port != -1) + { + validate_data_node_port(port); + options = + append_data_node_option(options, ¤t_options, "port", (Node *) makeInteger(port)); + } + + /* Check for non-libpq options as well. These are defined as part of the + * FDW. Validation is done in fdw/option.c */ + if (NULL != arropts) + { + ArrayIterator arrit = array_create_iterator(arropts, 0, NULL); + Datum elem = (Datum) NULL; + bool isnull; + + while (array_iterate(arrit, &elem, &isnull)) + { + if (!isnull) + { + const char *opt = TextDatumGetCString(elem); + char *key = pstrdup(opt); + char *value = strstr(key, "="); + Value *v; + + if (value == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("missing value for option \"%s\"", opt), + errhint("Data node options should be of the form key=value"))); + + value[0] = '\0'; + value++; + v = makeString(value); + + /* Check explicitly for "available" since we might need to + * take action */ + if (strcmp("available", key) == 0) + { + DefElem elem = { + .type = T_DefElem, + .defname = key, + .arg = (Node *) v, + }; + + available = defGetBoolean(&elem); + } + + options = append_data_node_option(options, ¤t_options, key, (Node *) v); + } + } + + array_free_iterator(arrit); + } + + alter_server_stmt.options = options; + AlterForeignServer(&alter_server_stmt); + /* Make changes to the data node (foreign server object) visible so that + * the changes are present when we switch "primary" data node on chunks */ + CommandCounterIncrement(); + + /* Update the currently used query data node on all affected chunks to + * reflect the new status of the data node */ + switch_data_node_on_chunks(server, available); + + /* Add updated options last as they will take precedence over old options + * when creating the result tuple. */ + options = list_concat(current_options, options); + + PG_RETURN_DATUM(HeapTupleGetDatum(create_alter_data_node_tuple(tupdesc, node_name, options))); +} + /* * Drop a data node's database. * diff --git a/tsl/src/data_node.h b/tsl/src/data_node.h index e46ecebd6e2..4cfdc905462 100644 --- a/tsl/src/data_node.h +++ b/tsl/src/data_node.h @@ -28,6 +28,7 @@ extern Datum data_node_add(PG_FUNCTION_ARGS); extern Datum data_node_delete(PG_FUNCTION_ARGS); extern Datum data_node_attach(PG_FUNCTION_ARGS); extern Datum data_node_detach(PG_FUNCTION_ARGS); +extern Datum data_node_alter(PG_FUNCTION_ARGS); extern Datum data_node_block_new_chunks(PG_FUNCTION_ARGS); extern Datum data_node_allow_new_chunks(PG_FUNCTION_ARGS); extern List *data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck); diff --git a/tsl/src/fdw/option.c b/tsl/src/fdw/option.c index 303d8a3c1ed..5e2c594284d 100644 --- a/tsl/src/fdw/option.c +++ b/tsl/src/fdw/option.c @@ -132,6 +132,11 @@ option_validate(List *options_list, Oid catalog) (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s requires a non-negative integer value", def->defname))); } + else if (strcmp(def->defname, "available") == 0) + { + /* This will throw an error if not a boolean */ + defGetBoolean(def); + } } } @@ -154,6 +159,7 @@ init_ts_fdw_options(void) /* fetch_size is available on both foreign data wrapper and server */ { "fetch_size", ForeignDataWrapperRelationId }, { "fetch_size", ForeignServerRelationId }, + { "available", ForeignServerRelationId }, { NULL, InvalidOid } }; diff --git a/tsl/src/init.c b/tsl/src/init.c index 32314005d44..eb0536d70ff 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -189,6 +189,7 @@ CrossModuleFunctions tsl_cm_functions = { .data_node_attach = data_node_attach, .data_node_ping = data_node_ping, .data_node_detach = data_node_detach, + .data_node_alter = data_node_alter, .data_node_allow_new_chunks = data_node_allow_new_chunks, .data_node_block_new_chunks = data_node_block_new_chunks, .chunk_set_default_data_node = chunk_set_default_data_node, diff --git a/tsl/test/expected/data_node.out b/tsl/test/expected/data_node.out index f941793b98b..205cbd06c71 100644 --- a/tsl/test/expected/data_node.out +++ b/tsl/test/expected/data_node.out @@ -1579,3 +1579,390 @@ DROP DATABASE :DN_DBNAME_3; DROP DATABASE :DN_DBNAME_4; DROP DATABASE :DN_DBNAME_5; DROP DATABASE :DN_DBNAME_6; +----------------------------------------------- +-- Test alter_data_node() +----------------------------------------------- +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_1', host => 'localhost', database => :'DN_DBNAME_1'); + node_name | database | node_created | database_created | extension_created +-------------+----------------+--------------+------------------+------------------- + data_node_1 | db_data_node_1 | t | t | t +(1 row) + +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_2', host => 'localhost', database => :'DN_DBNAME_2'); + node_name | database | node_created | database_created | extension_created +-------------+----------------+--------------+------------------+------------------- + data_node_2 | db_data_node_2 | t | t | t +(1 row) + +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_3', host => 'localhost', database => :'DN_DBNAME_3'); + node_name | database | node_created | database_created | extension_created +-------------+----------------+--------------+------------------+------------------- + data_node_3 | db_data_node_3 | t | t | t +(1 row) + +GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO :ROLE_1; +SET ROLE :ROLE_1; +CREATE TABLE hyper1 (time timestamptz, location int, temp float); +CREATE TABLE hyper2 (LIKE hyper1); +CREATE TABLE hyper3 (LIKE hyper1); +CREATE TABLE hyper_1dim (LIKE hyper1); +SELECT create_distributed_hypertable('hyper1', 'time', 'location', replication_factor=>1); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (10,public,hyper1,t) +(1 row) + +SELECT create_distributed_hypertable('hyper2', 'time', 'location', replication_factor=>2); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (11,public,hyper2,t) +(1 row) + +SELECT create_distributed_hypertable('hyper3', 'time', 'location', replication_factor=>3); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (12,public,hyper3,t) +(1 row) + +SELECT create_distributed_hypertable('hyper_1dim', 'time', chunk_time_interval=>interval '2 days', replication_factor=>3); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (13,public,hyper_1dim,t) +(1 row) + +SELECT setseed(1); + setseed +--------- + +(1 row) + +INSERT INTO hyper1 +SELECT t, (abs(timestamp_hash(t::timestamp)) % 3) + 1, random() * 30 +FROM generate_series('2022-01-01 00:00:00'::timestamptz, '2022-01-05 00:00:00', '1 h') t; +INSERT INTO hyper2 SELECT * FROM hyper1; +INSERT INTO hyper3 SELECT * FROM hyper1; +INSERT INTO hyper_1dim SELECT * FROM hyper1; +-- create view to see the data nodes and default data node of all +-- chunks +CREATE VIEW chunk_query_data_node AS +SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node + FROM timescaledb_information.chunks ch + INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid) + INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid) + ORDER BY 1, 2; +SELECT * FROM chunk_query_data_node; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1 + hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2 + hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3 + hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1 + hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 +(12 rows) + +-- query some data from all hypertables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +-- test "switching over" to other data node when +\set ON_ERROR_STOP 0 +-- must be owner to alter a data node +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +ERROR: must be owner of foreign server data_node_1 +SELECT * FROM alter_data_node('data_node_1', port=>8989); +ERROR: must be owner of foreign server data_node_1 +\set ON_ERROR_STOP 1 +RESET ROLE; +-- simulate a node being down by renaming the database for +-- data_node_1, but for that to work we need to reconnect the backend +-- to clear out the connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +ALTER DATABASE :DN_DBNAME_1 RENAME TO data_node_1_unavailable; +WARNING: you need to manually restart any running background workers after this command +\set ON_ERROR_STOP 0 +SELECT * FROM hyper1 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +SELECT * FROM hyper2 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +SELECT * FROM hyper3 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +SELECT * FROM hyper_1dim ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +\set ON_ERROR_STOP 1 +-- when altering a data node, related entries in the connection cache +-- should be invalidated +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; + node_name | host | port | invalidated +-----------+------+------+------------- +(0 rows) + +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +WARNING: could not switch data node on 1 chunks + node_name | host | port | database | options +-------------+-----------+-------+----------------+------------------- + data_node_1 | localhost | 55432 | db_data_node_1 | {available=false} +(1 row) + +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; + node_name | host | port | invalidated +-----------+------+------+------------- +(0 rows) + +-- the node that is not available for reads should no longer be +-- query data node for chunks, except for those that have no +-- alternative (i.e., the chunk only has one data node). +SELECT * FROM chunk_query_data_node; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1 + hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2 + hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3 + hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 +(12 rows) + +-- queries should work again, except onb hyper1 which has no replica +-- chunks +\set ON_ERROR_STOP 0 +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +\set ON_ERROR_STOP 1 +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +-- inserts should fail if going to chunks that exist on the +-- unavailable data node +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); +ERROR: could not connect to "data_node_1" +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); +ERROR: could not connect to "data_node_1" +\set ON_ERROR_STOP 1 +-- inserts should work if going to a new chunk +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +WARNING: insufficient number of data nodes +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +WARNING: insufficient number of data nodes +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-10 00:00:00' +AND range_end::timestamptz > '2022-01-10 00:00:00' +ORDER BY 1, 2; + hypertable_name | chunk_name | data_nodes +-----------------+-------------------------+--------------------------- + hyper3 | _dist_hyper_12_24_chunk | {data_node_2,data_node_3} + hyper_1dim | _dist_hyper_13_25_chunk | {data_node_2,data_node_3} +(2 rows) + +-- re-enable the data node and the chunks should "switch back" to +-- using the data node. However, only the chunks for which the node is +-- "primary" should switch to using the data node for queries +ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; +WARNING: you need to manually restart any running background workers after this command +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }'); + node_name | host | port | database | options +-------------+-----------+-------+----------------+------------------ + data_node_1 | localhost | 55432 | db_data_node_1 | {available=true} +(1 row) + +SELECT * FROM chunk_query_data_node; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1 + hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2 + hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3 + hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1 + hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_2 +(14 rows) + +--queries should work again on all tables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +-- save old port so that we can restore connectivity after we test +-- changing the connection information for the data node +WITH options AS ( + SELECT unnest(options) option + FROM timescaledb_information.data_nodes + WHERE node_name = 'data_node_1' +) +SELECT split_part(option, '=', 2) AS old_port +FROM options WHERE option LIKE 'port%' \gset +-- also test altering host, port and database +SELECT node_name, options FROM timescaledb_information.data_nodes; + node_name | options +-------------+------------------------------------------------------------------ + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} + data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} +(3 rows) + +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }', host=>'foo.bar', port=>8989, database=>'new_db'); + node_name | host | port | database | options +-------------+---------+------+----------+------------------ + data_node_1 | foo.bar | 8989 | new_db | {available=true} +(1 row) + +SELECT node_name, options FROM timescaledb_information.data_nodes; + node_name | options +-------------+------------------------------------------------------- + data_node_1 | {host=foo.bar,port=8989,dbname=new_db,available=true} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} +(3 rows) + +-- just show current options: +SELECT * FROM alter_data_node('data_node_1'); + node_name | host | port | database | options +-------------+---------+------+----------+------------------ + data_node_1 | foo.bar | 8989 | new_db | {available=true} +(1 row) + +\set ON_ERROR_STOP 0 +-- test some failure cases +SELECT * FROM alter_data_node(NULL); +ERROR: data node name cannot be NULL +SELECT * FROM alter_data_node('does_not_exist'); +ERROR: server "does_not_exist" does not exist +SELECT * FROM alter_data_node('data_node_1', port=>89000); +ERROR: invalid port number 89000 +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=" }'); +ERROR: available requires a Boolean value +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available" }'); +ERROR: missing value for option "available" +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo=bar" }'); +ERROR: invalid option "foo" +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo" }'); +ERROR: missing value for option "foo" +-- cannot delete data node with "drop_database" since configuration is wrong +SELECT delete_data_node('data_node_1', drop_database=>true); +ERROR: could not connect to data node "data_node_1" +\set ON_ERROR_STOP 1 +-- restore configuration for data_node_1 +SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1'); + node_name | host | port | database | options +-------------+-----------+-------+----------------+------------------ + data_node_1 | localhost | 55432 | db_data_node_1 | {available=true} +(1 row) + +SELECT node_name, options FROM timescaledb_information.data_nodes; + node_name | options +-------------+------------------------------------------------------------------ + data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} +(3 rows) + +DROP TABLE hyper1; +DROP TABLE hyper2; +DROP TABLE hyper3; +DROP TABLE hyper_1dim; +DROP VIEW chunk_query_data_node; +-- create new session to clear out connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +SELECT delete_data_node('data_node_1', drop_database=>true); + delete_data_node +------------------ + t +(1 row) + +SELECT delete_data_node('data_node_2', drop_database=>true); + delete_data_node +------------------ + t +(1 row) + +SELECT delete_data_node('data_node_3', drop_database=>true); + delete_data_node +------------------ + t +(1 row) + diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index e1d4874a665..281b567e2fd 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -147,13 +147,14 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_internal.unfreeze_chunk(regclass) _timescaledb_internal.validate_as_data_node() _timescaledb_internal.wait_subscription_sync(name,name,integer,numeric) - add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text) - add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text) + add_compression_policy(regclass,"any",boolean,interval) + add_continuous_aggregate_policy(regclass,"any","any",interval,boolean) add_data_node(name,text,name,integer,boolean,boolean,text) add_dimension(regclass,name,integer,anyelement,regproc,boolean) - add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text) - add_reorder_policy(regclass,name,boolean,timestamp with time zone,text) - add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text) + add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc) + add_reorder_policy(regclass,name,boolean) + add_retention_policy(regclass,"any",boolean,interval) + alter_data_node(name,text,name,integer,text[]) alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean,regproc) approximate_row_count(regclass) attach_data_node(name,regclass,boolean,boolean) diff --git a/tsl/test/sql/data_node.sql b/tsl/test/sql/data_node.sql index 97a9703c970..60441e19392 100644 --- a/tsl/test/sql/data_node.sql +++ b/tsl/test/sql/data_node.sql @@ -767,3 +767,173 @@ DROP DATABASE :DN_DBNAME_3; DROP DATABASE :DN_DBNAME_4; DROP DATABASE :DN_DBNAME_5; DROP DATABASE :DN_DBNAME_6; + + +----------------------------------------------- +-- Test alter_data_node() +----------------------------------------------- +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_1', host => 'localhost', database => :'DN_DBNAME_1'); +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_2', host => 'localhost', database => :'DN_DBNAME_2'); +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_3', host => 'localhost', database => :'DN_DBNAME_3'); + +GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO :ROLE_1; +SET ROLE :ROLE_1; + +CREATE TABLE hyper1 (time timestamptz, location int, temp float); +CREATE TABLE hyper2 (LIKE hyper1); +CREATE TABLE hyper3 (LIKE hyper1); +CREATE TABLE hyper_1dim (LIKE hyper1); + +SELECT create_distributed_hypertable('hyper1', 'time', 'location', replication_factor=>1); +SELECT create_distributed_hypertable('hyper2', 'time', 'location', replication_factor=>2); +SELECT create_distributed_hypertable('hyper3', 'time', 'location', replication_factor=>3); +SELECT create_distributed_hypertable('hyper_1dim', 'time', chunk_time_interval=>interval '2 days', replication_factor=>3); + +SELECT setseed(1); +INSERT INTO hyper1 +SELECT t, (abs(timestamp_hash(t::timestamp)) % 3) + 1, random() * 30 +FROM generate_series('2022-01-01 00:00:00'::timestamptz, '2022-01-05 00:00:00', '1 h') t; + +INSERT INTO hyper2 SELECT * FROM hyper1; +INSERT INTO hyper3 SELECT * FROM hyper1; +INSERT INTO hyper_1dim SELECT * FROM hyper1; + +-- create view to see the data nodes and default data node of all +-- chunks +CREATE VIEW chunk_query_data_node AS +SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node + FROM timescaledb_information.chunks ch + INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid) + INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid) + ORDER BY 1, 2; + +SELECT * FROM chunk_query_data_node; + +-- query some data from all hypertables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + +-- test "switching over" to other data node when +\set ON_ERROR_STOP 0 +-- must be owner to alter a data node +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +SELECT * FROM alter_data_node('data_node_1', port=>8989); +\set ON_ERROR_STOP 1 + +RESET ROLE; + +-- simulate a node being down by renaming the database for +-- data_node_1, but for that to work we need to reconnect the backend +-- to clear out the connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +ALTER DATABASE :DN_DBNAME_1 RENAME TO data_node_1_unavailable; +\set ON_ERROR_STOP 0 +SELECT * FROM hyper1 ORDER BY time LIMIT 1; +SELECT * FROM hyper2 ORDER BY time LIMIT 1; +SELECT * FROM hyper3 ORDER BY time LIMIT 1; +SELECT * FROM hyper_1dim ORDER BY time LIMIT 1; +\set ON_ERROR_STOP 1 + +-- when altering a data node, related entries in the connection cache +-- should be invalidated +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; + +-- the node that is not available for reads should no longer be +-- query data node for chunks, except for those that have no +-- alternative (i.e., the chunk only has one data node). +SELECT * FROM chunk_query_data_node; + +-- queries should work again, except onb hyper1 which has no replica +-- chunks +\set ON_ERROR_STOP 0 +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +\set ON_ERROR_STOP 1 +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + + +-- inserts should fail if going to chunks that exist on the +-- unavailable data node +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); +\set ON_ERROR_STOP 1 + +-- inserts should work if going to a new chunk +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); + +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-10 00:00:00' +AND range_end::timestamptz > '2022-01-10 00:00:00' +ORDER BY 1, 2; + +-- re-enable the data node and the chunks should "switch back" to +-- using the data node. However, only the chunks for which the node is +-- "primary" should switch to using the data node for queries +ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }'); +SELECT * FROM chunk_query_data_node; + +--queries should work again on all tables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + +-- save old port so that we can restore connectivity after we test +-- changing the connection information for the data node +WITH options AS ( + SELECT unnest(options) option + FROM timescaledb_information.data_nodes + WHERE node_name = 'data_node_1' +) +SELECT split_part(option, '=', 2) AS old_port +FROM options WHERE option LIKE 'port%' \gset + +-- also test altering host, port and database +SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }', host=>'foo.bar', port=>8989, database=>'new_db'); + +SELECT node_name, options FROM timescaledb_information.data_nodes; +-- just show current options: +SELECT * FROM alter_data_node('data_node_1'); + +\set ON_ERROR_STOP 0 +-- test some failure cases +SELECT * FROM alter_data_node(NULL); +SELECT * FROM alter_data_node('does_not_exist'); +SELECT * FROM alter_data_node('data_node_1', port=>89000); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=" }'); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available" }'); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo=bar" }'); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo" }'); +-- cannot delete data node with "drop_database" since configuration is wrong +SELECT delete_data_node('data_node_1', drop_database=>true); +\set ON_ERROR_STOP 1 + +-- restore configuration for data_node_1 +SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1'); +SELECT node_name, options FROM timescaledb_information.data_nodes; + +DROP TABLE hyper1; +DROP TABLE hyper2; +DROP TABLE hyper3; +DROP TABLE hyper_1dim; +DROP VIEW chunk_query_data_node; + +-- create new session to clear out connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +SELECT delete_data_node('data_node_1', drop_database=>true); +SELECT delete_data_node('data_node_2', drop_database=>true); +SELECT delete_data_node('data_node_3', drop_database=>true);