Skip to content

Commit

Permalink
Merge 1d3e93b into 9412922
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberDem0n authored Jan 10, 2018
2 parents 9412922 + 1d3e93b commit f03b97a
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 78 deletions.
22 changes: 16 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ sudo: true
dist: trusty
language: c
compiler: gcc
addons:
postgresql: "9.6"
matrix:
include:
- addons:
postgresql: "9.3"
- addons:
postgresql: "9.4"
- addons:
postgresql: "9.5"
- addons:
postgresql: "9.6"
before_install:
- sudo apt-get update
- sudo apt-get install -y postgresql-server-dev-9.6 libevent-dev
- version=$(postgres -V | egrep -o '[1-9][0-9]*(\.[0-9]{1,}){1,2}')
- version=${version%.*}
- sudo apt-get install -y postgresql-server-dev-$version libevent-dev pv
- sudo pip install --upgrade cpp-coveralls
script:
- |
Expand All @@ -15,15 +25,15 @@ script:
initdb test_cluster
echo "host replication all 127.0.0.1/32 trust" >> test_cluster/pg_hba.conf
echo "unix_socket_directories = '.'" >> test_cluster/postgresql.conf
echo "wal_level = 'replica'" >> test_cluster/postgresql.conf
echo "wal_level = 'hot_standby'" >> test_cluster/postgresql.conf
echo "max_wal_senders = 5" >> test_cluster/postgresql.conf
echo "cluster_name = ' bgworker: test cluster'" >> test_cluster/postgresql.conf
if [ $version != "9.3" ] && [ $version != "9.4" ]; then echo "cluster_name = ' bgworker: test cluster'" >> test_cluster/postgresql.conf; fi
echo "shared_preload_libraries = ' bg_mon'" >> test_cluster/postgresql.conf
pg_ctl -w -D test_cluster start -o "--port=5440"
psql -h localhost -p 5440 -d postgres -c "select pg_advisory_lock(1), pg_sleep(30)" &
sleep 1
psql -h localhost -p 5440 -d postgres -c "select pg_advisory_lock(1), pg_sleep(5)" &
( time pg_basebackup -D test_cluster_backup -c fast -X stream -h localhost -p 5440 -r 2M )&
( time pg_basebackup -D test_cluster_backup -c fast -h localhost -p 5440 -F t -D - | pv -qL 2M > /dev/null )&
sleep 1
echo "bg_mon.port = 8081" >> test_cluster/postgresql.conf
pg_ctl -D test_cluster reload
Expand Down
11 changes: 4 additions & 7 deletions bg_mon.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ static void prepare_statistics_output(struct evbuffer *evb)
if ((!s.is_backend && s.ps.cmdline != NULL) || s.query != NULL) {
proc_stat ps = s.ps;
proc_io io = ps.io;
char *type = s.is_backend ? "backend" : ps.cmdline;
char *type = s.is_backend ? "\"backend\"" : ps.cmdline;
if (is_first) is_first = false;
else evbuffer_add_printf(evb, ", ");
evbuffer_add_printf(evb, "{\"pid\": %d, \"type\": \"%s\", \"state\": \"%c\", ", s.pid, type, ps.state);
evbuffer_add_printf(evb, "{\"pid\": %d, \"type\": %s, \"state\": \"%c\", ", s.pid, type, ps.state);
evbuffer_add_printf(evb, "\"cpu\": {\"user\": %2.1f, \"system\": %2.1f, ", ps.utime_diff, ps.stime_diff);
evbuffer_add_printf(evb, "\"guest\": %2.1f}, \"io\": {\"read\": %lu, ", ps.gtime_diff, io.read_diff);
evbuffer_add_printf(evb, "\"write\": %lu}, \"uss\": %llu", io.write_diff, ps.uss);
Expand All @@ -189,11 +189,8 @@ static void prepare_statistics_output(struct evbuffer *evb)
evbuffer_add_printf(evb, ", \"database\": %s", s.datname == NULL ? "null" : s.datname);
evbuffer_add_printf(evb, ", \"username\": %s", s.usename == NULL ? "null" : s.usename);
}
if (s.query != NULL) {
evbuffer_add_printf(evb, ", \"query\": ");
if (s.is_backend) evbuffer_add_printf(evb, "%s", s.query);
else evbuffer_add_printf(evb, "\"%s\"", s.query);
}
if (s.query != NULL)
evbuffer_add_printf(evb, ", \"query\": %s", s.query);
evbuffer_add_printf(evb, "}");
}
}
Expand Down
161 changes: 97 additions & 64 deletions postgres_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,58 +24,55 @@ static size_t postmaster_pid_len;
static char postmaster_pid[32];
static unsigned long long mem_page_size;
static SPIPlanPtr pg_stat_activity_query_plan;
static const char * const pg_stat_activity_query =
"WITH activity AS ("
"SELECT to_json(datname)::text AS datname,"
"a.pid as pid,"
"to_json(usename)::text AS usename,"
"client_addr,"
"client_port,"
"round(extract(epoch from (now() - xact_start)))::int as age,"
static const char * const pg_stat_activity_query =
"WITH locked_processes AS ("
"SELECT this.pid as pid, "
#if PG_VERSION_NUM >= 90600
"wait_event IS NOT NULL as "
#endif
"waiting,"
"to_json(CASE WHEN state = 'idle in transaction' THEN "
"CASE WHEN xact_start != state_change THEN "
"'idle in transaction ' || CAST("
"abs(round(extract(epoch from (now() - state_change)))) AS text) "
"ELSE state "
"END "
"WHEN state = 'active' THEN query "
"ELSE state "
"END)::text AS query,"
#if PG_VERSION_NUM >= 90600
"ARRAY(SELECT unnest(pg_blocking_pids(a.pid)) ORDER BY 1) as locked_by "
"ARRAY(SELECT unnest(pg_blocking_pids(this.pid)) ORDER BY 1) AS locked_by"
#else
"array_agg(distinct other.pid ORDER BY other.pid) as locked_by "
"array_agg(DISTINCT other.pid ORDER BY other.pid) AS locked_by"
#endif
"FROM pg_stat_activity a "
" FROM pg_locks this"
#if PG_VERSION_NUM < 90600
"LEFT JOIN pg_locks this ON this.pid = a.pid and this.granted = 'f' "
"LEFT JOIN pg_locks other ON this.locktype = other.locktype "
"AND this.database IS NOT DISTINCT FROM other.database "
"AND this.relation IS NOT DISTINCT FROM other.relation "
"AND this.page IS NOT DISTINCT FROM other.page "
"AND this.tuple IS NOT DISTINCT FROM other.tuple "
"AND this.virtualxid IS NOT DISTINCT FROM other.virtualxid "
"AND this.transactionid IS NOT DISTINCT FROM other.transactionid "
"AND this.classid IS NOT DISTINCT FROM other.classid "
"AND this.objid IS NOT DISTINCT FROM other.objid "
"AND this.objsubid IS NOT DISTINCT FROM other.objsubid "
"AND this.pid != other.pid "
"AND other.granted = 't' "
" JOIN pg_locks other ON this.locktype = other.locktype "
"AND this.database IS NOT DISTINCT FROM other.database "
"AND this.relation IS NOT DISTINCT FROM other.relation "
"AND this.page IS NOT DISTINCT FROM other.page "
"AND this.tuple IS NOT DISTINCT FROM other.tuple "
"AND this.virtualxid IS NOT DISTINCT FROM other.virtualxid "
"AND this.transactionid IS NOT DISTINCT FROM other.transactionid "
"AND this.classid IS NOT DISTINCT FROM other.classid "
"AND this.objid IS NOT DISTINCT FROM other.objid "
"AND this.objsubid IS NOT DISTINCT FROM other.objsubid "
"AND this.pid != other.pid "
"AND other.granted"
#endif
" WHERE NOT this.granted"
#if PG_VERSION_NUM < 90600
" GROUP BY 1"
#endif
"WHERE a.pid != pg_backend_pid() "
"AND datname IS NOT NULL "
"GROUP BY 1,2,3,4,5,6,7,8"
"), lockers AS ("
"SELECT DISTINCT(locked_by[1])"
" FROM activity "
"WHERE locked_by IS NOT NULL"
") SELECT datname, pid, usename, client_addr, client_port, age, waiting,"
" NULLIF(array_to_string(locked_by, ','), ''), query, pid IN (SELECT * FROM lockers)"
" FROM activity";
"SELECT DISTINCT unnest(locked_by)"
" FROM locked_processes"
") SELECT pid,"
" datname::text,"
" usename::text,"
" round(extract(epoch from (now() - COALESCE(xact_start, CASE WHEN state = 'active'"
" THEN query_start"
" ELSE NULL END))))::int AS age,"
" NULLIF(array_to_string(locked_by, ','), ''),"
" CASE WHEN state = 'idle in transaction' THEN"
" CASE WHEN xact_start != state_change THEN"
" 'idle in transaction ' || CAST("
" abs(round(extract(epoch from (now() - state_change)))) AS text)"
" ELSE state END"
" WHEN state = 'active' THEN query"
" ELSE state END::text AS query,"
" pid IN (SELECT * FROM lockers)"
" FROM pg_stat_activity a"
" LEFT JOIN locked_processes USING (pid) "
"WHERE pid != pg_backend_pid()"
" AND datname IS NOT NULL";

