Skip to content

Commit

Permalink
Measure and expose DNS programming latency from Kubernetes plugin.
Browse files Browse the repository at this point in the history
For now metric is measure only for headless services. Informer has been slighlty
refactored, so the code can measure latency without storing extra fields on
Endpoint struct.
  • Loading branch information
oxddr committed Aug 22, 2019
1 parent 3f47fc8 commit fe12bd6
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ coverage.txt
.classpath
.project
.settings/**
build/
release/
73 changes: 56 additions & 17 deletions plugin/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ type dnsControl struct {

zones []string
endpointNameMode bool

// durationSinceFunc returns the duration elapsed since the given time.
// Added as a member to the struct to allow injection for testing.
durationSinceFunc func(time.Time) time.Duration
}

type dnsControlOpts struct {
Expand All @@ -88,8 +92,9 @@ type dnsControlOpts struct {
namespaceLabelSelector *meta.LabelSelector
namespaceSelector labels.Selector

zones []string
endpointNameMode bool
zones []string
endpointNameMode bool
skipAPIObjectsCleanup bool
}

// newDNSController creates a controller for CoreDNS.
Expand All @@ -101,6 +106,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
stopCh: make(chan struct{}),
zones: opts.zones,
endpointNameMode: opts.endpointNameMode,
durationSinceFunc: time.Since,
}

dns.svcLister, dns.svcController = object.NewIndexerInformer(
Expand All @@ -111,7 +117,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Service{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
object.ToService,
object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup)),
)

if opts.initPodCache {
Expand All @@ -123,7 +129,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc},
object.ToPod,
object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup)),
)
}

Expand All @@ -134,9 +140,50 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
},
&api.Endpoints{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.ResourceEventHandlerFuncs{},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.ToEndpoints)
func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {

apiEndpoints, obj := object.ToEndpoints(d.Object)

switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
// endpoint updates can come frequently, make sure it's a change we care about
if !endpointsEquivalent(old.(*object.Endpoints), obj) {
dns.updateModifed()
recordDNSProgrammingLatency(dns, apiEndpoints)
}
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns, apiEndpoints)
}
case cache.Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns, apiEndpoints)
}
if !opts.skipAPIObjectsCleanup {
*apiEndpoints = api.Endpoints{}
}
}
return nil
}
})

}

dns.nsLister, dns.nsController = cache.NewInformer(
Expand Down Expand Up @@ -423,17 +470,6 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
switch ob := obj.(type) {
case *object.Service:
dns.updateModifed()
case *object.Endpoints:
if newObj == nil || oldObj == nil {
dns.updateModifed()
return
}
p := oldObj.(*object.Endpoints)
// endpoint updates can come frequently, make sure it's a change we care about
if endpointsEquivalent(p, ob) {
return
}
dns.updateModifed()
case *object.Pod:
dns.updateModifed()
default:
Expand Down Expand Up @@ -483,6 +519,9 @@ func subsetsEquivalent(sa, sb object.EndpointSubset) bool {
// endpointsEquivalent checks if the update to an endpoint is something
// that matters to us or if they are effectively equivalent.
func endpointsEquivalent(a, b *object.Endpoints) bool {
if a == nil || b == nil {
return false
}

if len(a.Subsets) != len(b.Subsets) {
return false
Expand Down
81 changes: 81 additions & 0 deletions plugin/kubernetes/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package kubernetes

import (
"time"

"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/prometheus/client_golang/prometheus"
api "k8s.io/api/core/v1"
)

const (
subsystem = "kubernetes"
)

var (
// DnsProgrammingLatency is defined as the time it took to program a DNS instance - from the time
// a service or pod has changed to the time the change was propagated and was available to be
// served by a DNS server.
// The definition of this SLI can be found at https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md
// Note that the metrics is partially based on the time exported by the endpoints controller on
// the master machine. The measurement may be inaccurate if there is a clock drift between the
// node and master machine.
// The service_kind label can be one of:
// * cluster_ip
// * headless_with_selector
// * headless_without_selector
DnsProgrammingLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: plugin.Namespace,
Subsystem: subsystem,
Name: "dns_programming_latency_seconds",
// From 1 millisecond to ~17 minutes.
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
Help: "Histogram of the time (in seconds) it took to program a dns instance.",
}, []string{"service_kind"})
)

func recordDNSProgrammingLatency(dns dnsControl, endpoints *api.Endpoints) {
lastChangeTriggerTime := getLastChangeTriggerTime(endpoints)

if endpoints == nil || !isEndpointForHeadlessService(dns, endpoints) || lastChangeTriggerTime.IsZero() {
return
}

// If we're here it means that the Endpoints object is for a headless service and that
// the Endpoints object was created by the endpoints-controller (because the
// LastChangeTriggerTime annotation is set). It means that the corresponding service is a
// "headless service with selector".
DnsProgrammingLatency.WithLabelValues("headless_with_selector").
Observe(dns.durationSinceFunc(lastChangeTriggerTime).Seconds())

}

// getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
func getLastChangeTriggerTime(endpoints *api.Endpoints) time.Time {
stringVal, ok := endpoints.Annotations[api.EndpointsLastChangeTriggerTime]
if !ok {
// It's possible that the Endpoints object won't have the EndpointsLastChangeTriggerTime
// annotation set. In that case return the 'zero value', which is ignored in the upstream code.
return time.Time{}
}
val, err := time.Parse(time.RFC3339Nano, stringVal)
if err != nil {
log.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v",
stringVal, err)
// In case of error val = time.Zero, which is ignored in the upstream code.
}
return val
}

// isEndpointForHeadlessService returns true whether the endpoints object belongs to a headless
// service (i.e. clusterIp = None). Note that this method may return false negatives if the service
// informer is lagging, i.e. we may not see a recently created service. Given that the services
// don't change very often (comparing to much more frequent endpoints changes), cases when this method
// will return wrong answer should be relatively rare. Because of that we intentionally accept this
// flaw to keep the solution simple.
func isEndpointForHeadlessService(dns dnsControl, endpoints *api.Endpoints) bool {
svcs := dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace()))
return len(svcs) == 1 && svcs[0].ClusterIP == api.ClusterIPNone
}
131 changes: 131 additions & 0 deletions plugin/kubernetes/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package kubernetes

import (
"strings"
"testing"
"time"

"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/prometheus/client_golang/prometheus/testutil"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)

const (
namespace = "testns"
)

func TestDnsProgrammingLatency(t *testing.T) {
client := fake.NewSimpleClientset()
now := time.Now()
controller := newdnsController(client, dnsControlOpts{
initEndpointsCache: true,
// This is needed as otherwise the fake k8s client doesn't work properly.
skipAPIObjectsCleanup: true,
})
controller.durationSinceFunc = func(t time.Time) time.Duration {
return now.Sub(t)
}
DnsProgrammingLatency.Reset()
go controller.Run()

subset1 := []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
}}

subset2 := []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
}}

createService(t, client, controller, "my-service", api.ClusterIPNone)
createEndpoints(t, client, "my-service", now.Add(-2*time.Second), subset1)
updateEndpoints(t, client, "my-service", now.Add(-1*time.Second), subset2)

createEndpoints(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil)

createService(t, client, controller, "clusterIP-service", "10.40.0.12")
createEndpoints(t, client, "clusterIP-service", now.Add(-8*time.Second), nil)

createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone)
createEndpoints(t, client, "headless-no-annotation", nil, nil)

createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone)
createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil)

controller.Stop()
expected := `
# HELP coredns_kubernetes_dns_programming_latency_seconds Histogram of the time (in seconds) it took to program a dns instance.
# TYPE coredns_kubernetes_dns_programming_latency_seconds histogram
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2
coredns_kubernetes_dns_programming_latency_seconds_sum{service_kind="headless_with_selector"} 3
coredns_kubernetes_dns_programming_latency_seconds_count{service_kind="headless_with_selector"} 2
`
if err := testutil.CollectAndCompare(DnsProgrammingLatency, strings.NewReader(expected)); err != nil {
t.Error(err)
}
}

func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []api.EndpointSubset) *api.Endpoints {
annotations := make(map[string]string)
switch v := lastChangeTriggerTime.(type) {
case string:
annotations[api.EndpointsLastChangeTriggerTime] = v
case time.Time:
annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano)
}
return &api.Endpoints{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name, Annotations: annotations},
Subsets: subsets,
}
}

func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
_, err := client.CoreV1().Endpoints(namespace).Create(buildEndpoints(name, triggerTime, subsets))
if err != nil {
t.Fatal(err)
}
}

func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
_, err := client.CoreV1().Endpoints(namespace).Update(buildEndpoints(name, triggerTime, subsets))
if err != nil {
t.Fatal(err)
}
}

func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) {
if _, err := client.CoreV1().Services(namespace).Create(&api.Service{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name},
Spec: api.ServiceSpec{ClusterIP: clusterIp},
}); err != nil {
t.Fatal(err)
}
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
return len(controller.SvcIndex(object.ServiceKey(name, namespace))) == 1, nil
}); err != nil {
t.Fatal(err)
}
}
8 changes: 3 additions & 5 deletions plugin/kubernetes/object/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type EndpointPort struct {
func EndpointsKey(name, namespace string) string { return name + "." + namespace }

// ToEndpoints converts an api.Endpoints to a *Endpoints.
func ToEndpoints(obj interface{}) interface{} {
func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) {
end, ok := obj.(*api.Endpoints)
if !ok {
return nil
return nil, nil
}

e := &Endpoints{
Expand Down Expand Up @@ -93,9 +93,7 @@ func ToEndpoints(obj interface{}) interface{} {
}
}

*end = api.Endpoints{}

return e
return end, e
}

// CopyWithoutSubsets copies e, without the subsets.
Expand Down
Loading

0 comments on commit fe12bd6

Please sign in to comment.