Skip to content

Commit

Permalink
[pgmoneta#123]Common functionalities for native backup
Browse files Browse the repository at this point in the history
  • Loading branch information
Jubilee101 committed May 28, 2023
1 parent f15030e commit 593b89f
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/include/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,27 @@ pgmoneta_create_start_replication_message(char* xlogpos, int timeline, char* slo
int
pgmoneta_create_standby_status_update_message(int64_t received, int64_t flushed, int64_t applied, struct message** msg);

/**
* Create a base backup message
* @param server_version The version of the PostgreSQL server to backup
* @param label The label of the backup
* @param include_wal The indication of whether to also include WAL
* @param checksum_algorithm The checksum algorithm to be applied to backup manifest (only work for server version > 12)
* @param msg The resulting message
* @return 0 upon success, otherwise 1
*/
int
pgmoneta_create_base_backup_message(int server_version, char* label, bool include_wal, char* checksum_algorithm, struct message** msg);

/**
* Create a query message for a simple query
* @param query The query to be executed on server
* @param msg The resulting message
* @return 0 upon success, otherwise 1
*/
int
pgmoneta_create_query_message(char* query, struct message** msg);

/**
* Has a message
* @param type The message type to be extracted
Expand Down
1 change: 1 addition & 0 deletions src/include/pgmoneta.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ struct server
int wal_size; /**< The size of the WAL files */
bool wal_streaming; /**< Is WAL streaming active */
bool valid; /**< Is the server valid */
int version; /**< The major version of the server*/
} __attribute__ ((aligned (64)));

/** @struct
Expand Down
9 changes: 9 additions & 0 deletions src/include/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pgmoneta_server_info(int srv);
int
pgmoneta_server_get_wal_size(int socket, int* ws);

/**
* Get the version of a server
* @param socket The socket
* @param server The server index
* @return 0 upon success, otherwise 1
*/
int
pgmoneta_server_get_version(int socket, int server);

#ifdef __cplusplus
}
#endif
Expand Down
107 changes: 107 additions & 0 deletions src/libpgmoneta/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,113 @@ pgmoneta_create_standby_status_update_message(int64_t received, int64_t flushed,
return MESSAGE_STATUS_OK;
}

int
pgmoneta_create_base_backup_message(int server_version, char* label, bool include_wal, char* checksum_algorithm, struct message** msg)
{
bool use_new_format = server_version >= 15;
char cmd[1024];
char* options = NULL;
struct message* m = NULL;
size_t size;

memset(&cmd[0], 0, sizeof(cmd));
// other options are
if (use_new_format)
{
options = pgmoneta_append(options, "LABEL '");
options = pgmoneta_append(options, label);
options = pgmoneta_append(options, "', ");

if (include_wal)
{
options = pgmoneta_append(options, "WAL true, ");
}
else
{
options = pgmoneta_append(options, "WAL false, ");
}

options = pgmoneta_append(options, "MANIFEST 'yes', ");

options = pgmoneta_append(options, "MANIFEST_CHECKSUMS ");
options = pgmoneta_append(options, checksum_algorithm);

snprintf(cmd, sizeof(cmd), "BASE_BACKUP (%s)", options);
}
else
{
options = pgmoneta_append(options, "LABEL '");
options = pgmoneta_append(options, label);
options = pgmoneta_append(options, "' ");

if (include_wal)
{
options = pgmoneta_append(options, "WAL ");
}

if (server_version > 12)
{
options = pgmoneta_append(options, "MANIFEST 'yes', ");

options = pgmoneta_append(options, "MANIFEST_CHECKSUMS ");
options = pgmoneta_append(options, checksum_algorithm);
}

options = pgmoneta_append(options, "MANIFEST 'yes' ");
snprintf(cmd, sizeof(cmd), "BASE_BACKUP %s", options);
}

if (options != NULL)
{
free(options);
options = NULL;
}

size = 1 + 4 + strlen(cmd) + 1;

m = (struct message*)malloc(sizeof(struct message));
m->data = malloc(size);

memset(m->data, 0, size);

m->kind = 'Q';
m->length = size;

pgmoneta_write_byte(m->data, 'Q');
pgmoneta_write_int32(m->data + 1, size - 1);
memcpy(m->data + 5, &cmd[0], strlen(cmd));

*msg = m;

return MESSAGE_STATUS_OK;
}

int
pgmoneta_create_query_message(char* query, struct message** msg)
{
struct message* m = NULL;
size_t size;
char cmd[1024];

strcpy(cmd, query);
size = 1 + 4 + strlen(cmd) + 1;
m = (struct message*)malloc(sizeof(struct message));
m->data = malloc(size);

memset(m->data, 0, size);

m->kind = 'Q';
m->length = size;

pgmoneta_write_byte(m->data, 'Q');
pgmoneta_write_int32(m->data + 1, size - 1);
memcpy(m->data + 5, &cmd[0], strlen(cmd));

*msg = m;

return MESSAGE_STATUS_OK;
}

int
pgmoneta_query_execute(int socket, struct message* msg, struct query_response** response)
{
Expand Down
32 changes: 32 additions & 0 deletions src/libpgmoneta/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,35 @@ get_wal_level(int socket, bool* replica)

return 1;
}

int
pgmoneta_server_get_version(int socket, int server)
{
int ret;
struct message* query_msg = NULL;
struct query_response* response = NULL;
struct configuration* config;

config = (struct configuration*)shmem;

ret = pgmoneta_create_query_message("SELECT split_part(split_part(version(), ' ', 2), '.', 1);", &query_msg);
if (ret != MESSAGE_STATUS_OK)
{
goto error;
}

if (pgmoneta_query_execute(socket, query_msg, &response) || response == NULL)
{
goto error;
}

config->servers[server].version = atoi(response->tuples->data[0]);

pgmoneta_free_query_response(response);
pgmoneta_free_message(query_msg);

return 0;
error:
pgmoneta_free_query_response(response);
return 1;
}

0 comments on commit 593b89f

Please sign in to comment.