Skip to content

Commit

Permalink
Measure and expose DNS programming latency from Kubernetes plugin.
Browse files Browse the repository at this point in the history
For now metric is measure only for headless services. Informer has been slighlty
refactored, so the code can measure latency without storing extra fields on
Endpoint struct.

Signed-off-by: Janek Łukaszewicz <janluk@google.com>

Suggestions from code review

Co-Authored-By: Chris O'Haver <cohaver@infoblox.com>
  • Loading branch information
oxddr and chrisohaver committed Oct 4, 2019
1 parent 0da2c0c commit b9ec733
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 37 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -13,3 +13,5 @@ coverage.txt
.classpath
.project
.settings/**
build/
release/
16 changes: 16 additions & 0 deletions plugin/kubernetes/README.md
Expand Up @@ -229,3 +229,19 @@ plugin is also enabled:
* kubernetes/service: the service name in the query
* kubernetes/client-namespace: the client pod's namespace, if `pods verified` mode is enabled
* kubernetes/client-pod-name: the client pod's name, if `pods verified` mode is enabled

## Metrics

The *kubernetes* plugin exports the following *Prometheus* metrics.
* `coredns_kubernetes_dns_programming_latency_seconds{service_kind}` - exports the
[DNS programming latency SLI](https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md).
The metrics has the `service_kind` label that identifies the kind of the
[kubernetes service](https://kubernetes.io/docs/concepts/services-networking/service).
It may take one of the three values:
* `cluster_ip`
* `headless_with_selector`
* `headless_without_selector`

## Bugs

* add support for other service types; only "headless_with_selector" is supported now
72 changes: 55 additions & 17 deletions plugin/kubernetes/controller.go
Expand Up @@ -88,8 +88,9 @@ type dnsControlOpts struct {
namespaceLabelSelector *meta.LabelSelector
namespaceSelector labels.Selector

zones []string
endpointNameMode bool
zones []string
endpointNameMode bool
skipAPIObjectsCleanup bool
}

// newDNSController creates a controller for CoreDNS.
Expand All @@ -111,7 +112,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Service{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
object.ToService,
object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup)),
)

if opts.initPodCache {
Expand All @@ -123,7 +124,7 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc},
object.ToPod,
object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup)),
)
}

Expand All @@ -134,9 +135,50 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
},
&api.Endpoints{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.ResourceEventHandlerFuncs{},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.ToEndpoints)
func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {

apiEndpoints, obj := object.ToEndpoints(d.Object)

switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
// endpoint updates can come frequently, make sure it's a change we care about
if !endpointsEquivalent(old.(*object.Endpoints), obj) {
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
}
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
}
case cache.Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
}
if !opts.skipAPIObjectsCleanup {
*apiEndpoints = api.Endpoints{}
}
}
return nil
}
})

}

dns.nsLister, dns.nsController = cache.NewInformer(
Expand Down Expand Up @@ -423,24 +465,17 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
switch ob := obj.(type) {
case *object.Service:
dns.updateModifed()
case *object.Endpoints:
if newObj == nil || oldObj == nil {
dns.updateModifed()
return
}
p := oldObj.(*object.Endpoints)
// endpoint updates can come frequently, make sure it's a change we care about
if endpointsEquivalent(p, ob) {
return
}
dns.updateModifed()
case *object.Pod:
dns.updateModifed()
default:
log.Warningf("Updates for %T not supported.", ob)
}
}

func (dns *dnsControl) getServices(endpoints *object.Endpoints) []*object.Service {
return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace()))
}

// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
// I.e. that they have the same ready addresses, host names, ports (including protocol
// and service names for SRV)
Expand Down Expand Up @@ -483,6 +518,9 @@ func subsetsEquivalent(sa, sb object.EndpointSubset) bool {
// endpointsEquivalent checks if the update to an endpoint is something
// that matters to us or if they are effectively equivalent.
func endpointsEquivalent(a, b *object.Endpoints) bool {
if a == nil || b == nil {
return false
}

if len(a.Subsets) != len(b.Subsets) {
return false
Expand Down
76 changes: 76 additions & 0 deletions plugin/kubernetes/metrics.go
@@ -0,0 +1,76 @@
package kubernetes

import (
"time"

"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/prometheus/client_golang/prometheus"
api "k8s.io/api/core/v1"
)

const (
subsystem = "kubernetes"
)

var (
// DnsProgrammingLatency is defined as the time it took to program a DNS instance - from the time
// a service or pod has changed to the time the change was propagated and was available to be
// served by a DNS server.
// The definition of this SLI can be found at https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md
// Note that the metrics is partially based on the time exported by the endpoints controller on
// the master machine. The measurement may be inaccurate if there is a clock drift between the
// node and master machine.
// The service_kind label can be one of:
// * cluster_ip
// * headless_with_selector
// * headless_without_selector
DnsProgrammingLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: plugin.Namespace,
Subsystem: subsystem,
Name: "dns_programming_latency_seconds",
// From 1 millisecond to ~17 minutes.
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
Help: "Histogram of the time (in seconds) it took to program a dns instance.",
}, []string{"service_kind"})

// durationSinceFunc returns the duration elapsed since the given time.
// Added as a global variable to allow injection for testing.
durationSinceFunc = time.Since
)

func recordDNSProgrammingLatency(svcs []*object.Service, endpoints *api.Endpoints) {
// getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
var lastChangeTriggerTime time.Time
stringVal, ok := endpoints.Annotations[api.EndpointsLastChangeTriggerTime]
if ok {
ts, err := time.Parse(time.RFC3339Nano, stringVal)
if err != nil {
log.Warningf("DnsProgrammingLatency cannot be calculated for Endpoints '%s/%s'; invalid %q annotation RFC3339 value of %q",
endpoints.GetNamespace(), endpoints.GetName(), api.EndpointsLastChangeTriggerTime, stringVal)
// In case of error val = time.Zero, which is ignored in the upstream code.
}
lastChangeTriggerTime = ts
}

// isHeadless indicates whether the endpoints object belongs to a headless
// service (i.e. clusterIp = None). Note that this can be a false negatives if the service
// informer is lagging, i.e. we may not see a recently created service. Given that the services
// don't change very often (comparing to much more frequent endpoints changes), cases when this method
// will return wrong answer should be relatively rare. Because of that we intentionally accept this
// flaw to keep the solution simple.
isHeadless := len(svcs) == 1 && svcs[0].ClusterIP == api.ClusterIPNone

if endpoints == nil || !isHeadless || lastChangeTriggerTime.IsZero() {
return
}

// If we're here it means that the Endpoints object is for a headless service and that
// the Endpoints object was created by the endpoints-controller (because the
// LastChangeTriggerTime annotation is set). It means that the corresponding service is a
// "headless service with selector".
DnsProgrammingLatency.WithLabelValues("headless_with_selector").
Observe(durationSinceFunc(lastChangeTriggerTime).Seconds())

}
132 changes: 132 additions & 0 deletions plugin/kubernetes/metrics_test.go
@@ -0,0 +1,132 @@
package kubernetes

import (
"strings"
"testing"
"time"

"github.com/coredns/coredns/plugin/kubernetes/object"

"github.com/prometheus/client_golang/prometheus/testutil"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
)

const (
namespace = "testns"
)

func TestDnsProgrammingLatency(t *testing.T) {
client := fake.NewSimpleClientset()
now := time.Now()
controller := newdnsController(client, dnsControlOpts{
initEndpointsCache: true,
// This is needed as otherwise the fake k8s client doesn't work properly.
skipAPIObjectsCleanup: true,
})
durationSinceFunc = func(t time.Time) time.Duration {
return now.Sub(t)
}
DnsProgrammingLatency.Reset()
go controller.Run()

subset1 := []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
}}

subset2 := []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
}}

createService(t, client, controller, "my-service", api.ClusterIPNone)
createEndpoints(t, client, "my-service", now.Add(-2*time.Second), subset1)
updateEndpoints(t, client, "my-service", now.Add(-1*time.Second), subset2)

createEndpoints(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil)

createService(t, client, controller, "clusterIP-service", "10.40.0.12")
createEndpoints(t, client, "clusterIP-service", now.Add(-8*time.Second), nil)

createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone)
createEndpoints(t, client, "headless-no-annotation", nil, nil)

createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone)
createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil)

controller.Stop()
expected := `
# HELP coredns_kubernetes_dns_programming_latency_seconds Histogram of the time (in seconds) it took to program a dns instance.
# TYPE coredns_kubernetes_dns_programming_latency_seconds histogram
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2
coredns_kubernetes_dns_programming_latency_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2
coredns_kubernetes_dns_programming_latency_seconds_sum{service_kind="headless_with_selector"} 3
coredns_kubernetes_dns_programming_latency_seconds_count{service_kind="headless_with_selector"} 2
`
if err := testutil.CollectAndCompare(DnsProgrammingLatency, strings.NewReader(expected)); err != nil {
t.Error(err)
}
}

func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []api.EndpointSubset) *api.Endpoints {
annotations := make(map[string]string)
switch v := lastChangeTriggerTime.(type) {
case string:
annotations[api.EndpointsLastChangeTriggerTime] = v
case time.Time:
annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano)
}
return &api.Endpoints{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name, Annotations: annotations},
Subsets: subsets,
}
}

func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
_, err := client.CoreV1().Endpoints(namespace).Create(buildEndpoints(name, triggerTime, subsets))
if err != nil {
t.Fatal(err)
}
}

func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
_, err := client.CoreV1().Endpoints(namespace).Update(buildEndpoints(name, triggerTime, subsets))
if err != nil {
t.Fatal(err)
}
}

func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) {
if _, err := client.CoreV1().Services(namespace).Create(&api.Service{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name},
Spec: api.ServiceSpec{ClusterIP: clusterIp},
}); err != nil {
t.Fatal(err)
}
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
return len(controller.SvcIndex(object.ServiceKey(name, namespace))) == 1, nil
}); err != nil {
t.Fatal(err)
}
}
8 changes: 3 additions & 5 deletions plugin/kubernetes/object/endpoint.go
Expand Up @@ -44,10 +44,10 @@ type EndpointPort struct {
func EndpointsKey(name, namespace string) string { return name + "." + namespace }

// ToEndpoints converts an api.Endpoints to a *Endpoints.
func ToEndpoints(obj interface{}) interface{} {
func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) {
end, ok := obj.(*api.Endpoints)
if !ok {
return nil
return nil, nil
}

e := &Endpoints{
Expand Down Expand Up @@ -93,9 +93,7 @@ func ToEndpoints(obj interface{}) interface{} {
}
}

*end = api.Endpoints{}

return e
return end, e
}

// CopyWithoutSubsets copies e, without the subsets.
Expand Down

0 comments on commit b9ec733

Please sign in to comment.