forked from leoleovich/grafsy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
209 lines (190 loc) · 6.13 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package main
import (
"log"
"net"
"os"
"time"
)
type Client struct {
conf Config
lc LocalConfig
mon *Monitoring
graphiteAddr net.TCPAddr
lg log.Logger
ch chan string
chM chan string
}
// Function takes file size and returning it as int64 in bytes
func (c Client) getFileSize(file string) int64 {
f, err := os.Open(file)
if err != nil {
return 0
}
stat, err := f.Stat()
f.Close()
if err != nil {
return 0
}
return stat.Size()
}
/*
Function saves []string to file. We need it cause it make a lot of IO to save and check size of file
After every single metric
*/
func (c Client) saveSliceToRetry(metrics []string) {
/*
If size of file is bigger, than max size we will remove lines from this file,
and will call this function again to check result and write to the file.
Recursion:)
*/
c.lg.Printf("Saving %d metrics to the retry-file", len(metrics))
f, err := os.OpenFile(c.conf.RetryFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
c.lg.Println(err.Error())
}
for _, metric := range metrics {
f.WriteString(metric + "\n")
c.mon.saved++
}
f.Close()
c.removeOldDataFromRetryFile()
}
func (c Client) saveChannelToRetry(ch chan string, size int) {
/*
If size of file is bigger, than max size we will remove lines from this file,
and will call this function again to check result and write to the file.
Recursion:)
*/
c.lg.Printf("Saving %d metrics to the retry-file from channel", size)
f, err := os.OpenFile(c.conf.RetryFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
c.lg.Println(err.Error())
}
for i := 0; i < size; i++ {
f.WriteString(<-ch + "\n")
c.mon.saved++
}
f.Close()
c.removeOldDataFromRetryFile()
}
/*
Function is cleaning up retry-file
wholeFile is sorted to have newest metrics on the beginning
So we need to keep newest metrics
*/
func (c Client) removeOldDataFromRetryFile() {
currentLinesInFile := getSizeInLinesFromFile(c.conf.RetryFile)
if currentLinesInFile > c.lc.fileMetricSize {
c.lg.Printf("I can not save to %s more, than %d. I will have to drop the rest (%d)",
c.conf.RetryFile, c.lc.fileMetricSize, currentLinesInFile-c.lc.fileMetricSize)
// We save first c.lc.fileMetricSize of metrics (newest)
wholeFile := readMetricsFromFile(c.conf.RetryFile)[:c.lc.fileMetricSize]
c.saveSliceToRetry(wholeFile)
}
}
func (c Client) tryToSendToGraphite(metric string, conn net.Conn) error {
_, err := conn.Write([]byte(metric + "\n"))
if err != nil {
c.lg.Println("Write to server failed:", err.Error())
return err
} else {
c.mon.sent++
return nil
}
}
/*
Sending data to graphite:
1) Metrics from monitor queue
2) Metrics from main quere
3) Retry file
*/
func (c Client) runClient() {
sup := Supervisor{c.conf.Supervisor}
for ; ; time.Sleep(time.Duration(c.conf.ClientSendInterval) * time.Second) {
var connectionFailed bool
// Notify watchdog about aliveness of Client routine
sup.notify()
// Try to dial to Graphite server. If ClientSendInterval is 10 seconds - dial should be no longer than 1 second
conn, err := net.DialTimeout("tcp", c.graphiteAddr.String(), time.Duration(c.conf.ConnectTimeout)*time.Second)
if err != nil {
c.lg.Println("Can not connect to graphite server: ", err.Error())
c.saveChannelToRetry(c.chM, len(c.chM))
c.saveChannelToRetry(c.ch, len(c.ch))
c.removeOldDataFromRetryFile()
continue
} else {
// We set dead line for connection to write. It should be the rest of we have for client interval
err := conn.SetWriteDeadline(time.Now().Add(time.Duration(c.conf.ClientSendInterval - c.conf.ConnectTimeout - 1)*time.Second))
if err != nil {
c.lg.Println("Can not set deadline for connection: ", err.Error())
connectionFailed = true
}
processedTotal := 0
// We send retry file first, we have a risk to lose old data
if !connectionFailed {
retryFileMetrics := readMetricsFromFile(c.conf.RetryFile)
for numOfMetricFromFile, metricFromFile := range retryFileMetrics {
if numOfMetricFromFile + 1 < c.lc.mainBufferSize {
err = c.tryToSendToGraphite(metricFromFile, conn)
if err != nil {
c.lg.Printf("Error happened in the middle of writing retry metrics. Resaving %d metrics\n", len(retryFileMetrics) - numOfMetricFromFile)
// If we failed to write a metric to graphite - something is wrong with connection
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:])
connectionFailed = true
break
} else {
c.mon.got.retry++
}
} else {
c.lg.Printf("Can read only %d metrics from %s. Rest will be kept for the next run", numOfMetricFromFile + 1, c.conf.RetryFile)
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:])
break
}
processedTotal++
}
}
// Monitoring. We read it always and we reserved space for it
bufSize := len(c.chM)
if !connectionFailed {
for i := 0; i < bufSize; i++ {
err = c.tryToSendToGraphite(<-c.chM, conn)
if err != nil {
c.lg.Println("Error happened in the middle of writing monitoring metrics. Saving...")
c.saveChannelToRetry(c.chM, bufSize-i)
connectionFailed = true
break
}
}
} else {
c.saveChannelToRetry(c.chM, bufSize)
}
/*
Main Buffer. We read it completely but send only part which fits in mainBufferSize
Rests we save
*/
bufSize = len(c.ch)
if !connectionFailed {
for processedMainBuff := 0; processedMainBuff < bufSize; processedMainBuff, processedTotal = processedMainBuff + 1, processedTotal + 1 {
if processedTotal < c.lc.mainBufferSize {
err = c.tryToSendToGraphite(<-c.ch, conn)
if err != nil {
c.lg.Printf("Error happened in the middle of writing metrics. Saving %d metrics\n", bufSize - processedMainBuff)
c.saveChannelToRetry(c.ch, bufSize - processedMainBuff)
break
}
} else {
/*
Save only data for the moment of run. Concurrent goroutines know no mercy
and they continue to write...
*/
c.saveChannelToRetry(c.ch, bufSize - processedMainBuff)
break
}
}
} else {
c.saveChannelToRetry(c.chM, bufSize)
}
}
conn.Close()
}
}