Skip to content

Commit

Permalink
Rework "Proxy labels should be updated when pod/wle labels updated (i…
Browse files Browse the repository at this point in the history
…stio#40036)"

This reverts commit 2f9f3fb.
  • Loading branch information
hzxuzhonghu committed Jul 30, 2022
1 parent 505f49a commit 8da39ca
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 60 deletions.
30 changes: 22 additions & 8 deletions pilot/pkg/model/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,10 +533,15 @@ type NodeMetadata struct {
// Mostly used when istiod requests the upstream.
IstioRevision string `json:"ISTIO_REVISION,omitempty"`

// Labels specifies the set of workload instance (ex: k8s pod) labels associated with this node.
// IstioMetaLabels contains the labels specified by ISTIO_METAJSON_LABELS and platform instance,
// so we can tell the difference between user specified labels and istio labels.
IstioMetaLabels map[string]string `json:"ISTIO_META_LABELS,omitempty"`

// Labels specifies the set of workload instance (ex: k8s pod) labels associated with this node. Labels is a
// superset of IstioMetaLabels.
Labels map[string]string `json:"LABELS,omitempty"`

// Labels specifies the set of workload instance (ex: k8s pod) annotations associated with this node.
// Annotations specifies the set of workload instance (ex: k8s pod) annotations associated with this node.
Annotations map[string]string `json:"ANNOTATIONS,omitempty"`

// InstanceIPs is the set of IPs attached to this proxy
Expand Down Expand Up @@ -849,14 +854,23 @@ func (node *Proxy) SetServiceInstances(serviceDiscovery ServiceDiscovery) {
node.ServiceInstances = instances
}

// SetWorkloadLabels will set the node.Metadata.Labels only when it is nil.
// SetWorkloadLabels will set the node.Metadata.Labels.
// It merges both node meta labels and workload labels and give preference to workload labels.
// Note: must be called after `SetServiceInstances`.
func (node *Proxy) SetWorkloadLabels(env *Environment) {
// First get the workload labels from node meta
if len(node.Metadata.Labels) > 0 {
return
labels := env.GetProxyWorkloadLabels(node)
if len(labels) > 0 {
node.Metadata.Labels = make(map[string]string, len(labels)+len(node.Metadata.IstioMetaLabels))
// we can't just equate proxy workload labels to node meta labels as it may be customized by user
// with `ISTIO_METAJSON_LABELS` env (pkg/bootstrap/config.go extractAttributesMetadata).
// so, we fill the `ISTIO_METAJSON_LABELS` as well.
for k, v := range labels {
node.Metadata.Labels[k] = v
}
for k, v := range node.Metadata.IstioMetaLabels {
node.Metadata.Labels[k] = v
}
}
// Fallback to calling GetProxyWorkloadLabels
node.Metadata.Labels = env.GetProxyWorkloadLabels(node)
}

// DiscoverIPMode discovers the IP Versions supported by Proxy based on its IP addresses.
Expand Down
9 changes: 9 additions & 0 deletions pilot/pkg/model/push_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ func (pr *PushRequest) IsRequest() bool {
return len(pr.Reason) == 1 && pr.Reason[0] == ProxyRequest
}

func (pr *PushRequest) IsProxyUpdate() bool {
for _, r := range pr.Reason {
if r == ProxyUpdate {
return true
}
}
return false
}

func (pr *PushRequest) PushReason() string {
if pr.IsRequest() {
return " request"
Expand Down
8 changes: 7 additions & 1 deletion pilot/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func NewController(kubeClient kubelib.Client, options Options) *Controller {
})
}
})
c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, nil)
c.registerHandlers(c.pods.informer, "Pods", c.pods.onEvent, c.pods.labelFilter)

