Skip to content

Commit

Permalink
Merge pull request #964 from evanlixin/fix-k8s-watch-event
Browse files Browse the repository at this point in the history
fix: bcs-k8s-watch fix informer handler hang question by fifo queue; …
  • Loading branch information
DeveloperJim committed Aug 19, 2021
2 parents 76fdc53 + cb4a704 commit 31004c0
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 7 deletions.
44 changes: 40 additions & 4 deletions bcs-k8s/bcs-k8s-watch/app/k8s/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"github.com/Tencent/bk-bcs/bcs-k8s/bcs-k8s-watch/app/bcs"
"github.com/Tencent/bk-bcs/bcs-k8s/bcs-k8s-watch/app/output"
"github.com/Tencent/bk-bcs/bcs-k8s/bcs-k8s-watch/app/output/action"
"github.com/Tencent/bk-bcs/bcs-k8s/bcs-k8s-watch/pkg/metrics"

"github.com/parnurzeal/gorequest"
"github.com/sheerun/queue"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -40,6 +42,9 @@ import (
)

const (
// defaultWatcherQueueTime for watcher queue metrics collect
defaultWatcherQueueTime = 3 * time.Second

// defaultSyncInterval is default sync interval.
defaultSyncInterval = 30 * time.Second

Expand All @@ -58,6 +63,7 @@ const (
type Watcher struct {
resourceType string
resourceNamespaced bool
queue *queue.Queue
controller cache.Controller
store cache.Store
writer *output.Writer
Expand All @@ -74,6 +80,7 @@ func NewWatcher(client *rest.Interface, resourceType string, resourceName string
writer: writer,
sharedWatchers: sharedWatchers,
resourceNamespaced: resourceNamespaced,
queue: queue.New(),
}

// build list watch.
Expand All @@ -87,7 +94,7 @@ func NewWatcher(client *rest.Interface, resourceType string, resourceName string
}

// build informer.
store, controller := cache.NewInformer(listWatch, objType, defaultSyncInterval, eventHandler)
store, controller := cache.NewInformer(listWatch, objType, 0, eventHandler)
watcher.store = store
watcher.controller = controller

Expand All @@ -106,17 +113,46 @@ func (w *Watcher) ListKeys() []string {

// Run starts the watcher.
func (w *Watcher) Run(stopCh <-chan struct{}) {
// do with handler data
go w.handleQueueData(stopCh)
go wait.NonSlidingUntil(func() {
metrics.ReportK8sWatcherQueueLength(w.resourceType, float64(w.queue.Length()))
}, time.Second*1, stopCh)

// run controller.
w.controller.Run(stopCh)
}

func (w *Watcher) handleQueueData(stopCh <-chan struct{}) {
glog.Infof("watcher %s handleQueueData", w.resourceType)

for {
select {
case <-stopCh:
glog.Infof("receive stop signal, quit watcher: %s", w.resourceType)
return
default:
}

data := w.queue.Pop()
sData, ok := data.(*action.SyncData)
if !ok {
glog.Errorf("queue data trans to *action.SyncData failed")
continue
}

glog.V(4).Infof("queue length[%s:%d] resource[%s:%s:%s]", w.resourceType, w.queue.Length(), sData.Action, sData.Namespace, sData.Name)
w.writer.Sync(sData)
}
}

// AddEvent is event handler for add resource event.
func (w *Watcher) AddEvent(obj interface{}) {
data := w.genSyncData(obj, action.SyncDataActionAdd)
if data == nil {
return
}
w.writer.Sync(data)
w.queue.Append(data)
}

// DeleteEvent is event handler for delete resource event.
Expand All @@ -125,7 +161,7 @@ func (w *Watcher) DeleteEvent(obj interface{}) {
if data == nil {
return
}
w.writer.Sync(data)
w.queue.Append(data)
}

// UpdateEvent is event handler for update resource event.
Expand Down Expand Up @@ -176,7 +212,7 @@ func (w *Watcher) UpdateEvent(oldObj, newObj interface{}) {
if data == nil {
return
}
w.writer.Sync(data)
w.queue.Append(data)
}

// isEventShouldFilter filters k8s system events.
Expand Down
1 change: 1 addition & 0 deletions bcs-k8s/bcs-k8s-watch/app/output/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type StorageRequestBody struct {
Data interface{} `json:"data"`
}

// GetURL get url
func (client *StorageClient) GetURL() (string, string) {
// Event
if client.ResourceType == "Event" {
Expand Down
1 change: 1 addition & 0 deletions bcs-k8s/bcs-k8s-watch/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.9.0
github.com/satori/go.uuid v1.2.0
github.com/sheerun/queue v1.0.1
github.com/spf13/pflag v1.0.5
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
golang.org/x/oauth2 v0.0.0-20210113205817-d3ed898aa8a3 // indirect
Expand Down
24 changes: 24 additions & 0 deletions bcs-k8s/bcs-k8s-watch/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ var (
Buckets: []float64{0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 3.0},
}, []string{"cluster_id", "handler", "namespace", "resource_type", "method", "status"})

// bcs-k8s-watch record watcher queue length
requestsWatcherQueueLength = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "bcs_k8s_watch",
Subsystem: "queue",
Name: "watcher_queue_num",
Help: "The total number of watcher queue",
}, []string{"watcher"})

// bcs-k8s-watch record queueData metrics
requestsTotalHandlerQueue = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: BkBcsK8sWatch,
Expand Down Expand Up @@ -77,6 +85,7 @@ func init() {
// handler queue data
prometheus.MustRegister(requestsTotalHandlerQueue)
prometheus.MustRegister(requestLatencyHandler)
prometheus.MustRegister(requestsWatcherQueueLength)

// handler discard events
prometheus.MustRegister(handlerDiscardEvents)
Expand All @@ -103,6 +112,21 @@ func ReportK8sWatchHandlerQueueLengthDec(clusterID, handler string) {
requestsTotalHandlerQueue.WithLabelValues(clusterID, handler).Dec()
}

// ReportK8sWatcherQueueLength report watcher queue length
func ReportK8sWatcherQueueLength(watcher string, queueLen float64) {
requestsWatcherQueueLength.WithLabelValues(watcher).Set(queueLen)
}

// ReportK8sWatcherQueueLengthInc inc queue len
func ReportK8sWatcherQueueLengthInc(watcher string) {
requestsWatcherQueueLength.WithLabelValues(watcher).Inc()
}

// ReportK8sWatcherQueueLengthDec dec queue len
func ReportK8sWatcherQueueLengthDec(watcher string) {
requestsWatcherQueueLength.WithLabelValues(watcher).Dec()
}

// ReportK8sWatchHandlerDiscardEvents report handler discard events num
func ReportK8sWatchHandlerDiscardEvents(clusterID, handler string) {
handlerDiscardEvents.WithLabelValues(clusterID, handler).Inc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type NetworkPolicyController struct {
dataInfrSynced bool
}

// SetDataInformerSynced set informer sync state
func (npc *NetworkPolicyController) SetDataInformerSynced() {
npc.dataInfrSynced = true
}
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-cluster-manager/cidrmigrationtool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func uploadMongo() {
if cidr.Cluster != nil {
newCidr.Cluster = *cidr.Cluster
}
if err := tkeStore.CreateTkeCidr(context.TODO(), newCidr); err != nil {
if err := tkeStore.CreateTkeCidr(context.Background(), newCidr); err != nil {
blog.Fatalf("create tke cidr %+v to mongo failed, err %s", newCidr, err.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-cluster-manager/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (cm *ClusterManager) initLocker() error {
blog.Errorf("init locker failed, err %s", err.Error())
return err
}
blog.Infof("init locker successfullly")
blog.Infof("init locker successful")
cm.locker = locker
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *Client) Lock(id string, opts ...lock.LockOption) error {

m := cc.NewMutex(s, lpath)

if err := m.Lock(context.TODO()); err != nil {
if err := m.Lock(context.Background()); err != nil {
return err
}

Expand Down

0 comments on commit 31004c0

Please sign in to comment.