Skip to content

Commit 1e4948a

Browse files
committed
checkpoint two-phase
1 parent d37e2cd commit 1e4948a

File tree

1 file changed

+72
-147
lines changed

1 file changed

+72
-147
lines changed

src/backend/access/transam/twophase.c

Lines changed: 72 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
170170
const TwoPhaseCallback callbacks[]);
171171
static void RemoveGXact(GlobalTransaction gxact);
172172

173-
174-
175-
static char twophase_buf[10*1024];
176-
static int twophase_pos = 0;
177-
size_t bogus_write(int fd, const void *buf, size_t nbytes);
178-
static char *XlogReadTwoPhaseData(XLogRecPtr lsn);
173+
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
179174

180175
/*
181176
* Initialization of shared memory
@@ -1033,14 +1028,8 @@ StartPrepare(GlobalTransaction gxact)
10331028
void
10341029
EndPrepare(GlobalTransaction gxact)
10351030
{
1036-
// PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037-
// TransactionId xid = pgxact->xid;
10381031
TwoPhaseFileHeader *hdr;
1039-
char path[MAXPGPATH];
10401032
StateFileChunk *record;
1041-
pg_crc32c statefile_crc;
1042-
// pg_crc32c bogus_crc;
1043-
int fd;
10441033

10451034
/* Add the end sentinel to the list of 2PC records */
10461035
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1061,72 +1050,7 @@ EndPrepare(GlobalTransaction gxact)
10611050
errmsg("two-phase state file maximum length exceeded")));
10621051

10631052
/*
1064-
* Create the 2PC state file.
1065-
*/
1066-
// TwoPhaseFilePath(path, xid);
1067-
1068-
// fd = OpenTransientFile(path,
1069-
// O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
1070-
// S_IRUSR | S_IWUSR);
1071-
fd = 1;
1072-
1073-
if (fd < 0)
1074-
ereport(ERROR,
1075-
(errcode_for_file_access(),
1076-
errmsg("could not create two-phase state file \"%s\": %m",
1077-
path)));
1078-
1079-
/* Write data to file, and calculate CRC as we pass over it */
1080-
INIT_CRC32C(statefile_crc);
1081-
1082-
for (record = records.head; record != NULL; record = record->next)
1083-
{
1084-
COMP_CRC32C(statefile_crc, record->data, record->len);
1085-
if ((bogus_write(fd, record->data, record->len)) != record->len)
1086-
{
1087-
CloseTransientFile(fd);
1088-
ereport(ERROR,
1089-
(errcode_for_file_access(),
1090-
errmsg("could not write two-phase state file: %m")));
1091-
}
1092-
}
1093-
1094-
FIN_CRC32C(statefile_crc);
1095-
1096-
// /*
1097-
// * Write a deliberately bogus CRC to the state file; this is just paranoia
1098-
// * to catch the case where four more bytes will run us out of disk space.
1099-
// */
1100-
// bogus_crc = ~statefile_crc;
1101-
1102-
// if ((bogus_write(fd, &bogus_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
1103-
// {
1104-
// CloseTransientFile(fd);
1105-
// ereport(ERROR,
1106-
// (errcode_for_file_access(),
1107-
// errmsg("could not write two-phase state file: %m")));
1108-
// }
1109-
1110-
// /* Back up to prepare for rewriting the CRC */
1111-
// if (lseek(fd, -((off_t) sizeof(pg_crc32c)), SEEK_CUR) < 0)
1112-
// {
1113-
// CloseTransientFile(fd);
1114-
// ereport(ERROR,
1115-
// (errcode_for_file_access(),
1116-
// errmsg("could not seek in two-phase state file: %m")));
1117-
// }
1118-
1119-
/*
1120-
* The state file isn't valid yet, because we haven't written the correct
1121-
* CRC yet. Before we do that, insert entry in WAL and flush it to disk.
1122-
*
1123-
* Between the time we have written the WAL entry and the time we write
1124-
* out the correct state file CRC, we have an inconsistency: the xact is
1125-
* prepared according to WAL but not according to our on-disk state. We
1126-
* use a critical section to force a PANIC if we are unable to complete
1127-
* the write --- then, WAL replay should repair the inconsistency. The
1128-
* odds of a PANIC actually occurring should be very tiny given that we
1129-
* were able to write the bogus CRC above.
1053+
* Now writing 2PC state data to WAL.
11301054
*
11311055
* We have to set delayChkpt here, too; otherwise a checkpoint starting
11321056
* immediately after the WAL record is inserted could complete without
@@ -1148,25 +1072,11 @@ EndPrepare(GlobalTransaction gxact)
11481072
XLogRegisterData(record->data, record->len);
11491073
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
11501074
XLogFlush(gxact->prepare_lsn);
1151-
gxact->prepare_xlogptr = ProcLastRecPtr;
1152-
1153-
// fprintf(stderr, "EndPrepare: %s={xlogptr:%X,lsn:%X, delta: %X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
11541075

11551076
/* If we crash now, we have prepared: WAL replay will fix things */
11561077

