Skip to content

Commit

Permalink
Use telegraf for collecting (some) of the SQL stats, thanks to a prot…
Browse files Browse the repository at this point in the history
…ip from @erasche
  • Loading branch information
natefoo committed Oct 4, 2018
1 parent 6bf8ecf commit d0d15fa
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 78 deletions.
19 changes: 19 additions & 0 deletions env/tacc/group_vars/galaxydbservers.yml
@@ -0,0 +1,19 @@
---

dbservers_group_templates:
- src: templates/stats/galaxy_db_slurp.sh.j2
dest: /usr/local/bin/galaxy_db_slurp.sh
owner: root
group: root
mode: '0755'

telegraf_plugins_extra:
galaxy_db_slurp:
plugin: "exec"
config:
- commands = ["/usr/local/bin/galaxy_db_slurp.sh"]
- timeout = "120s"
- data_format = "influx"
- interval = "1m"
- '[inputs.exec.tags]'
- ' influxdb_database = "{{ galaxy_instance_codename }}_sql"'
7 changes: 7 additions & 0 deletions env/tacc/group_vars/galaxyservers.yml
Expand Up @@ -106,6 +106,13 @@ telegraf_agent_output:
- 'tagexclude = ["influxdb_database"]'
- '[outputs.influxdb.tagpass]'
- ' influxdb_database = ["telegraf"]'
- type: influxdb
config:
- urls = ["http://stats.galaxyproject.org:8086"]
- database = "{{ galaxy_instance_codename }}_sql"
- 'tagexclude = ["influxdb_database"]'
- '[outputs.influxdb.tagpass]'
- ' influxdb_database = ["{{ galaxy_instance_codename }}_sql"]'

telegraf_plugins_default:
- plugin: statsd
Expand Down
28 changes: 28 additions & 0 deletions env/tacc/group_vars/maingalaxydbservers.yml
@@ -0,0 +1,28 @@
---

galaxy_instance_codename: main

stats_slurp:
db_name: galaxy_main
galaxy_handlers:
- main_w3_handler0
- main_w3_handler1
- main_w3_handler2
- main_w4_handler0
- main_w4_handler1
- main_w4_handler2

telegraf_agent_output:
- type: influxdb
config:
- urls = ["http://stats.galaxyproject.org:8086"]
- database = "system"
- '[outputs.influxdb.tagdrop]'
- ' influxdb_database = ["*"]'
- type: influxdb
config:
- urls = ["http://stats.galaxyproject.org:8086"]
- database = "{{ galaxy_instance_codename }}_sql"
- 'tagexclude = ["influxdb_database"]'
- '[outputs.influxdb.tagpass]'
- ' influxdb_database = ["{{ galaxy_instance_codename }}_sql"]'
13 changes: 13 additions & 0 deletions env/tacc/group_vars/testgalaxydbservers.yml
@@ -0,0 +1,13 @@
---

galaxy_instance_codename: test

stats_slurp:
db_name: galaxy_test
galaxy_handlers:
- test_handler0
- test_handler1
- test_handler2
- test_handler3

# test's telegraf output config is in galaxyservers.yml
11 changes: 11 additions & 0 deletions env/tacc/inventory
Expand Up @@ -3,9 +3,17 @@
testgalaxyservers
maingalaxyservers

[galaxydbservers]
[galaxydbservers:children]
testgalaxydbservers
maingalaxydbservers

[testgalaxyservers]
galaxy07.tacc.utexas.edu

[testgalaxydbservers]
galaxy07.tacc.utexas.edu

[maingalaxyservers]
[maingalaxyservers:children]
maingalaxywebservers
Expand All @@ -19,6 +27,9 @@ galaxy-web-06.tacc.utexas.edu
galaxy-web-03.tacc.utexas.edu
galaxy-web-04.tacc.utexas.edu

[maingalaxydbservers]
galaxy-db-02.tacc.utexas.edu

[galaxynodes]
roundup[49:64].tacc.utexas.edu

Expand Down
93 changes: 93 additions & 0 deletions env/tacc/templates/stats/galaxy_db_slurp.sh.j2
@@ -0,0 +1,93 @@
#!/bin/bash
##
## This file is maintained by Ansible - CHANGES WILL BE OVERWRITTEN
##
#set -xv
set -e