c.exports = newServiceExportCache(c)
c.imports = newServiceImportCache(c)
Expand Down Expand Up @@ -1365,6 +1365,12 @@ func (c *Controller) getProxyServiceInstancesByPod(pod *v1.Pod,
func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance {
pod := c.pods.getPodByProxy(proxy)
if pod != nil {
locality := c.getPodLocality(pod)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
// Add locality labels
pod.Labels[model.LocalityLabel] = locality
return pod.Labels
}
return nil
Expand Down
14 changes: 14 additions & 0 deletions pilot/pkg/serviceregistry/kube/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"fmt"
"reflect"
"sync"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -111,6 +112,19 @@ func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodC
return -1, nil
}

func (pc *PodCache) labelFilter(old, cur interface{}) bool {
oldPod := old.(*v1.Pod)
curPod := cur.(*v1.Pod)

// If labels updated, trigger proxy push
if curPod.Status.PodIP != "" && !reflect.DeepEqual(oldPod.Labels, curPod.Labels) {
pc.proxyUpdates(curPod.Status.PodIP)
}

// always continue calling pc.onEvent
return false
}

// onEvent updates the IP-based index (pc.podsByIP).
func (pc *PodCache) onEvent(curr any, ev model.Event) error {
// When a pod is deleted obj could be an *v1.Pod or a DeletionFinalStateUnknown marker item.
Expand Down
2 changes: 2 additions & 0 deletions pilot/pkg/serviceregistry/serviceentry/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.E
if labels.Instance(oldWle.Labels).Equals(curr.Labels) {
oldSes = currSes
} else {
// labels update should trigger proxy update
s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
oldSes = getWorkloadServiceEntries(cfgs, oldWle)
}
}
Expand Down
8 changes: 5 additions & 3 deletions pilot/pkg/serviceregistry/serviceentry/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,13 +886,14 @@ func TestServiceDiscoveryWorkloadChangeLabel(t *testing.T) {
instances = []*model.ServiceInstance{}
expectServiceInstances(t, sd, selector, 0, instances)
expectProxyInstances(t, sd, instances, "2.2.2.2")
expectEvents(t, events, Event{kind: "eds", host: "selector.com", namespace: selector.Namespace, endpoints: 0})
expectEvents(t, events, Event{kind: "xds", proxyIP: "2.2.2.2"},
Event{kind: "eds", host: "selector.com", namespace: selector.Namespace, endpoints: 0})
})

t.Run("change label removing one", func(t *testing.T) {
// Add a WLE, we expect this to update
createConfigs([]*config.Config{wle}, store, t)
expectEvents(t, events,
expectEvents(t, events, Event{kind: "xds", proxyIP: "2.2.2.2"},
Event{kind: "eds", host: "selector.com", namespace: selector.Namespace, endpoints: 2},
)
// add a wle, expect this to be an add
Expand Down Expand Up @@ -942,7 +943,8 @@ func TestServiceDiscoveryWorkloadChangeLabel(t *testing.T) {
}
expectServiceInstances(t, sd, selector, 0, instances)
expectProxyInstances(t, sd, instances, "3.3.3.3")
expectEvents(t, events, Event{kind: "eds", host: "selector.com", namespace: selector.Namespace, endpoints: 2})
expectEvents(t, events, Event{kind: "xds", proxyIP: "2.2.2.2"},
Event{kind: "eds", host: "selector.com", namespace: selector.Namespace, endpoints: 2})
})
}

Expand Down
19 changes: 16 additions & 3 deletions pilot/pkg/serviceregistry/serviceregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ func TestWorkloadInstances(t *testing.T) {
expectServiceInstances(t, kc, expectedSvc, 80, instances)
})

t.Run("Kubernetes pod labels update", func(t *testing.T) {
_, _, _, kube, xdsUpdater := setupTest(t)
makeService(t, kube, service)
xdsUpdater.WaitOrFail(t, "svcupdate")
makePod(t, kube, pod)
xdsUpdater.WaitOrFail(t, "proxy update")
newPod := pod.DeepCopy()
newPod.Labels["newlabel"] = "new"
makePod(t, kube, newPod)
xdsUpdater.WaitOrFail(t, "proxy update")
})

