-
Notifications
You must be signed in to change notification settings - Fork 24
/
servicegetter.go
108 lines (88 loc) · 2.87 KB
/
servicegetter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package servicegetter
import (
"context"
"net"
"go.uber.org/fx"
"google.golang.org/grpc/peer"
"github.com/fluxninja/aperture/pkg/entitycache"
"github.com/fluxninja/aperture/pkg/log"
)
// ServiceGetter can be used to query services based on client context.
type ServiceGetter interface {
ServicesFromContext(ctx context.Context) []string
}
// FromEntityCache creates a new EntityCache-powered ServiceGetter.
func FromEntityCache(ec *entitycache.EntityCache) ServiceGetter {
return &ecServiceGetter{entityCache: ec}
}
// NewEmpty creates a new ServiceGetter that always returns nil.
func NewEmpty() ServiceGetter { return emptyServiceGetter{} }
type ecServiceGetter struct {
entityCache *entitycache.EntityCache
ecHasDiscovery bool
metrics *Metrics
}
// ServicesFromContext returns list of services associated with IP extracted from context
//
// The returned list of services depends only on state of entityCache.
// However, emitted warnings will depend on whether service discovery is enabled or not.
func (sg *ecServiceGetter) ServicesFromContext(ctx context.Context) []string {
svcs, ok := sg.servicesFromContext(ctx)
sg.metrics.inc(ok)
return svcs
}
func (sg *ecServiceGetter) servicesFromContext(ctx context.Context) (svcs []string, ok bool) {
rpcPeer, peerExists := peer.FromContext(ctx)
if !peerExists {
if sg.ecHasDiscovery {
log.Bug().Msg("cannot get client info from context")
}
return nil, false
}
tcpAddr, isTCPAddr := rpcPeer.Addr.(*net.TCPAddr)
if !isTCPAddr {
if sg.ecHasDiscovery {
log.Bug().Msg("client addr is not TCP")
}
return nil, false
}
clientIP := tcpAddr.IP.String()
entity, err := sg.entityCache.GetByIP(clientIP)
if err != nil {
if sg.ecHasDiscovery {
log.Sample(noEntitySampler).Warn().Err(err).Str("clientIP", clientIP).
Msg("cannot get services")
}
return nil, false
}
return entity.Services, true
}
var noEntitySampler = log.NewRatelimitingSampler()
// FxIn are FX arguments to ProvideFromEntityCache.
type FxIn struct {
fx.In
Lifecycle fx.Lifecycle
EntityCache *entitycache.EntityCache
EntityTrackers *entitycache.EntityTrackers
Metrics *Metrics `optional:"true"`
}
// ProvideFromEntityCache provides an EntityCache-powered ServiceGetter.
func ProvideFromEntityCache(in FxIn) ServiceGetter {
sg := &ecServiceGetter{
entityCache: in.EntityCache,
metrics: in.Metrics,
}
in.Lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
// Checking this flag on OnStart so that all registrations done in
// provide/invoke stage would be visible.
sg.ecHasDiscovery = in.EntityTrackers.HasDiscovery()
return nil
},
OnStop: func(context.Context) error { return nil },
})
return sg
}
type emptyServiceGetter struct{}
// ServicesFromContext implements ServiceGetter interface.
func (sg emptyServiceGetter) ServicesFromContext(ctx context.Context) []string { return nil }