-
Notifications
You must be signed in to change notification settings - Fork 8
/
writer.go
161 lines (140 loc) · 5.28 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright 2022 Block, Inc.
package heartbeat
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/cashapp/blip"
"github.com/cashapp/blip/sqlutil"
"github.com/cashapp/blip/status"
)
const BLIP_TABLE_DDL = `CREATE TABLE IF NOT EXISTS heartbeat (
src_id varchar(200) NOT NULL PRIMARY KEY,
src_role varchar(200) NULL DEFAULT NULL,
ts timestamp(3) NOT NULL, -- heartbeat
freq smallint unsigned NOT NULL -- milliseconds
) ENGINE=InnoDB`
// WriteTimeout is how long to wait for MySQL to execute any heartbeat write.
// This should be much greater than the write frequency (config.hearbeat.freq)
// because it allows for slow network, MySQL, and so on.
var WriteTimeout = 5 * time.Second
// InitErrorWait is how long to wait between retries when initializing the
// heartbeat table (the first INSERT). This should be a relatively long wait.
var InitErrorWait = 10 * time.Second
// ReadOnlyWait is how long to wait when MySQL is read-only (not writable).
// This should be a long wait because it could mean Blip is effectively in
// standby mode on a read-only replica until it's promoted to be the writable
// source, which might not happen for a very long time.
var ReadOnlyWait = 20 * time.Second
type Writer struct {
monitorId string
db *sql.DB
srcId string
srcRole string
freq time.Duration
table string
}
func NewWriter(monitorId string, db *sql.DB, cfg blip.ConfigHeartbeat) *Writer {
if cfg.Freq == "" {
panic("heartbeat.NewWriter called but config.heartbeat.freq not set")
}
if cfg.Table == "" {
panic("heartbeat.NewWriter called but config.heartbeat.table not set")
}
freq, _ := time.ParseDuration(cfg.Freq)
srcId := cfg.SourceId
if srcId == "" {
srcId = monitorId
}
return &Writer{
monitorId: monitorId,
db: db,
srcId: srcId,
srcRole: cfg.Role,
freq: freq,
table: sqlutil.SanitizeTable(cfg.Table, blip.DEFAULT_DATABASE),
}
}
func (w *Writer) Write(stopChan, doneChan chan struct{}) error {
defer close(doneChan)
defer status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "stopped")
var (
err error
ctx context.Context
cancel context.CancelFunc
)
// First INSERT: either creates row if it doesn't exist for this monitor ID,
// or it updates an existing row with the current timestamp and frequency.
// This must be done else the simpler UPDATE statements below, which is the
// real heartbeat, will fail because there's no match row.
var ping string
if w.srcRole != "" {
ping = fmt.Sprintf("INSERT INTO %s (src_id, src_role, ts, freq) VALUES ('%s', '%s', NOW(3), %d) ON DUPLICATE KEY UPDATE ts=NOW(3), freq=%d, src_role='%s'",
w.table, w.srcId, w.srcRole, w.freq.Milliseconds(), w.freq.Milliseconds(), w.srcRole)
} else {
ping = fmt.Sprintf("INSERT INTO %s (src_id, src_role, ts, freq) VALUES ('%s', NULL, NOW(3), %d) ON DUPLICATE KEY UPDATE ts=NOW(3), freq=%d, src_role=NULL",
w.table, w.monitorId, w.freq.Milliseconds(), w.freq.Milliseconds())
}
blip.Debug("%s: first heartbeat: %s", w.monitorId, ping)
for {
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "first insert")
ctx, cancel = context.WithTimeout(context.Background(), WriteTimeout)
_, err = w.db.ExecContext(ctx, ping)
cancel()
if err == nil { // success
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "sleep")
break
}
// Error --
blip.Debug("%s: first heartbeat failed: %s", w.monitorId, err)
if sqlutil.ReadOnly(err) {
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "init: MySQL is read-only, sleeping %s", ReadOnlyWait)
time.Sleep(ReadOnlyWait)
} else {
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "init: error: %s (sleeping %s)", err, InitErrorWait)
time.Sleep(InitErrorWait)
}
// Was Stop called?
select {
case <-stopChan: // yes, return immediately
return nil
default: // no
}
}
// ----------------------------------------------------------------------
// Write heartbeats
// This is the critical loop, so we use a query literal, not SQL ? params,
// to void 2 wasted round trips: prep (waste), exec, close (waste).
// This risk of SQL injection is miniscule because both table and monitorId
// are sanitized, and Blip should only have write privs on its heartbeat table.
ping = fmt.Sprintf("UPDATE %s SET ts=NOW(3) WHERE src_id='%s'", w.table, w.srcId)
blip.Debug("%s: heartbeat: %s", w.monitorId, ping)
for {
time.Sleep(w.freq)
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "write")
ctx, cancel = context.WithTimeout(context.Background(), WriteTimeout)
_, err = w.db.ExecContext(ctx, ping)
cancel()
if err != nil {
blip.Debug("%s: %s", w.monitorId, err.Error())
if sqlutil.ReadOnly(err) {
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "MySQL is read-only, sleeping %s", ReadOnlyWait)
time.Sleep(ReadOnlyWait)
} else {
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "write error: %s", err)
// No special sleep on random errors; keep trying to write at freq
}
} else {
// Set status on successful Exec here, not before Sleep, so it
// doesn't overwrite status set on Exec error; "sleep" = "write OK"
status.Monitor(w.monitorId, status.HEARTBEAT_WRITER, "sleep")
}
// Was Stop called?
select {
case <-stopChan: // yes, return immediately
return nil
default: // no
}
}
}