-
Notifications
You must be signed in to change notification settings - Fork 0
/
insert.go
65 lines (56 loc) · 1.51 KB
/
insert.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package influxdb
import (
"time"
"github.com/danielstutzman/sync-log-files-to-db/src/log"
clientPkg "github.com/influxdata/influxdb/client/v2"
)
func (conn *Connection) InsertMaps(tagsSet map[string]bool,
maps []map[string]interface{}) {
// Create a batch
points, err := clientPkg.NewBatchPoints(clientPkg.BatchPointsConfig{
Database: conn.databaseName,
})
if err != nil {
log.Fatalw("Error from NewBatchPoints", "err", err)
}
for _, mapUnfiltered := range maps {
tags := map[string]string{}
fields := map[string]interface{}{}
for key, value := range mapUnfiltered {
if key == "timestamp" {
// skip
} else if tagsSet[key] {
tags[key] = value.(string)
} else {
fields[key] = value
}
}
point, err := clientPkg.NewPoint(conn.measurementName, tags,
fields, mapUnfiltered["timestamp"].(time.Time))
if err != nil {
log.Fatalw("Error from NewPoint", "err", err)
}
points.AddPoint(point)
}
if false {
var earliestTime time.Time
var latestTime time.Time
for _, m := range maps {
timestamp := m["timestamp"].(time.Time)
if earliestTime.IsZero() || earliestTime.After(timestamp) {
earliestTime = timestamp
}
if timestamp.After(latestTime) {
latestTime = timestamp
}
}
log.Infow("Inserted Influx DB points",
"measurement_name", conn.measurementName,
"num_points", len(points.Points()),
"earliest", earliestTime,
"latest", latestTime)
}
if err := conn.client.Write(points); err != nil {
log.Fatalw("Error from Write", "err", err)
}
}