Skip to content

Commit

Permalink
Add a UDF to use a distributed snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoslot committed Nov 11, 2022
1 parent 7358b82 commit 21b0500
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 28 deletions.
14 changes: 14 additions & 0 deletions src/backend/distributed/connection/connection_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,20 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
}


/*
* CloseConnectionList closes all connections in the given list.
*/
void
CloseConnectionList(List *connectionList)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
}


/*
* Close a previously established connection.
*/
Expand Down
25 changes: 18 additions & 7 deletions src/backend/distributed/connection/remote_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,24 +425,35 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)


/*
* ExecuteRemoteCommandInConnectionList executes a remote command, on all connections
* given in the list, that is critical to the transaction. If the command fails then
* the transaction aborts.
* SendCommandToConnectionList sends a given command over all connections
* in the list in parallel.
*/
void
ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *command)
SendRemoteCommandToConnectionList(List *connectionList, const char *command)
{
MultiConnection *connection = NULL;

foreach_ptr(connection, nodeConnectionList)
foreach_ptr(connection, connectionList)
{
int querySent = SendRemoteCommand(connection, command);

if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
}


/*
* ExecuteRemoteCommandInConnectionList executes a remote command, on all connections
* given in the list, that is critical to the transaction. If the command fails then
* the transaction aborts.
*/
void
ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *command)
{
MultiConnection *connection = NULL;

SendRemoteCommandToConnectionList(nodeConnectionList, command);

/* Process the result */
foreach_ptr(connection, nodeConnectionList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ citus_create_restore_point(PG_FUNCTION_ARGS)
* OpenConnectionsToAllNodes opens connections to all nodes and returns the list
* of connections.
*/
static List *
List *
OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode)
{
List *connectionList = NIL;
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--11.1-1--11.2-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_get_transaction_clock/11.2-1.sql"
#include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"
#include "udfs/citus_use_snapshot/11.1-1.sql"
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-- citus--11.2-1--11.1-1
#include "../udfs/get_rebalance_progress/11.1-1.sql"
#include "../udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
DROP FUNCTION pg_catalog.citus_use_snapshot();
DROP FUNCTION pg_catalog.citus_get_node_clock();
DROP FUNCTION pg_catalog.citus_get_transaction_clock();
DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_use_snapshot()
RETURNS void
LANGUAGE c
STRICT
AS '$libdir/citus', $function$citus_use_snapshot$function$;
COMMENT ON FUNCTION pg_catalog.citus_use_snapshot()
IS 'use a consistent a consistent distributed snapshot for the remainder of the transaction';

GRANT EXECUTE ON FUNCTION pg_catalog.citus_use_snapshot() TO PUBLIC;
19 changes: 14 additions & 5 deletions src/backend/distributed/transaction/remote_transaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/transaction/snapshot.h"
#include "distributed/transaction_identifier.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
Expand Down Expand Up @@ -85,14 +86,22 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)

StringInfo beginAndSetDistributedTransactionId = makeStringInfo();

/*
* Explicitly specify READ COMMITTED, the default on the remote
* side might have been changed, and that would cause problematic
* behaviour.
*/
/* append the BEGIN command */
appendStringInfoString(beginAndSetDistributedTransactionId,
BeginTransactionCommand());

/* when using a distributed snapshot, append SET TRANSACTION SNAPSHOT .. */
char *exportedSnapshotName = GetSnapshotNameForNode(connection->hostname,
connection->port,
connection->user,
connection->database);
if (exportedSnapshotName != NULL)
{
appendStringInfo(beginAndSetDistributedTransactionId,
"SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(exportedSnapshotName));
}

/* append context for in-progress SAVEPOINTs for this transaction */
List *activeSubXacts = ActiveSubXactContexts();
transaction->lastSuccessfulSubXact = TopSubTransactionId;
Expand Down
Loading

0 comments on commit 21b0500

Please sign in to comment.