1157-
/* write correct CRC and close file */
1158-
if ((bogus_write(fd, &statefile_crc, sizeof(pg_crc32c))) != sizeof(pg_crc32c))
1159-
{
1160-
CloseTransientFile(fd);
1161-
ereport(ERROR,
1162-
(errcode_for_file_access(),
1163-
errmsg("could not write two-phase state file: %m")));
1164-
}
1165-
1166-
// if (CloseTransientFile(fd) != 0)
1167-
// ereport(ERROR,
1168-
// (errcode_for_file_access(),
1169-
// errmsg("could not close two-phase state file: %m")));
1078+
/* Store record's start location to read that later on Commit */
1079+
gxact->prepare_xlogptr = ProcLastRecPtr;
11701080

11711081
/*
11721082
* Mark the prepared transaction as valid. As soon as xact.c marks
@@ -1198,6 +1108,11 @@ EndPrepare(GlobalTransaction gxact)
11981108

11991109
END_CRIT_SECTION();
12001110

1111+
1112+
fprintf(stderr, "EndPrepare: %s=(%d,%d,%d,%d,%d)\n", gxact->gid, hdr->xid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1113+
fprintf(stderr, "EndPrepare: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1114+
1115+
12011116
/*
12021117
* Wait for synchronous replication, if required.
12031118
*
@@ -1254,7 +1169,7 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
12541169
if (give_warnings)
12551170
ereport(WARNING,
12561171
(errcode_for_file_access(),
1257-
errmsg("could not open two-phase state file \"%s\": %m",
1172+
errmsg("ReadTwoPhaseFile: could not open two-phase state file \"%s\": %m",
12581173
path)));
12591174
return NULL;
12601175
}
@@ -1395,9 +1310,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
13951310
/*
13961311
* Read and validate the state file
13971312
*/
1398-
// buf = ReadTwoPhaseFile(xid, true);
1399-
// buf = twophase_buf;
1400-
buf = XlogReadTwoPhaseData(gxact->prepare_xlogptr);
1313+
if (gxact->prepare_lsn <= GetRedoRecPtr())
1314+
buf = ReadTwoPhaseFile(xid, true);
1315+
else
1316+
XlogReadTwoPhaseData(gxact->prepare_xlogptr, &buf, NULL);
14011317

14021318
/*
14031319
* Disassemble the header area
@@ -1417,9 +1333,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
14171333
/* compute latestXid among all children */
14181334
latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
14191335

