This repository has been archived by the owner on Aug 30, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 31
/
service_mapper.go
114 lines (92 loc) · 2.44 KB
/
service_mapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main
import (
"sync"
"time"
"github.com/DataDog/datadog-trace-agent/internal/pb"
"github.com/DataDog/datadog-trace-agent/internal/watchdog"
log "github.com/cihub/seelog"
)
// serviceApp represents the app to which certain integration belongs to
const serviceApp = "app"
// ServiceMapper provides a cache layer over model.ServicesMetadata pipeline
// Used in conjunction with ServiceWriter: in-> ServiceMapper out-> ServiceWriter
type ServiceMapper struct {
in <-chan pb.ServicesMetadata
out chan<- pb.ServicesMetadata
exit chan bool
done sync.WaitGroup
cache pb.ServicesMetadata
}
// NewServiceMapper returns an instance of ServiceMapper with the provided channels
func NewServiceMapper(in <-chan pb.ServicesMetadata, out chan<- pb.ServicesMetadata) *ServiceMapper {
return &ServiceMapper{
in: in,
out: out,
exit: make(chan bool),
cache: make(pb.ServicesMetadata),
}
}
// Start runs the event loop in a non-blocking way
func (s *ServiceMapper) Start() {
s.done.Add(1)
go func() {
defer watchdog.LogOnPanic()
s.Run()
s.done.Done()
}()
}
// Stop gracefully terminates the event-loop
func (s *ServiceMapper) Stop() {
close(s.exit)
s.done.Wait()
}
// Run triggers the event-loop that consumes model.ServicesMeta
func (s *ServiceMapper) Run() {
telemetryTicker := time.NewTicker(1 * time.Minute)
defer telemetryTicker.Stop()
for {
select {
case metadata := <-s.in:
s.update(metadata)
case <-telemetryTicker.C:
log.Infof("total number of tracked services: %d", len(s.cache))
case <-s.exit:
return
}
}
}
func (s *ServiceMapper) update(metadata pb.ServicesMetadata) {
var changes pb.ServicesMetadata
for k, v := range metadata {
if !s.shouldAdd(k, metadata) {
continue
}
// We do this inside the for loop to avoid unecessary memory allocations.
// After few method executions, the cache will be warmed up and this section be skipped altogether.
if changes == nil {
changes = make(pb.ServicesMetadata)
}
changes[k] = v
}
if changes == nil {
return
}
s.out <- changes
for k, v := range changes {
s.cache[k] = v
}
}
func (s *ServiceMapper) shouldAdd(service string, metadata pb.ServicesMetadata) bool {
cacheEntry, ok := s.cache[service]
// No cache entry?
if !ok {
return true
}
// Cache entry came from service API?
if _, ok = cacheEntry[serviceApp]; ok {
return false
}
// New metadata value came from service API?
_, ok = metadata[service][serviceApp]
return ok
}