Skip to content

Commit

Permalink
The main modification of pgreplay supports the following functions:
Browse files Browse the repository at this point in the history
1.Readapted the polardb log format.
2.Support extension protocol of pg execution.
3.Supports normal execution when prepare is not present.
  • Loading branch information
qinjingyuan committed Oct 29, 2023
1 parent 81564a2 commit 309b20f
Show file tree
Hide file tree
Showing 9 changed files with 1,265 additions and 292 deletions.
2 changes: 1 addition & 1 deletion pgreplay/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
pgreplay
pgreplay.exe
Makefile
config.h
# config.h
config.log
config.status

Expand Down
7 changes: 4 additions & 3 deletions pgreplay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,14 @@ log_min_messages = error
log_min_error_statement = log
log_connections = on
log_disconnections = on
log_line_prefix = '%t|%u|%d|%c|%N|%L|'
log_line_prefix = '%t|%u|%d|%c|'
log_statement = 'all'
# lc_messages must be set to English (the encoding does not matter)
bytea_output = escape
# (from version 9.0 on, only if you want to replay the log on 8.4 or earlier)
# polar_auditlog_max_query_length = 49152
polar_auditlog_max_query_length_limit = false

polar_enable_log_search_path = true
polar_enable_log_parameter_type = true
```
- run read audit log from $PGDATA/log/replay_xxx.log and replay sql to databse, reporting monitor info every 3 seconds.
```sh
Expand Down
78 changes: 65 additions & 13 deletions pgreplay/database.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "pgreplay.h"
#include "uthash.h"

#include <stdio.h>
#include <stdlib.h>
Expand Down Expand Up @@ -71,6 +72,13 @@ typedef enum {
closed
} connstatus;

/* prepare hash table */
typedef struct prepare_item{
char *name; /* key */
int id;
UT_hash_handle hh; /* makes this structure hashable */
}prepare_item;