1420-
1421-
// fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d)\n", gxact->gid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1422-
// fprintf(stderr, "FinishPrepared: %s={xlogptr:%X,lsn:%X,delta:%X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1336+
fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d,%d)\n", gxact->gid, hdr->xid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1337+
fprintf(stderr, "FinishPrepared: %s={xlogptr:%lX,lsn:%lX,delta:%lX}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
14231338

14241339
Assert(hdr->nsubxacts == 0);
14251340
Assert(hdr->ncommitrels == 0);
@@ -1508,7 +1423,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
15081423
/*
15091424
* And now we can clean up our mess.
15101425
*/
1511-
RemoveTwoPhaseFile(xid, true);
1426+
// RemoveTwoPhaseFile(xid, true);
15121427

15131428
RemoveGXact(gxact);
15141429
MyLockedGxact = NULL;
@@ -1551,16 +1466,15 @@ ProcessRecords(char *bufptr, TransactionId xid,
15511466
void
15521467
RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
15531468
{
1554-
// char path[MAXPGPATH];
1555-
1556-
// TwoPhaseFilePath(path, xid);
1557-
// if (unlink(path))
1558-
// if (errno != ENOENT || giveWarning)
1559-
// ereport(WARNING,
1560-
// (errcode_for_file_access(),
1561-
// errmsg("could not remove two-phase state file \"%s\": %m",
1562-
// path)));
1563-
twophase_pos = 0;
1469+
char path[MAXPGPATH];
1470+
1471+
TwoPhaseFilePath(path, xid);
1472+
if (unlink(path))
1473+
if (errno != ENOENT || giveWarning)
1474+
ereport(WARNING,
1475+
(errcode_for_file_access(),
1476+
errmsg("could not remove two-phase state file \"%s\": %m",
1477+
path)));
15641478
}
15651479

15661480
/*
@@ -1575,6 +1489,8 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
15751489
pg_crc32c statefile_crc;
15761490
int fd;
15771491

1492+
fprintf(stderr, "RecreateTwoPhaseFile called xid=%d, len=%d\n", xid, len);
1493+
15781494
/* Recompute CRC */
15791495
INIT_CRC32C(statefile_crc);
15801496
COMP_CRC32C(statefile_crc, content, len);
@@ -1646,6 +1562,7 @@ void
16461562
CheckPointTwoPhase(XLogRecPtr redo_horizon)
16471563
{
16481564
TransactionId *xids;
1565+
XLogRecPtr *xlogptrs;
16491566
int nxids;
16501567
char path[MAXPGPATH];
16511568
int i;
@@ -1667,6 +1584,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16671584
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
16681585

16691586
xids = (TransactionId *) palloc(max_prepared_xacts * sizeof(TransactionId));
1587+
xlogptrs = (XLogRecPtr *) palloc(max_prepared_xacts * sizeof(XLogRecPtr));
16701588
nxids = 0;
16711589

16721590
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
@@ -1675,10 +1593,14 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16751593
{
16761594
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
16771595
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1596+
int j;
16781597

1679-
if (gxact->valid &&
1680-
gxact->prepare_lsn <= redo_horizon)
1681-
xids[nxids++] = pgxact->xid;
1598+
if (gxact->valid && gxact->prepare_lsn <= redo_horizon){
1599+
j = nxids++;
1600+
xids[j] = pgxact->xid;
1601+
xlogptrs[j] = gxact->prepare_xlogptr;
1602+
}
1603+
16821604
}
16831605

16841606
LWLockRelease(TwoPhaseStateLock);
@@ -1687,32 +1609,39 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
16871609
{
16881610
TransactionId xid = xids[i];
16891611
int fd;
1612+
int len;
1613+
char *buf;
16901614

16911615
TwoPhaseFilePath(path, xid);
16921616

1617+
fprintf(stderr, "CheckPointTwoPhase: %lX\n", xlogptrs[i]);
1618+
16931619
fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1694-
if (fd < 0)
1620+
1621+
if (fd < 0 && errno == ENOENT)
16951622
{
1696-
if (errno == ENOENT)
1697-
{
1698-
/* OK if gxact is no longer valid */
1699-
if (!TransactionIdIsPrepared(xid))
1700-
continue;
1701-
/* Restore errno in case it was changed */
1702-
errno = ENOENT;
1703-
}
1704-
ereport(ERROR,
1705-
(errcode_for_file_access(),
1706-
errmsg("could not open two-phase state file \"%s\": %m",
1707-
path)));
1708-
}
1623+
fprintf(stderr, "CheckPointTwoPhase: %d <-> %d \n", errno, ENOENT);
17091624

1710-
if (pg_fsync(fd) != 0)
1625+
/* OK if gxact is no longer valid */
1626+
if (!TransactionIdIsPrepared(xid))
1627+
continue;
1628+
1629+
/* Re-create file */
1630+
XlogReadTwoPhaseData(xlogptrs[i], &buf, &len);
1631+
RecreateTwoPhaseFile(xid, buf, len);
1632+
fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);
1633+
1634+
if (fd < 0)
1635+
ereport(ERROR,
1636+
(errcode_for_file_access(),
1637+
errmsg("CheckPointTwoPhase: could not open two-phase state file after re-creating \"%s\": %m",
1638+
path)));
1639+
}
1640+
else if (fd < 0)
17111641
{
1712-
CloseTransientFile(fd);
17131642
ereport(ERROR,
17141643
(errcode_for_file_access(),
1715-
errmsg("could not fsync two-phase state file \"%s\": %m",
1644+
errmsg("CheckPointTwoPhase: could not open two-phase state file \"%s\": %m",
17161645
path)));
17171646
}
17181647

@@ -2239,18 +2168,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
22392168

22402169
/**********************************************************************************/
22412170

2242-
2243-
size_t
2244-
bogus_write(int fd, const void *buf, size_t nbytes)
2245-
{
2246-
memcpy(twophase_buf + twophase_pos, buf, nbytes);
2247-
twophase_pos += nbytes;
2248-
return nbytes;
2249-
}
2250-
2251-
2252-
static char *
2253-
XlogReadTwoPhaseData(XLogRecPtr lsn)
2171+
void
2172+
XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
22542173
{
22552174
XLogRecord *record;
22562175
XLogReaderState *xlogreader;
@@ -2261,10 +2180,16 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
22612180
fprintf(stderr, "xlogreader == NULL\n");
22622181

22632182
record = XLogReadRecord(xlogreader, lsn, &errormsg);
2183+
22642184
if (record == NULL)
2265-
{
22662185
fprintf(stderr, "XLogReadRecord error\n");
2267-
}
22682186

2269-
return XLogRecGetData(xlogreader);
2187+
if (len != NULL)
2188+
*len = XLogRecGetDataLen(xlogreader);
2189+
*buf = XLogRecGetData(xlogreader);
22702190
}
2191+
2192+
2193+
2194+
2195+

0 commit comments

Comments
 (0)