Skip to content

Commit

Permalink
Add function to alter data nodes
Browse files Browse the repository at this point in the history
Add a new function, `alter_data_node()`, which can be used to change a
data node's configuration on the access node, including connection
information like host, port, and database.

In addition, the function allows marking a data node as "unavailable",
which means the node should no longer be used for reads and
writes. Only read "failover" is implemented as part of this change,
however.

To fail over reads, the alter data node function finds all the chunks
for which the unavailable data node is the "primary" query target and
sets a chunk replica on another data node to use instead. If some
chunks do not have a replica to fail over to, a warning will be
raised.

When a data node is available again, the function can be used to
switch back to once again use the data node for queries.

Closes timescale#2104
  • Loading branch information
erimatnor committed Oct 18, 2022
1 parent 54ed0d5 commit 142b70c
Show file tree
Hide file tree
Showing 23 changed files with 1,094 additions and 39 deletions.
9 changes: 9 additions & 0 deletions sql/ddl_api.sql
Expand Up @@ -216,3 +216,12 @@ CREATE OR REPLACE PROCEDURE @extschema@.refresh_continuous_aggregate(
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

CREATE OR REPLACE FUNCTION @extschema@.alter_data_node(
node_name NAME,
host TEXT = NULL,
database NAME = NULL,
port INTEGER = NULL,
options TEXT[] = NULL
) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, options TEXT[])
AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE;
2 changes: 2 additions & 0 deletions sql/pre_install/tables.sql
Expand Up @@ -251,6 +251,8 @@ CREATE TABLE _timescaledb_catalog.chunk_data_node (
CONSTRAINT chunk_data_node_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id)
);

CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_data_node', '');

-- Default jobs are given the id space [1,1000). User-installed jobs and any jobs created inside tests
Expand Down
10 changes: 10 additions & 0 deletions sql/updates/latest-dev.sql
Expand Up @@ -297,3 +297,13 @@ CREATE FUNCTION @extschema@.add_reorder_policy(
) RETURNS INTEGER
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_add'
LANGUAGE C VOLATILE;

CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);
CREATE FUNCTION @extschema@.alter_data_node(
node_name NAME,
host TEXT = NULL,
database NAME = NULL,
port INTEGER = NULL,
options TEXT[] = NULL
) RETURNS TABLE(node_name NAME, host TEXT, port INTEGER, database NAME, options TEXT[])
AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE;
3 changes: 3 additions & 0 deletions sql/updates/reverse-dev.sql
Expand Up @@ -224,3 +224,6 @@ ALTER TABLE _timescaledb_internal.bgw_policy_chunk_stats
ADD CONSTRAINT bgw_policy_chunk_stats_job_id_fkey
FOREIGN KEY(job_id) REFERENCES _timescaledb_config.bgw_job(id)
ON DELETE CASCADE;

DROP INDEX IF EXISTS chunk_data_node_node_name_idx;
DROP FUNCTION @extschema@.alter_data_node;
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Expand Up @@ -102,6 +102,7 @@ CROSSMODULE_WRAPPER(data_node_add);
CROSSMODULE_WRAPPER(data_node_delete);
CROSSMODULE_WRAPPER(data_node_attach);
CROSSMODULE_WRAPPER(data_node_detach);
CROSSMODULE_WRAPPER(data_node_alter);
CROSSMODULE_WRAPPER(chunk_drop_replica);

CROSSMODULE_WRAPPER(chunk_set_default_data_node);
Expand Down Expand Up @@ -504,6 +505,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.data_node_attach = error_no_default_fn_pg_community,
.data_node_ping = error_no_default_fn_pg_community,
.data_node_detach = error_no_default_fn_pg_community,
.data_node_alter = error_no_default_fn_pg_community,
.data_node_allow_new_chunks = error_no_default_fn_pg_community,
.data_node_block_new_chunks = error_no_default_fn_pg_community,
.distributed_exec = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -158,6 +158,7 @@ typedef struct CrossModuleFunctions
PGFunction data_node_attach;
PGFunction data_node_ping;
PGFunction data_node_detach;
PGFunction data_node_alter;
PGFunction data_node_allow_new_chunks;
PGFunction data_node_block_new_chunks;

Expand Down
31 changes: 30 additions & 1 deletion src/hypertable.c
Expand Up @@ -2667,6 +2667,23 @@ assert_chunk_data_nodes_is_a_set(const List *chunk_data_nodes)
}
#endif

static bool
data_node_is_available(const char *name)
{
const ForeignServer *server = GetForeignServerByName(name, false);
ListCell *lc;

foreach (lc, server->options)
{
DefElem *elem = lfirst(lc);

if (strcmp(elem->defname, "available") == 0 && !defGetBoolean(elem))
return false;
}

return true;
}

