Skip to content

Commit

Permalink
Add function to alter data nodes
Browse files Browse the repository at this point in the history
Add a new function, `alter_data_node()`, which can be used to change
the data node's configuration originally set up via `add_data_node()`
on the access node.

The new functions introduces a new option "available" that allows
configuring the availability of the data node. Setting
`available=>false` means that the node should no longer be used for
reads and writes. Only read "failover" is implemented as part of this
change, however.

To fail over reads, the alter data node function finds all the chunks
for which the unavailable data node is the "primary" query target and
"fails over" to a chunk replica on another data node instead. If some
chunks do not have a replica to fail over to, a warning will be
raised.

When a data node is available again, the function can be used to
switch back to using the data node for queries.

Closes timescale#2104
  • Loading branch information
erimatnor committed Nov 9, 2022
1 parent 2a64450 commit 03d6260
Show file tree
Hide file tree
Showing 25 changed files with 1,049 additions and 42 deletions.
9 changes: 9 additions & 0 deletions sql/ddl_api.sql
Expand Up @@ -216,3 +216,12 @@ CREATE OR REPLACE PROCEDURE @extschema@.refresh_continuous_aggregate(
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

CREATE OR REPLACE FUNCTION @extschema@.alter_data_node(
node_name NAME,
host TEXT = NULL,
database NAME = NULL,
port INTEGER = NULL,
available BOOLEAN = NULL
) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, available BOOLEAN)
AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE;
2 changes: 2 additions & 0 deletions sql/pre_install/tables.sql
Expand Up @@ -255,6 +255,8 @@ CREATE TABLE _timescaledb_catalog.chunk_data_node (
CONSTRAINT chunk_data_node_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id)
);

CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_data_node', '');

-- Default jobs are given the id space [1,1000). User-installed jobs and any jobs created inside tests
Expand Down
11 changes: 11 additions & 0 deletions sql/updates/latest-dev.sql
Expand Up @@ -379,3 +379,14 @@ GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;

-- end recreate _timescaledb_catalog.dimension table --

-- changes related to alter_data_node():
CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);
CREATE FUNCTION @extschema@.alter_data_node(
node_name NAME,
host TEXT = NULL,
database NAME = NULL,
port INTEGER = NULL,
available BOOLEAN = NULL
) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, available BOOLEAN)
AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE;
4 changes: 4 additions & 0 deletions sql/updates/reverse-dev.sql
Expand Up @@ -305,3 +305,7 @@ GRANT SELECT ON _timescaledb_catalog.dimension_id_seq TO PUBLIC;
GRANT SELECT ON _timescaledb_catalog.dimension TO PUBLIC;

-- end recreate _timescaledb_catalog.dimension table --

-- changes related to alter_data_node()
DROP INDEX _timescaledb_catalog.chunk_data_node_node_name_idx;
DROP FUNCTION @extschema@.alter_data_node;
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Expand Up @@ -102,6 +102,7 @@ CROSSMODULE_WRAPPER(data_node_add);
CROSSMODULE_WRAPPER(data_node_delete);
CROSSMODULE_WRAPPER(data_node_attach);
CROSSMODULE_WRAPPER(data_node_detach);
CROSSMODULE_WRAPPER(data_node_alter);
CROSSMODULE_WRAPPER(chunk_drop_replica);

CROSSMODULE_WRAPPER(chunk_set_default_data_node);
Expand Down Expand Up @@ -505,6 +506,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.data_node_attach = error_no_default_fn_pg_community,
.data_node_ping = error_no_default_fn_pg_community,
.data_node_detach = error_no_default_fn_pg_community,
.data_node_alter = error_no_default_fn_pg_community,
.data_node_allow_new_chunks = error_no_default_fn_pg_community,
.data_node_block_new_chunks = error_no_default_fn_pg_community,
.distributed_exec = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -158,6 +158,7 @@ typedef struct CrossModuleFunctions
PGFunction data_node_attach;
PGFunction data_node_ping;
PGFunction data_node_detach;
PGFunction data_node_alter;
PGFunction data_node_allow_new_chunks;
PGFunction data_node_block_new_chunks;

