Skip to content

Commit

Permalink
Add function to alter data nodes
Browse files Browse the repository at this point in the history
Add a new function, `alter_data_node()`, which can be used to change a
data node's configuration on the access node, including connection
information like host, port, and database.

In addition, the function allows marking a node as not available,
which means other nodes will be used for, e.g., reads instead (a
different "default" data node is set for each chunk). For this to
work, all the chunks on the unavailable data node must have a replica
on other data nodes. If some chunks are not replicated, a warning will
be raised.

Closes timescale#2104
  • Loading branch information
erimatnor committed Oct 15, 2022
1 parent 066bcbe commit 4283de2
Show file tree
Hide file tree
Showing 20 changed files with 636 additions and 21 deletions.
9 changes: 9 additions & 0 deletions sql/ddl_api.sql
Expand Up @@ -216,3 +216,12 @@ CREATE OR REPLACE PROCEDURE @extschema@.refresh_continuous_aggregate(
window_start "any",
window_end "any"
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh';

CREATE OR REPLACE FUNCTION @extschema@.alter_data_node(
node_name NAME,
host TEXT = NULL,
database NAME = NULL,
port INTEGER = NULL,
available BOOLEAN = NULL
) RETURNS TABLE(node_name NAME, host TEXT, database NAME, port INTEGER, available BOOLEAN)
AS '@MODULE_PATHNAME@', 'ts_data_node_alter' LANGUAGE C VOLATILE;
2 changes: 2 additions & 0 deletions sql/pre_install/tables.sql
Expand Up @@ -251,6 +251,8 @@ CREATE TABLE _timescaledb_catalog.chunk_data_node (
CONSTRAINT chunk_data_node_chunk_id_fkey FOREIGN KEY (chunk_id) REFERENCES _timescaledb_catalog.chunk (id)
);

CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);

SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_data_node', '');

-- Default jobs are given the id space [1,1000). User-installed jobs and any jobs created inside tests
Expand Down
3 changes: 3 additions & 0 deletions sql/updates/latest-dev.sql
Expand Up @@ -175,3 +175,6 @@ $BODY$
UNION ALL
SELECT * FROM _chunk_sizes) AS sizes;
$BODY$ SET search_path TO pg_catalog, pg_temp;


CREATE INDEX chunk_data_node_node_name_idx ON _timescaledb_catalog.chunk_data_node (node_name);
2 changes: 2 additions & 0 deletions sql/updates/reverse-dev.sql
Expand Up @@ -115,3 +115,5 @@ $BODY$
SELECT * FROM _chunk_sizes) AS sizes;
$BODY$ SET search_path TO pg_catalog, pg_temp;


DROP INDEX IF EXISTS chunk_data_node_node_name_idx;
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Expand Up @@ -102,6 +102,7 @@ CROSSMODULE_WRAPPER(data_node_add);
CROSSMODULE_WRAPPER(data_node_delete);
CROSSMODULE_WRAPPER(data_node_attach);
CROSSMODULE_WRAPPER(data_node_detach);
CROSSMODULE_WRAPPER(data_node_alter);
CROSSMODULE_WRAPPER(chunk_drop_replica);

CROSSMODULE_WRAPPER(chunk_set_default_data_node);
Expand Down Expand Up @@ -504,6 +505,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.data_node_attach = error_no_default_fn_pg_community,
.data_node_ping = error_no_default_fn_pg_community,
.data_node_detach = error_no_default_fn_pg_community,
.data_node_alter = error_no_default_fn_pg_community,
.data_node_allow_new_chunks = error_no_default_fn_pg_community,
.data_node_block_new_chunks = error_no_default_fn_pg_community,
.distributed_exec = error_no_default_fn_pg_community,
Expand Down
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Expand Up @@ -158,6 +158,7 @@ typedef struct CrossModuleFunctions
PGFunction data_node_attach;
PGFunction data_node_ping;
PGFunction data_node_detach;
PGFunction data_node_alter;
PGFunction data_node_allow_new_chunks;
PGFunction data_node_block_new_chunks;

Expand Down
1 change: 1 addition & 0 deletions src/ts_catalog/catalog.c
Expand Up @@ -190,6 +190,7 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
.names = (char *[]) {
[CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX] = "chunk_data_node_node_chunk_id_node_name_key",
[CHUNK_DATA_NODE_NODE_NAME_IDX] = "chunk_data_node_node_name_idx",
}
},
[TABLESPACE] = {
Expand Down
12 changes: 12 additions & 0 deletions src/ts_catalog/catalog.h
Expand Up @@ -596,6 +596,7 @@ enum
{
CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX,
CHUNK_DATA_NODE_NODE_NAME_IDX,
_MAX_CHUNK_DATA_NODE_INDEX,
};

Expand Down Expand Up @@ -625,6 +626,17 @@ struct FormData_chunk_data_node_node_chunk_id_node_name_idx
NameData node_name;
};

enum Anum_chunk_data_node_node_name_idx
{
Anum_chunk_data_node_name_idx_node_name = 1,
_Anum_chunk_data_node_node_name_idx_max,
};

struct FormData_chunk_data_node_node_name_idx
{
NameData node_name;
};

