Skip to content

Commit

Permalink
change envoy controll
Browse files Browse the repository at this point in the history
  • Loading branch information
barnettZQG committed Jul 4, 2019
1 parent f3574dd commit 5381b22
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 185 deletions.
4 changes: 2 additions & 2 deletions cmd/init-probe/healthy/depends_health_test.go
Expand Up @@ -33,11 +33,11 @@ import (
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
)

var testClusterID = "2bf54c5a0b5a48a890e2dda8635cb507_aff929446a7e48bea94c75447ed40b09_grc9e8e3"
var testClusterID = "2bf54c5a0b5a48a890e2dda8635cb507_2591d9904fc4480c9c012037697f98c6_grc9e8e3"

var testXDSHost = "192.168.195.1:6101"

// var testClusterID = "6ab5725e1ca34cfba7762b7ac10c0dee_9d379258e0bc4fc581331780b0541ac6_grc69d9c"
// var testClusterID = "2bf54c5a0b5a48a890e2dda8635cb507_tcpmeshed6827c0afdda50599b4108105c9e8e3_grc9e8e3"
//var testXDSHost = "127.0.0.1:6101"

func TestClientListener(t *testing.T) {
Expand Down
38 changes: 9 additions & 29 deletions node/kubecache/kube_cache.go
Expand Up @@ -24,8 +24,6 @@ import (
"strings"
"time"

"k8s.io/client-go/tools/cache"

"github.com/pquerna/ffjson/ffjson"

conf "github.com/goodrain/rainbond/cmd/node/option"
Expand Down Expand Up @@ -55,6 +53,7 @@ const (

//KubeClient KubeClient
type KubeClient interface {
GetKubeClient() kubernetes.Interface
UpK8sNode(*client.HostNode) (*v1.Node, error)
DownK8sNode(nodename string) error
GetAllPods() (pods []*v1.Pod, err error)
Expand All @@ -69,7 +68,6 @@ type KubeClient interface {
GetEndpoints(namespace string, selector labels.Selector) ([]*v1.Endpoints, error)
GetServices(namespace string, selector labels.Selector) ([]*v1.Service, error)
GetConfig(namespace string, selector labels.Selector) ([]*v1.ConfigMap, error)
AddEventWatch(eventtype string, handler cache.ResourceEventHandler)
Stop()
}

Expand All @@ -90,31 +88,22 @@ func NewKubeClient(cfg *conf.Conf) (KubeClient, error) {
func(options *metav1.ListOptions) {
//options.LabelSelector = "creater=Rainbond"
})
serviceInformers := sharedInformers.Core().V1().Services().Informer()
go serviceInformers.Run(stop)
endpointInformers := sharedInformers.Core().V1().Endpoints().Informer()
go endpointInformers.Run(stop)
configmapInformers := sharedInformers.Core().V1().ConfigMaps().Informer()
go configmapInformers.Run(stop)
sharedInformers.Core().V1().Services().Informer()
sharedInformers.Core().V1().Endpoints().Informer()
sharedInformers.Core().V1().ConfigMaps().Informer()
sharedInformers.Core().V1().Nodes().Informer()
sharedInformers.Core().V1().Pods().Informer()
sharedInformers.Start(stop)
return &kubeClient{
kubeclient: cli,
stop: stop,
watchInformers: map[string]cache.SharedIndexInformer{
"service": serviceInformers,
"configmap": configmapInformers,
"endpoint": endpointInformers,
},
kubeclient: cli,
stop: stop,
sharedInformers: sharedInformers,
}, nil
}

type kubeClient struct {
kubeclient *kubernetes.Clientset
sharedInformers informers.SharedInformerFactory
watchInformers map[string]cache.SharedIndexInformer
stop chan struct{}
}

Expand All @@ -124,18 +113,9 @@ func (k *kubeClient) Stop() {
}
}

//AddEventWatch add event watch handler
// if eventtype is "" or "all" ,means watch all
func (k *kubeClient) AddEventWatch(eventtype string, handler cache.ResourceEventHandler) {
if eventtype == "all" || eventtype == "" {
for _, watch := range k.watchInformers {
watch.AddEventHandler(handler)
}
} else {
if watch, ok := k.watchInformers[eventtype]; ok && watch != nil {
watch.AddEventHandler(handler)
}
}
//GetKubeClient get kube client
func (k *kubeClient) GetKubeClient() kubernetes.Interface {
return k.kubeclient
}

//GetNodeByName get node
Expand Down
145 changes: 145 additions & 0 deletions node/nodem/envoy/queue.go
@@ -0,0 +1,145 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2019 Goodrain Co., Ltd.

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package envoy

import (
"sync"
"time"

"github.com/Sirupsen/logrus"
)

//Event event
type Event int

const (
// EventAdd is sent when an object is added
EventAdd Event = iota

// EventUpdate is sent when an object is modified
// Captures the modified object
EventUpdate

// EventDelete is sent when an object is deleted
// Captures the object at the last known state
EventDelete
)

// Queue of work tickets processed using a rate-limiting loop
type Queue interface {
// Push a ticket
Push(Task)
// Run the loop until a signal on the channel
Run(<-chan struct{})
}

// Handler specifies a function to apply on an object for a given event type
type Handler func(obj interface{}, event Event) error

// Task object for the event watchers; processes until handler succeeds
type Task struct {
handler Handler
obj interface{}
event Event
}

// NewTask creates a task from a work item
func NewTask(handler Handler, obj interface{}, event Event) Task {
return Task{handler: handler, obj: obj, event: event}
}

type queueImpl struct {
delay time.Duration
queue []Task
cond *sync.Cond
closing bool
}

// NewQueue instantiates a queue with a processing function
func NewQueue(errorDelay time.Duration) Queue {
return &queueImpl{
delay: errorDelay,
queue: make([]Task, 0),
closing: false,
cond: sync.NewCond(&sync.Mutex{}),
}
}

func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.queue = append(q.queue, item)
}
q.cond.Signal()
}

func (q *queueImpl) Run(stop <-chan struct{}) {
go func() {
<-stop
q.cond.L.Lock()
q.closing = true
q.cond.L.Unlock()
}()

for {
q.cond.L.Lock()
for !q.closing && len(q.queue) == 0 {
q.cond.Wait()
}

if len(q.queue) == 0 {
q.cond.L.Unlock()
// We must be shutting down.
return
}

var item Task
item, q.queue = q.queue[0], q.queue[1:]
q.cond.L.Unlock()

if err := item.handler(item.obj, item.event); err != nil {
logrus.Infof("Work item handle failed (%v), retry after delay %v", err, q.delay)
time.AfterFunc(q.delay, func() {
q.Push(item)
})
}

}
}

// ChainHandler applies handlers in a sequence
type ChainHandler struct {
funcs []Handler
}

// Apply is the handler function
func (ch *ChainHandler) Apply(obj interface{}, event Event) error {
for _, f := range ch.funcs {
if err := f(obj, event); err != nil {
return err
}
}
return nil
}

// Append a handler as the last handler in the chain
func (ch *ChainHandler) Append(h Handler) {
ch.funcs = append(ch.funcs, h)
}

0 comments on commit 5381b22

Please sign in to comment.