Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a citus_use_snapshot function for basic distributed snapshot isolation #6489

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/backend/distributed/connection/connection_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,20 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
}


/*
* CloseConnectionList closes all connections in the given list.
*/
void
CloseConnectionList(List *connectionList)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
}


/*
* Close a previously established connection.
*/
Expand Down
25 changes: 18 additions & 7 deletions src/backend/distributed/connection/remote_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,24 +425,35 @@ ExecuteCriticalRemoteCommand(MultiConnection *connection, const char *command)


/*
* ExecuteRemoteCommandInConnectionList executes a remote command, on all connections
* given in the list, that is critical to the transaction. If the command fails then
* the transaction aborts.
* SendCommandToConnectionList sends a given command over all connections
* in the list in parallel.
*/
void
ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *command)
SendRemoteCommandToConnectionList(List *connectionList, const char *command)
{
MultiConnection *connection = NULL;

foreach_ptr(connection, nodeConnectionList)
foreach_ptr(connection, connectionList)
{
int querySent = SendRemoteCommand(connection, command);

if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
}


/*
* ExecuteRemoteCommandInConnectionList executes a remote command, on all connections
* given in the list, that is critical to the transaction. If the command fails then
* the transaction aborts.
*/
void
ExecuteRemoteCommandInConnectionList(List *nodeConnectionList, const char *command)
{
MultiConnection *connection = NULL;

SendRemoteCommandToConnectionList(nodeConnectionList, command);

/* Process the result */
foreach_ptr(connection, nodeConnectionList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ citus_create_restore_point(PG_FUNCTION_ARGS)
* OpenConnectionsToAllNodes opens connections to all nodes and returns the list
* of connections.
*/
static List *
List *
OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode)
{
List *connectionList = NIL;
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--11.1-1--11.2-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ DROP FUNCTION pg_catalog.worker_append_table_to_shard(text, text, text, integer)
#include "udfs/citus_get_transaction_clock/11.2-1.sql"
#include "udfs/citus_is_clock_after/11.2-1.sql"
#include "udfs/citus_internal_adjust_local_clock_to_remote/11.2-1.sql"
#include "udfs/citus_use_snapshot/11.1-1.sql"
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-- citus--11.2-1--11.1-1
#include "../udfs/get_rebalance_progress/11.1-1.sql"
#include "../udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
DROP FUNCTION pg_catalog.citus_use_snapshot();
DROP FUNCTION pg_catalog.citus_get_node_clock();
DROP FUNCTION pg_catalog.citus_get_transaction_clock();
DROP FUNCTION pg_catalog.citus_internal_adjust_local_clock_to_remote(cluster_clock);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_use_snapshot()
RETURNS void
LANGUAGE c
STRICT
AS '$libdir/citus', $function$citus_use_snapshot$function$;
COMMENT ON FUNCTION pg_catalog.citus_use_snapshot()
IS 'use a consistent a consistent distributed snapshot for the remainder of the transaction';

GRANT EXECUTE ON FUNCTION pg_catalog.citus_use_snapshot() TO PUBLIC;
19 changes: 14 additions & 5 deletions src/backend/distributed/transaction/remote_transaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/transaction/snapshot.h"
#include "distributed/transaction_identifier.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
Expand Down Expand Up @@ -85,14 +86,22 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)

StringInfo beginAndSetDistributedTransactionId = makeStringInfo();

/*
* Explicitly specify READ COMMITTED, the default on the remote
* side might have been changed, and that would cause problematic
* behaviour.
*/
/* append the BEGIN command */
appendStringInfoString(beginAndSetDistributedTransactionId,
BeginTransactionCommand());

/* when using a distributed snapshot, append SET TRANSACTION SNAPSHOT .. */
char *exportedSnapshotName = GetSnapshotNameForNode(connection->hostname,
connection->port,
connection->user,
connection->database);
if (exportedSnapshotName != NULL)
{
appendStringInfo(beginAndSetDistributedTransactionId,
"SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(exportedSnapshotName));
}

/* append context for in-progress SAVEPOINTs for this transaction */
List *activeSubXacts = ActiveSubXactContexts();
transaction->lastSuccessfulSubXact = TopSubTransactionId;
Expand Down
Loading