Skip to content

Commit

Permalink
[pgmoneta#164] Reuse message buffer to hold consumed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Jubilee101 committed Jan 2, 2024
1 parent 4b1ea71 commit ef0aa19
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 36 deletions.
20 changes: 20 additions & 0 deletions src/include/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,26 @@ pgmoneta_read_copy_stream(int socket, struct stream_buffer* buffer);
int
pgmoneta_consume_copy_stream(int socket, struct stream_buffer* buffer, struct message** message);

/**
* Consume the data in copy stream buffer similar to pgmoneta_consume_copy_stream.
* Instead of creating a new message each time, reuse the same message buffer each time
* Must be used with pgmoneta_consume_copy_stream_end
* @param socket The socket
* @param buffer The stream buffer
* @param message The message buffer
* @return 1 upon success, 0 if no data to consume, otherwise 2
*/
int
pgmoneta_consume_copy_stream_start(int socket, struct stream_buffer* buffer, struct message* message);

/**
* Finish consuming the buffer, prepare for the next message to be consumed
* @param buffer The stream buffer
* @param message The message buffer
*/
void
pgmoneta_consume_copy_stream_end(struct stream_buffer* buffer, struct message* message);

/**
* Receive and parse the DataRow messages into tuples
* @param socket The socket
Expand Down
1 change: 1 addition & 0 deletions src/libpgmoneta/backup.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

/* system */
#include <stdatomic.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>

Expand Down
168 changes: 136 additions & 32 deletions src/libpgmoneta/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -1640,21 +1640,6 @@ pgmoneta_read_copy_stream(int socket, struct stream_buffer* buffer)
{
int numbytes = 0;
bool keep_read = false;
// left shift unconsumed data to reuse space
if (buffer->start < buffer->end)
{
if (buffer->start > 0)
{
memmove(buffer->buffer, buffer->buffer + buffer->start, buffer->end - buffer->start);
buffer->end -= buffer->start;
buffer->cursor -= buffer->start;
buffer->start = 0;
}
}
else
{
buffer->start = buffer->end = buffer->cursor = 0;
}
/*
* if buffer is still too full,
* try enlarging it to be at least big enough for one TCP packet (I'm using 1500B here)
Expand Down Expand Up @@ -1812,32 +1797,138 @@ pgmoneta_consume_copy_stream(int socket, struct stream_buffer* buffer, struct me
return status;
}

int
pgmoneta_consume_copy_stream_start(int socket, struct stream_buffer* buffer, struct message* message)
{
bool keep_read = false;
int status;
int length;
do
{
while (buffer->cursor >= buffer->end)
{
status = pgmoneta_read_copy_stream(socket, buffer);
if (status == MESSAGE_STATUS_ZERO)
{
SLEEP(1000000L);
}
else if (status != MESSAGE_STATUS_OK)
{
goto error;
}
}
message->kind = buffer->buffer[buffer->cursor];
// try to get message length
while (buffer->cursor + 1 + 4 >= buffer->end)
{
status = pgmoneta_read_copy_stream(socket, buffer);
if (status == MESSAGE_STATUS_ZERO)
{
SLEEP(1000000L);
}
else if (status != MESSAGE_STATUS_OK)
{
goto error;
}
}
length = pgmoneta_read_int32(buffer->buffer + buffer->cursor + 1);
// receive the whole message even if we are going to skip it
while (buffer->cursor + 1 + length >= buffer->end)
{
status = pgmoneta_read_copy_stream(socket, buffer);
if (status == MESSAGE_STATUS_ZERO)
{
SLEEP(1000000L);
}
else if (status != MESSAGE_STATUS_OK)
{
goto error;
}
}
if (message->kind != 'D' && message->kind != 'H' && message->kind != 'W' && message->kind != 'T' &&
message->kind != 'c' && message->kind != 'f' && message->kind != 'E' && message->kind != 'd' && message->kind != 'C')
{
// skip this message
keep_read = true;
buffer->cursor += (length + 1);
buffer->start = buffer->cursor;
continue;
}

if (message->kind != 'D' && message->kind != 'T')
{
message->data = buffer->buffer + (buffer->cursor + 1 + 4);
message->length = length - 4;
}
else
{
/** include all the data in message's data buffer, i.e. include type and length info,
* if it's a DataRow or RowDescription message
* This is to accommodate our existing message parsing APIs for these two types of messages
*/
message->data = buffer->buffer + buffer->cursor;
message->length = length + 1;
}

keep_read = false;

}
while (keep_read);

