Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6X] Add distributed snapshot support to pg_export_snapshot #13470

Merged
merged 3 commits into from May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/backend/cdb/cdbdistributedsnapshot.c
Expand Up @@ -21,6 +21,13 @@
#include "cdb/cdbvars.h"
#include "utils/guc.h"
#include "utils/snapmgr.h"
#include "storage/procarray.h"

int
GetMaxSnapshotDistributedXidCount()
{
return GetMaxSnapshotXidCount();
}

/*
* DistributedSnapshotWithLocalMapping_CommittedTest
Expand Down
15 changes: 4 additions & 11 deletions src/backend/cdb/cdbtm.c
Expand Up @@ -133,19 +133,12 @@ static void sendWaitGxidsToQD(List *waitGxids);

extern void CheckForResetSession(void);

/**
* All assignments of the global DistributedTransactionContext should go through this function
* (so we can add logging here to see all assignments)
*
* @param context the new value for DistributedTransactionContext
*/
static void
void
setDistributedTransactionContext(DtxContext context)
{
/*
* elog(INFO, "Setting DistributedTransactionContext to '%s'",
* DtxContextToString(context));
*/
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"Setting DistributedTransactionContext to '%s'",
DtxContextToString(context));
DistributedTransactionContext = context;
}

Expand Down
4 changes: 2 additions & 2 deletions src/backend/storage/ipc/sinval.c
Expand Up @@ -360,12 +360,12 @@ ProcessCatchupEvent(void)
* Save distributed transaction context first.
*/
saveDistributedTransactionContext = DistributedTransactionContext;
DistributedTransactionContext = DTX_CONTEXT_LOCAL_ONLY;
setDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY);

StartTransactionCommand();
CommitTransactionCommand();

DistributedTransactionContext = saveDistributedTransactionContext;
setDistributedTransactionContext(saveDistributedTransactionContext);
}

if (notify_enabled)
Expand Down
75 changes: 74 additions & 1 deletion src/backend/utils/time/snapmgr.c
Expand Up @@ -62,6 +62,8 @@
#include "utils/syscache.h"
#include "utils/tqual.h"

#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
#include "utils/guc.h"
Expand Down Expand Up @@ -449,7 +451,13 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
* two variables in exported snapshot files, but it seems better to have
* snapshot importers compute reasonably up-to-date values for them.)
*/
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, DistributedTransactionContext);

/*
* Pass in DTX_CONTEXT_LOCAL_ONLY to prevent a distributed snapshot from
* getting created here, as we will be using the one we parsed in
* ImportSnapshot.
*/
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData, DTX_CONTEXT_LOCAL_ONLY);

/*
* Now copy appropriate fields from the source snapshot.
Expand All @@ -466,6 +474,16 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
sourcesnap->subxcnt * sizeof(TransactionId));
CurrentSnapshot->suboverflowed = sourcesnap->suboverflowed;
CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;

/*
* GPDB: Copy over distributed snapshot if present.
*/
if (sourcesnap->haveDistribSnapshot)
{
CurrentSnapshot->haveDistribSnapshot = true;
DistributedSnapshot_Copy(&CurrentSnapshot->distribSnapshotWithLocalMapping.ds,
&sourcesnap->distribSnapshotWithLocalMapping.ds);
}
/* NB: curcid should NOT be copied, it's a local matter */

/*
Expand Down Expand Up @@ -1014,6 +1032,7 @@ ExportSnapshot(Snapshot snapshot)
char path[MAXPGPATH];
char pathtmp[MAXPGPATH];

DistributedSnapshot *distributed_snapshot;
/*
* It's tempting to call RequireTransactionChain here, since it's not very
* useful to export a snapshot that will disappear immediately afterwards.
Expand Down Expand Up @@ -1117,6 +1136,21 @@ ExportSnapshot(Snapshot snapshot)
}
appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);

/*
* GPDB: Serialize distributed snapshot if present.
*/
if (snapshot->haveDistribSnapshot)
{
distributed_snapshot = &snapshot->distribSnapshotWithLocalMapping.ds;
appendStringInfo(&buf, "dsxminall:%u\n", distributed_snapshot->xminAllDistributedSnapshots);
appendStringInfo(&buf, "dsid:%d\n", distributed_snapshot->distribSnapshotId);
appendStringInfo(&buf, "dsxmin:%u\n", distributed_snapshot->xmin);
appendStringInfo(&buf, "dsxmax:%u\n", distributed_snapshot->xmax);
appendStringInfo(&buf, "dsxcnt:%d\n", distributed_snapshot->count);
for (i = 0; i < distributed_snapshot->count; i++)
appendStringInfo(&buf, "dsxip:%u\n", distributed_snapshot->inProgressXidArray[i]);
}