/*
* Assign data nodes to a chunk.
*
Expand All @@ -2693,7 +2710,16 @@ ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cub
ts_hypercube_get_slice_by_dimension_id(cube, space_dim->fd.id);
const DimensionPartition *dp =
ts_dimension_partition_find(space_dim->dimension_partitions, slice->fd.range_start);
chunk_data_nodes = dp->data_nodes;
ListCell *lc;

/* Filter out data nodes that aren't available */
foreach (lc, dp->data_nodes)
{
char *node_name = lfirst(lc);

if (data_node_is_available(node_name))
chunk_data_nodes = lappend(chunk_data_nodes, node_name);
}
}
else
{
Expand Down Expand Up @@ -2740,6 +2766,9 @@ typedef bool (*hypertable_data_node_filter)(const HypertableDataNode *hdn);
static bool
filter_non_blocked_data_nodes(const HypertableDataNode *node)
{
if (!data_node_is_available(NameStr(node->fd.node_name)))
return false;

return !node->fd.block_chunks;
}

Expand Down
3 changes: 2 additions & 1 deletion src/hypertable.h
Expand Up @@ -151,7 +151,8 @@ extern TSDLLEXPORT bool ts_hypertable_set_compressed(Hypertable *ht,
extern TSDLLEXPORT bool ts_hypertable_unset_compressed(Hypertable *ht);
extern TSDLLEXPORT void ts_hypertable_clone_constraints_to_compressed(const Hypertable *ht,
List *constraint_list);
extern List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht, const Hypercube *cube);
extern TSDLLEXPORT List *ts_hypertable_assign_chunk_data_nodes(const Hypertable *ht,
const Hypercube *cube);
extern TSDLLEXPORT List *ts_hypertable_get_data_node_name_list(const Hypertable *ht);
extern TSDLLEXPORT List *ts_hypertable_get_data_node_serverids_list(const Hypertable *ht);
extern TSDLLEXPORT List *ts_hypertable_get_available_data_nodes(const Hypertable *ht,
Expand Down
1 change: 1 addition & 0 deletions src/ts_catalog/catalog.c
Expand Up @@ -190,6 +190,7 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
.names = (char *[]) {
[CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_NAME_IDX] = "chunk_data_node_node_name_idx",
}
},
[TABLESPACE] = {
Expand Down
14 changes: 13 additions & 1 deletion src/ts_catalog/catalog.h
Expand Up @@ -596,6 +596,7 @@ enum
{
CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_NAME_IDX,
_MAX_CHUNK_DATA_NODE_INDEX,
};

Expand Down Expand Up @@ -625,6 +626,17 @@ struct FormData_chunk_data_node_node_chunk_id_node_name_idx
NameData node_name;
};

enum Anum_chunk_data_node_node_name_idx
{
Anum_chunk_data_node_name_idx_node_name = 1,
_Anum_chunk_data_node_node_name_idx_max,
};

struct FormData_chunk_data_node_node_name_idx
{
NameData node_name;
};

/************************************
*
* Tablespace table definitions
Expand Down Expand Up @@ -1456,7 +1468,7 @@ extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid
extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid);
extern TSDLLEXPORT void ts_catalog_delete_only(Relation rel, HeapTuple tuple);
extern TSDLLEXPORT void ts_catalog_delete(Relation rel, HeapTuple tuple);
extern void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);
extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);

bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey,
int num_keys, tuple_found_func tuple_found, LOCKMODE lockmode,
Expand Down
18 changes: 16 additions & 2 deletions src/ts_catalog/chunk_data_node.c
Expand Up @@ -106,14 +106,15 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data)
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_chunk_data_node form = (Form_chunk_data_node) GETSTRUCT(tuple);
ForeignServer *server;
ChunkDataNode *chunk_data_node;
MemoryContext old;

server = GetForeignServerByName(NameStr(form->node_name), false);
old = MemoryContextSwitchTo(ti->mctx);
chunk_data_node = palloc(sizeof(ChunkDataNode));
memcpy(&chunk_data_node->fd, form, sizeof(FormData_chunk_data_node));
chunk_data_node->foreign_server_oid =
get_foreign_server_oid(NameStr(form->node_name), /* missing_ok = */ false);
chunk_data_node->foreign_server_oid = server->serverid;
*nodes = lappend(*nodes, chunk_data_node);
MemoryContextSwitchTo(old);

Expand Down Expand Up @@ -339,3 +340,16 @@ ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id)
F_INT4EQ,
Int32GetDatum(chunk_id));
}

void
ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name)
{
it->ctx.index =
catalog_get_index(ts_catalog_get(), CHUNK_DATA_NODE, CHUNK_DATA_NODE_NODE_NAME_IDX);
ts_scan_iterator_scan_key_reset(it);
ts_scan_iterator_scan_key_init(it,
Anum_chunk_data_node_name_idx_node_name,
BTEqualStrategyNumber,
F_NAMEEQ,
CStringGetDatum(node_name));
}
7 changes: 5 additions & 2 deletions src/ts_catalog/chunk_data_node.h
Expand Up @@ -32,7 +32,10 @@ extern int ts_chunk_data_node_delete_by_node_name(const char *node_name);
extern TSDLLEXPORT List *
ts_chunk_data_node_scan_by_node_name_and_hypertable_id(const char *node_name, int32 hypertable_id,
MemoryContext mctx);
extern ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id);
extern TSDLLEXPORT ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it,
int32 chunk_id);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it,
const char *node_name);

#endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */
1 change: 1 addition & 0 deletions src/ts_catalog/dimension_partition.c
Expand Up @@ -8,6 +8,7 @@
#include <access/xact.h>
#include <catalog/catalog.h>
#include <commands/tablecmds.h>
#include <foreign/foreign.h>
#include <nodes/parsenodes.h>
#include <utils/array.h>
#include <utils/palloc.h>
Expand Down

0 comments on commit 142b70c

Please sign in to comment.