Skip to content

Commit

Permalink
[exporter/loadbalancingexporter] fix k8s service resolver retaining i…
Browse files Browse the repository at this point in the history
…nvalid old endpoints (open-telemetry#25061)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

fix k8s service resolver retaining invalid old endpoints

In the endpoint update event, the step of deleting the old endpoint is
missed, so a connection request will still be made to the old endpoint.

**Link to tracking Issue:** <Issue number if applicable>
open-telemetry#24914 
**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>

Signed-off-by: Yuan Fang <yuanfang@alauda.io>
Co-authored-by: Juraci Paixão Kröhling <juraci@kroehling.de>
  • Loading branch information
fyuan1316 and jpkrohling committed Aug 23, 2023
1 parent 717a842 commit 653ab06
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: fix k8s service resolver retaining invalid old endpoints

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24914]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
15 changes: 12 additions & 3 deletions exporter/loadbalancingexporter/resolver_k8s_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,24 @@ func (h handler) OnAdd(obj interface{}, _ bool) {
}

func (h handler) OnUpdate(oldObj, newObj interface{}) {
switch oldObj.(type) {
switch oldEps := oldObj.(type) {
case *corev1.Endpoints:
epRemove := convertToEndpoints(oldEps)
for _, ep := range epRemove {
h.endpoints.Delete(ep)
}
if len(epRemove) > 0 {
_, _ = h.callback(context.Background())
}

newEps, ok := newObj.(*corev1.Endpoints)
if !ok {
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", newObj))
_ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1))
return
}
endpoints := convertToEndpoints(newEps)
changed := false
for _, ep := range endpoints {
for _, ep := range convertToEndpoints(newEps) {
if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded {
changed = true
}
Expand Down
48 changes: 44 additions & 4 deletions exporter/loadbalancingexporter/resolver_k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package loadbalancingexporter

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -45,9 +46,13 @@ func TestK8sResolve(t *testing.T) {
},
},
}
expectInit := []string{
"192.168.10.100:8080",
"192.168.10.100:9090",
var expectInit []string
for _, subset := range endpoint.Subsets {
for _, address := range subset.Addresses {
for _, port := range args.ports {
expectInit = append(expectInit, fmt.Sprintf("%s:%d", address.IP, port))
}
}
}

cl := fake.NewSimpleClientset(endpoint)
Expand All @@ -74,7 +79,7 @@ func TestK8sResolve(t *testing.T) {
verifyFn func(*suiteContext, args) error
}{
{
name: "simulate changes to the backend ip address",
name: "simulate append the backend ip address",
args: args{
logger: zap.NewNop(),
service: "lb",
Expand Down Expand Up @@ -111,6 +116,41 @@ func TestK8sResolve(t *testing.T) {
return nil
},
},
{
name: "simulate change the backend ip address",
args: args{
logger: zap.NewNop(),
service: "lb",
namespace: "default",
ports: []int32{4317},
},
simulateFn: func(suiteCtx *suiteContext, args args) error {
endpoint, exist := suiteCtx.endpoint.DeepCopy(), suiteCtx.endpoint.DeepCopy()
endpoint.Subsets = []corev1.EndpointSubset{
{Addresses: []corev1.EndpointAddress{{IP: "10.10.0.11"}}},
}
patch := client.MergeFrom(exist)
data, err := patch.Data(endpoint)
if err != nil {
return err
}
_, err = suiteCtx.clientset.CoreV1().Endpoints(args.namespace).
Patch(context.TODO(), args.service, types.MergePatchType, data, metav1.PatchOptions{})
return err

},
verifyFn: func(ctx *suiteContext, args args) error {
if _, err := ctx.resolver.resolve(context.Background()); err != nil {
return err
}

assert.Equal(t, []string{
"10.10.0.11:4317",
}, ctx.resolver.Endpoints(), "resolver failed, endpoints not equal")

return nil
},
},
{
name: "simulate deletion of backends",
args: args{
Expand Down

0 comments on commit 653ab06

Please sign in to comment.