-
Notifications
You must be signed in to change notification settings - Fork 187
/
taskdefinitionprocessor.go
91 lines (76 loc) · 2.33 KB
/
taskdefinitionprocessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package ecsservicediscovery
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/hashicorp/golang-lru/simplelru"
)
const (
// ECS Service Quota: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-quotas.html
taskDefCacheSize = 2000
)
// Decorate the tasks with the ECS task definition
type TaskDefinitionProcessor struct {
svcEcs *ecs.ECS
stats *ProcessorStats
taskDefCache *simplelru.LRU
}
func NewTaskDefinitionProcessor(ecs *ecs.ECS, s *ProcessorStats) *TaskDefinitionProcessor {
p := &TaskDefinitionProcessor{
svcEcs: ecs,
stats: s,
}
// initiate the caching
lru, err := simplelru.NewLRU(taskDefCacheSize, nil)
if err != nil {
panic(err)
}
p.taskDefCache = lru
return p
}
func (p *TaskDefinitionProcessor) Process(cluster string, taskList []*DecoratedTask) ([]*DecoratedTask, error) {
defer func() {
p.stats.AddStatsCount(LRUCacheSizeTaskDefinition, p.taskDefCache.Len())
}()
arn2Definition := make(map[string]*ecs.TaskDefinition)
for _, t := range taskList {
arn2Definition[aws.StringValue(t.Task.TaskDefinitionArn)] = nil
}
for k, _ := range arn2Definition {
if k == "" {
continue
}
var td *ecs.TaskDefinition
if res, ok := p.taskDefCache.Get(k); ok {
p.stats.AddStats(LRUCacheGetTaskDefinition)
td = res.(*ecs.TaskDefinition)
} else {
resp, err := p.svcEcs.DescribeTaskDefinition(&ecs.DescribeTaskDefinitionInput{TaskDefinition: &k})
p.stats.AddStats(AWSCLIDescribeTaskDefinition)
if err != nil {
return taskList, newServiceDiscoveryError("Failed to describe task definition for "+k, &err)
}
p.taskDefCache.Add(k, resp.TaskDefinition)
td = resp.TaskDefinition
}
arn2Definition[k] = td
}
for _, v := range taskList {
v.TaskDefinition = arn2Definition[aws.StringValue(v.Task.TaskDefinitionArn)]
}
taskList = filterNilTaskDefinitionTasks(taskList)
return taskList, nil
}
func filterNilTaskDefinitionTasks(taskList []*DecoratedTask) []*DecoratedTask {
var filteredTasks []*DecoratedTask
for _, v := range taskList {
if v.TaskDefinition != nil {
filteredTasks = append(filteredTasks, v)
}
}
return filteredTasks
}
func (p *TaskDefinitionProcessor) ProcessorName() string {
return "TaskDefinitionProcessor"
}