Expand Down
14 changes: 13 additions & 1 deletion src/hypertable.c
Expand Up @@ -2711,7 +2711,16 @@ ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cub
ts_hypercube_get_slice_by_dimension_id(cube, space_dim->fd.id);
const DimensionPartition *dp =
ts_dimension_partition_find(space_dim->dimension_partitions, slice->fd.range_start);
chunk_data_nodes = dp->data_nodes;
ListCell *lc;

/* Filter out data nodes that aren't available */
foreach (lc, dp->data_nodes)
{
char *node_name = lfirst(lc);

if (ts_data_node_is_available(node_name))
chunk_data_nodes = lappend(chunk_data_nodes, node_name);
}
}
else
{
Expand Down Expand Up @@ -2758,6 +2767,9 @@ typedef bool (*hypertable_data_node_filter)(const HypertableDataNode *hdn);
static bool
filter_non_blocked_data_nodes(const HypertableDataNode *node)
{
if (!ts_data_node_is_available(NameStr(node->fd.node_name)))
return false;

return !node->fd.block_chunks;
}

Expand Down
3 changes: 2 additions & 1 deletion src/hypertable.h
Expand Up @@ -153,7 +153,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compress_interval(Hypertable *ht,
int64 compress_interval);
extern TSDLLEXPORT void ts_hypertable_clone_constraints_to_compressed(const Hypertable *ht,
List *constraint_list);
extern List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cube);
extern TSDLLEXPORT List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht,
const Hypercube *cube);
extern TSDLLEXPORT List *ts_hypertable_get_data_node_name_list(const Hypertable *ht);
extern TSDLLEXPORT List *ts_hypertable_get_data_node_serverids_list(const Hypertable *ht);
extern TSDLLEXPORT List *ts_hypertable_get_available_data_nodes(const Hypertable *ht,
Expand Down
32 changes: 26 additions & 6 deletions src/process_utility.c
Expand Up @@ -4,6 +4,7 @@
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include <foreign/foreign.h>
#include <nodes/parsenodes.h>
#include <nodes/nodes.h>
#include <nodes/makefuncs.h>
Expand Down Expand Up @@ -43,6 +44,7 @@

#include "annotations.h"
#include "export.h"
#include "extension_constants.h"
#include "process_utility.h"
#include "ts_catalog/catalog.h"
#include "chunk.h"
Expand Down Expand Up @@ -495,14 +497,32 @@ static DDLResult
process_alter_foreign_server(ProcessUtilityArgs *args)
{
AlterForeignServerStmt *stmt = (AlterForeignServerStmt *) args->parsetree;
ForeignServer *server = GetForeignServerByName(stmt->servername, true);
Oid fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false);
ListCell *lc;

if (stmt->has_version)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("operation not supported"),
errdetail("It is not possible to set a version on the data node configuration.")));
if (server != NULL && server->fdwid == fdwid)
{
if (stmt->has_version)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("version not supported"),
errdetail(
"It is not possible to set a version on the data node configuration.")));

/* Other options are validated by the FDW */
/* Options are validated by the FDW, but we need to block available option
* since that must be handled via alter_data_node(). */
foreach (lc, stmt->options)
{
DefElem *elem = lfirst(lc);

if (strcmp(elem->defname, "available") == 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot set \"available\" using ALTER SERVER"),
errhint("Use alter_data_node() to set \"available\".")));
}
}

return DDL_CONTINUE;
}
Expand Down
1 change: 1 addition & 0 deletions src/ts_catalog/catalog.c
Expand Up @@ -190,6 +190,7 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
.names = (char *[]) {
[CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_NAME_IDX] = "chunk_data_node_node_name_idx",
}
},
[TABLESPACE] = {
Expand Down
14 changes: 13 additions & 1 deletion src/ts_catalog/catalog.h
Expand Up @@ -598,6 +598,7 @@ enum
{
CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_NAME_IDX,
_MAX_CHUNK_DATA_NODE_INDEX,
};

Expand Down Expand Up @@ -627,6 +628,17 @@ struct FormData_chunk_data_node_node_chunk_id_node_name_idx
NameData node_name;
};

enum Anum_chunk_data_node_node_name_idx
{
Anum_chunk_data_node_name_idx_node_name = 1,
_Anum_chunk_data_node_node_name_idx_max,
};

