Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer transaction statements to disk to prevent replay deadlock #505

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/bin/pgcopydb/copydb.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ copydb_prepare_filepaths(CopyFilePaths *cfPaths,
"%s/lsn.json",
cfPaths->cdc.dir);

sformat(cfPaths->cdc.txnlatestfile, MAXPGPATH,
"%s/txn.latest.sql",
cfPaths->cdc.dir);

/*
* Now prepare the "compare" files we need to compare schema and data
* between the source and target instance.
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/copydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
{ "extra_float_digits", "3" }, \
{ "statement_timeout", "0" }, \
{ "default_transaction_read_only", "off" }

/*
* These parameters are added to the connection strings, unless the user has
* added them, allowing user-defined values to be taken into account.
Expand Down
1 change: 1 addition & 0 deletions src/bin/pgcopydb/copydb_paths.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct CDCPaths
char tlifile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/tli */
char tlihistfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/tli.history */
char lsntrackingfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/lsn.json */
char txnlatestfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/txn.latest.sql */
} CDCPaths;


Expand Down
309 changes: 307 additions & 2 deletions src/bin/pgcopydb/ld_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,17 @@
typedef struct ReplayStreamCtx
{
StreamApplyContext applyContext;

bool isTxnBuffering;
bool skipTxnBuffering;

FILE *txnBuffer;

char currentTxnFileName[MAXPGPATH];
} ReplayStreamCtx;

static bool streamReplayLineIntoBuffer(void *ctx, const char *line, bool *stop);
static bool streamBufferedXID(ReplayStreamCtx *replayCtx);

/*
* stream_apply_replay implements "live replay" of the changes from the source
Expand Down Expand Up @@ -84,7 +93,7 @@ stream_apply_replay(StreamSpecs *specs)
* Setup our PIPE reading callback function and read from the PIPE.
*/
ReadFromStreamContext readerContext = {
.callback = stream_replay_line,
.callback = streamReplayLineIntoBuffer,
.ctx = &ctx
};

Expand Down Expand Up @@ -174,7 +183,6 @@ stream_replay_line(void *ctx, const char *line, bool *stop)
StreamApplyContext *context = &(replayCtx->applyContext);

LogicalMessageMetadata metadata = { 0 };

if (!parseSQLAction((char *) line, &metadata))
{
/* errors have already been logged */
Expand Down Expand Up @@ -249,3 +257,300 @@ stream_replay_line(void *ctx, const char *line, bool *stop)

return true;
}


/*
* streamBufferedXID reads a transaction buffer file and streams it into the
* target database.
*/
static bool
streamBufferedXID(ReplayStreamCtx *replayCtx)
{
/* Open replaybuffer in read-only mode to use it as input stream */
FILE *txnBufferForRead = fopen_read_only(replayCtx->currentTxnFileName);

if (txnBufferForRead == NULL)
{
log_error("Failed to open transaction buffer file \"%s\" in "
"readonly mode: %m",
replayCtx->currentTxnFileName);
return false;
}

ReadFromStreamContext readerContext = {
.callback = stream_replay_line,
.ctx = replayCtx
};

if (!read_from_stream(txnBufferForRead, &readerContext))
{
log_error("Failed to read from replay buffer, "
"see above for details");
return false;
}

fclose(txnBufferForRead);

return true;
}


