-
Notifications
You must be signed in to change notification settings - Fork 0
/
serviceEntries.go
157 lines (123 loc) · 4.09 KB
/
serviceEntries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package service
import (
"log"
"strings"
"sync"
"github.com/gogo/protobuf/types"
"istio.io/api/mcp/v1alpha1"
"istio.io/api/networking/v1alpha3"
)
// Representation of the endpoints - used to serve EDS and ServiceEntries over MCP and XDS.
//
type Endpoints struct {
mutex sync.RWMutex
seShards map[string]map[string][]*v1alpha3.ServiceEntry
}
var (
ep = &Endpoints{
seShards: map[string]map[string][]*v1alpha3.ServiceEntry{},
}
)
const ServiceEntriesType = "istio/networking/v1alpha3/serviceentries"
func init() {
resourceHandler["ServiceEntry"] = sePush
resourceHandler[ServiceEntriesType] = sePush
resourceHandler["type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"] = edsPush
}
// Called to request push of endpoints in ServiceEntry format
func sePush(s *AdsService, con *Connection, rtype string, res []string) error {
log.Print("SE request ", rtype, res)
r := &v1alpha1.Resources{}
r.Collection = ServiceEntriesType // must match
for hostname, sh := range ep.seShards {
res, err := convertServiceEntriesToResource(hostname, sh)
if err != nil {
return err
}
r.Resources = append(r.Resources, *res)
}
return s.Send(con, rtype, r)
}
// Called to request push of ClusterLoadAssignments (EDS) - same information, but in Envoy format
func edsPush(s *AdsService, con *Connection, rtype string, res []string) error {
// TODO.
return nil
}
// Called when a new endpoint is added to a shard.
func (fx *AdsService) ServiceEntriesUpdate(shard, hostname string, entry []*v1alpha3.ServiceEntry) error {
ep.mutex.Lock()
defer ep.mutex.Unlock()
sh, f := ep.seShards[hostname]
if !f {
sh = map[string][]*v1alpha3.ServiceEntry{}
ep.seShards[hostname] = sh
}
sh[shard] = entry
log.Println("SEUpdate ", shard, hostname, entry)
// Typically this is deployed for a single cluster - but may still group in shards.
// See sink.go - handleResponse.
r := &v1alpha1.Resources{}
r.Collection = ServiceEntriesType // must match
res, err := convertServiceEntriesToResource(hostname, sh)
if err != nil {
return err
}
r.Resources = []v1alpha1.Resource{*res}
// The object created by client has resource.Body.TypeUrl, resource.Metadata and Body==Message.
// TODO: remove the extra caching in coremodel
fx.SendAll(r)
return nil
}
// Return all ServiceEntries for a host, as an MCP resource.
func convertServiceEntriesToResource(hostname string, sh map[string][]*v1alpha3.ServiceEntry) (*v1alpha1.Resource, error) {
// See serviceregistry/external/conversion for the opposite side
// See galley/pkg/runtime/state
hostParts := strings.Split(hostname, ".")
name := hostParts[0]
var namespace string
if len(hostParts) == 1 {
namespace = "consul"
} else {
namespace = hostParts[1]
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{hostname},
}
for _, serviceEntriesShard := range sh {
for _, se := range serviceEntriesShard {
se.Endpoints = append(se.Endpoints, se.Endpoints...)
}
}
seAny, err := types.MarshalAny(se)
if err != nil {
return nil, err
}
res := v1alpha1.Resource{
Body: seAny,
Metadata: &v1alpha1.Metadata{
Annotations: map[string]string{
"virtual": "1",
},
Name: namespace + "/" + name, // goes to model.Config.Name and Namespace - of course different syntax
},
}
res.Metadata.Version = "1" // model.Config.ResourceVersion
// Labels and Annotations - for the top service, not used here
return &res, nil
}
// Called on pod events.
func (fx *AdsService) WorkloadUpdate(id string, labels map[string]string, annotations map[string]string) {
// update-Running seems to be readiness check ?
log.Println("PodUpdate ", id, labels, annotations)
}
func (*AdsService) ConfigUpdate(bool) {
//log.Println("ConfigUpdate")
}
// Updating the internal data structures
// SvcUpdate is called when a service port mapping definition is updated.
// This interface is WIP - labels, annotations and other changes to service may be
// updated to force a EDS and CDS recomputation and incremental push, as it doesn't affect
// LDS/RDS.
func (fx *AdsService) SvcUpdate(shard, hostname string, ports map[string]uint32, rports map[uint32]string) {
log.Println("ConfigUpdate")
}