-
Notifications
You must be signed in to change notification settings - Fork 15
/
sql.go
116 lines (103 loc) · 3.04 KB
/
sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"database/sql"
"fmt"
"runtime"
"strconv"
"strings"
"time"
)
type queryParams []interface{}
type scanTo []interface{}
var insertStatusChange = "insert into status_changes (model_id, status, timestamp) values (?,?,?)"
var updateLastStatusChange = `
insert into last_status_changes (model_id, status, timestamp)
values (?,?,?)
on conflict(model_id) do update set status=excluded.status, timestamp=excluded.timestamp`
var updateModelStatus = `
insert into models (model_id, status)
values (?,?)
on conflict(model_id) do update set status=excluded.status`
var storeNotification = `
insert into notification_queue (endpoint, chat_id, model_id, status, time_diff, image_url, social, priority, sound, kind)
values (?,?,?,?,?,?,?,?,?,?)`
func gid() int {
var buf [64]byte
n := runtime.Stack(buf[:], false)
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
id, err := strconv.Atoi(idField)
if err != nil {
checkErr(fmt.Errorf("cannot get goroutine id: %v", err))
}
return id
}
func (w *worker) checkTID() {
if !w.cfg.CheckGID {
return
}
current := gid()
if w.mainGID != current {
checkErr(fmt.Errorf("database queries should be run from single thread, expected: %d, actual: %d", w.mainGID, current))
}
}
func (w *worker) measure(query string) func() {
now := time.Now()
w.checkTID()
return func() {
elapsed := time.Since(now).Seconds()
data := w.durations[query]
data.avg = (data.avg*float64(data.count) + elapsed) / float64(data.count+1)
data.count++
w.durations[query] = data
}
}
func (w *worker) mustExec(query string, args ...interface{}) {
defer w.measure("db: " + query)()
stmt, err := w.db.Prepare(query)
checkErr(err)
_, err = stmt.Exec(args...)
checkErr(err)
checkErr(stmt.Close())
}
func (w *worker) mustExecPrepared(query string, stmt *sql.Stmt, args ...interface{}) {
w.checkTID()
_, err := stmt.Exec(args...)
checkErr(err)
}
func (w *worker) mustInt(query string, args ...interface{}) (result int) {
defer w.measure("db: " + query)()
row := w.db.QueryRow(query, args...)
checkErr(row.Scan(&result))
return result
}
func (w *worker) mustString(query string, args ...interface{}) (result string) {
defer w.measure("db: " + query)()
row := w.db.QueryRow(query, args...)
checkErr(row.Scan(&result))
return result
}
func (w *worker) maybeRecord(query string, args queryParams, record scanTo) bool {
defer w.measure("db: " + query)()
row := w.db.QueryRow(query, args...)
err := row.Scan(record...)
if err == sql.ErrNoRows {
return false
}
checkErr(err)
return true
}
func (w *worker) mustStrings(queryString string, args ...interface{}) (result []string) {
var current string
w.mustQuery(queryString, args, scanTo{¤t}, func() { result = append(result, current) })
return
}
func (w *worker) mustQuery(queryString string, args queryParams, record scanTo, store func()) {
defer w.measure("db: " + queryString)()
query, err := w.db.Query(queryString, args...)
checkErr(err)
for query.Next() {
checkErr(query.Scan(record...))
store()
}
checkErr(query.Close())
}