-
Notifications
You must be signed in to change notification settings - Fork 1
/
influxdb.go
executable file
·154 lines (139 loc) · 3.96 KB
/
influxdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// write MultiDataPoint into a file.
// no rotation currently
package backends
import (
"fmt"
"github.com/influxdb/influxdb/client"
"github.com/oliveagle/hickwall/backends/config"
"github.com/oliveagle/hickwall/logging"
"github.com/oliveagle/hickwall/newcore"
"time"
)
var (
_ = time.Now()
_ = fmt.Sprintf("")
)
type influxdbBackend struct {
name string
closing chan chan error // for Close
updates chan newcore.MultiDataPoint // for receive updates
// influxdb backend specific attributes
conf *config.Transport_influxdb
output InfluxdbClient
version string
}
func NewInfluxdbBackend(name string, conf *config.Transport_influxdb) (newcore.Publication, error) {
s := &influxdbBackend{
name: name,
closing: make(chan chan error),
updates: make(chan newcore.MultiDataPoint),
conf: conf,
version: influxdbParseVersionFromString(conf.Version),
}
go s.loop()
return s, nil
}
func (b *influxdbBackend) newInfluxdbClientFromConf() error {
iclient, err := NewInfluxdbClient(map[string]interface{}{
"Host": b.conf.Host,
"URL": b.conf.URL,
"Username": b.conf.Username,
"Password": b.conf.Password,
"UserAgent": "",
"Database": b.conf.Database,
"FlatTemplate": b.conf.FlatTemplate,
}, b.version)
if err != nil && iclient == nil {
logging.Error("failed to create influxdb client: ", err)
return fmt.Errorf("failed to create influxdb client: ", err)
}
b.output = iclient
return nil
}
func (b *influxdbBackend) loop() {
var (
startConsuming <-chan newcore.MultiDataPoint
try_create_client_once chan bool
try_create_client_tick <-chan time.Time
)
startConsuming = b.updates
logging.Debug("influxdb backend loop started ")
for {
if b.output == nil && try_create_client_once == nil && try_create_client_tick == nil {
startConsuming = nil // disable consuming
try_create_client_once = make(chan bool)
// try to create influxdb client the first time async.
go func() {
err := b.newInfluxdbClientFromConf()
if err == nil {
try_create_client_once <- true
} else {
try_create_client_once <- false
}
}()
}
select {
case md := <-startConsuming:
if b.output != nil {
points := []client.Point{}
for _, p := range md {
points = append(points, client.Point{
Name: p.Metric.Clean(),
Timestamp: p.Timestamp,
Fields: map[string]interface{}{
"value": p.Value,
},
Tags: p.Tags, //TODO: Tags
})
}
write := client.BatchPoints{
Database: b.conf.Database,
RetentionPolicy: b.conf.RetentionPolicy,
Points: points,
}
// logging.Debugf("write: count: %d", len(md))
//FIXME: connection timeout?
b.output.Write(write)
}
case opened := <-try_create_client_once:
try_create_client_once = nil // disable this branch
if !opened {
// failed open it the first time,
// then we try to open file with time interval, until opened successfully.
logging.Debug("open the first time failed, try to open with interval of 1s")
try_create_client_tick = time.Tick(time.Second * 1)
} else {
startConsuming = b.updates
}
case <-try_create_client_tick:
// try to open with interval
err := b.newInfluxdbClientFromConf()
if b.output != nil && err == nil {
// finally opened.
try_create_client_tick = nil
startConsuming = b.updates
} else {
logging.Critical("influxdb backend trying to open file but failed: %s", err)
}
case errc := <-b.closing:
// fmt.Println("errc <- b.closing")
logging.Debug("influxdb backend .loop closing")
startConsuming = nil // stop comsuming
errc <- nil
close(b.updates)
logging.Debug("influxdb backend .loop stopped")
return
}
}
}
func (b *influxdbBackend) Updates() chan<- newcore.MultiDataPoint {
return b.updates
}
func (b *influxdbBackend) Close() error {
errc := make(chan error)
b.closing <- errc
return <-errc
}
func (b *influxdbBackend) Name() string {
return b.name
}