t.Run("Kubernetes only: headless service", func(t *testing.T) {
kc, _, _, kube, xdsUpdater := setupTest(t)
makeService(t, kube, headlessService)
Expand Down Expand Up @@ -448,8 +460,8 @@ func TestWorkloadInstances(t *testing.T) {
t.Run("Service selects WorkloadEntry: wle occur earlier", func(t *testing.T) {
kc, _, store, kube, xdsUpdater := setupTest(t)
makeIstioObject(t, store, workloadEntry)

// Wait no event pushed when workload entry created as no service entry
// Other than proxy update, no event pushed when workload entry created as no service entry
xdsUpdater.WaitOrFail(t, "proxy update")
select {
case ev := <-xdsUpdater.Events:
t.Fatalf("Got %s event, expect none", ev.Kind)
Expand Down Expand Up @@ -504,7 +516,8 @@ func TestWorkloadInstances(t *testing.T) {
kc, _, store, kube, xdsUpdater := setupTest(t)
makeIstioObject(t, store, workloadEntry)

// Wait no event pushed when workload entry created as no service entry
// Other than proxy update, no event pushed when workload entry created as no service entry
xdsUpdater.WaitOrFail(t, "proxy update")
select {
case ev := <-xdsUpdater.Events:
t.Fatalf("Got %s event, expect none", ev.Kind)
Expand Down
78 changes: 38 additions & 40 deletions pilot/pkg/xds/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *C
// but proxy's SidecarScope has been updated(s.updateProxy) due to optimizations that skip sidecar scope
// computation.
if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion {
s.computeProxyState(con.proxy, request)
s.computeProxyState(con.proxy, request, true)
}
return s.pushXds(con, con.Watched(req.TypeUrl), request)
}
Expand Down Expand Up @@ -554,7 +554,7 @@ func (s *DiscoveryServer) initConnection(node *core.Node, con *Connection, ident
defer close(con.initialized)

// Complete full initialization of the proxy
if err := s.initializeProxy(node, con); err != nil {
if err := s.initializeProxy(con); err != nil {
s.closeConnection(con)
return err
}
Expand Down Expand Up @@ -602,38 +602,47 @@ func (s *DiscoveryServer) initProxyMetadata(node *core.Node) (*model.Proxy, erro
return proxy, nil
}

// initializeProxy completes the initialization of a proxy. It is expected to be called only after
// initProxyMetadata.
func (s *DiscoveryServer) initializeProxy(node *core.Node, con *Connection) error {
proxy := con.proxy
// this should be done before we look for service instances, but after we load metadata
// TODO fix check in kubecontroller treat echo VMs like there isn't a pod
if err := s.WorkloadEntryController.RegisterWorkload(proxy, con.connectedAt); err != nil {
return err
}
s.computeProxyState(proxy, nil)

// setTopologyLabels sets locality, cluster, network label
// must be called after `SetWorkloadLabels` and `SetServiceInstances`.
func setTopologyLabels(proxy *model.Proxy) {
var localityStr string
// Get the locality from the proxy's service instances.
// We expect all instances to have the same IP and therefore the same locality.
// So its enough to look at the first instance.
if len(proxy.ServiceInstances) > 0 {
proxy.Locality = util.ConvertLocality(proxy.ServiceInstances[0].Endpoint.Locality.Label)
localityStr = proxy.ServiceInstances[0].Endpoint.Locality.Label
} else {
// If no service instances(this maybe common for a pure client), respect LocalityLabel
localityStr = proxy.Metadata.Labels[model.LocalityLabel]
}

// If there is no locality in the registry then use the one sent as part of the discovery request.
// This is not preferable as only the connected Pilot is aware of this proxies location, but it
// can still help provide some client-side Envoy context when load balancing based on location.
if util.IsLocalityEmpty(proxy.Locality) {
if localityStr != "" {
proxy.Locality = util.ConvertLocality(localityStr)
} else {
// If there is no locality in the registry then use the one sent as part of the discovery request.
// This is not preferable as only the connected Pilot is aware of this proxies location, but it
// can still help provide some client-side Envoy context when load balancing based on location.
proxy.Locality = &core.Locality{
Region: node.Locality.GetRegion(),
Zone: node.Locality.GetZone(),
SubZone: node.Locality.GetSubZone(),
Region: proxy.XdsNode.Locality.GetRegion(),
Zone: proxy.XdsNode.Locality.GetZone(),
SubZone: proxy.XdsNode.Locality.GetSubZone(),
}
}

locality := util.LocalityToString(proxy.Locality)
// add topology labels to proxy metadata labels
proxy.Metadata.Labels = labelutil.AugmentLabels(proxy.Metadata.Labels, proxy.Metadata.ClusterID, locality, proxy.Metadata.Network)
}

// initializeProxy completes the initialization of a proxy. It is expected to be called only after
// initProxyMetadata.
func (s *DiscoveryServer) initializeProxy(con *Connection) error {
proxy := con.proxy
// this should be done before we look for service instances, but after we load metadata
// TODO fix check in kubecontroller treat echo VMs like there isn't a pod
if err := s.WorkloadEntryController.RegisterWorkload(proxy, con.connectedAt); err != nil {
return err
}
s.computeProxyState(proxy, nil, false)
// Discover supported IP Versions of proxy so that appropriate config can be delivered.
proxy.DiscoverIPMode()

Expand All @@ -646,24 +655,12 @@ func (s *DiscoveryServer) initializeProxy(node *core.Node, con *Connection) erro
return nil
}

func (s *DiscoveryServer) updateProxy(proxy *model.Proxy, request *model.PushRequest) {
s.computeProxyState(proxy, request)
if util.IsLocalityEmpty(proxy.Locality) {
// Get the locality from the proxy's service instances.
// We expect all instances to have the same locality.
// So its enough to look at the first instance.
if len(proxy.ServiceInstances) > 0 {
proxy.Locality = util.ConvertLocality(proxy.ServiceInstances[0].Endpoint.Locality.Label)
locality := proxy.ServiceInstances[0].Endpoint.Locality.Label
// add topology labels to proxy metadata labels
proxy.Metadata.Labels = labelutil.AugmentLabels(proxy.Metadata.Labels, proxy.Metadata.ClusterID, locality, proxy.Metadata.Network)
}
}
}

func (s *DiscoveryServer) computeProxyState(proxy *model.Proxy, request *model.PushRequest) {
proxy.SetWorkloadLabels(s.Env)
func (s *DiscoveryServer) computeProxyState(proxy *model.Proxy, request *model.PushRequest, skipLabels bool) {
proxy.SetServiceInstances(s.Env.ServiceDiscovery)
if !skipLabels {
proxy.SetWorkloadLabels(s.Env)
setTopologyLabels(proxy)
}
// Precompute the sidecar scope and merged gateways associated with this proxy.
// Saves compute cycles in networking code. Though this might be redundant sometimes, we still
// have to compute this because as part of a config change, a new Sidecar could become
Expand Down Expand Up @@ -736,8 +733,9 @@ func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
pushRequest := pushEv.pushRequest

if pushRequest.Full {
skipLabel := !pushRequest.IsProxyUpdate()
// Update Proxy with current information.
s.updateProxy(con.proxy, pushRequest)
s.computeProxyState(con.proxy, pushRequest, skipLabel)
}

if !s.ProxyNeedsPush(con.proxy, pushRequest) {
Expand Down
5 changes: 3 additions & 2 deletions pilot/pkg/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ func (s *DiscoveryServer) pushConnectionDelta(con *Connection, pushEv *Event) er
pushRequest := pushEv.pushRequest

if pushRequest.Full {
skipLabel := !pushRequest.IsProxyUpdate()
// Update Proxy with current information.
s.updateProxy(con.proxy, pushRequest)
s.computeProxyState(con.proxy, pushRequest, skipLabel)
}

if !s.ProxyNeedsPush(con.proxy, pushRequest) {
Expand Down Expand Up @@ -301,7 +302,7 @@ func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryReque
// It can happen when `processRequest` comes after push context has been updated(s.initPushContext),
// but before proxy's SidecarScope has been updated(s.updateProxy).
if con.proxy.SidecarScope != nil && con.proxy.SidecarScope.Version != request.Push.PushVersion {
s.computeProxyState(con.proxy, request)
s.computeProxyState(con.proxy, request, true)
}
return s.pushDeltaXds(con, con.Watched(req.TypeUrl), request)
}
Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/xds/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func (fx *FakeXdsUpdater) ConfigUpdate(req *model.PushRequest) {
}

func (fx *FakeXdsUpdater) ProxyUpdate(c cluster.ID, p string) {
fx.Events <- FakeXdsEvent{Kind: "proxy update"}
if fx.Delegate != nil {
fx.Delegate.ProxyUpdate(c, p)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func extractAttributesMetadata(envVars []string, plat platform.Environment, meta
case "ISTIO_METAJSON_LABELS":
m := jsonStringToMap(val)
if len(m) > 0 {
meta.IstioMetaLabels = m
meta.Labels = m
}
case "POD_NAME":
Expand Down Expand Up @@ -544,7 +545,6 @@ func GetNodeMetaData(options MetadataOptions) (*model.Node, error) {
if err := json.Unmarshal(j, meta); err != nil {
return nil, err
}
extractAttributesMetadata(options.Envs, options.Platform, meta)

// Support multiple network interfaces, removing duplicates.
meta.InstanceIPs = removeDuplicates(options.InstanceIPs)
Expand All @@ -559,6 +559,7 @@ func GetNodeMetaData(options MetadataOptions) (*model.Node, error) {

meta.ProxyConfig = (*model.NodeMetaProxyConfig)(options.ProxyConfig)

extractAttributesMetadata(options.Envs, options.Platform, meta)
// Add all instance labels with lower precedence than pod labels
extractInstanceLabels(options.Platform, meta)

Expand Down Expand Up @@ -687,6 +688,7 @@ func extractInstanceLabels(plat platform.Environment, meta *model.BootstrapNodeM
meta.Labels = map[string]string{}
}
for k, v := range instanceLabels {
meta.IstioMetaLabels[k] = v
meta.Labels[k] = v
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bootstrap/testdata/running_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
,
"sub_zone": "sub_zoneC"
},
"metadata": {"ANNOTATIONS":{"istio.io/insecurepath":"{\"paths\":[\"/metrics\",\"/live\"]}"},"ENVOY_PROMETHEUS_PORT":15090,"ENVOY_STATUS_PORT":15021,"INSTANCE_IPS":"10.3.3.3,10.4.4.4,10.5.5.5,10.6.6.6","INTERCEPTION_MODE":"REDIRECT","ISTIO_PROXY_SHA":"istio-proxy:sha","ISTIO_VERSION":"release-3.1","LABELS":{"app":"test","istio-locality":"regionA.zoneB.sub_zoneC","version":"v1alpha1"},"NAME":"svc-0-0-0-6944fb884d-4pgx8","NAMESPACE":"test","OUTLIER_LOG_PATH":"/dev/stdout","PILOT_SAN":["spiffe://cluster.local/ns/istio-system/sa/istio-pilot-service-account"],"POD_NAME":"svc-0-0-0-6944fb884d-4pgx8","PROXY_CONFIG":{"binaryPath":"/usr/local/bin/envoy","configPath":"/tmp/bootstrap/running","controlPlaneAuthPolicy":"MUTUAL_TLS","customConfigFile":"envoy_bootstrap.json","discoveryAddress":"mypilot:1001","drainDuration":"5s","parentShutdownDuration":"6s","proxyAdminPort":15005,"serviceCluster":"istio-proxy","statNameLength":200,"statsdUdpAddress":"10.1.1.1:9125","statusPort":15020,"tracing":{"zipkin":{"address":"localhost:6000"}}},"app":"test","istio-locality":"regionA.zoneB.sub_zoneC","istio.io/insecurepath":"{\"paths\":[\"/metrics\",\"/live\"]}","version":"v1alpha1"}
"metadata": {"ANNOTATIONS":{"istio.io/insecurepath":"{\"paths\":[\"/metrics\",\"/live\"]}"},"ENVOY_PROMETHEUS_PORT":15090,"ENVOY_STATUS_PORT":15021,"INSTANCE_IPS":"10.3.3.3,10.4.4.4,10.5.5.5,10.6.6.6","INTERCEPTION_MODE":"REDIRECT","ISTIO_META_LABELS":{"app":"test","istio-locality":"regionA.zoneB.sub_zoneC","version":"v1alpha1"},"ISTIO_PROXY_SHA":"istio-proxy:sha","ISTIO_VERSION":"release-3.1","LABELS":{"app":"test","istio-locality":"regionA.zoneB.sub_zoneC","version":"v1alpha1"},"NAME":"svc-0-0-0-6944fb884d-4pgx8","NAMESPACE":"test","OUTLIER_LOG_PATH":"/dev/stdout","PILOT_SAN":["spiffe://cluster.local/ns/istio-system/sa/istio-pilot-service-account"],"POD_NAME":"svc-0-0-0-6944fb884d-4pgx8","PROXY_CONFIG":{"binaryPath":"/usr/local/bin/envoy","configPath":"/tmp/bootstrap/running","controlPlaneAuthPolicy":"MUTUAL_TLS","customConfigFile":"envoy_bootstrap.json","discoveryAddress":"mypilot:1001","drainDuration":"5s","parentShutdownDuration":"6s","proxyAdminPort":15005,"serviceCluster":"istio-proxy","statNameLength":200,"statsdUdpAddress":"10.1.1.1:9125","statusPort":15020,"tracing":{"zipkin":{"address":"localhost:6000"}}},"app":"test","istio-locality":"regionA.zoneB.sub_zoneC","istio.io/insecurepath":"{\"paths\":[\"/metrics\",\"/live\"]}","version":"v1alpha1"}
},
"layered_runtime": {
"layers": [
Expand Down
Loading

0 comments on commit 8da39ca

Please sign in to comment.