Skip to content

Commit

Permalink
implement trigger support
Browse files Browse the repository at this point in the history
rename triggers on the shard
re-create the trigger on shell table
  • Loading branch information
onurctirtir committed May 20, 2020
1 parent 455850d commit 8eb5401
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ CreateCitusLocalTable(Oid relationOid)
List *foreignConstraintCommands =
GetForeignConstraintCommandsTableInvolved(relationOid);

List *triggerCommands = GetTableTriggerCommands(relationOid);

/* include DEFAULT clauses for columns getting their default values from a sequence */
bool includeSequenceDefaults = true;

Expand All @@ -572,6 +574,7 @@ CreateCitusLocalTable(Oid relationOid)
List *tableDDLEvents = GetTableDDLEvents(relationOid, includeSequenceDefaults,
includeForeignTableDependencies);
tableDDLEvents = list_concat(tableDDLEvents, foreignConstraintCommands);
tableDDLEvents = list_concat(tableDDLEvents, triggerCommands);

uint64 shardId = GetNextShardId();

Expand Down
103 changes: 103 additions & 0 deletions src/backend/distributed/commands/trigger.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*-------------------------------------------------------------------------
* trigger.c
*
* This file contains functions to create and process triggers objects on
* citus tables.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/pg_version_constants.h"

#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "access/genam.h"
#if PG_VERSION_NUM >= PG_VERSION_12
#include "access/table.h"
#else
#include "access/heapam.h"
#include "access/htup_details.h"
#endif
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_trigger.h"
#include "utils/fmgroids.h"

/*
* GetTableTriggerCommands returns the list of DDL commands to (re)create
* triggers for the given table.
*/
List *
GetTableTriggerCommands(Oid relationOid)
{
List *triggerDDLEventList = NIL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;

/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);

bool useIndex = true;

Relation pgTrigger = heap_open(TriggerRelationId, AccessShareLock);

ScanKeyInit(&scanKey[0], Anum_pg_trigger_tgrelid,
BTEqualStrategyNumber, F_OIDEQ, relationOid);

SysScanDesc scanDescriptor = systable_beginscan(pgTrigger, TriggerRelidNameIndexId,
useIndex, NULL, scanKeyCount,
scanKey);

HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Oid triggerId = get_relation_trigger_oid_compat(heapTuple);
char *statementDef = pg_get_triggerdef_command(triggerId);

triggerDDLEventList = lappend(triggerDDLEventList, statementDef);

heapTuple = systable_getnext(scanDescriptor);
}

systable_endscan(scanDescriptor);
heap_close(pgTrigger, NoLock);

/* revert back to original search_path */
PopOverrideSearchPath();

return triggerDDLEventList;
}


/*
* get_relation_trigger_oid_compat returns OID of the trigger represented
* by the constraintForm, which is passed as an heapTuple. OID of the
* trigger is already stored in the triggerForm struct if major PostgreSQL
* version is 12. However, in the older versions, we should utilize
* HeapTupleGetOid to deduce that OID with no cost.
*/
Oid
get_relation_trigger_oid_compat(HeapTuple heapTuple)
{
Assert(heapTuple != NULL);

Oid triggerOid = InvalidOid;

#if PG_VERSION_NUM >= PG_VERSION_12
Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(heapTuple);
triggerOid = triggerForm->oid;
#else
triggerOid = HeapTupleGetOid(heapTuple);
#endif

return triggerOid;
}
8 changes: 8 additions & 0 deletions src/backend/distributed/deparser/ruleutils_11.c
Original file line number Diff line number Diff line change
Expand Up @@ -7485,6 +7485,14 @@ get_tablesample_def(TableSampleClause *tablesample, deparse_context *context)
}
}

char *
pg_get_triggerdef_command(Oid triggerId)
{
Assert(OidIsValid(triggerId));

return pg_get_triggerdef_worker(triggerId, false);
}

static char *
pg_get_triggerdef_worker(Oid trigid, bool pretty)
{
Expand Down
8 changes: 8 additions & 0 deletions src/backend/distributed/deparser/ruleutils_12.c
Original file line number Diff line number Diff line change
Expand Up @@ -7485,6 +7485,14 @@ get_tablesample_def(TableSampleClause *tablesample, deparse_context *context)
}
}

char *
pg_get_triggerdef_command(Oid triggerId)
{
Assert(OidIsValid(triggerId));

return pg_get_triggerdef_worker(triggerId, false);
}