/************************************
*
* Tablespace table definitions
Expand Down
13 changes: 13 additions & 0 deletions src/ts_catalog/chunk_data_node.c
Expand Up @@ -339,3 +339,16 @@ ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id)
F_INT4EQ,
Int32GetDatum(chunk_id));
}

void
ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it, const char *node_name)
{
it->ctx.index =
catalog_get_index(ts_catalog_get(), CHUNK_DATA_NODE, CHUNK_DATA_NODE_NODE_NAME_IDX);
ts_scan_iterator_scan_key_reset(it);
ts_scan_iterator_scan_key_init(it,
Anum_chunk_data_node_name_idx_node_name,
BTEqualStrategyNumber,
F_NAMEEQ,
CStringGetDatum(node_name));
}
7 changes: 5 additions & 2 deletions src/ts_catalog/chunk_data_node.h
Expand Up @@ -32,7 +32,10 @@ extern int ts_chunk_data_node_delete_by_node_name(const char *node_name);
extern TSDLLEXPORT List *
ts_chunk_data_node_scan_by_node_name_and_hypertable_id(const char *node_name, int32 hypertable_id,
MemoryContext mctx);
extern ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id);
extern TSDLLEXPORT ScanIterator ts_chunk_data_nodes_scan_iterator_create(MemoryContext result_mcxt);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_chunk_id(ScanIterator *it,
int32 chunk_id);
extern TSDLLEXPORT void ts_chunk_data_nodes_scan_iterator_set_node_name(ScanIterator *it,
const char *node_name);

#endif /* TIMESCALEDB_CHUNK_DATA_NODE_H */
61 changes: 51 additions & 10 deletions tsl/src/chunk.c
Expand Up @@ -13,6 +13,7 @@
#include <access/htup_details.h>
#include <access/xact.h>
#include <nodes/makefuncs.h>
#include <nodes/parsenodes.h>
#include <utils/acl.h>
#include <utils/builtins.h>
#include <utils/syscache.h>
Expand Down Expand Up @@ -66,7 +67,7 @@ chunk_match_data_node_by_server(const Chunk *chunk, const ForeignServer *server)
}

static bool
chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server)
chunk_set_foreign_server(const Chunk *chunk, const ForeignServer *new_server)
{
Relation ftrel;
HeapTuple tuple;
Expand Down Expand Up @@ -134,32 +135,72 @@ chunk_set_foreign_server(Chunk *chunk, ForeignServer *new_server)
return true;
}

void
chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id)
static bool
data_node_is_available(const ForeignServer *server)
{
ListCell *lc;

foreach (lc, server->options)
{
DefElem *elem = lfirst(lc);

if (strcmp(elem->defname, "available") == 0)
{
bool available = defGetBoolean(elem);

return available;
}
}

/* If no option is set, default to available=true */
return true;
}

bool
chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid existing_server_id)
{
ListCell *lc;
ChunkDataNode *new_server = NULL;
Chunk *chunk = ts_chunk_get_by_id(chunk_id, true);
ForeignTable *foreign_table = NULL;
ForeignServer *server = NULL;
bool new_data_node_found = false;

Assert(chunk->relkind == RELKIND_FOREIGN_TABLE);
foreign_table = GetForeignTable(chunk->table_id);

/* Cannot switch to other data node if only one or none assigned */
if (list_length(chunk->data_nodes) < 2)
return false;

/* no need to update since foreign table doesn't reference server we try to remove */
if (existing_server_id != foreign_table->serverid)
return;

Assert(list_length(chunk->data_nodes) > 1);
return false;

/* Pick the first "available" data node referenced by the chunk, which is
* not the existing data node. */
foreach (lc, chunk->data_nodes)
{
new_server = lfirst(lc);

if (new_server->foreign_server_oid != existing_server_id)
break;
{
server = GetForeignServer(new_server->foreign_server_oid);

if (data_node_is_available(server))
{
new_data_node_found = true;
break;
}
}
}

if (new_data_node_found)
{
Assert(server != NULL);
chunk_set_foreign_server(chunk, server);
}
Assert(new_server != NULL);

chunk_set_foreign_server(chunk, GetForeignServer(new_server->foreign_server_oid));
return new_data_node_found;
}

Datum
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/chunk.h
Expand Up @@ -10,7 +10,7 @@
#include <fmgr.h>
#include <chunk.h>

extern void chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id);
extern bool chunk_update_foreign_server_if_needed(const Chunk *chunk, Oid existing_server_id);
extern Datum chunk_set_default_data_node(PG_FUNCTION_ARGS);
extern Datum chunk_drop_replica(PG_FUNCTION_ARGS);
extern int chunk_invoke_drop_chunks(Oid relid, Datum older_than, Datum older_than_type);
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/chunk_api.c
Expand Up @@ -1767,6 +1767,6 @@ chunk_api_call_chunk_drop_replica(const Chunk *chunk, const char *node_name, Oid
* This chunk might have this data node as primary, change that association
* if so. Then delete the chunk_id and node_name association.
*/
chunk_update_foreign_server_if_needed(chunk->fd.id, serverid);
chunk_update_foreign_server_if_needed(chunk, serverid);
ts_chunk_data_node_delete_by_chunk_id_and_node_name(chunk->fd.id, node_name);
}

0 comments on commit 4283de2

Please sign in to comment.