Skip to content

Commit

Permalink
Do not drop sequences when dropping metadata (#5584)
Browse files Browse the repository at this point in the history
Dropping sequences means we need to recreate
and hence losing the sequence.

With this commit, we keep the existing sequences
such that resyncing wouldn't drop the sequence.

We do that by breaking the dependency of the sequence
from the table.
  • Loading branch information
onderkalaci committed Jan 6, 2022
1 parent 8007add commit 5305aa4
Show file tree
Hide file tree
Showing 15 changed files with 674 additions and 17 deletions.
15 changes: 15 additions & 0 deletions src/backend/distributed/commands/alter_table.c
Expand Up @@ -1372,6 +1372,21 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
schemaName, targetName);
SendCommandToWorkersWithMetadata(workerChangeSequenceDependencyCommand);
}
else if (ShouldSyncTableMetadata(sourceId))
{
/*
* We are converting a citus local table to a distributed/reference table,
* so we should prevent dropping the sequence on the table. Otherwise, we'd
* lose track of the previous changes in the sequence.
*/
StringInfo command = makeStringInfo();

appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency('%s');",
quote_qualified_identifier(schemaName, sourceName));

SendCommandToWorkersWithMetadata(command->data);
}
}

char *justBeforeDropCommand = NULL;
Expand Down
10 changes: 9 additions & 1 deletion src/backend/distributed/metadata/metadata_sync.c
Expand Up @@ -898,8 +898,16 @@ MetadataDropCommands(void)
dropSnapshotCommandList = list_concat(dropSnapshotCommandList,
detachPartitionCommandList);

/*
* We are re-creating the metadata, so not lose track of the
* sequences by preventing them dropped via DROP TABLE.
*/
dropSnapshotCommandList =
lappend(dropSnapshotCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);

dropSnapshotCommandList = lappend(dropSnapshotCommandList,
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
REMOVE_ALL_CITUS_TABLES_COMMAND);

dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/sql/citus--10.2-4--11.0-1.sql
Expand Up @@ -8,6 +8,8 @@

#include "udfs/citus_internal_add_object_metadata/11.0-1.sql"
#include "udfs/citus_run_local_command/11.0-1.sql"
#include "udfs/worker_drop_sequence_dependency/11.0-1.sql"


DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);
DROP FUNCTION pg_catalog.master_get_table_metadata(text);
Expand Down
Expand Up @@ -45,3 +45,4 @@ DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();

DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer);
DROP FUNCTION pg_catalog.citus_run_local_command(text);
DROP FUNCTION pg_catalog.worker_drop_sequence_dependency(text);

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

@@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_drop_sequence_dependency(table_name text);

CREATE OR REPLACE FUNCTION pg_catalog.worker_drop_sequence_dependency(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_sequence_dependency$$;
COMMENT ON FUNCTION pg_catalog.worker_drop_sequence_dependency(table_name text)
IS 'drop the Citus tables sequence dependency';
116 changes: 115 additions & 1 deletion src/backend/distributed/worker/worker_drop_protocol.c
Expand Up @@ -17,6 +17,10 @@
#include "access/heapam.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/pg_depend.h"
#if PG_VERSION_NUM < PG_VERSION_13
#include "catalog/pg_depend_d.h"
#endif
#include "catalog/pg_foreign_server.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/distribution_column.h"
Expand All @@ -29,9 +33,14 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"


PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
PG_FUNCTION_INFO_V1(worker_drop_sequence_dependency);


#if PG_VERSION_NUM < PG_VERSION_13
static long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype,
Oid refclassId, Oid refobjectId);
#endif

/*
* worker_drop_distributed_table drops the distributed table with the given oid,
Expand Down Expand Up @@ -153,3 +162,108 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)

PG_RETURN_VOID();
}


/*
* worker_drop_sequence_dependency is a UDF that removes the dependency
* of all the sequences for the given table.
*
* The main purpose of this UDF is to prevent dropping the sequences while
* re-creating the same table such as changing the shard count, converting
* a citus local table to a distributed table or re-syncing the metadata.
*/
Datum
worker_drop_sequence_dependency(PG_FUNCTION_ARGS)
{
text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName, true);

if (!OidIsValid(relationId))
{
ereport(NOTICE, (errmsg("relation %s does not exist, skipping",
text_to_cstring(relationName))));
PG_RETURN_VOID();
}

EnsureTableOwner(relationId);

/* break the dependent sequences from the table */
#if PG_VERSION_NUM >= PG_VERSION_13
List *ownedSequences = getOwnedSequences(relationId);
#else
List *ownedSequences = getOwnedSequences(relationId, InvalidAttrNumber);
#endif

Oid ownedSequenceOid = InvalidOid;
foreach_oid(ownedSequenceOid, ownedSequences)
{
/* the caller doesn't want to drop the sequence, so break the dependency */
deleteDependencyRecordsForSpecific(RelationRelationId, ownedSequenceOid,
DEPENDENCY_AUTO, RelationRelationId,
relationId);
}

if (list_length(ownedSequences) > 0)
{
/* if we delete at least one dependency, let next commands know */
CommandCounterIncrement();
}

PG_RETURN_VOID();
}


/* *INDENT-OFF* */
#if PG_VERSION_NUM < PG_VERSION_13

/*
* This function is already available on PG 13+.
* deleteDependencyRecordsForSpecific -- delete all records with given depender
* classId/objectId, dependee classId/objectId, of the given deptype.
* Returns the number of records deleted.
*/
static long
deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype,
Oid refclassId, Oid refobjectId)
{
long count = 0;
Relation depRel;
ScanKeyData key[2];
HeapTuple tup;

depRel = table_open(DependRelationId, RowExclusiveLock);

ScanKeyInit(&key[0],
Anum_pg_depend_classid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(classId));
ScanKeyInit(&key[1],
Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(objectId));

SysScanDesc scan =
systable_beginscan(depRel, DependDependerIndexId, true,
NULL, 2, key);

while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend depform = (Form_pg_depend) GETSTRUCT(tup);

if (depform->refclassid == refclassId &&
depform->refobjid == refobjectId &&
depform->deptype == deptype)
{
CatalogTupleDelete(depRel, &tup->t_self);
count++;
}
}

systable_endscan(scan);

table_close(depRel, RowExclusiveLock);

return count;
}
#endif
/* *INDENT-ON* */
5 changes: 4 additions & 1 deletion src/include/distributed/metadata_sync.h
Expand Up @@ -67,8 +67,11 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);

#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
#define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition"

#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'"
#define DISABLE_OBJECT_PROPAGATION "SET citus.enable_object_propagation TO 'off'"
Expand Down
3 changes: 2 additions & 1 deletion src/test/regress/expected/multi_extension.out
Expand Up @@ -1005,7 +1005,8 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_disable_node(text,integer,boolean) void
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void
| function citus_run_local_command(text) void
(9 rows)
| function worker_drop_sequence_dependency(text) void
(10 rows)

DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version
Expand Down

0 comments on commit 5305aa4

Please sign in to comment.