Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: supports distributed acceleration of COPY [skip ci] #282

Draft
wants to merge 13 commits into
base: POLARDB_11_DEV
Choose a base branch
from
90 changes: 86 additions & 4 deletions src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,68 @@ static void SendCopyFromForwardedTuple(CopyState cstate,
Datum *values,
bool *nulls);

/* ==========================================================================
* The following macros aid in major refactoring of data processing code (in
* CopyFrom(+Dispatch)). We use macros because in some cases the code must be in
* line in order to work (for example elog_dismiss() in PG_CATCH) while in
* other cases we'd like to inline the code for performance reasons.
*
* NOTE that an almost identical set of macros exists in fileam.c. If you make
* changes here you may want to consider taking a look there as well.
* ==========================================================================
*/

#define RESET_LINEBUF \
cstate->line_buf.len = 0; \
cstate->line_buf.data[0] = '\0'; \
cstate->line_buf.cursor = 0;

#define RESET_ATTRBUF \
cstate->attribute_buf.len = 0; \
cstate->attribute_buf.data[0] = '\0'; \
cstate->attribute_buf.cursor = 0;

#define RESET_LINEBUF_WITH_LINENO \
line_buf_with_lineno.len = 0; \
line_buf_with_lineno.data[0] = '\0'; \
line_buf_with_lineno.cursor = 0;

/*
* When doing a COPY FROM through the dispatcher, the QD reads the input from
* the input file (or stdin or program), and forwards the data to the QE nodes,
* where they will actually be inserted.
*
* Ideally, the QD would just pass through each line to the QE as is, and let
* the QEs to do all the processing. Because the more processing the QD has
* to do, the more likely it is to become a bottleneck.
*
* However, the QD needs to figure out which QE to send each row to. For that,
* it needs to at least parse the distribution key. The distribution key might
* also be a DEFAULTed column, in which case the DEFAULT value needs to be
* evaluated in the QD. In that case, the QD must send the computed value
* to the QE - we cannot assume that the QE can re-evaluate the expression and
* arrive at the same value, at least not if the DEFAULT expression is volatile.
*
* Therefore, we need a flexible format between the QD and QE, where the QD
* processes just enough of each input line to figure out where to send it.
* It must send the values it had to parse and evaluate to the QE, as well
* as the rest of the original input line, so that the QE can parse the rest
* of it.
*
* The 'copy_from_dispatch_*' structs are used in the QD->QE stream. For each
* input line, the QD constructs a 'copy_from_dispatch_row' struct, and sends
* it to the QE. Before any rows, a QDtoQESignature is sent first, followed by
* a 'copy_from_dispatch_header'. When QD encounters a recoverable error that
* needs to be logged in the error log (LOG ERRORS SEGMENT REJECT LIMIT), it
* sends the erroneous raw to a QE, in a 'copy_from_dispatch_error' struct.
*
*
* COPY TO is simpler: The QEs form the output rows in the final form, and the QD
* just collects and forwards them to the client. The QD doesn't need to parse
* the rows at all.
*/
static const char QDtoQESignature[] = "PGCOPY-QD-TO-QE\n\377\r\n";

typedef struct
{
/*
Expand Down Expand Up @@ -3234,7 +3296,7 @@ if (cstate->dispatch_mode == COPY_DISPATCH)

pxCopyEnd(pxCopy, NULL, NULL);

PxDispatchCopyEnd(pxCopy);
// PxDispatchCopyEnd(pxCopy);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delay in progress. I've been working on resolving the bug in my spare time, but unfortunately, I'm stuck and unsure of how to proceed. Could anyone lend me a hand and provide me with some hints or guidance?

When I tested the COPY TO, I met the following error because of a comment,

server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.

But no idea how to modify PxDispatchCopyEnd to fit PolarDB architecture.
Thank you in advance. 🙇‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any *.core files in ~/tmp_master_xxx directory? If so, use gdb --core to see the call stack when the error happens.


/*
* In the old protocol, tell pqcomm that we can process normal protocol
Expand Down Expand Up @@ -3403,8 +3465,8 @@ BeginCopyFrom(ParseState *pstate,
{
if (px_role == PX_ROLE_QC && cstate->rel)
cstate->dispatch_mode = COPY_DISPATCH;
// else if (px_role == PX_ROLE_QC)
// cstate->dispatch_mode = COPY_EXECUTOR;
else if (px_role != PX_ROLE_QC)
cstate->dispatch_mode = COPY_EXECUTOR;
}
else
cstate->dispatch_mode = COPY_DIRECT;
Expand Down Expand Up @@ -3570,7 +3632,27 @@ BeginCopyFrom(ParseState *pstate,
/* must rely on user to tell us... */
cstate->file_has_oids = cstate->oids;
}
else
else if (cstate->dispatch_mode == COPY_EXECUTOR && cstate->copy_dest != COPY_CALLBACK)
{
/* Read special header from QD */
static const size_t sigsize = sizeof(QDtoQESignature);
char readSig[sizeof(QDtoQESignature)];
copy_from_dispatch_header header_frame;

if (CopyGetData(cstate, &readSig, sigsize, sigsize) != sigsize ||
memcmp(readSig, QDtoQESignature, sigsize) != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("QC->PC COPY communication signature not recognized")));

if (CopyGetData(cstate, &header_frame, sizeof(header_frame), sizeof(header_frame)) != sizeof(header_frame))
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid QC->QC COPY communication header")));

cstate->first_qe_processed_field = header_frame.first_qe_processed_field;
}
else if (!cstate->binary)
{
/* Read and verify binary header */
char readSig[11];
Expand Down