/* streamReplayLineIntoBuffer is a callback function for the ReadFromStreamContext
* and read_from_stream infrastructure. It's called on each line read from a
* stream such as a unix pipe and buffers the line into a file when we encounter
* a COMMIT without a valid txnCommitLSN.
*
* This ensures that the apply process doesn't block the generation of the
* transaction metadata file created by the transform process on arrival of
* a COMMIT message.
*/
static bool
streamReplayLineIntoBuffer(void *ctx, const char *line, bool *stop)
{
ReplayStreamCtx *replayCtx = (ReplayStreamCtx *) ctx;
StreamApplyContext *context = &(replayCtx->applyContext);

LogicalMessageMetadata metadata = { 0 };

if (!parseSQLAction((char *) line, &metadata))
{
/* errors have already been logged */
return false;
}

/* We don't use this metadata in this function */
free(metadata.jsonBuffer);
dimitri marked this conversation as resolved.
Show resolved Hide resolved


if (metadata.action == STREAM_ACTION_BEGIN)
{
/*
* Enable transaction buffering only when the BEGIN doesn't have
* valid commit LSN, otherwise we can skip buffering and stream
* the transaction directly.
*/
replayCtx->skipTxnBuffering = metadata.txnCommitLSN != InvalidXLogRecPtr;
if (replayCtx->skipTxnBuffering)
{
return stream_replay_line(ctx, line, stop);
}

if (replayCtx->isTxnBuffering)
{
/*
* When the follow switches from prefetch to replay mode, we
* call stream_transform_stream which might stream the partially
* written transaction created during the prefetch mode.
*
* For example, lets consider the partially written C.sql file,
*
* BEGIN -- {"xid": 1000, "commitLSN": "0/1234"};
* INSERT INTO C VALUES (1);
* INSERT INTO C VALUES (2);
* KEEPALIVE;
*
* After switching to the replay mode, logical decoding will resume
* from consistent point which again starts with a valid transcation
* block.
Comment on lines +352 to +354
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the LSN value for that consistent point available in the context here? If yes, I believe we should be using it to make sure we are skipping transactions as intended. If no, then we should add to the comment why not, so that another PR later can go and fix the situation if we feel like it should be fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimitri This is a great point ! I think the current ignore logic implemented in this PR might cause inconsistency where it might ignore the partial transaction, but applies the next transaction because it has valid BEGIN/COMMIT block.

I need to have a another deep look now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimitri I was wrong in my previous comment.

While switching to replay mode, follow ensures that the pending LSNs in transform queue has been processed and applied to the target. This ensures everything except the last the partial transaction has been successfully committed into the target. After switching to replay mode, stream_transform_resume again streams the last partial file(which is already applied by the previous step) to the replay process which leads to this scenario. I think streaming last partial .sql from stream_transform_resume is redundant and shall be removed and this shouldn't cause any issue. WDYT?

Copy link
Contributor Author

@arajkumar arajkumar Oct 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering are these PRs #348, #277 trying to do the same in different ways?

I no longer see partial txn when I remove part of the code from stream_transform_resume which streams partial sql., I didn't see any data loss either.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC the transform parts need to parse/load the latest file to build up the context in case we stopped in the middle of a transaction (we then need to have the start of the transaction in-memory); but I agree we don't need to send that to the replay process, which is only going to skip again.

That said the skip logic is sound and I wonder why in your PR we can't benefit from it?

*
* Lets assume the following contents streamed from the transform
* to catchup process in UNIX PIPE after switching to replay mode,
*
* BEGIN -- {"xid": 999, "commitLSN": "0/1230"};
* INSERT INTO A VALUES (1);
* INSERT INTO A VALUES (2);
* COMMIT -- {"xid": 999, "lsn": "0/1230"};
* BEGIN -- {"xid": 1000, "commitLSN": "0/1234"};
* INSERT INTO C VALUES (1);
* INSERT INTO C VALUES (2);
* COMMIT -- {"xid": 1000, "lsn": "0/1234"};
*
* Contents of C.sql will be streamed by stream_transform_stream
* and followed by the contents from UNIX PIPE. Since both of the
* contents are streamed one after the other, the second block
* contains the full content of previously written
* transaction. So, we can skip previous transaction buffer and
* start buffering from the current transaction.
*/
log_debug("Received %s when transaction is already "
"in buffering mode, ignoring.", line);
Comment on lines +375 to +376
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rewrite the message so that it ends with ": %s", line. This makes it easier to process the logs later.

fclose(replayCtx->txnBuffer);

/* Remove symlink to the replay buffer */
(void) unlink_file(context->paths.txnlatestfile);
}

sformat(replayCtx->currentTxnFileName, MAXPGPATH, "%s/%d.sql",
context->paths.dir, metadata.xid);

/*
* Open current transaction file in w+ mode to truncate if the file
* already exists.
*/
replayCtx->txnBuffer = fopen_with_umask(replayCtx->currentTxnFileName,
"w+",
O_RDWR | O_TRUNC | O_CREAT,
0644);
if (replayCtx->txnBuffer == NULL)
{
log_error("Failed to open transaction buffer file \"%s\": %m",
replayCtx->currentTxnFileName);
return false;
}

/* Create as a symlink to the current transcation file */
if (!create_symbolic_link(replayCtx->currentTxnFileName,
context->paths.txnlatestfile))
{
/* errors have already been logged */
return false;
}

fformat(replayCtx->txnBuffer, "%s\n", line);

replayCtx->isTxnBuffering = true;
}
else if (metadata.action == STREAM_ACTION_COMMIT)
{
if (replayCtx->skipTxnBuffering)
{
return stream_replay_line(ctx, line, stop);
}

if (!replayCtx->isTxnBuffering)
{
/*
* When the follow switches from prefetch to replay mode, we
* call stream_transform_stream which might stream the partially
* written transaction created during the prefetch mode.
*
* For example, lets consider the partially written C.sql file,
*
* COMMIT -- {"xid": 999, "lsn": "0/1230"};
* BEGIN -- {"xid": 1000, "commitLSN": "0/1234"};
* INSERT INTO C VALUES (1);
* INSERT INTO C VALUES (2);
* KEEPALIVE;
*
* After switching to the replay mode, logical decoding will resume
* from consistent point which again starts with a valid transcation
* block.
*
* Lets assume the following contents streamed from the transform
* to catchup process in UNIX PIPE after switching to replay mode,
*
* BEGIN -- {"xid": 999, "commitLSN": "0/1230"};
* INSERT INTO A VALUES (1);
* INSERT INTO A VALUES (2);
* COMMIT -- {"xid": 999, "lsn": "0/1230"};
* BEGIN -- {"xid": 1000, "commitLSN": "0/1234"};
* INSERT INTO C VALUES (1);
* INSERT INTO C VALUES (2);
* COMMIT -- {"xid": 1000, "lsn": "0/1234"};
*
* Contents of C.sql will be streamed by stream_transform_stream
* and followed by the contents from UNIX PIPE. Since both of the
* contents are streamed one after the other, the second block
* contains the full content of previously written
* transaction. So, we can skip previous transaction buffer and
* start buffering from the current transaction.
*/
log_debug("Received %s when transaction is not "
"in buffering mode, ignoring.", line);
Comment on lines +458 to +459
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rewrite the message so that it ends with ": %s", line. This makes it easier to process the logs later.

return true;
}

fformat(replayCtx->txnBuffer, "%s\n", line);

/* Close replaybuffer to mark it as complete */
fclose(replayCtx->txnBuffer);

/* Stream the transaction buffer into the target database */
if (!streamBufferedXID(replayCtx))
{
/* errors have already been logged */
return false;
}

replayCtx->isTxnBuffering = false;

/* Remove the symlink to the replay buffer */
(void) unlink_file(context->paths.txnlatestfile);

/* Early exit if we reached the end position */
*stop = context->reachedEndPos;

replayCtx->txnBuffer = NULL;
}
else
{
if (replayCtx->skipTxnBuffering)
{
return stream_replay_line(ctx, line, stop);
}

if (replayCtx->isTxnBuffering)
{
/*
* We are in a transaction block, buffer all the messages
* including KEEPALIVE, SWITCHWAL and ENPOS.
*/
fformat(replayCtx->txnBuffer, "%s\n", line);
}
else if (metadata.action == STREAM_ACTION_KEEPALIVE ||
metadata.action == STREAM_ACTION_SWITCH ||
metadata.action == STREAM_ACTION_ENDPOS)
{
/*
* We allow KEEPALIVE, SWITCHWAL and ENPOS messages in a
* non-transactional context. In that case, call stream_replay_line
* directly without buffering the message.
*/
return stream_replay_line(ctx, line, stop);
}
else
{
/*
* When the follow switches from prefetch to replay mode, we
* call stream_transform_stream which might stream the partially
* written transaction created during the prefetch mode.
*
* For example, lets consider the partially written C.sql file,
*
* INSERT INTO A VALUES (2);
* COMMIT -- {"xid": 999, "lsn": "0/1230"};
* BEGIN -- {"xid": 1000, "commitLSN": "0/1234"};
* INSERT INTO C VALUES (1);
* INSERT INTO C VALUES (2);
* KEEPALIVE;
*
* After switching to the replay mode, logical decoding will resume
* from consistent point which again starts with a valid transcation
* block.
*
* Lets assume the following contents streamed from the transform
* to catchup process in UNIX PIPE after switching to replay mode,
*
* BEGIN -- {"xid": 999, "commitLSN": "0/1230"};
* INSERT INTO A VALUES (1);
* INSERT INTO A VALUES (2);
* COMMIT -- {"xid": 999, "lsn": "0/1230"};
* BEGIN -- {"xid": 1000, "commitLSN": "0/1234"};
* INSERT INTO C VALUES (1);
* INSERT INTO C VALUES (2);
* COMMIT -- {"xid": 1000, "lsn": "0/1234"};
*
* Contents of C.sql will be streamed by stream_transform_stream
* and followed by the contents from UNIX PIPE. Since both of the
* contents are streamed one after the other, the second block
* contains the full content of previously written
* transaction. So, we can skip previous transaction buffer and
* start buffering from the current transaction.
*/
log_debug("Received %s when transaction is not "
"in buffering mode", line);
Comment on lines +550 to +551
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rewrite the message so that it ends with ": %s", line. This makes it easier to process the logs later.

}
}

return true;
}
Loading
Loading