-
Notifications
You must be signed in to change notification settings - Fork 202
/
servicediscovery.go
107 lines (89 loc) · 3.37 KB
/
servicediscovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package ecsservicediscovery
import (
"log"
"sync"
"time"
internalaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs"
)
type ServiceDiscovery struct {
Config *ServiceDiscoveryConfig
svcEcs *ecs.ECS
svcEc2 *ec2.EC2
stats ProcessorStats
clusterProcessors []Processor
}
func (sd *ServiceDiscovery) init() {
credentialConfig := &internalaws.CredentialConfig{
Region: sd.Config.TargetClusterRegion,
}
configProvider := credentialConfig.Credentials()
sd.svcEcs = ecs.New(configProvider, aws.NewConfig().WithRegion(sd.Config.TargetClusterRegion).WithMaxRetries(AwsSdkLevelRetryCount))
sd.svcEc2 = ec2.New(configProvider, aws.NewConfig().WithRegion(sd.Config.TargetClusterRegion).WithMaxRetries(AwsSdkLevelRetryCount))
sd.initClusterProcessorPipeline()
}
func (sd *ServiceDiscovery) initClusterProcessorPipeline() {
sd.clusterProcessors = append(sd.clusterProcessors, NewTaskProcessor(sd.svcEcs, &sd.stats))
sd.clusterProcessors = append(sd.clusterProcessors, NewTaskDefinitionProcessor(sd.svcEcs, &sd.stats))
sd.clusterProcessors = append(sd.clusterProcessors, NewServiceEndpointDiscoveryProcessor(sd.svcEcs, sd.Config.ServiceNamesForTasks, &sd.stats))
sd.clusterProcessors = append(sd.clusterProcessors, NewDockerLabelDiscoveryProcessor(sd.Config.DockerLabel))
sd.clusterProcessors = append(sd.clusterProcessors, NewTaskDefinitionDiscoveryProcessor(sd.Config.TaskDefinitions))
sd.clusterProcessors = append(sd.clusterProcessors, NewTaskFilterProcessor())
sd.clusterProcessors = append(sd.clusterProcessors, NewContainerInstanceProcessor(sd.svcEcs, sd.svcEc2, &sd.stats))
sd.clusterProcessors = append(sd.clusterProcessors, NewTargetsExportProcessor(sd.Config, &sd.stats))
}
func StartECSServiceDiscovery(sd *ServiceDiscovery, shutDownChan chan interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if !sd.validateConfig() {
return
}
frequency, _ := time.ParseDuration(sd.Config.Frequency)
sd.init()
t := time.NewTicker(frequency)
defer t.Stop()
for {
select {
case <-shutDownChan:
return
case <-t.C:
sd.work()
}
}
}
func (sd *ServiceDiscovery) work() {
sd.stats.ResetStats()
var err error
var clusterTasks []*DecoratedTask
for _, p := range sd.clusterProcessors {
clusterTasks, err = p.Process(sd.Config.TargetCluster, clusterTasks)
// Ignore partial result to avoid overwriting existing targets
if err != nil {
log.Printf("E! ECS SD processor: %v got error: %v \n", p.ProcessorName(), err.Error())
return
}
}
sd.stats.ShowStats()
}
func (sd *ServiceDiscovery) validateConfig() bool {
if sd.Config == nil {
return false
}
if sd.Config.DockerLabel == nil && len(sd.Config.TaskDefinitions) == 0 && len(sd.Config.ServiceNamesForTasks) == 0 {
log.Printf("E! Neither docker label based discovery, nor task definition based discovery, nor service name based discovery is enabled.\n")
return false
}
if sd.Config.TargetCluster == "" || sd.Config.TargetClusterRegion == "" {
log.Printf("E! Target ECS cluster info is not correct.\n")
return false
}
_, err := time.ParseDuration(sd.Config.Frequency)
if err != nil {
log.Printf("E! Invalid ECS service discovery frequency: %v.\n", sd.Config.Frequency)
return false
}
return true
}