Skip to content

Commit

Permalink
Merge pull request prometheus#4 from cofyc/kubernetes_shared
Browse files Browse the repository at this point in the history
Kubernetes shared
  • Loading branch information
cofyc committed Mar 26, 2018
2 parents 90de541 + f4611a5 commit 2f63d9c
Show file tree
Hide file tree
Showing 24 changed files with 1,787 additions and 665 deletions.
142 changes: 72 additions & 70 deletions discovery/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/prometheus/prometheus/util/strutil"
apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

// Endpoints discovers new endpoint targets.
Expand All @@ -41,14 +42,16 @@ type Endpoints struct {
nodeStore cache.Store
endpointsStore cache.Store
serviceStore cache.Store

queue *workqueue.Type
}

// NewEndpoints returns a new endpoints discovery.
func NewEndpoints(l log.Logger, svc, eps, pod, node cache.SharedInformer) *Endpoints {
if l == nil {
l = log.NewNopLogger()
}
ep := &Endpoints{
e := &Endpoints{
logger: l,
endpointsInf: eps,
endpointsStore: eps.GetStore(),
Expand All @@ -58,25 +61,49 @@ func NewEndpoints(l log.Logger, svc, eps, pod, node cache.SharedInformer) *Endpo
podStore: pod.GetStore(),
nodeInf: node,
nodeStore: node.GetStore(),
queue: workqueue.NewNamed("endpoints"),
}

return ep
e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("endpoints", "add").Inc()
e.enqueue(o)
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("endpoints", "update").Inc()
e.enqueue(o)
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("endpoints", "delete").Inc()
e.enqueue(o)
},
})

return e
}

func (e *Endpoints) enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}

e.queue.Add(key)
}

// Run implements the Discoverer interface.
func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Send full initial set of endpoint targets.
var initial []*targetgroup.Group

for _, o := range e.endpointsStore.List() {
tg := e.buildEndpoints(o.(*apiv1.Endpoints))
initial = append(initial, tg)
cacheSyncs := []cache.InformerSynced{
e.endpointsInf.HasSynced,
e.serviceInf.HasSynced,
e.podInf.HasSynced,
e.nodeInf.HasSynced,
}
select {
case <-ctx.Done():
if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) {
level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache")
return
case ch <- initial:
}

// Send target groups for pod updates.
send := func(tg *targetgroup.Group) {
if tg == nil {
Expand All @@ -89,73 +116,44 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
}
}

e.endpointsInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("endpoints", "add").Inc()

eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(e.buildEndpoints(eps))
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("endpoints", "update").Inc()

eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(e.buildEndpoints(eps))
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("endpoints", "delete").Inc()

eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return
}
send(&targetgroup.Group{Source: endpointsSource(eps)})
},
})
workFunc := func() bool {
keyObj, quit := e.queue.Get()
if quit {
return true
}
defer e.queue.Done(keyObj)
key := keyObj.(string)

serviceUpdate := func(o interface{}) {
svc, err := convertToService(o)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "converting to Service object failed", "err", err)
return
level.Error(e.logger).Log("msg", "spliting key failed", "key", key)
return false
}

ep := &apiv1.Endpoints{}
ep.Namespace = svc.Namespace
ep.Name = svc.Name
obj, exists, err := e.endpointsStore.Get(ep)
if exists && err != nil {
send(e.buildEndpoints(obj.(*apiv1.Endpoints)))
o, exists, err := e.endpointsStore.GetByKey(key)
if err != nil {
level.Error(e.logger).Log("msg", "getting object from store failed", "key", key)
return false
}
if !exists {
send(&targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)})
return false
}
eps, err := convertToEndpoints(o)
if err != nil {
level.Error(e.logger).Log("msg", "retrieving endpoints failed", "err", err)
level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err)
return false
}
send(e.buildEndpoints(eps))
return false
}

for {
quit := workFunc()
if quit {
return
}
}
e.serviceInf.AddEventHandler(cache.ResourceEventHandlerFuncs{
// TODO(fabxc): potentially remove add and delete event handlers. Those should
// be triggered via the endpoint handlers already.
AddFunc: func(o interface{}) {
eventCount.WithLabelValues("service", "add").Inc()
serviceUpdate(o)
},
UpdateFunc: func(_, o interface{}) {
eventCount.WithLabelValues("service", "update").Inc()
serviceUpdate(o)
},
DeleteFunc: func(o interface{}) {
eventCount.WithLabelValues("service", "delete").Inc()
serviceUpdate(o)
},
})

// Block until the target provider is explicitly canceled.
<-ctx.Done()
Expand All @@ -182,6 +180,10 @@ func endpointsSource(ep *apiv1.Endpoints) string {
return "endpoints/" + ep.ObjectMeta.Namespace + "/" + ep.ObjectMeta.Name
}

func endpointsSourceFromNamespaceAndName(namespace, name string) string {
return "endpoints/" + namespace + "/" + name
}

const (
endpointsNameLabel = metaLabelPrefix + "endpoints_name"
endpointReadyLabel = metaLabelPrefix + "endpoint_ready"
Expand Down
57 changes: 12 additions & 45 deletions discovery/kubernetes/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,8 @@ import (
"k8s.io/client-go/tools/cache"
)

func endpointsStoreKeyFunc(obj interface{}) (string, error) {
return obj.(*v1.Endpoints).ObjectMeta.Name, nil
}

func newFakeEndpointsInformer() *fakeInformer {
return newFakeInformer(endpointsStoreKeyFunc)
return newFakeInformer(cache.DeletionHandlingMetaNamespaceKeyFunc)
}

func makeTestEndpointsDiscovery() (*Endpoints, *fakeInformer, *fakeInformer, *fakeInformer, *fakeInformer) {
Expand Down Expand Up @@ -84,44 +80,6 @@ func makeEndpoints() *v1.Endpoints {
}
}

func TestEndpointsDiscoveryInitial(t *testing.T) {
n, _, eps, _, _ := makeTestEndpointsDiscovery()
eps.GetStore().Add(makeEndpoints())

k8sDiscoveryTest{
discovery: n,
expectedInitial: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
"__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true",
},
{
"__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "false",
},
},
Labels: model.LabelSet{
"__meta_kubernetes_namespace": "default",
"__meta_kubernetes_endpoints_name": "testendpoints",
},
Source: "endpoints/default/testendpoints",
},
},
}.Run(t)
}

func TestEndpointsDiscoveryAdd(t *testing.T) {
n, _, eps, pods, _ := makeTestEndpointsDiscovery()
pods.GetStore().Add(&v1.Pod{
Expand Down Expand Up @@ -326,8 +284,17 @@ func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) {
eps.GetStore().Add(makeEndpoints())

k8sDiscoveryTest{
discovery: n,
afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() },
discovery: n,
afterStart: func() {
go func() {
obj := makeEndpoints()
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
t.Errorf("failed to get key for %v: %v", obj, err)
}
eps.Delete(cache.DeletedFinalStateUnknown{Key: key, Obj: obj})
}()
},
expectedRes: []*targetgroup.Group{
{
Source: "endpoints/default/testendpoints",
Expand Down

0 comments on commit 2f63d9c

Please sign in to comment.