/*
* Now write the text representation into a file. We first write to a
* ".tmp" filename, and rename to final filename if no error. This
Expand Down Expand Up @@ -1245,12 +1279,14 @@ ImportSnapshot(const char *idstr)
struct stat stat_buf;
char *filebuf;
int xcnt;
int dxcnt;
int i;
TransactionId src_xid;
Oid src_dbid;
int src_isolevel;
bool src_readonly;
SnapshotData snapshot;
DistributedSnapshot *distributed_snapshot;

/*
* Must be at top level of a fresh transaction. Note in particular that
Expand Down Expand Up @@ -1355,6 +1391,43 @@ ImportSnapshot(const char *idstr)

snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);

/*
* GPDB: Extract distributed snapshot
* Importing a distributed snapshot in utility mode is not allowed because
* functionality to dispatch pg_export_snapshot to all segments and create
* a snapshot with ds fields in each segments datadir is not implemented.
* Since there is no reliable way to export a utility mode distributed snapshot,
* we have no way to judge its provenance.
*/
if(*filebuf != '\0')
{
if (Gp_role == GP_ROLE_UTILITY) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("cannot import distributed snapshot in utility mode"),
errhint("export the snapshot in utility mode")));
}
distributed_snapshot = &snapshot.distribSnapshotWithLocalMapping.ds;
distributed_snapshot->xminAllDistributedSnapshots = parseXidFromText("dsxminall:", &filebuf, path);
distributed_snapshot->distribSnapshotId = parseIntFromText("dsid:", &filebuf, path);
distributed_snapshot->xmin = parseXidFromText("dsxmin:", &filebuf, path);
distributed_snapshot->xmax = parseXidFromText("dsxmax:", &filebuf, path);
distributed_snapshot->count = dxcnt = parseIntFromText("dsxcnt:", &filebuf, path);

/* sanity-check dsxmin and the xid count before palloc */
if (distributed_snapshot->xmin < distributed_snapshot->xminAllDistributedSnapshots ||
(dxcnt < 0 || dxcnt > GetMaxSnapshotDistributedXidCount())
)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid snapshot data in file \"%s\"", path)));

distributed_snapshot->inProgressXidArray = (DistributedTransactionId *) palloc(dxcnt * sizeof(DistributedTransactionId));
for (i = 0; i < dxcnt; i++)
distributed_snapshot->inProgressXidArray[i] = parseXidFromText("dsxip:", &filebuf, path);
snapshot.haveDistribSnapshot = true;
}

/*
* Do some additional sanity checking, just to protect ourselves. We
* don't trouble to check the array elements, just the most critical
Expand Down
2 changes: 2 additions & 0 deletions src/include/cdb/cdbdistributedsnapshot.h
Expand Up @@ -74,6 +74,8 @@ typedef enum
DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE
} DistributedSnapshotCommitted;

extern int GetMaxSnapshotDistributedXidCount(void);

extern DistributedSnapshotCommitted DistributedSnapshotWithLocalMapping_CommittedTest(
DistributedSnapshotWithLocalMapping *dslm,
TransactionId localXid,
Expand Down
2 changes: 2 additions & 0 deletions src/include/cdb/cdbtm.h
Expand Up @@ -285,6 +285,8 @@ extern DistributedTransactionTimeStamp getDistributedTransactionTimestamp(void);
extern bool getDistributedTransactionIdentifier(char *id);

extern void resetGxact(void);
extern void setDistributedTransactionContext(DtxContext context);

extern void prepareDtxTransaction(void);
extern bool isPreparedDtxTransaction(void);
extern bool notifyCommittedDtxTransactionIsNeeded(void);
Expand Down