Skip to content

Commit

Permalink
[pgmoneta#187] Make WAL streaming follow timeline change
Browse files Browse the repository at this point in the history
  • Loading branch information
Jubilee101 committed Feb 19, 2024
1 parent 316d7b6 commit 0bfd5ff
Show file tree
Hide file tree
Showing 4 changed files with 402 additions and 194 deletions.
31 changes: 20 additions & 11 deletions src/libpgmoneta/message.c
Expand Up @@ -165,7 +165,7 @@ pgmoneta_log_copyfail_message(struct message* msg)
void
pgmoneta_log_error_response_message(struct message* msg)
{
size_t offset = 0;
size_t offset = 1 + 4;
signed char field_type = 0;
char* error = NULL;
char* error_code = NULL;
Expand Down Expand Up @@ -206,7 +206,7 @@ pgmoneta_log_error_response_message(struct message* msg)
void
pgmoneta_log_notice_response_message(struct message* msg)
{
size_t offset = 0;
size_t offset = 1 + 4;
signed char field_type = 0;
char* error = NULL;
char* error_code = NULL;
Expand Down Expand Up @@ -767,7 +767,7 @@ pgmoneta_create_start_replication_message(char* xlogpos, int timeline, char* slo
{
if (xlogpos != NULL && strlen(xlogpos) > 0)
{
snprintf(&cmd[0], sizeof(cmd), "START_REPLICATION SLOT %s PHYSICAL %s;", slot, xlogpos);
snprintf(&cmd[0], sizeof(cmd), "START_REPLICATION SLOT %s PHYSICAL %s TIMELINE %d;", slot, xlogpos, timeline);
}
else
{
Expand All @@ -778,7 +778,7 @@ pgmoneta_create_start_replication_message(char* xlogpos, int timeline, char* slo
{
if (xlogpos != NULL && strlen(xlogpos) > 0)
{
snprintf(&cmd[0], sizeof(cmd), "START_REPLICATION PHYSICAL %s;", xlogpos);
snprintf(&cmd[0], sizeof(cmd), "START_REPLICATION PHYSICAL %s TIMELINE %d;", xlogpos, timeline);
}
else
{
Expand Down Expand Up @@ -1729,6 +1729,9 @@ pgmoneta_read_copy_stream(SSL* ssl, int socket, struct stream_buffer* buffer)
int numbytes = 0;
bool keep_read = false;
int err;
struct configuration* config;

config = (struct configuration*)shmem;

/*
* if buffer is still too full,
Expand Down Expand Up @@ -1835,7 +1838,7 @@ pgmoneta_read_copy_stream(SSL* ssl, int socket, struct stream_buffer* buffer)
}
}
}
while (keep_read);
while (keep_read && config->running);

error:
return MESSAGE_STATUS_ERROR;
Expand Down Expand Up @@ -1903,7 +1906,7 @@ pgmoneta_consume_copy_stream(SSL* ssl, int socket, struct stream_buffer* buffer,
continue;
}

if (m->kind != 'D' && m->kind != 'T')
if (m->kind != 'D' && m->kind != 'T' && m->kind != 'E')
{
m->data = (void*) malloc(length - 4 + 1);
m->length = length - 4;
Expand All @@ -1913,8 +1916,8 @@ pgmoneta_consume_copy_stream(SSL* ssl, int socket, struct stream_buffer* buffer,
else
{
/** include all the data in message's data buffer, i.e. include type and length info,
* if it's a DataRow or RowDescription message
* This is to accommodate our existing message parsing APIs for these two types of messages
* if it's a DataRow, RowDescription or ErrorResponse message
* This is to accommodate our existing message parsing APIs for these types of messages
*/
m->data = (void*) malloc(length + 1);
m->length = length + 1;
Expand Down Expand Up @@ -1944,9 +1947,12 @@ pgmoneta_consume_copy_stream_start(SSL* ssl, int socket, struct stream_buffer* b
bool keep_read = false;
int status;
int length;
struct configuration* config;

config = (struct configuration*)shmem;
do
{
while (buffer->cursor >= buffer->end)
while (config->running && buffer->cursor >= buffer->end)
{
status = pgmoneta_read_copy_stream(ssl, socket, buffer);
if (status == MESSAGE_STATUS_ZERO)
Expand Down Expand Up @@ -2014,7 +2020,7 @@ pgmoneta_consume_copy_stream_start(SSL* ssl, int socket, struct stream_buffer* b
keep_read = false;

}
while (keep_read);
while (keep_read && config->running);

return MESSAGE_STATUS_OK;

Expand Down Expand Up @@ -2057,11 +2063,14 @@ pgmoneta_consume_data_row_messages(SSL* ssl, int socket, struct stream_buffer* b
struct message* msg = (struct message*)malloc(sizeof (struct message));
struct tuple* current = NULL;
struct query_response* r = NULL;
struct configuration* config;

config = (struct configuration*)shmem;

memset(msg, 0, sizeof (struct message));

// consume DataRow messages from stream buffer until CommandComplete
while (msg == NULL || msg->kind != 'C')
while (config->running && (msg == NULL || msg->kind != 'C'))
{
status = pgmoneta_consume_copy_stream_start(ssl, socket, buffer, msg);

Expand Down
2 changes: 1 addition & 1 deletion src/libpgmoneta/utils.c
Expand Up @@ -235,7 +235,7 @@ pgmoneta_extract_message(char type, struct message* msg, struct message** extrac
int
pgmoneta_extract_error_fields(char type, struct message* msg, char** extracted)
{
size_t offset = 0;
size_t offset = 1 + 4;
char* result = NULL;
*extracted = NULL;

Expand Down

0 comments on commit 0bfd5ff

Please sign in to comment.