Skip to content

Commit

Permalink
[pgmoneta#123] Base backup functionality integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Jubilee101 committed Jul 12, 2023
1 parent be5781e commit 83cbad2
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 140 deletions.
12 changes: 12 additions & 0 deletions src/include/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,18 @@ pgmoneta_create_start_replication_message(char* xlogpos, int timeline, char* slo
int
pgmoneta_create_standby_status_update_message(int64_t received, int64_t flushed, int64_t applied, struct message** msg);

/**
* Create a base backup message
* @param server_version The version of the PostgreSQL server to backup
* @param label The label of the backup
* @param include_wal The indication of whether to also include WAL
* @param checksum_algorithm The checksum algorithm to be applied to backup manifest (only work for server version > 12)
* @param msg The resulting message
* @return 0 upon success, otherwise 1
*/
int
pgmoneta_create_base_backup_message(int server_version, char* label, bool include_wal, char* checksum_algorithm, struct message** msg);

/**
* Create a query message for a simple query
* @param query The query to be executed on server
Expand Down
98 changes: 96 additions & 2 deletions src/libpgmoneta/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,96 @@ pgmoneta_create_standby_status_update_message(int64_t received, int64_t flushed,
return MESSAGE_STATUS_OK;
}

int
pgmoneta_create_base_backup_message(int server_version, char* label, bool include_wal, char* checksum_algorithm, struct message** msg)
{
bool use_new_format = server_version >= 15;
char cmd[1024];
char* options = NULL;
struct message* m = NULL;
size_t size;

memset(&cmd[0], 0, sizeof(cmd));
// other options are
if (use_new_format)
{
options = pgmoneta_append(options, "LABEL '");
options = pgmoneta_append(options, label);
options = pgmoneta_append(options, "', ");

if (include_wal)
{
options = pgmoneta_append(options, "WAL true, ");

options = pgmoneta_append(options, "WAIT false, ");
}
else
{
options = pgmoneta_append(options, "WAL false, ");
}

options = pgmoneta_append(options, "CHECKPOINT 'fast', ");

options = pgmoneta_append(options, "MANIFEST 'yes', ");

options = pgmoneta_append(options, "MANIFEST_CHECKSUMS '");
options = pgmoneta_append(options, checksum_algorithm);
options = pgmoneta_append(options, "'");

snprintf(cmd, sizeof(cmd), "BASE_BACKUP (%s)", options);
}
else
{
options = pgmoneta_append(options, "LABEL '");
options = pgmoneta_append(options, label);
options = pgmoneta_append(options, "' ");

options = pgmoneta_append(options, "FAST ");

if (include_wal)
{
options = pgmoneta_append(options, "WAL ");

options = pgmoneta_append(options, "NOWAIT ");
}

if (server_version > 12)
{
options = pgmoneta_append(options, "MANIFEST 'yes' ");

options = pgmoneta_append(options, "MANIFEST_CHECKSUMS '");
options = pgmoneta_append(options, checksum_algorithm);
options = pgmoneta_append(options, "' ");
}

snprintf(cmd, sizeof(cmd), "BASE_BACKUP %s;", options);
}

if (options != NULL)
{
free(options);
options = NULL;
}

size = 1 + 4 + strlen(cmd) + 1;

m = (struct message*)malloc(sizeof(struct message));
m->data = malloc(size);

memset(m->data, 0, size);

m->kind = 'Q';
m->length = size;

pgmoneta_write_byte(m->data, 'Q');
pgmoneta_write_int32(m->data + 1, size - 1);
memcpy(m->data + 5, &cmd[0], strlen(cmd));

*msg = m;

return MESSAGE_STATUS_OK;
}

int
pgmoneta_create_query_message(char* query, struct message** msg)
{
Expand Down Expand Up @@ -1743,7 +1833,7 @@ pgmoneta_receive_archive_files(int socket, struct stream_buffer* buffer, char* b
goto error;
}

if (msg->kind == 'd')
if (msg->kind == 'd' && msg->length > 0)
{
// copy data
if (fwrite(msg->data, msg->length, 1, file) != 1)
Expand Down Expand Up @@ -1982,6 +2072,10 @@ pgmoneta_receive_archive_stream(int socket, struct stream_buffer* buffer, char*
case 'd':
{
// real data
if (msg->length <= 1)
{
break;
}
if (fwrite(msg->data + 1, msg->length - 1, 1, file) != 1)
{
pgmoneta_log_error("could not write to file %s", file_path);
Expand Down Expand Up @@ -2095,7 +2189,7 @@ pgmoneta_receive_manifest_file(int socket, struct stream_buffer* buffer, char* b
pgmoneta_log_message(msg);
goto error;
}
if (msg->kind == 'd')
if (msg->kind == 'd' && msg->length > 0)
{
// copy data
if (fwrite(msg->data, msg->length, 1, file) != 1)
Expand Down
Loading

0 comments on commit 83cbad2

Please sign in to comment.