struct FormData_chunk_data_node_node_name_idx
{
NameData node_name;
};

/************************************
*
* Tablespace table definitions
Expand Down Expand Up @@ -1460,7 +1472,7 @@ extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid
extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid);
extern TSDLLEXPORT void ts_catalog_delete_only(Relation rel, HeapTuple tuple);
extern TSDLLEXPORT void ts_catalog_delete(Relation rel, HeapTuple tuple);
extern void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);
extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);

bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey,
int num_keys, tuple_found_func tuple_found, LOCKMODE lockmode,
Expand Down
18 changes: 16 additions & 2 deletions src/ts_catalog/chunk_data_node.c
Expand Up @@ -106,14 +106,15 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data)
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple);
ForeignServer *server;
ChunkDataNode *chunk_data_node;
MemoryContext old;

server = GetForeignServerByName(NameStr(form->node_name), false);
old = MemoryContextSwitchTo(ti->mctx);
chunk_data_node = palloc(sizeof(ChunkDataNode));
memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node));
chunk_data_node->foreign_server_oid =
get_foreign_server_oid(NameStr(form->node_name), /* missing_ok = */ false);
chunk_data_node->foreign_server_oid = server->serverid;
*nodes = lappend(*nodes, chunk_data_node);
MemoryContextSwitchTo(old);

Expand Down Expand Up @@ -339,3 +340,16 @@ ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id)
F_INT4EQ,
Int32GetDatum(chunk_id));
}

void
ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name)
{
it->ctx.index =
catalog_get_index(ts_catalog_get(), CHUNK_DATA_NODE, CHUNK_DATA_NODE_NODE_NAME_IDX);
ts_scan_iterator_scan_key_reset(it);
ts_scan_iterator_scan_key_init(it,
Anum_chunk_data_node_name_idx_node_name,
BTEqualStrategyNumber,
F_NAMEEQ,
CStringGetDatum(node_name));
}
7 changes: 5 additions & 2 deletions src/ts_catalog/chunk_data_node.h
Expand Up @@ -32,7 +32,10 @@ extern int ts_chunk_data_node_delete_by_node_name(const char *node_name);
extern TSDLLEXPORT List *
ts_chunk_data_node_scan_by_node_name_and_hypertable_id(const char *node_name, int32 hypertable_id,
MemoryContext mctx);
extern ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id);
extern TSDLLEXPORT ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it,
int32 chunk_id);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it,
const char *node_name);

#endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */
23 changes: 23 additions & 0 deletions src/utils.c
Expand Up @@ -1302,3 +1302,26 @@ ts_copy_relation_acl(const Oid source_relid, const Oid target_relid, const Oid o
ReleaseSysCache(source_tuple);
table_close(class_rel, RowExclusiveLock);
}

bool
ts_data_node_is_available_by_server(const ForeignServer *server)
{
ListCell *lc;

foreach (lc, server->options)
{
DefElem *elem = lfirst(lc);

if (strcmp(elem->defname, "available") == 0)
return defGetBoolean(elem);
}

/* Default to available if option is not yet added */
return true;
}

bool
ts_data_node_is_available(const char *name)
{
return ts_data_node_is_available_by_server(GetForeignServerByName(name, false));
}
3 changes: 3 additions & 0 deletions src/utils.h
Expand Up @@ -10,6 +10,7 @@
#include <access/htup_details.h>
#include <catalog/pg_proc.h>
#include <common/int.h>
#include <foreign/foreign.h>
#include <nodes/pathnodes.h>
#include <nodes/extensible.h>
#include <utils/datetime.h>
Expand Down Expand Up @@ -201,5 +202,7 @@ extern TSDLLEXPORT void ts_alter_table_with_event_trigger(Oid relid, Node *cmd,
bool recurse);
extern TSDLLEXPORT void ts_copy_relation_acl(const Oid source_relid, const Oid target_relid,
const Oid owner_id);
extern TSDLLEXPORT bool ts_data_node_is_available_by_server(const ForeignServer *server);
extern TSDLLEXPORT bool ts_data_node_is_available(const char *node_name);

#endif /* TIMESCALEDB_UTILS_H */

0 comments on commit 03d6260

Please sign in to comment.