static char *
pg_get_triggerdef_worker(Oid trigid, bool pretty)
{
Expand Down
90 changes: 90 additions & 0 deletions src/backend/distributed/master/master_create_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_constraint_d.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_trigger_d.h"
#include "commands/tablecmds.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
Expand Down Expand Up @@ -75,6 +77,8 @@ static List * GetRelationConstraintNames(Oid relationOid);
static void RenameForeignConstraintsReferencingToShard(Oid relationOid, uint64 shardId);
static void RenameShardRelationIndexes(Oid relationOid, uint64 shardId);
static List * GetShardRelationIndexNames(Oid relationOid);
static void RenameShardRelationTriggers(Oid relationOid, uint64 shardId);
static List * GetShardRelationTriggerNames(Oid relationOid);

/*
* master_create_worker_shards is a user facing function to create worker shards
Expand Down Expand Up @@ -419,6 +423,7 @@ CreateCitusLocalTableShard(Oid relationOid, uint64 shardId)
RenameShardRelationConstraints(relationOid, shardId);
RenameForeignConstraintsReferencingToShard(relationOid, shardId);
RenameShardRelationIndexes(relationOid, shardId);
RenameShardRelationTriggers(relationOid, shardId);
}


Expand Down Expand Up @@ -689,6 +694,91 @@ GetShardRelationIndexNames(Oid relationOid)
}


/*
* RenameShardRelationTriggers appends given shardId to the end of the names
* of shard relation triggers.
*/
static void
RenameShardRelationTriggers(Oid relationOid, uint64 shardId)
{
List *triggerNameList = GetShardRelationTriggerNames(relationOid);

char *triggerName = NULL;
foreach_ptr(triggerName, triggerNameList)
{
char *shardRelationName = get_rel_name(relationOid);

char *shardTriggerName = pstrdup(triggerName);
AppendShardIdToName(&shardTriggerName, shardId);

StringInfo renameCommand = makeStringInfo();
appendStringInfo(renameCommand, "ALTER TRIGGER %s ON %s RENAME TO %s;",
triggerName, shardRelationName, shardTriggerName);

const char *commandString = renameCommand->data;

Node *parseTree = ParseTreeNode(commandString);

CitusProcessUtility(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}
}


/*
* GetShardRelationTriggerNames returns a list of trigger names defined on the
* relation with relationOid.
*/
static List *
GetShardRelationTriggerNames(Oid relationOid)
{
List *triggerNames = NIL;

ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool useIndex = true;

/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);

Relation pgTrigger = heap_open(TriggerRelationId, AccessShareLock);

ScanKeyInit(&scanKey[0], Anum_pg_trigger_tgrelid,
BTEqualStrategyNumber, F_OIDEQ, relationOid);

SysScanDesc scanDescriptor = systable_beginscan(pgTrigger, TriggerRelidNameIndexId,
useIndex, NULL, scanKeyCount,
scanKey);

HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_trigger triggerForm = (Form_pg_trigger) GETSTRUCT(heapTuple);

char *triggerName = NameStr(triggerForm->tgname);

triggerNames = lappend(triggerNames, pstrdup(triggerName));

heapTuple = systable_getnext(scanDescriptor);
}

systable_endscan(scanDescriptor);
heap_close(pgTrigger, AccessShareLock);

/* revert back to original search_path */
PopOverrideSearchPath();

return triggerNames;
}


/*
* CheckHashPartitionedTable looks up the partition information for the given
* tableId and checks if the table is hash partitioned. If not, the function
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/citus_ruleutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ extern void pg_get_query_def(Query *query, StringInfo buffer);
char * pg_get_rule_expr(Node *expression);
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
StringInfo buffer);
extern char * pg_get_triggerdef_command(Oid triggerId);
extern char * generate_relation_name(Oid relid, List *namespaces);
extern char * generate_qualified_relation_name(Oid relid);
extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);
Expand Down
4 changes: 4 additions & 0 deletions src/include/distributed/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ extern ObjectWithArgs * ObjectWithArgsFromOid(Oid funcOid);
/* vacuum.c - forward declarations */
extern void PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);

/* trigger.c - forward declarations */
extern List * GetTableTriggerCommands(Oid relationOid);
extern Oid get_relation_trigger_oid_compat(HeapTuple heapTuple);

extern bool ShouldPropagateSetCommand(VariableSetStmt *setStmt);
extern void PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setCommand);

Expand Down

0 comments on commit 8eb5401

Please sign in to comment.