typedef struct {
proc_stat *values;
Expand Down Expand Up @@ -127,10 +124,10 @@ static bool pg_stat_list_add(pg_stat_list *list, pg_stat ps)
}


static size_t json_escaped_size(const char *s)
static size_t json_escaped_size(const char *s, size_t len)
{
size_t ret = 0;
while (true) {
while (len-- > 0) {
switch (*s) {
case 0x00:
return ret;
Expand Down Expand Up @@ -175,16 +172,19 @@ static size_t json_escaped_size(const char *s)
++s;
++ret;
}
return ret;
}

static char *json_escape_string(const char *s)
static char *json_escape_string_len(const char *s, size_t len)
{
char *ret = palloc(json_escaped_size(s) + 1);
char *ret = palloc(json_escaped_size(s, len) + 3);
char *r = ret;
*(r++) = '"';

while (true) {
while (len-- > 0) {
switch (*s) {
case 0x00:
*(r++) = '"';
*r = '\0';
return ret;
case '"':
Expand Down Expand Up @@ -264,6 +264,14 @@ static char *json_escape_string(const char *s)
++r;
++s;
}
*(r++) = '"';
*r = '\0';
return ret;
}

static char *json_escape_string(const char *s)
{
return json_escape_string_len(s, -1);
}

static unsigned long long get_memory_usage(const char *proc_file)
Expand Down Expand Up @@ -476,7 +484,7 @@ static void read_proc_cmdline(pg_stat *stat)
// cluster name can contain ' bgworker: ' string, so we need to skip all but last
// buf - 2 + 11 points to the first space character after 'postgres:' string
for (p = buf - 2; (p = strstr(p + 11, " bgworker: ")) != NULL; lp = p);
if (lp == NULL) stat->ps.cmdline = pstrdup("unknown");
if (lp == NULL) stat->ps.cmdline = pstrdup("\"unknown\"");
else {
for (p = lp + 11; *p != '\0'; ++p);
while (*(--p) == ' ') *p = '\0'; // trim whitespaces
Expand Down Expand Up @@ -569,26 +577,51 @@ static void get_pg_stat_activity(pg_stat_list *pg_stats)

if (SPI_processed > 0)
{
int a;
bool isnull, is_locker;
bool isnull, is_locker, is_idle;
int a, len;
Datum data;
text *value;
char *text_value;

MemoryContext oldcxt = MemoryContextSwitchTo(uppercxt);
for (a = 0; a < SPI_processed; a++) {
pg_stat ps = {0, };

ps.pid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 2, &isnull));
is_locker = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 10, &isnull));
ps.query = SPI_getvalue(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 9);
is_idle = true;

ps.pid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 1, &isnull));
is_locker = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 7, &isnull));
data = SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 6, &isnull);

