diff --git a/sql/ddl_api.sql b/sql/ddl_api.sql index c1b469bde86..659063313fe 100644 --- a/sql/ddl_api.sql +++ b/sql/ddl_api.sql @@ -216,3 +216,12 @@ CREATE OR REPLACE PROCEDURE @extschema@.refresh_continuous_aggregate( window_start "any", window_end "any" ) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh'; + +CREATE OR REPLACE FUNCTION @extschema@.alter_data_node( + node_name NAME, + host TEXT = NULL, + database NAME = NULL, + port INTEGER = NULL, + options TEXT[] = NULL +) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, options TEXT[]) +AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE; diff --git a/sql/pre_install/tables.sql b/sql/pre_install/tables.sql index 6f8d9097890..7c532544383 100644 --- a/sql/pre_install/tables.sql +++ b/sql/pre_install/tables.sql @@ -251,6 +251,8 @@ CREATE TABLE _timescaledb_catalog.chunk_data_node ( CONSTRAINT chunk_data_node_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id) ); +CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name); + SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_data_node', ''); -- Default jobs are given the id space [1,1000). User-installed jobs and any jobs created inside tests diff --git a/sql/updates/latest-dev.sql b/sql/updates/latest-dev.sql index fb506472d0a..1bb305cdce5 100644 --- a/sql/updates/latest-dev.sql +++ b/sql/updates/latest-dev.sql @@ -297,3 +297,13 @@ CREATE FUNCTION @extschema@.add_reorder_policy( ) RETURNS INTEGER AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add' LANGUAGE C VOLATILE; + +CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name); +CREATE FUNCTION @extschema@.alter_data_node( + node_name NAME, + host TEXT = NULL, + database NAME = NULL, + port INTEGER = NULL, + options TEXT[] = NULL +) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, options TEXT[]) +AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE; diff --git a/sql/updates/reverse-dev.sql b/sql/updates/reverse-dev.sql index 8ee47e78e9f..1ee647f534b 100644 --- a/sql/updates/reverse-dev.sql +++ b/sql/updates/reverse-dev.sql @@ -224,3 +224,6 @@ ALTER TABLE _timescaledb_internal.bgw_policy_chunk_stats ADD CONSTRAINT bgw_policy_chunk_stats_job_id_fkey FOREIGN KEY(job_id) REFERENCES _timescaledb_config.bgw_job(id) ON DELETE CASCADE; + +DROP INDEX IF EXISTS chunk_data_node_node_name_idx; +DROP FUNCTION @extschema@.alter_data_node; diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index eec82d93d2a..1c2c8902267 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -102,6 +102,7 @@ CROSSMODULE_WRAPPER(data_node_add); CROSSMODULE_WRAPPER(data_node_delete); CROSSMODULE_WRAPPER(data_node_attach); CROSSMODULE_WRAPPER(data_node_detach); +CROSSMODULE_WRAPPER(data_node_alter); CROSSMODULE_WRAPPER(chunk_drop_replica); CROSSMODULE_WRAPPER(chunk_set_default_data_node); @@ -504,6 +505,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = { .data_node_attach = error_no_default_fn_pg_community, .data_node_ping = error_no_default_fn_pg_community, .data_node_detach = error_no_default_fn_pg_community, + .data_node_alter = error_no_default_fn_pg_community, .data_node_allow_new_chunks = error_no_default_fn_pg_community, .data_node_block_new_chunks = error_no_default_fn_pg_community, .distributed_exec = error_no_default_fn_pg_community, diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 1207a60ab64..4b361ffbcee 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -158,6 +158,7 @@ typedef struct CrossModuleFunctions PGFunction data_node_attach; PGFunction data_node_ping; PGFunction data_node_detach; + PGFunction data_node_alter; PGFunction data_node_allow_new_chunks; PGFunction data_node_block_new_chunks; diff --git a/src/hypertable.c b/src/hypertable.c index 12f836514c6..b5f419dc2ef 100644 --- a/src/hypertable.c +++ b/src/hypertable.c @@ -2667,6 +2667,23 @@ assert_chunk_data_nodes_is_a_set(const List *chunk_data_nodes) } #endif +static bool +data_node_is_available(const char *name) +{ + const ForeignServer *server = GetForeignServerByName(name, false); + ListCell *lc; + + foreach (lc, server->options) + { + DefElem *elem = lfirst(lc); + + if (strcmp(elem->defname, "available") == 0 && !defGetBoolean(elem)) + return false; + } + + return true; +} + /* * Assign data nodes to a chunk. * @@ -2693,7 +2710,16 @@ ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cub ts_hypercube_get_slice_by_dimension_id(cube, space_dim->fd.id); const DimensionPartition *dp = ts_dimension_partition_find(space_dim->dimension_partitions, slice->fd.range_start); - chunk_data_nodes = dp->data_nodes; + ListCell *lc; + + /* Filter out data nodes that aren't available */ + foreach (lc, dp->data_nodes) + { + char *node_name = lfirst(lc); + + if (data_node_is_available(node_name)) + chunk_data_nodes = lappend(chunk_data_nodes, node_name); + } } else { @@ -2740,6 +2766,9 @@ typedef bool (*hypertable_data_node_filter)(const HypertableDataNode *hdn); static bool filter_non_blocked_data_nodes(const HypertableDataNode *node) { + if (!data_node_is_available(NameStr(node->fd.node_name))) + return false; + return !node->fd.block_chunks; } diff --git a/src/hypertable.h b/src/hypertable.h index fbce1b22c82..99ac720cf7c 100644 --- a/src/hypertable.h +++ b/src/hypertable.h @@ -151,7 +151,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compressed(Hypertable *ht, extern TSDLLEXPORT bool ts_hypertable_unset_compressed(Hypertable *ht); extern TSDLLEXPORT void ts_hypertable_clone_constraints_to_compressed(const Hypertable *ht, List *constraint_list); -extern List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cube); +extern TSDLLEXPORT List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, + const Hypercube *cube); extern TSDLLEXPORT List *ts_hypertable_get_data_node_name_list(const Hypertable *ht); extern TSDLLEXPORT List *ts_hypertable_get_data_node_serverids_list(const Hypertable *ht); extern TSDLLEXPORT List *ts_hypertable_get_available_data_nodes(const Hypertable *ht, diff --git a/src/ts_catalog/catalog.c b/src/ts_catalog/catalog.c index a2b6e574afe..89cdf3bcfe1 100644 --- a/src/ts_catalog/catalog.c +++ b/src/ts_catalog/catalog.c @@ -190,6 +190,7 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES] .names = (char *[]) { [CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_chunk_id_node_name_key", [CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_node_chunk_id_node_name_key", + [CHUNK_DATA_NODE_NODE_NAME_IDX] = "chunk_data_node_node_name_idx", } }, [TABLESPACE] = { diff --git a/src/ts_catalog/catalog.h b/src/ts_catalog/catalog.h index 39af6ac3a17..d7a26d07ac6 100644 --- a/src/ts_catalog/catalog.h +++ b/src/ts_catalog/catalog.h @@ -596,6 +596,7 @@ enum { CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX, CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX, + CHUNK_DATA_NODE_NODE_NAME_IDX, _MAX_CHUNK_DATA_NODE_INDEX, }; @@ -625,6 +626,17 @@ struct FormData_chunk_data_node_node_chunk_id_node_name_idx NameData node_name; }; +enum Anum_chunk_data_node_node_name_idx +{ + Anum_chunk_data_node_name_idx_node_name = 1, + _Anum_chunk_data_node_node_name_idx_max, +}; + +struct FormData_chunk_data_node_node_name_idx +{ + NameData node_name; +}; + /************************************ * * Tablespace table definitions @@ -1456,7 +1468,7 @@ extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid); extern TSDLLEXPORT void ts_catalog_delete_only(Relation rel, HeapTuple tuple); extern TSDLLEXPORT void ts_catalog_delete(Relation rel, HeapTuple tuple); -extern void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation); +extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation); bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey, int num_keys, tuple_found_func tuple_found, LOCKMODE lockmode, diff --git a/src/ts_catalog/chunk_data_node.c b/src/ts_catalog/chunk_data_node.c index b28c9637bd0..78e3afd19be 100644 --- a/src/ts_catalog/chunk_data_node.c +++ b/src/ts_catalog/chunk_data_node.c @@ -106,14 +106,15 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data) bool should_free; HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple); + ForeignServer *server; ChunkDataNode *chunk_data_node; MemoryContext old; + server = GetForeignServerByName(NameStr(form->node_name), false); old = MemoryContextSwitchTo(ti->mctx); chunk_data_node = palloc(sizeof(ChunkDataNode)); memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node)); - chunk_data_node->foreign_server_oid = - get_foreign_server_oid(NameStr(form->node_name), /* missing_ok = */ false); + chunk_data_node->foreign_server_oid = server->serverid; *nodes = lappend(*nodes, chunk_data_node); MemoryContextSwitchTo(old); @@ -339,3 +340,16 @@ ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id) F_INT4EQ, Int32GetDatum(chunk_id)); } + +void +ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name) +{ + it->ctx.index = + catalog_get_index(ts_catalog_get(), CHUNK_DATA_NODE, CHUNK_DATA_NODE_NODE_NAME_IDX); + ts_scan_iterator_scan_key_reset(it); + ts_scan_iterator_scan_key_init(it, + Anum_chunk_data_node_name_idx_node_name, + BTEqualStrategyNumber, + F_NAMEEQ, + CStringGetDatum(node_name)); +} diff --git a/src/ts_catalog/chunk_data_node.h b/src/ts_catalog/chunk_data_node.h index 1aef275b99a..83a35c78583 100644 --- a/src/ts_catalog/chunk_data_node.h +++ b/src/ts_catalog/chunk_data_node.h @@ -32,7 +32,10 @@ extern int ts_chunk_data_node_delete_by_node_name(const char *node_name); extern TSDLLEXPORT List * ts_chunk_data_node_scan_by_node_name_and_hypertable_id(const char *node_name, int32 hypertable_id, MemoryContext mctx); -extern ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt); -extern void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id); +extern TSDLLEXPORT ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt); +extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, + int32 chunk_id); +extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, + const char *node_name); #endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */ diff --git a/src/ts_catalog/dimension_partition.c b/src/ts_catalog/dimension_partition.c index 6d467d3f021..d1040d71862 100644 --- a/src/ts_catalog/dimension_partition.c +++ b/src/ts_catalog/dimension_partition.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include diff --git a/tsl/src/chunk.c b/tsl/src/chunk.c index 783149c78eb..b0f50ed3774 100644 --- a/tsl/src/chunk.c +++ b/tsl/src/chunk.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include #include #include +#include "hypercube.h" #include "chunk.h" #include "chunk_api.h" @@ -66,7 +68,7 @@ chunk_match_data_node_by_server(const Chunk *chunk, const ForeignServer *server) } static bool -chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server) +chunk_set_foreign_server(const Chunk *chunk, const ForeignServer *new_server) { Relation ftrel; HeapTuple tuple; @@ -134,32 +136,148 @@ chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server) return true; } -void -chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id) +static bool +data_node_is_available(const ForeignServer *server) { ListCell *lc; - ChunkDataNode *new_server = NULL; - Chunk *chunk = ts_chunk_get_by_id(chunk_id, true); + + foreach (lc, server->options) + { + DefElem *elem = lfirst(lc); + + if (strcmp(elem->defname, "available") == 0) + return defGetBoolean(elem); + } + + /* If no option is set, default to available=true */ + return true; +} + +/* + * Change the data node used for query a chunk. + * + * Either switch away from using the "default" or reset to the "default". + * + * Return true if the chunk's data node was changed or it was already + * configured the way requested. Otherwise return false. + */ +bool +chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid data_node_id, bool reset) +{ ForeignTable *foreign_table = NULL; + ForeignServer *server = NULL; + bool new_data_node_found = false; + ListCell *lc; Assert(chunk->relkind == RELKIND_FOREIGN_TABLE); foreign_table = GetForeignTable(chunk->table_id); - /* no need to update since foreign table doesn't reference server we try to remove */ - if (existing_server_id != foreign_table->serverid) - return; + /* Cannot switch to other data node if only one or none assigned */ + if (list_length(chunk->data_nodes) < 2) + return false; - Assert(list_length(chunk->data_nodes) > 1); + /* Nothing to do if the chunk table already has the requested data node set */ + if ((!reset && data_node_id != foreign_table->serverid) || + (reset && data_node_id == foreign_table->serverid)) + return true; - foreach (lc, chunk->data_nodes) + if (reset) { - new_server = lfirst(lc); - if (new_server->foreign_server_oid != existing_server_id) - break; + /* Switch to using the given data node, but only on chunks where the + * given node is the "default" according to partitioning */ + Cache *htcache = ts_hypertable_cache_pin(); + const Hypertable *ht = + ts_hypertable_cache_get_entry(htcache, chunk->hypertable_relid, CACHE_FLAG_NONE); + const Dimension *dim = hyperspace_get_closed_dimension(ht->space, 0); + + if (dim != NULL) + { + /* For space-partitioned tables, use the current partitioning + * configuration in that dimension (dimension partition) as a + * template for picking the query data node */ + const DimensionSlice *slice = + ts_hypercube_get_slice_by_dimension_id(chunk->cube, dim->fd.id); + int i; + + Assert(dim->dimension_partitions); + + for (i = 0; i < dim->dimension_partitions->num_partitions; i++) + { + const DimensionPartition *dp = dim->dimension_partitions->partitions[i]; + + /* Match the chunk with the dimension partition. Count as a + * match if the start of chunk is within the range of the + * partition. This captures both the case when the chunk + * aligns perfectly with the partition and when it is bigger + * or smaller (due to a previous partitioning change). */ + if (slice->fd.range_start >= dp->range_start && + slice->fd.range_start <= dp->range_end) + { + ListCell *lc; + + /* Use the data node for queries if it is the first + * available data node in the partition's list (i.e., the + * default choice) */ + foreach (lc, dp->data_nodes) + { + const char *node_name = lfirst(lc); + server = GetForeignServerByName(node_name, false); + + if (data_node_is_available(server)) + { + new_data_node_found = (server->serverid == data_node_id); + break; + } + } + } + } + } + else + { + /* For hypertables without a space partition, use the data node + * assignment logic to figure out whether to use the data node as + * query data node. The "default" query data node is the first in + * the list. The chunk assign logic only returns available data + * nodes. */ + List *datanodes = ts_hypertable_assign_chunk_data_nodes(ht, chunk->cube); + const char *node_name = linitial(datanodes); + server = GetForeignServerByName(node_name, false); + + if (strcmp(server->servername, node_name) == 0) + new_data_node_found = true; + } + + ts_cache_release(htcache); + } + else + { + /* Switch "away" from using the given data node. Pick the first + * "available" data node referenced by the chunk */ + foreach (lc, chunk->data_nodes) + { + const ChunkDataNode *cdn = lfirst(lc); + + if ((!reset && cdn->foreign_server_oid != data_node_id) || + (reset && cdn->foreign_server_oid == data_node_id)) + { + server = GetForeignServer(cdn->foreign_server_oid); + + if (data_node_is_available(server)) + { + new_data_node_found = true; + break; + } + } + } + } + + if (new_data_node_found) + { + Assert(server != NULL); + chunk_set_foreign_server(chunk, server); } - Assert(new_server != NULL); - chunk_set_foreign_server(chunk, GetForeignServer(new_server->foreign_server_oid)); + return new_data_node_found; } Datum diff --git a/tsl/src/chunk.h b/tsl/src/chunk.h index 0ede76be13e..370a138a9ec 100644 --- a/tsl/src/chunk.h +++ b/tsl/src/chunk.h @@ -10,7 +10,7 @@ #include #include -extern void chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id); +extern bool chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid data_node_id, bool reset); extern Datum chunk_set_default_data_node(PG_FUNCTION_ARGS); extern Datum chunk_drop_replica(PG_FUNCTION_ARGS); extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type); diff --git a/tsl/src/chunk_api.c b/tsl/src/chunk_api.c index 05bfc7138ef..30fdcf3091d 100644 --- a/tsl/src/chunk_api.c +++ b/tsl/src/chunk_api.c @@ -1767,6 +1767,6 @@ chunk_api_call_chunk_drop_replica(const Chunk *chunk, const char *node_name, Oid * This chunk might have this data node as primary, change that association * if so. Then delete the chunk_id and node_name association. */ - chunk_update_foreign_server_if_needed(chunk->fd.id, serverid); + chunk_update_foreign_server_if_needed(chunk, serverid, false); ts_chunk_data_node_delete_by_chunk_id_and_node_name(chunk->fd.id, node_name); } diff --git a/tsl/src/data_node.c b/tsl/src/data_node.c index 790f61f54eb..b3843deccdd 100644 --- a/tsl/src/data_node.c +++ b/tsl/src/data_node.c @@ -3,9 +3,7 @@ * Please see the included NOTICE for copyright information and * LICENSE-TIMESCALE for a copy of the license. */ -#include "cache.h" #include - #include #include #include @@ -17,22 +15,30 @@ #include #include #include +#include #include +#include #include #include #include #include #include +#include +#include #include #include +#include #include #include #include +#include #include #include "compat/compat.h" #include "config.h" #include "extension.h" +#include "cache.h" +#include "chunk.h" #include "fdw/fdw.h" #include "remote/async.h" #include "remote/connection.h" @@ -45,7 +51,8 @@ #include "dist_util.h" #include "utils/uuid.h" #include "mb/pg_wchar.h" -#include "chunk.h" +#include "scan_iterator.h" +#include "ts_catalog/catalog.h" #include "ts_catalog/chunk_data_node.h" #include "ts_catalog/dimension_partition.h" #include "ts_catalog/hypertable_data_node.h" @@ -679,6 +686,16 @@ get_server_port() return pg_strtoint32(portstr); } +static void +validate_data_node_port(int port) +{ + if (port < 1 || port > PG_UINT16_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + (errmsg("invalid port number %d", port), + errhint("The port number must be between 1 and %u.", PG_UINT16_MAX)))); +} + /* set_distid may need to be false for some otherwise invalid configurations * that are useful for testing */ static Datum @@ -719,11 +736,7 @@ data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errmsg("data node name cannot be NULL")))); - if (port < 1 || port > PG_UINT16_MAX) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - (errmsg("invalid port number %d", port), - errhint("The port number must be between 1 and %u.", PG_UINT16_MAX)))); + validate_data_node_port(port); result = get_database_info(MyDatabaseId, &database); Assert(result); @@ -1135,8 +1148,8 @@ data_node_modify_hypertable_data_nodes(const char *node_name, List *hypertable_d foreach (cs_lc, chunk_data_nodes) { ChunkDataNode *cdn = lfirst(cs_lc); - - chunk_update_foreign_server_if_needed(cdn->fd.chunk_id, cdn->foreign_server_oid); + const Chunk *chunk = ts_chunk_get_by_id(cdn->fd.chunk_id, true); + chunk_update_foreign_server_if_needed(chunk, cdn->foreign_server_oid, false); ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id, NameStr(cdn->fd.node_name)); } @@ -1416,6 +1429,276 @@ data_node_detach(PG_FUNCTION_ARGS) PG_RETURN_INT32(removed); } +enum Anum_show_conn +{ + Anum_alter_data_node_node_name = 1, + Anum_alter_data_node_host, + Anum_alter_data_node_port, + Anum_alter_data_node_database, + Anum_alter_data_node_options, + _Anum_alter_data_node_max, +}; + +#define Natts_alter_data_node (_Anum_alter_data_node_max - 1) + +static HeapTuple +create_alter_data_node_tuple(TupleDesc tupdesc, const char *node_name, List *options) +{ + Datum values[Natts_alter_data_node]; + bool nulls[Natts_alter_data_node] = { false }; + ListCell *lc; + ArrayBuildState *arrbuilder = initArrayResult(TEXTOID, CurrentMemoryContext, false); + + MemSet(nulls, false, sizeof(nulls)); + + values[AttrNumberGetAttrOffset(Anum_alter_data_node_node_name)] = CStringGetDatum(node_name); + + foreach (lc, options) + { + DefElem *elem = lfirst(lc); + + if (strcmp("host", elem->defname) == 0) + { + values[AttrNumberGetAttrOffset(Anum_alter_data_node_host)] = + CStringGetTextDatum(defGetString(elem)); + } + else if (strcmp("port", elem->defname) == 0) + { + int port = atoi(defGetString(elem)); + values[AttrNumberGetAttrOffset(Anum_alter_data_node_port)] = Int32GetDatum(port); + } + else if (strcmp("dbname", elem->defname) == 0) + { + values[AttrNumberGetAttrOffset(Anum_alter_data_node_database)] = + CStringGetDatum(defGetString(elem)); + } + else + { + StringInfo opt = makeStringInfo(); + appendStringInfo(opt, "%s=%s", elem->defname, defGetString(elem)); + accumArrayResult(arrbuilder, + CStringGetTextDatum(opt->data), + false, + TEXTOID, + CurrentMemoryContext); + } + } + + values[AttrNumberGetAttrOffset(Anum_alter_data_node_options)] = + makeArrayResult(arrbuilder, CurrentMemoryContext); + + return heap_form_tuple(tupdesc, values, nulls); +} + +/* + * Switch data node to use for queries on chunks. + * + * When reset=false it will switch from the given data node to another one, + * but only if the data node is currently used for queries on the chunk. + * + * When reset=true it will switch to the given data node, if it is "primary" + * for the chunk (according to the current partitioning configuration). + */ +static void +switch_data_node_on_chunks(const ForeignServer *datanode, bool reset) +{ + unsigned int failed_update_count = 0; + ScanIterator it = ts_chunk_data_nodes_scan_iterator_create(CurrentMemoryContext); + ts_chunk_data_nodes_scan_iterator_set_node_name(&it, datanode->servername); + + /* Scan for chunks that reference the given data node */ + ts_scanner_foreach(&it) + { + TupleTableSlot *slot = ts_scan_iterator_slot(&it); + bool PG_USED_FOR_ASSERTS_ONLY isnull = false; + Datum chunk_id = slot_getattr(slot, Anum_chunk_data_node_chunk_id, &isnull); + + Assert(!isnull); + + const Chunk *chunk = ts_chunk_get_by_id(DatumGetInt32(chunk_id), true); + if (!chunk_update_foreign_server_if_needed(chunk, datanode->serverid, reset)) + failed_update_count++; + } + + if (!reset && failed_update_count > 0) + elog(WARNING, "could not switch data node on %u chunks", failed_update_count); + + ts_scan_iterator_close(&it); +} + +static List * +append_data_node_option(List *new_options, List **current_options, const char *name, Node *value) +{ + DefElem *elem; + ListCell *lc; + bool option_found = false; +#if PG13_LT + ListCell *prev_lc = NULL; +#endif + + foreach (lc, *current_options) + { + elem = lfirst(lc); + + if (strcmp(elem->defname, name) == 0) + { + option_found = true; + /* Remove the option which is replaced so that the remaining + * options can be merged later into an updated list */ +#if PG13_GE + *current_options = list_delete_cell(*current_options, lc); +#else + *current_options = list_delete_cell(*current_options, lc, prev_lc); +#endif + break; + } +#if PG13_LT + prev_lc = lc; +#endif + } + + elem = makeDefElemExtended(NULL, + pstrdup(name), + value, + option_found ? DEFELEM_SET : DEFELEM_ADD, + -1); + return lappend(new_options, elem); +} + +/* + * Alter a data node. + * + * Change the configuration of a data node, including host, port, and + * database. + * + * Can also be used to mark a data node "unavailable", which ensures it is no + * longer used for reads as long as there are replica chunks on other data + * nodes to use for reads instead. If it is not possible to fail over all + * chunks, a warning will be raised. + */ +Datum +data_node_alter(PG_FUNCTION_ARGS) +{ + const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0)); + const char *host = PG_ARGISNULL(1) ? NULL : TextDatumGetCString(PG_GETARG_TEXT_P(1)); + const char *database = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2)); + int port = PG_ARGISNULL(3) ? -1 : PG_GETARG_INT32(3); + ArrayType *arropts = PG_ARGISNULL(4) ? NULL : PG_GETARG_ARRAYTYPE_P(4); + List *options = NIL; + List *current_options = NIL; + bool available = true; + ForeignServer *server = NULL; + TupleDesc tupdesc; + AlterForeignServerStmt alter_server_stmt = { + .type = T_AlterForeignServerStmt, + .servername = node_name ? pstrdup(node_name) : NULL, + .has_version = false, + .version = NULL, + .options = NIL, + }; + + TS_PREVENT_FUNC_IF_READ_ONLY(); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + + tupdesc = BlessTupleDesc(tupdesc); + + /* Check if a data node with the given name actually exists, or raise an error. */ + server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false /* missing_ok */); + + if (host == NULL && database == NULL && port == -1 && arropts == NULL) + PG_RETURN_DATUM( + HeapTupleGetDatum(create_alter_data_node_tuple(tupdesc, node_name, server->options))); + + current_options = list_copy(server->options); + + if (host != NULL) + options = append_data_node_option(options, + ¤t_options, + "host", + (Node *) makeString((char *) host)); + + if (database != NULL) + options = append_data_node_option(options, + ¤t_options, + "dbname", + (Node *) makeString((char *) database)); + + if (port != -1) + { + validate_data_node_port(port); + options = + append_data_node_option(options, ¤t_options, "port", (Node *) makeInteger(port)); + } + + /* Check for non-libpq options as well. These are defined as part of the + * FDW. Validation is done in fdw/option.c */ + if (NULL != arropts) + { + ArrayIterator arrit = array_create_iterator(arropts, 0, NULL); + Datum elem = (Datum) NULL; + bool isnull; + + while (array_iterate(arrit, &elem, &isnull)) + { + if (!isnull) + { + const char *opt = TextDatumGetCString(elem); + char *key = pstrdup(opt); + char *value = strstr(key, "="); + Value *v; + + if (value == NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("missing value for option \"%s\"", opt), + errhint("Data node options should be of the form key=value"))); + + value[0] = '\0'; + value++; + v = makeString(value); + + /* Check explicitly for "available" since we might need to + * take action */ + if (strcmp("available", key) == 0) + { + DefElem elem = { + .type = T_DefElem, + .defname = key, + .arg = (Node *) v, + }; + + available = defGetBoolean(&elem); + } + + options = append_data_node_option(options, ¤t_options, key, (Node *) v); + } + } + + array_free_iterator(arrit); + } + + alter_server_stmt.options = options; + AlterForeignServer(&alter_server_stmt); + /* Make changes to the data node (foreign server object) visible so that + * the changes are present when we switch "primary" data node on chunks */ + CommandCounterIncrement(); + + /* Update the currently used query data node on all affected chunks to + * reflect the new status of the data node */ + switch_data_node_on_chunks(server, available); + + /* Add updated options last as they will take precedence over old options + * when creating the result tuple. */ + options = list_concat(current_options, options); + + PG_RETURN_DATUM(HeapTupleGetDatum(create_alter_data_node_tuple(tupdesc, node_name, options))); +} + /* * Drop a data node's database. * diff --git a/tsl/src/data_node.h b/tsl/src/data_node.h index e46ecebd6e2..4cfdc905462 100644 --- a/tsl/src/data_node.h +++ b/tsl/src/data_node.h @@ -28,6 +28,7 @@ extern Datum data_node_add(PG_FUNCTION_ARGS); extern Datum data_node_delete(PG_FUNCTION_ARGS); extern Datum data_node_attach(PG_FUNCTION_ARGS); extern Datum data_node_detach(PG_FUNCTION_ARGS); +extern Datum data_node_alter(PG_FUNCTION_ARGS); extern Datum data_node_block_new_chunks(PG_FUNCTION_ARGS); extern Datum data_node_allow_new_chunks(PG_FUNCTION_ARGS); extern List *data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck); diff --git a/tsl/src/fdw/option.c b/tsl/src/fdw/option.c index 303d8a3c1ed..5e2c594284d 100644 --- a/tsl/src/fdw/option.c +++ b/tsl/src/fdw/option.c @@ -132,6 +132,11 @@ option_validate(List *options_list, Oid catalog) (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s requires a non-negative integer value", def->defname))); } + else if (strcmp(def->defname, "available") == 0) + { + /* This will throw an error if not a boolean */ + defGetBoolean(def); + } } } @@ -154,6 +159,7 @@ init_ts_fdw_options(void) /* fetch_size is available on both foreign data wrapper and server */ { "fetch_size", ForeignDataWrapperRelationId }, { "fetch_size", ForeignServerRelationId }, + { "available", ForeignServerRelationId }, { NULL, InvalidOid } }; diff --git a/tsl/src/init.c b/tsl/src/init.c index 32314005d44..eb0536d70ff 100644 --- a/tsl/src/init.c +++ b/tsl/src/init.c @@ -189,6 +189,7 @@ CrossModuleFunctions tsl_cm_functions = { .data_node_attach = data_node_attach, .data_node_ping = data_node_ping, .data_node_detach = data_node_detach, + .data_node_alter = data_node_alter, .data_node_allow_new_chunks = data_node_allow_new_chunks, .data_node_block_new_chunks = data_node_block_new_chunks, .chunk_set_default_data_node = chunk_set_default_data_node, diff --git a/tsl/test/expected/data_node.out b/tsl/test/expected/data_node.out index f941793b98b..205cbd06c71 100644 --- a/tsl/test/expected/data_node.out +++ b/tsl/test/expected/data_node.out @@ -1579,3 +1579,390 @@ DROP DATABASE :DN_DBNAME_3; DROP DATABASE :DN_DBNAME_4; DROP DATABASE :DN_DBNAME_5; DROP DATABASE :DN_DBNAME_6; +----------------------------------------------- +-- Test alter_data_node() +----------------------------------------------- +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_1', host => 'localhost', database => :'DN_DBNAME_1'); + node_name | database | node_created | database_created | extension_created +-------------+----------------+--------------+------------------+------------------- + data_node_1 | db_data_node_1 | t | t | t +(1 row) + +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_2', host => 'localhost', database => :'DN_DBNAME_2'); + node_name | database | node_created | database_created | extension_created +-------------+----------------+--------------+------------------+------------------- + data_node_2 | db_data_node_2 | t | t | t +(1 row) + +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_3', host => 'localhost', database => :'DN_DBNAME_3'); + node_name | database | node_created | database_created | extension_created +-------------+----------------+--------------+------------------+------------------- + data_node_3 | db_data_node_3 | t | t | t +(1 row) + +GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO :ROLE_1; +SET ROLE :ROLE_1; +CREATE TABLE hyper1 (time timestamptz, location int, temp float); +CREATE TABLE hyper2 (LIKE hyper1); +CREATE TABLE hyper3 (LIKE hyper1); +CREATE TABLE hyper_1dim (LIKE hyper1); +SELECT create_distributed_hypertable('hyper1', 'time', 'location', replication_factor=>1); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (10,public,hyper1,t) +(1 row) + +SELECT create_distributed_hypertable('hyper2', 'time', 'location', replication_factor=>2); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (11,public,hyper2,t) +(1 row) + +SELECT create_distributed_hypertable('hyper3', 'time', 'location', replication_factor=>3); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (12,public,hyper3,t) +(1 row) + +SELECT create_distributed_hypertable('hyper_1dim', 'time', chunk_time_interval=>interval '2 days', replication_factor=>3); +NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (13,public,hyper_1dim,t) +(1 row) + +SELECT setseed(1); + setseed +--------- + +(1 row) + +INSERT INTO hyper1 +SELECT t, (abs(timestamp_hash(t::timestamp)) % 3) + 1, random() * 30 +FROM generate_series('2022-01-01 00:00:00'::timestamptz, '2022-01-05 00:00:00', '1 h') t; +INSERT INTO hyper2 SELECT * FROM hyper1; +INSERT INTO hyper3 SELECT * FROM hyper1; +INSERT INTO hyper_1dim SELECT * FROM hyper1; +-- create view to see the data nodes and default data node of all +-- chunks +CREATE VIEW chunk_query_data_node AS +SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node + FROM timescaledb_information.chunks ch + INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid) + INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid) + ORDER BY 1, 2; +SELECT * FROM chunk_query_data_node; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1 + hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2 + hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3 + hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1 + hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 +(12 rows) + +-- query some data from all hypertables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +-- test "switching over" to other data node when +\set ON_ERROR_STOP 0 +-- must be owner to alter a data node +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +ERROR: must be owner of foreign server data_node_1 +SELECT * FROM alter_data_node('data_node_1', port=>8989); +ERROR: must be owner of foreign server data_node_1 +\set ON_ERROR_STOP 1 +RESET ROLE; +-- simulate a node being down by renaming the database for +-- data_node_1, but for that to work we need to reconnect the backend +-- to clear out the connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +ALTER DATABASE :DN_DBNAME_1 RENAME TO data_node_1_unavailable; +WARNING: you need to manually restart any running background workers after this command +\set ON_ERROR_STOP 0 +SELECT * FROM hyper1 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +SELECT * FROM hyper2 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +SELECT * FROM hyper3 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +SELECT * FROM hyper_1dim ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +\set ON_ERROR_STOP 1 +-- when altering a data node, related entries in the connection cache +-- should be invalidated +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; + node_name | host | port | invalidated +-----------+------+------+------------- +(0 rows) + +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +WARNING: could not switch data node on 1 chunks + node_name | host | port | database | options +-------------+-----------+-------+----------------+------------------- + data_node_1 | localhost | 55432 | db_data_node_1 | {available=false} +(1 row) + +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; + node_name | host | port | invalidated +-----------+------+------+------------- +(0 rows) + +-- the node that is not available for reads should no longer be +-- query data node for chunks, except for those that have no +-- alternative (i.e., the chunk only has one data node). +SELECT * FROM chunk_query_data_node; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1 + hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2 + hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3 + hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 +(12 rows) + +-- queries should work again, except onb hyper1 which has no replica +-- chunks +\set ON_ERROR_STOP 0 +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +ERROR: could not connect to "data_node_1" +\set ON_ERROR_STOP 1 +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +-- inserts should fail if going to chunks that exist on the +-- unavailable data node +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); +ERROR: could not connect to "data_node_1" +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); +ERROR: could not connect to "data_node_1" +\set ON_ERROR_STOP 1 +-- inserts should work if going to a new chunk +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +WARNING: insufficient number of data nodes +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); +WARNING: insufficient number of data nodes +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-10 00:00:00' +AND range_end::timestamptz > '2022-01-10 00:00:00' +ORDER BY 1, 2; + hypertable_name | chunk_name | data_nodes +-----------------+-------------------------+--------------------------- + hyper3 | _dist_hyper_12_24_chunk | {data_node_2,data_node_3} + hyper_1dim | _dist_hyper_13_25_chunk | {data_node_2,data_node_3} +(2 rows) + +-- re-enable the data node and the chunks should "switch back" to +-- using the data node. However, only the chunks for which the node is +-- "primary" should switch to using the data node for queries +ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; +WARNING: you need to manually restart any running background workers after this command +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }'); + node_name | host | port | database | options +-------------+-----------+-------+----------------+------------------ + data_node_1 | localhost | 55432 | db_data_node_1 | {available=true} +(1 row) + +SELECT * FROM chunk_query_data_node; + hypertable_name | chunk | data_nodes | default_data_node +-----------------+-----------------------------------------------+---------------------------------------+------------------- + hyper1 | _timescaledb_internal._dist_hyper_10_12_chunk | {data_node_1} | data_node_1 + hyper1 | _timescaledb_internal._dist_hyper_10_13_chunk | {data_node_2} | data_node_2 + hyper1 | _timescaledb_internal._dist_hyper_10_14_chunk | {data_node_3} | data_node_3 + hyper2 | _timescaledb_internal._dist_hyper_11_15_chunk | {data_node_1,data_node_2} | data_node_1 + hyper2 | _timescaledb_internal._dist_hyper_11_16_chunk | {data_node_2,data_node_3} | data_node_2 + hyper2 | _timescaledb_internal._dist_hyper_11_17_chunk | {data_node_1,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_18_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + hyper3 | _timescaledb_internal._dist_hyper_12_19_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper3 | _timescaledb_internal._dist_hyper_12_20_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper3 | _timescaledb_internal._dist_hyper_12_24_chunk | {data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_21_chunk | {data_node_1,data_node_2,data_node_3} | data_node_2 + hyper_1dim | _timescaledb_internal._dist_hyper_13_22_chunk | {data_node_1,data_node_2,data_node_3} | data_node_3 + hyper_1dim | _timescaledb_internal._dist_hyper_13_23_chunk | {data_node_1,data_node_2,data_node_3} | data_node_1 + hyper_1dim | _timescaledb_internal._dist_hyper_13_25_chunk | {data_node_2,data_node_3} | data_node_2 +(14 rows) + +--queries should work again on all tables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + time | location +------------------------------+---------- + Sat Jan 01 00:00:00 2022 PST | 1 +(1 row) + +-- save old port so that we can restore connectivity after we test +-- changing the connection information for the data node +WITH options AS ( + SELECT unnest(options) option + FROM timescaledb_information.data_nodes + WHERE node_name = 'data_node_1' +) +SELECT split_part(option, '=', 2) AS old_port +FROM options WHERE option LIKE 'port%' \gset +-- also test altering host, port and database +SELECT node_name, options FROM timescaledb_information.data_nodes; + node_name | options +-------------+------------------------------------------------------------------ + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} + data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} +(3 rows) + +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }', host=>'foo.bar', port=>8989, database=>'new_db'); + node_name | host | port | database | options +-------------+---------+------+----------+------------------ + data_node_1 | foo.bar | 8989 | new_db | {available=true} +(1 row) + +SELECT node_name, options FROM timescaledb_information.data_nodes; + node_name | options +-------------+------------------------------------------------------- + data_node_1 | {host=foo.bar,port=8989,dbname=new_db,available=true} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} +(3 rows) + +-- just show current options: +SELECT * FROM alter_data_node('data_node_1'); + node_name | host | port | database | options +-------------+---------+------+----------+------------------ + data_node_1 | foo.bar | 8989 | new_db | {available=true} +(1 row) + +\set ON_ERROR_STOP 0 +-- test some failure cases +SELECT * FROM alter_data_node(NULL); +ERROR: data node name cannot be NULL +SELECT * FROM alter_data_node('does_not_exist'); +ERROR: server "does_not_exist" does not exist +SELECT * FROM alter_data_node('data_node_1', port=>89000); +ERROR: invalid port number 89000 +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=" }'); +ERROR: available requires a Boolean value +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available" }'); +ERROR: missing value for option "available" +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo=bar" }'); +ERROR: invalid option "foo" +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo" }'); +ERROR: missing value for option "foo" +-- cannot delete data node with "drop_database" since configuration is wrong +SELECT delete_data_node('data_node_1', drop_database=>true); +ERROR: could not connect to data node "data_node_1" +\set ON_ERROR_STOP 1 +-- restore configuration for data_node_1 +SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1'); + node_name | host | port | database | options +-------------+-----------+-------+----------------+------------------ + data_node_1 | localhost | 55432 | db_data_node_1 | {available=true} +(1 row) + +SELECT node_name, options FROM timescaledb_information.data_nodes; + node_name | options +-------------+------------------------------------------------------------------ + data_node_1 | {host=localhost,port=55432,dbname=db_data_node_1,available=true} + data_node_2 | {host=localhost,port=55432,dbname=db_data_node_2} + data_node_3 | {host=localhost,port=55432,dbname=db_data_node_3} +(3 rows) + +DROP TABLE hyper1; +DROP TABLE hyper2; +DROP TABLE hyper3; +DROP TABLE hyper_1dim; +DROP VIEW chunk_query_data_node; +-- create new session to clear out connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +SELECT delete_data_node('data_node_1', drop_database=>true); + delete_data_node +------------------ + t +(1 row) + +SELECT delete_data_node('data_node_2', drop_database=>true); + delete_data_node +------------------ + t +(1 row) + +SELECT delete_data_node('data_node_3', drop_database=>true); + delete_data_node +------------------ + t +(1 row) + diff --git a/tsl/test/shared/expected/extension.out b/tsl/test/shared/expected/extension.out index e1d4874a665..281b567e2fd 100644 --- a/tsl/test/shared/expected/extension.out +++ b/tsl/test/shared/expected/extension.out @@ -147,13 +147,14 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text _timescaledb_internal.unfreeze_chunk(regclass) _timescaledb_internal.validate_as_data_node() _timescaledb_internal.wait_subscription_sync(name,name,integer,numeric) - add_compression_policy(regclass,"any",boolean,interval,timestamp with time zone,text) - add_continuous_aggregate_policy(regclass,"any","any",interval,boolean,timestamp with time zone,text) + add_compression_policy(regclass,"any",boolean,interval) + add_continuous_aggregate_policy(regclass,"any","any",interval,boolean) add_data_node(name,text,name,integer,boolean,boolean,text) add_dimension(regclass,name,integer,anyelement,regproc,boolean) - add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc,boolean,text) - add_reorder_policy(regclass,name,boolean,timestamp with time zone,text) - add_retention_policy(regclass,"any",boolean,interval,timestamp with time zone,text) + add_job(regproc,interval,jsonb,timestamp with time zone,boolean,regproc) + add_reorder_policy(regclass,name,boolean) + add_retention_policy(regclass,"any",boolean,interval) + alter_data_node(name,text,name,integer,text[]) alter_job(integer,interval,interval,integer,interval,boolean,jsonb,timestamp with time zone,boolean,regproc) approximate_row_count(regclass) attach_data_node(name,regclass,boolean,boolean) diff --git a/tsl/test/sql/data_node.sql b/tsl/test/sql/data_node.sql index 97a9703c970..60441e19392 100644 --- a/tsl/test/sql/data_node.sql +++ b/tsl/test/sql/data_node.sql @@ -767,3 +767,173 @@ DROP DATABASE :DN_DBNAME_3; DROP DATABASE :DN_DBNAME_4; DROP DATABASE :DN_DBNAME_5; DROP DATABASE :DN_DBNAME_6; + + +----------------------------------------------- +-- Test alter_data_node() +----------------------------------------------- +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_1', host => 'localhost', database => :'DN_DBNAME_1'); +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_2', host => 'localhost', database => :'DN_DBNAME_2'); +SELECT node_name, database, node_created, database_created, extension_created FROM add_data_node('data_node_3', host => 'localhost', database => :'DN_DBNAME_3'); + +GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO :ROLE_1; +SET ROLE :ROLE_1; + +CREATE TABLE hyper1 (time timestamptz, location int, temp float); +CREATE TABLE hyper2 (LIKE hyper1); +CREATE TABLE hyper3 (LIKE hyper1); +CREATE TABLE hyper_1dim (LIKE hyper1); + +SELECT create_distributed_hypertable('hyper1', 'time', 'location', replication_factor=>1); +SELECT create_distributed_hypertable('hyper2', 'time', 'location', replication_factor=>2); +SELECT create_distributed_hypertable('hyper3', 'time', 'location', replication_factor=>3); +SELECT create_distributed_hypertable('hyper_1dim', 'time', chunk_time_interval=>interval '2 days', replication_factor=>3); + +SELECT setseed(1); +INSERT INTO hyper1 +SELECT t, (abs(timestamp_hash(t::timestamp)) % 3) + 1, random() * 30 +FROM generate_series('2022-01-01 00:00:00'::timestamptz, '2022-01-05 00:00:00', '1 h') t; + +INSERT INTO hyper2 SELECT * FROM hyper1; +INSERT INTO hyper3 SELECT * FROM hyper1; +INSERT INTO hyper_1dim SELECT * FROM hyper1; + +-- create view to see the data nodes and default data node of all +-- chunks +CREATE VIEW chunk_query_data_node AS +SELECT ch.hypertable_name, format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass AS chunk, ch.data_nodes, fs.srvname default_data_node + FROM timescaledb_information.chunks ch + INNER JOIN pg_foreign_table ft ON (format('%I.%I', ch.chunk_schema, ch.chunk_name)::regclass = ft.ftrelid) + INNER JOIN pg_foreign_server fs ON (ft.ftserver = fs.oid) + ORDER BY 1, 2; + +SELECT * FROM chunk_query_data_node; + +-- query some data from all hypertables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + +-- test "switching over" to other data node when +\set ON_ERROR_STOP 0 +-- must be owner to alter a data node +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +SELECT * FROM alter_data_node('data_node_1', port=>8989); +\set ON_ERROR_STOP 1 + +RESET ROLE; + +-- simulate a node being down by renaming the database for +-- data_node_1, but for that to work we need to reconnect the backend +-- to clear out the connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +ALTER DATABASE :DN_DBNAME_1 RENAME TO data_node_1_unavailable; +\set ON_ERROR_STOP 0 +SELECT * FROM hyper1 ORDER BY time LIMIT 1; +SELECT * FROM hyper2 ORDER BY time LIMIT 1; +SELECT * FROM hyper3 ORDER BY time LIMIT 1; +SELECT * FROM hyper_1dim ORDER BY time LIMIT 1; +\set ON_ERROR_STOP 1 + +-- when altering a data node, related entries in the connection cache +-- should be invalidated +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=false" }'); +SELECT node_name, host, port, invalidated +FROM _timescaledb_internal.show_connection_cache() +WHERE node_name = 'data_node_1'; + +-- the node that is not available for reads should no longer be +-- query data node for chunks, except for those that have no +-- alternative (i.e., the chunk only has one data node). +SELECT * FROM chunk_query_data_node; + +-- queries should work again, except onb hyper1 which has no replica +-- chunks +\set ON_ERROR_STOP 0 +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +\set ON_ERROR_STOP 1 +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + + +-- inserts should fail if going to chunks that exist on the +-- unavailable data node +\set ON_ERROR_STOP 0 +INSERT INTO hyper3 VALUES ('2022-01-03 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-03 00:00:00', 1, 1); +\set ON_ERROR_STOP 1 + +-- inserts should work if going to a new chunk +INSERT INTO hyper3 VALUES ('2022-01-10 00:00:00', 1, 1); +INSERT INTO hyper_1dim VALUES ('2022-01-10 00:00:00', 1, 1); + +SELECT hypertable_name, chunk_name, data_nodes FROM timescaledb_information.chunks +WHERE hypertable_name IN ('hyper3', 'hyper_1dim') +AND range_start::timestamptz <= '2022-01-10 00:00:00' +AND range_end::timestamptz > '2022-01-10 00:00:00' +ORDER BY 1, 2; + +-- re-enable the data node and the chunks should "switch back" to +-- using the data node. However, only the chunks for which the node is +-- "primary" should switch to using the data node for queries +ALTER DATABASE data_node_1_unavailable RENAME TO :DN_DBNAME_1; +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }'); +SELECT * FROM chunk_query_data_node; + +--queries should work again on all tables +SELECT time, location FROM hyper1 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper2 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper3 ORDER BY time LIMIT 1; +SELECT time, location FROM hyper_1dim ORDER BY time LIMIT 1; + +-- save old port so that we can restore connectivity after we test +-- changing the connection information for the data node +WITH options AS ( + SELECT unnest(options) option + FROM timescaledb_information.data_nodes + WHERE node_name = 'data_node_1' +) +SELECT split_part(option, '=', 2) AS old_port +FROM options WHERE option LIKE 'port%' \gset + +-- also test altering host, port and database +SELECT node_name, options FROM timescaledb_information.data_nodes; +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=true" }', host=>'foo.bar', port=>8989, database=>'new_db'); + +SELECT node_name, options FROM timescaledb_information.data_nodes; +-- just show current options: +SELECT * FROM alter_data_node('data_node_1'); + +\set ON_ERROR_STOP 0 +-- test some failure cases +SELECT * FROM alter_data_node(NULL); +SELECT * FROM alter_data_node('does_not_exist'); +SELECT * FROM alter_data_node('data_node_1', port=>89000); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available=" }'); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "available" }'); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo=bar" }'); +SELECT * FROM alter_data_node('data_node_1', options=>'{ "foo" }'); +-- cannot delete data node with "drop_database" since configuration is wrong +SELECT delete_data_node('data_node_1', drop_database=>true); +\set ON_ERROR_STOP 1 + +-- restore configuration for data_node_1 +SELECT * FROM alter_data_node('data_node_1', host=>'localhost', port=>:old_port, database=>:'DN_DBNAME_1'); +SELECT node_name, options FROM timescaledb_information.data_nodes; + +DROP TABLE hyper1; +DROP TABLE hyper2; +DROP TABLE hyper3; +DROP TABLE hyper_1dim; +DROP VIEW chunk_query_data_node; + +-- create new session to clear out connection cache +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +SELECT delete_data_node('data_node_1', drop_database=>true); +SELECT delete_data_node('data_node_2', drop_database=>true); +SELECT delete_data_node('data_node_3', drop_database=>true);