return MESSAGE_STATUS_OK;

error:
memset(message, 0, sizeof(struct message));
return status;
}

void
pgmoneta_consume_copy_stream_end(struct stream_buffer* buffer, struct message* message)
{
int length = pgmoneta_read_int32(buffer->buffer + buffer->cursor + 1);
buffer->cursor += (1 + length);
buffer->start = buffer->cursor;
// left shift unconsumed data to reuse space
if (buffer->start < buffer->end)
{
if (buffer->start > 0)
{
memmove(buffer->buffer, buffer->buffer + buffer->start, buffer->end - buffer->start);
buffer->end -= buffer->start;
buffer->cursor -= buffer->start;
buffer->start = 0;
}
}
else
{
buffer->start = buffer->end = buffer->cursor = 0;
}
message->data = NULL;
message->length = 0;
}

int
pgmoneta_consume_data_row_messages(int socket, struct stream_buffer* buffer, struct query_response** response)
{
int cols;
int status;
char* name = NULL;
struct message* msg = NULL;
struct message* msg = (struct message*)malloc(sizeof (struct message));
struct tuple* current = NULL;
struct query_response* r = NULL;

memset(msg, 0, sizeof (struct message));

// consume DataRow messages from stream buffer until CommandComplete
while (msg == NULL || msg->kind != 'C')
{
status = pgmoneta_consume_copy_stream(socket, buffer, &msg);
status = pgmoneta_consume_copy_stream_start(socket, buffer, msg);

if (status != MESSAGE_STATUS_OK)
{
goto error;
}

if (msg == NULL)
{
pgmoneta_log_error("wal: received NULL message");
goto error;
}

if (msg->kind == 'E' || msg->kind == 'f')
{
pgmoneta_log_copyfail_message(msg);
Expand Down Expand Up @@ -1887,6 +1978,8 @@ pgmoneta_consume_data_row_messages(int socket, struct stream_buffer* buffer, str

current = dtuple;
}

pgmoneta_consume_copy_stream_end(buffer, msg);
}
*response = r;
pgmoneta_free_copy_message(msg);
Expand All @@ -1904,11 +1997,13 @@ int
pgmoneta_receive_archive_files(int socket, struct stream_buffer* buffer, char* basedir, struct tablespace* tablespaces, int version)
{
struct query_response* response = NULL;
struct message* msg = NULL;
struct message* msg = (struct message*)malloc(sizeof (struct message));
struct tuple* tup = NULL;
char null_buffer[2 * 512]; // 2 tar block size of terminator null bytes
FILE* file = NULL;

memset(msg, 0, sizeof (struct message));

// Receive the second result set
if (pgmoneta_consume_data_row_messages(socket, buffer, &response))
{
Expand Down Expand Up @@ -1969,18 +2064,19 @@ pgmoneta_receive_archive_files(int socket, struct stream_buffer* buffer, char* b
// get the copy out response
while (msg == NULL || msg->kind != 'H')
{
pgmoneta_consume_copy_stream(socket, buffer, &msg);
pgmoneta_consume_copy_stream_start(socket, buffer, msg);
if (msg->kind == 'E' || msg->kind == 'f')
{
pgmoneta_log_copyfail_message(msg);
pgmoneta_log_error_response_message(msg);
fclose(file);
goto error;
}
pgmoneta_consume_copy_stream_end(buffer, msg);
}
while (msg->kind != 'c')
{
pgmoneta_consume_copy_stream(socket, buffer, &msg);
pgmoneta_consume_copy_stream_start(socket, buffer, msg);
if (msg->kind == 'E' || msg->kind == 'f')
{
pgmoneta_log_copyfail_message(msg);
Expand All @@ -1999,6 +2095,7 @@ pgmoneta_receive_archive_files(int socket, struct stream_buffer* buffer, char* b
goto error;
}
}
pgmoneta_consume_copy_stream_end(buffer, msg);
}
//append two blocks of null bytes to the end of the tar file
memset(null_buffer, 0, 2 * 512);
Expand Down Expand Up @@ -2089,7 +2186,7 @@ int
pgmoneta_receive_archive_stream(int socket, struct stream_buffer* buffer, char* basedir, struct tablespace* tablespaces)
{
struct query_response* response = NULL;
struct message* msg = NULL;
struct message* msg = (struct message*)malloc(sizeof (struct message));
struct tuple* tup = NULL;
struct tablespace* tblspc = NULL;
char null_buffer[2 * 512];
Expand All @@ -2107,25 +2204,28 @@ pgmoneta_receive_archive_stream(int socket, struct stream_buffer* buffer, char*
char type;
FILE* file = NULL;

memset(msg, 0, sizeof(struct message));

// Receive the second result set
if (pgmoneta_consume_data_row_messages(socket, buffer, &response))
{
goto error;
}
while (msg == NULL || msg->kind != 'H')
{
pgmoneta_consume_copy_stream(socket, buffer, &msg);
pgmoneta_consume_copy_stream_start(socket, buffer, msg);
if (msg->kind == 'E' || msg->kind == 'f')
{
pgmoneta_log_copyfail_message(msg);
pgmoneta_log_error_response_message(msg);
goto error;
}
pgmoneta_consume_copy_stream_end(buffer, msg);
}

while (msg->kind != 'c')
{
pgmoneta_consume_copy_stream(socket, buffer, &msg);
pgmoneta_consume_copy_stream_start(socket, buffer, msg);
if (msg->kind == 'E' || msg->kind == 'f')
{
pgmoneta_log_copyfail_message(msg);
Expand Down Expand Up @@ -2274,6 +2374,7 @@ pgmoneta_receive_archive_stream(int socket, struct stream_buffer* buffer, char*
}
}
}
pgmoneta_consume_copy_stream_end(buffer, msg);
}

if (file != NULL)
Expand Down Expand Up @@ -2350,7 +2451,8 @@ pgmoneta_receive_manifest_file(int socket, struct stream_buffer* buffer, char* b
char tmp_file_path[MAX_PATH];
char file_path[MAX_PATH];
FILE* file = NULL;
struct message* msg = NULL;
struct message* msg = (struct message*)malloc(sizeof (struct message));
memset(msg, 0, sizeof (struct message));

memset(tmp_file_path, 0, sizeof(tmp_file_path));
memset(file_path, 0, sizeof(file_path));
Expand All @@ -2369,17 +2471,18 @@ pgmoneta_receive_manifest_file(int socket, struct stream_buffer* buffer, char* b
// get the copy out response
while (msg == NULL || msg->kind != 'H')
{
pgmoneta_consume_copy_stream(socket, buffer, &msg);
pgmoneta_consume_copy_stream_start(socket, buffer, msg);
if (msg->kind == 'E' || msg->kind == 'f')
{
pgmoneta_log_copyfail_message(msg);
pgmoneta_log_error_response_message(msg);
goto error;
}
pgmoneta_consume_copy_stream_end(buffer, msg);
}
while (msg->kind != 'c')
{
pgmoneta_consume_copy_stream(socket, buffer, &msg);
pgmoneta_consume_copy_stream_start(socket, buffer, msg);
if (msg->kind == 'E' || msg->kind == 'f')
{
pgmoneta_log_copyfail_message(msg);
Expand All @@ -2395,6 +2498,7 @@ pgmoneta_receive_manifest_file(int socket, struct stream_buffer* buffer, char* b
goto error;
}
}
pgmoneta_consume_copy_stream_end(buffer, msg);
}
// finish, remove the .tmp suffix
if (rename(tmp_file_path, file_path) != 0)
Expand Down

0 comments on commit ef0aa19

Please sign in to comment.