/
setup.go
118 lines (106 loc) · 2.84 KB
/
setup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Package setup helps initialize the database and all queries.
package setup
import (
"database/sql"
"errors"
"fmt"
"sync"
"time"
"github.com/Shyp/go-simple-metrics"
"github.com/Shyp/rickover/models"
"github.com/Shyp/rickover/models/archived_jobs"
"github.com/Shyp/rickover/models/db"
"github.com/Shyp/rickover/models/jobs"
"github.com/Shyp/rickover/models/queued_jobs"
)
var mu sync.Mutex
// TODO not sure for the best place for this to live.
var activeQueriesStmt *sql.Stmt
func prepare() (err error) {
if !db.Connected() {
return errors.New("No DB connection was established, can't query")
}
activeQueriesStmt, err = db.Conn.Prepare(`-- setup.GetActiveQueries
SELECT count(*) FROM pg_stat_activity
WHERE state='active'
`)
return
}
func GetActiveQueries() (count int64, err error) {
err = activeQueriesStmt.QueryRow().Scan(&count)
return
}
// TODO all of these should use a different database connection than the server
// or the worker, to avoid contention.
func MeasureActiveQueries(interval time.Duration) {
for _ = range time.Tick(interval) {
count, err := GetActiveQueries()
if err == nil {
go metrics.Measure("active_queries.count", count)
} else {
go metrics.Increment("active_queries.error")
}
}
}
func MeasureQueueDepth(interval time.Duration) {
for _ = range time.Tick(interval) {
allCount, readyCount, err := queued_jobs.CountReadyAndAll()
if err == nil {
go metrics.Measure("queue_depth.all", int64(allCount))
go metrics.Measure("queue_depth.ready", int64(readyCount))
} else {
go metrics.Increment("queue_depth.error")
}
}
}
func MeasureInProgressJobs(interval time.Duration) {
for _ = range time.Tick(interval) {
m, err := queued_jobs.GetCountsByStatus(models.StatusInProgress)
if err == nil {
count := int64(0)
for k, v := range m {
count += v
go metrics.Measure(fmt.Sprintf("queued_jobs.%s.in_progress", k), v)
}
go metrics.Measure("queued_jobs.in_progress", count)
} else {
go metrics.Increment("queued_jobs.in_progress.error")
}
}
}
// DB initializes a connection to the database, and prepares queries on all
// models.
func DB(connector db.Connector, dbConns int) error {
mu.Lock()
defer mu.Unlock()
if db.Conn != nil {
if err := db.Conn.Ping(); err == nil {
// Already connected.
return nil
}
}
conn, err := connector.Connect(dbConns)
db.Conn = conn
if err != nil {
return errors.New("Could not establish a database connection: " + err.Error())
}
if err := db.Conn.Ping(); err != nil {
return errors.New("Could not establish a database connection: " + err.Error())
}
return PrepareAll()
}
func PrepareAll() error {
if err := jobs.Setup(); err != nil {
return err
}
if err := queued_jobs.Setup(); err != nil {
return err
}
if err := archived_jobs.Setup(); err != nil {
return err
}
if err := prepare(); err != nil {
return err
}
return nil
}