-
Notifications
You must be signed in to change notification settings - Fork 0
/
backend_postgres.go
141 lines (109 loc) · 2.85 KB
/
backend_postgres.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package stats
import (
"context"
"database/sql"
_ "embed"
"fmt"
"time"
_ "github.com/lib/pq"
)
type storeRequest struct {
domain string
questionType string
metadata Metadata
}
type backendPostgres struct {
ctx context.Context
ctxCancel context.CancelFunc
uri string
workers int64
queryTimeout time.Duration
statsPrefix string
maxEntryAge time.Duration
logger Logger
db *sql.DB
tableName string
dataChan chan storeRequest
maxEntryCleanTicker <-chan struct{}
}
//go:embed sql/init_postgres.sql
var queryInit string
//go:embed sql/insert_stats_postgres.sql
var queryInsert string
//go:embed sql/cleanup_old_entries_postgres.sql
var queryCleanup string
func newBackendPostgres(uri string, workers int64, queryTimeout time.Duration, statsPrefix string, maxEntryAge time.Duration, maxEntryCleanTicker <-chan struct{}, logger Logger) *backendPostgres {
ctx, cancel := context.WithCancel(context.Background())
return &backendPostgres{
ctx: ctx,
ctxCancel: cancel,
uri: uri,
workers: workers,
queryTimeout: queryTimeout,
statsPrefix: statsPrefix,
maxEntryAge: maxEntryAge,
tableName: statsPrefix + "_stats",
logger: logger,
maxEntryCleanTicker: maxEntryCleanTicker,
}
}
func (b *backendPostgres) Start() error {
db, err := sql.Open("postgres", b.uri)
if err != nil {
return err
}
b.db = db
query := fmt.Sprintf(queryInit, b.tableName)
if _, err := b.db.Exec(query); err != nil {
return fmt.Errorf("error trying to init database: %w", err)
}
b.dataChan = make(chan storeRequest, b.workers)
for i := int64(0); i < b.workers; i++ {
go b.worker()
}
return nil
}
func (b *backendPostgres) Store(domain string, questionType string, metadata Metadata) {
b.dataChan <- storeRequest{
domain: domain,
questionType: questionType,
metadata: metadata,
}
}
func (b *backendPostgres) Stop() error {
b.ctxCancel()
return b.db.Close()
}
func (b *backendPostgres) insert(data storeRequest) {
ctx, cancel := context.WithTimeout(b.ctx, b.queryTimeout)
defer cancel()
query := fmt.Sprintf(queryInsert, b.tableName)
_, err := b.db.ExecContext(ctx, query, data.domain, data.questionType, &data.metadata)
if err != nil {
b.logger.Error("failed to insert data into database", err)
}
}
func (b *backendPostgres) worker() {
for {
select {
case <-b.ctx.Done():
return
case data := <-b.dataChan:
b.insert(data)
case <-b.maxEntryCleanTicker:
b.cleanup()
}
}
}
func (b backendPostgres) cleanup() {
ctx, cancel := context.WithTimeout(b.ctx, b.queryTimeout)
defer cancel()
query := fmt.Sprintf(queryCleanup, b.tableName, b.maxEntryAge)
_, err := b.db.ExecContext(ctx, query)
if err != nil {
b.logger.Error("failed to cleanup old entries from database", err)
}
}
func (b backendPostgres) Ready() bool {
return b.db.PingContext(b.ctx) == nil
}