if (is_locker || (ps.query != NULL && strncmp(ps.query, "\"idle\"", 7))) {
ps.datname = SPI_getvalue(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 1);
ps.usename = SPI_getvalue(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 3);
ps.age = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 6, &isnull));
if (!isnull) {
value = PG_DETOAST_DATUM_PACKED(data);
text_value = VARDATA_ANY(value);
len = VARSIZE_ANY_EXHDR(value);
is_idle = len == 4 && !strncmp(text_value, "idle", 4);
}

if (is_locker || !is_idle) {
ps.query = isnull ? NULL : json_escape_string_len(text_value, len);

data = SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 2, &isnull);
if (isnull) ps.datname = NULL;
else {
value = PG_DETOAST_DATUM_PACKED(data);
ps.datname = json_escape_string_len(VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value));
}

data = SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 3, &isnull);
if (isnull) ps.usename = NULL;
else {
value = PG_DETOAST_DATUM_PACKED(data);
ps.usename = json_escape_string_len(VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value));
}

ps.age = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 4, &isnull));
if (isnull) ps.age = -1;
ps.is_waiting = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 7, &isnull));
ps.locked_by = SPI_getvalue(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 8);
ps.locked_by = SPI_getvalue(SPI_tuptable->vals[a], SPI_tuptable->tupdesc, 5);
pg_stats->active_connections++;
} else FREE(ps.query);
}

ps.is_backend = true;
pg_stat_list_add(pg_stats, ps);
Expand Down
1 change: 0 additions & 1 deletion postgres_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ typedef struct {
char *datname;
char *usename;
int32 age;
bool is_waiting;
bool is_backend;
char *locked_by;
char *query;
Expand Down

0 comments on commit f03b97a

Please sign in to comment.