/* linked list element for list of open connections */
struct dbconn {
uint64_t session_id;
Expand All @@ -81,6 +89,7 @@ struct dbconn {
struct timeval stmt_start;
char *errmsg;
char *search_path;
prepare_item *prepare_hash;
struct dbconn *next;
};
typedef struct dbconn dbconn;
Expand Down Expand Up @@ -198,9 +207,9 @@ set search_path of the connection
*/
static int set_search_path(const char * search_path, PGconn * conn){
char * set_path = malloc(strlen(search_path) + strlen("set search_path = ;") + 1);
PGresult* res = PQexec(conn, set_path);

PGresult* res;
sprintf(set_path,"set search_path = %s;",search_path);
res = PQexec(conn, set_path);
if(PQresultStatus(res) != PGRES_COMMAND_OK){
fprintf(stderr, "set_search_path: Query execution failed search: %s\n", PQerrorMessage(conn));
}
Expand All @@ -209,6 +218,16 @@ static int set_search_path(const char * search_path, PGconn * conn){
return 0;
}

/* add prepare cmd */
static int set_prepare_cmd(const char * prepare_cmd, PGconn * conn){
PGresult* res = PQexec(conn, prepare_cmd);
if(PQresultStatus(res) != PGRES_COMMAND_OK){
fprintf(stderr, "set_prepare_cmd: Query execution failed search: %s\n", PQerrorMessage(conn));
}
PQclear(res);
return 0;
}

static void print_replay_statistics(int dry_run) {
int hours, minutes;
double seconds, runtime, session_time, busy_time;
Expand Down Expand Up @@ -444,6 +463,7 @@ int database_consumer(replay_item *item) {
PGcancel *cancel_request;
PGresult *result;
ExecStatusType result_status;
// const char* search_path,*params_typename,*source_text;

debug(3, "Entering database_consumer%s\n", "");

Expand Down Expand Up @@ -872,6 +892,7 @@ int database_consumer(replay_item *item) {
found_conn->errmsg = NULL;
found_conn->next = connections;
found_conn->search_path = malloc(POLARDBlEN);
found_conn->prepare_hash = NULL;
connections = found_conn;

/* do not display notices */
Expand Down Expand Up @@ -927,10 +948,49 @@ int database_consumer(replay_item *item) {
break;
case pg_execute:
debug(2, "Sending simple statement on session 0x" UINT64_FORMAT "\n", replay_get_session_id(item));
if(strcmp(item->search_path, found_conn->search_path) != 0){
set_search_path(item->search_path, found_conn->db_conn);
strcpy(found_conn->search_path, item->search_path);
if(polardb_audit){
/* set search before execute sql every times */
char * search_path;
search_path = replay_get_search_path(item);
if(!search_path) {
fprintf(stderr, "Error not have search_path statement: %s\n", search_path);
}else{
if(strcmp(search_path, found_conn->search_path) != 0){
set_search_path(search_path, found_conn->db_conn);
strcpy(found_conn->search_path, search_path);
}
}
debug(1,"search_path is %s\n",search_path);
if(search_path) free(search_path);

/* check out whether prepare exist,create it if not exist */
char * params_typename;
params_typename = replay_get_prepare_params_typename(item);
if(params_typename){
prepare_item * s;
char * tmp = strchr(params_typename,',');
*tmp = '\0';
debug(1,"params_typename is %s\n",params_typename);
HASH_FIND_STR(found_conn->prepare_hash, params_typename, s);
if(!s){
char* source_text;
source_text = replay_get_prepare_source_text(item);
if(!source_text){
fprintf(stderr, "prepare cmd is err in statement: %s\n", source_text);
}
debug(1,"source_text is %s\n",source_text);
set_prepare_cmd(source_text, found_conn->db_conn);
s = (prepare_item*)malloc(sizeof(prepare_item));
s->name = (char*)malloc(strlen(params_typename)+1);
// s->id = 1;
strcpy(s->name, params_typename);
HASH_ADD_KEYPTR(hh, found_conn->prepare_hash, s->name, strlen(s->name), s);
if(source_text) free(source_text);
}
}
if(params_typename) free(params_typename);
}
debug(1,"replay_get_statement(item) is %s\n",replay_get_statement(item));

if (! PQsendQuery(found_conn->db_conn, replay_get_statement(item))) {
fprintf(stderr, "Error sending simple statement: %s\n", PQerrorMessage(found_conn->db_conn));
Expand All @@ -941,10 +1001,6 @@ int database_consumer(replay_item *item) {
break;
case pg_prepare:
debug(2, "Sending prepare request on session 0x" UINT64_FORMAT "\n", replay_get_session_id(item));
if(strcmp(item->search_path, found_conn->search_path) != 0){
set_search_path(item->search_path, found_conn->db_conn);
strcpy(found_conn->search_path, item->search_path);
}

/* count preparations for statistics */
++stat_prep;
Expand All @@ -963,10 +1019,6 @@ int database_consumer(replay_item *item) {
break;
case pg_exec_prepared:
debug(2, "Sending prepared statement execution on session 0x" UINT64_FORMAT "\n", replay_get_session_id(item));
if(strcmp(item->search_path, found_conn->search_path) != 0){
set_search_path(item->search_path, found_conn->db_conn);
strcpy(found_conn->search_path, item->search_path);
}

if (! PQsendQueryPrepared(
found_conn->db_conn,
Expand Down
2 changes: 1 addition & 1 deletion pgreplay/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static void help(FILE *f) {

int main(int argc, char **argv) {
int arg, parse_only = 0, replay_only = 0, port = -1, csv = 0,
parse_opt = 0, replay_opt = 0, rc = 0, dry_run = 0, monitor_gap = 0, polardb_audit = 0;
parse_opt = 0, replay_opt = 0, rc = 0, dry_run = 0, monitor_gap = 0 ;
double factor = 1.0;
char *host = NULL, *encoding = NULL, *endptr, *passwd = NULL,
*outfilename = NULL, *infilename = NULL,
Expand Down
Loading

0 comments on commit 309b20f

Please sign in to comment.