function arr2awk() {
t=$(declare -p $1)
eval "declare -A t="${t#*=}
s=
for k in "${!t[@]}"; do
s+="$k=\"\$${t[$k]}\","
done
echo -n ${s:0:-2}
}

function query() {
if [ -n "$4" ]; then
psql {{ stats_slurp.db_name }} -c "COPY ($1) TO STDOUT WITH CSV" | awk -F, "{print \"${2},$(arr2awk $4)\" $(arr2awk $3)}"
else
psql {{ stats_slurp.db_name }} -c "COPY ($1) TO STDOUT WITH CSV" | awk -F, "{print \"${2} $(arr2awk $3)}"
fi
}


function jobs_queued_internal_by_handler() {
sql="
SELECT
t.handler AS handler,
coalesce(j.ct, 0) AS ct
FROM
(
SELECT
unnest(
ARRAY['{{ stats_slurp.galaxy_handlers | join("', '") }}']
) AS handler
) AS t
LEFT OUTER JOIN
(
SELECT
handler AS handler,
count(handler) AS ct
FROM
job
WHERE
state = 'queued'
AND job_runner_external_id IS null
GROUP BY
handler
) AS j
ON
t.handler = j.handler
"
declare -A fields=( [count]=2 )
declare -A tags=( [handler]=1 )
query "$sql" "${FUNCNAME[0]}" fields tags
}

function jobs_queued() {
sql="
SELECT
sum(CASE WHEN job_runner_external_id IS NOT null THEN 1 ELSE 0 END),
sum(CASE WHEN job_runner_external_id IS null THEN 1 ELSE 0 END)
FROM
job
WHERE
state = 'queued'
"
declare -A fields=( [internal]=2 [external]=1 )
query "$sql" "${FUNCNAME[0]}" fields
}

function disk_usage_sum() {
sql="
SELECT
sum(total_size)
FROM
dataset
WHERE
NOT purged
"
declare -A fields=( [value]=1 )
query "$sql" "${FUNCNAME[0]}" fields
}

#for f in "$@"; do
# $f
#done

jobs_queued_internal_by_handler
jobs_queued
disk_usage_sum
78 changes: 0 additions & 78 deletions roles/stats/templates/galaxy_slurp.py.j2
Expand Up @@ -12,23 +12,6 @@ PGCONNS = {
'test': 'host=galaxy07.tacc.utexas.edu user=grafana password={{ galaxy_test_grafana_db_password }} dbname=galaxy_test',
'main': 'host=galaxy-db-02.tacc.utexas.edu user=grafana password={{ galaxy_main_grafana_db_password }} dbname=galaxy_main',
}
HANDLERS = {
'test': (
'test_handler0',
'test_handler1',
'test_handler2',
'test_handler3',
'test_handler4',
),
'main': (
'main_w3_handler0',
'main_w3_handler1',
'main_w3_handler2',
'main_w4_handler0',
'main_w4_handler1',
'main_w4_handler2',
),
}
time = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')


Expand Down Expand Up @@ -102,44 +85,6 @@ def sql_avg_usage(min_usage=0, days=0):
)


def sql_sum_usage():
return """
SELECT
sum(total_size)
FROM
dataset
WHERE
NOT purged
"""


def sql_count_internally_queued_jobs():
return """
SELECT
handler,
count(handler)
FROM
job
WHERE
state = 'queued'
AND job_runner_external_id IS null
GROUP BY
handler
"""


def sql_count_total_queued_jobs():
return """
SELECT
sum(CASE WHEN job_runner_external_id IS NOT null THEN 1 ELSE 0 END),
sum(CASE WHEN job_runner_external_id IS null THEN 1 ELSE 0 END)
FROM
job
WHERE
state = 'queued'
"""


def make_measurement(measurement, value, tags=None):
m = {
'measurement': measurement,
Expand All @@ -162,19 +107,6 @@ def pg_execute(pconn_str, sql):
pconn.close()


def get_internally_queued_jobs(instance):
measurements = []
counts = {}
pconn_str = PGCONNS[instance]
for handler in HANDLERS[instance]:
counts[handler] = 0
for handler, count in pg_execute(pconn_str, sql_count_internally_queued_jobs()):
counts[handler] = int(count)
for handler, count in counts.items():
measurements.append(make_measurement('jobs_queued_internal_by_handler', count, tags={'handler': handler}))
return measurements


def collect(instance):
measurements = []
pconn_str = PGCONNS[instance]
Expand All @@ -196,16 +128,6 @@ def collect(instance):
}
if row[0]:
measurements.append(make_measurement('disk_usage_top', float(row[2]), tags=tags))
measurements.append(
make_measurement(
'disk_usage_sum',
int(next(pg_execute(pconn_str, sql_sum_usage()))[0]),
)
)
queued_external, queued_internal = [int(i) for i in next(pg_execute(pconn_str, sql_count_total_queued_jobs()))]
measurements.append(make_measurement('jobs_queued_external', queued_external))
measurements.append(make_measurement('jobs_queued_internal', queued_internal))
measurements.extend(get_internally_queued_jobs(instance))
return measurements


Expand Down

0 comments on commit d0d15fa

Please sign in to comment.