/
discovery.go
135 lines (119 loc) · 3.38 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package elasticache
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/elasticache"
"github.com/coroot/coroot-aws-agent/flags"
"github.com/coroot/coroot-aws-agent/utils"
"github.com/coroot/logger"
"github.com/prometheus/client_golang/prometheus"
"time"
)
type Discoverer struct {
reg prometheus.Registerer
awsSession *session.Session
instances map[string]*Collector
logger logger.Logger
}
func NewDiscoverer(reg prometheus.Registerer, awsSession *session.Session) *Discoverer {
d := &Discoverer{
reg: reg,
awsSession: awsSession,
instances: map[string]*Collector{},
logger: logger.NewKlog(""),
}
return d
}
func (d *Discoverer) Run() {
api := elasticache.New(d.awsSession)
if err := d.refresh(api); err != nil {
d.logger.Warning(err)
}
ticker := time.Tick(*flags.DiscoveryInterval)
for range ticker {
if err := d.refresh(api); err != nil {
d.logger.Warning(err)
}
}
}
func (d *Discoverer) refresh(api *elasticache.ElastiCache) error {
t := time.Now()
defer func() {
d.logger.Info("elasticache clusters refreshed in:", time.Since(t))
}()
var clusters []*elasticache.CacheCluster
var err error
for _, v := range []bool{false, true} {
input := &elasticache.DescribeCacheClustersInput{}
input.ShowCacheNodeInfo = aws.Bool(true)
input.ShowCacheClustersNotInReplicationGroups = aws.Bool(v)
for {
output, err := api.DescribeCacheClusters(input)
if err != nil {
return err
}
for _, cluster := range output.CacheClusters {
i := &elasticache.ListTagsForResourceInput{ResourceName: cluster.ARN}
tags := map[string]string{}
o, err := api.ListTagsForResource(i)
if err != nil {
d.logger.Error(err)
} else {
for _, t := range o.TagList {
tags[aws.StringValue(t.Key)] = aws.StringValue(t.Value)
}
}
if utils.Filtered(*flags.ElasticacheFilters, tags) {
d.logger.Infof(
"cluster %s (tags: %s) was skipped according to the tag-based filters: %s",
aws.StringValue(cluster.CacheClusterId),
tags,
*flags.ElasticacheFilters,
)
continue
}
clusters = append(clusters, cluster)
}
if output.Marker != nil {
input.SetMarker(aws.StringValue(output.Marker))
continue
}
break
}
}
actualInstances := map[string]bool{}
for _, cluster := range clusters {
for _, node := range cluster.CacheNodes {
id := aws.StringValue(cluster.CacheClusterId) + "/" + aws.StringValue(node.CacheNodeId)
actualInstances[id] = true
i, ok := d.instances[id]
if !ok {
d.logger.Info("new Elasticache instance found:", id)
i, err = NewCollector(d.awsSession, cluster, node)
if err != nil {
d.logger.Warning("failed to init Elasticache collector:", err)
continue
}
if err := d.wrappedReg(id).Register(i); err != nil {
d.logger.Warning(err)
continue
}
d.instances[id] = i
}
i.update(cluster, node)
}
}
for id, i := range d.instances {
if !actualInstances[id] {
d.logger.Info("Elasticache instance no longer exists:", id)
d.wrappedReg(id).Unregister(i)
i.Close()
delete(d.instances, id)
}
}
return nil
}
func (d *Discoverer) wrappedReg(instanceId string) prometheus.Registerer {
id := utils.IdWithRegion(aws.StringValue(d.awsSession.Config.Region), instanceId)
return prometheus.WrapRegistererWith(prometheus.Labels{"ec_instance_id": id}, d.reg)
}