Skip to content

Commit

Permalink
Merge pull request kubeedge#1215 from kadisi/cloudcore_stop_channel
Browse files Browse the repository at this point in the history
bugfix: Solve the problem of outputting a large amount of logs after cloudcore exits
  • Loading branch information
kubeedge-bot committed Nov 4, 2019
2 parents 4a75b7c + 1d27cbb commit c6a5aa7
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 313 deletions.
15 changes: 11 additions & 4 deletions cloud/pkg/cloudhub/channelq/channelq.go
@@ -1,13 +1,14 @@
package channelq

import (
"context"
"fmt"
"strings"
"sync"

"k8s.io/klog"

"github.com/kubeedge/beehive/pkg/core/context"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/model"
)

Expand Down Expand Up @@ -50,21 +51,27 @@ func (s *ChannelEventSet) Get() (*model.Event, error) {

// ChannelEventQueue is the channel implementation of EventQueue
type ChannelEventQueue struct {
ctx *context.Context
ctx *beehiveContext.Context
channelPool sync.Map
}

// NewChannelEventQueue initializes a new ChannelEventQueue
func NewChannelEventQueue(ctx *context.Context) *ChannelEventQueue {
func NewChannelEventQueue(ctx *beehiveContext.Context) *ChannelEventQueue {
q := ChannelEventQueue{ctx: ctx}
return &q
}

// DispatchMessage gets the message from the cloud, extracts the
// node id from it, gets the channel associated with the node
// and pushes the event on the channel
func (q *ChannelEventQueue) DispatchMessage() {
func (q *ChannelEventQueue) DispatchMessage(ctx context.Context) {
for {
select {
case <-ctx.Done():
klog.Warningf("Cloudhub channel eventqueue dispatch message loop stoped")
return
default:
}
msg, err := q.ctx.Receive(model.SrcCloudHub)
if err != nil {
klog.Info("receive not Message format message")
Expand Down
17 changes: 9 additions & 8 deletions cloud/pkg/cloudhub/cloudhub.go
@@ -1,14 +1,15 @@
package cloudhub

import (
"context"
"io/ioutil"
"os"

"k8s.io/klog"

"github.com/kubeedge/beehive/pkg/common/config"
"github.com/kubeedge/beehive/pkg/core"
"github.com/kubeedge/beehive/pkg/core/context"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/channelq"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/util"
chconfig "github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/config"
Expand All @@ -17,8 +18,8 @@ import (
)

type cloudHub struct {
context *context.Context
stopChan chan bool
context *beehiveContext.Context
cancel context.CancelFunc
}

func Register() {
Expand All @@ -33,16 +34,17 @@ func (a *cloudHub) Group() string {
return "cloudhub"
}

func (a *cloudHub) Start(c *context.Context) {
func (a *cloudHub) Start(c *beehiveContext.Context) {
var ctx context.Context
a.context = c
a.stopChan = make(chan bool)
ctx, a.cancel = context.WithCancel(context.Background())

initHubConfig()

eventq := channelq.NewChannelEventQueue(c)

// start dispatch message from the cloud to edge node
go eventq.DispatchMessage()
go eventq.DispatchMessage(ctx)

// start the cloudhub server
if util.HubConfig.ProtocolWebsocket {
Expand All @@ -59,11 +61,10 @@ func (a *cloudHub) Start(c *context.Context) {
go udsserver.StartServer(util.HubConfig, c)
}

<-a.stopChan
}

func (a *cloudHub) Cleanup() {
a.stopChan <- true
a.cancel()
a.context.Cleanup(a.Name())
}

Expand Down
50 changes: 17 additions & 33 deletions cloud/pkg/devicecontroller/controller/downstream.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"encoding/json"
"reflect"
"strconv"
Expand Down Expand Up @@ -76,22 +77,20 @@ type DownstreamController struct {
kubeClient *kubernetes.Clientset
messageLayer messagelayer.MessageLayer

deviceManager *manager.DeviceManager
deviceStop chan struct{}

deviceManager *manager.DeviceManager
deviceModelManager *manager.DeviceModelManager
deviceModelStop chan struct{}

configMapManager *manager.ConfigMapManager
configMapManager *manager.ConfigMapManager

crdClient *rest.RESTClient
}

// syncDeviceModel is used to get events from informer
func (dc *DownstreamController) syncDeviceModel(stop chan struct{}) {
running := true
for running {
func (dc *DownstreamController) syncDeviceModel(ctx context.Context) {
for {
select {
case <-ctx.Done():
klog.Info("stop syncDeviceModel")
return
case e := <-dc.deviceModelManager.Events():
deviceModel, ok := e.Object.(*v1alpha1.DeviceModel)
if !ok {
Expand All @@ -108,9 +107,6 @@ func (dc *DownstreamController) syncDeviceModel(stop chan struct{}) {
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
case <-stop:
klog.Info("stop syncDeviceModel")
running = false
}
}
}
Expand Down Expand Up @@ -157,10 +153,12 @@ func (dc *DownstreamController) deviceModelDeleted(deviceModel *v1alpha1.DeviceM
}

// syncDevice is used to get device events from informer
func (dc *DownstreamController) syncDevice(stop chan struct{}) {
running := true
for running {
func (dc *DownstreamController) syncDevice(ctx context.Context) {
for {
select {
case <-ctx.Done():
klog.Info("Stop syncDevice")
return
case e := <-dc.deviceManager.Events():
device, ok := e.Object.(*v1alpha1.Device)
if !ok {
Expand All @@ -177,9 +175,6 @@ func (dc *DownstreamController) syncDevice(stop chan struct{}) {
default:
klog.Warningf("Device event type: %s unsupported", e.Type)
}
case <-stop:
klog.Info("Stop syncDevice")
running = false
}
}
}
Expand Down Expand Up @@ -810,27 +805,16 @@ func (dc *DownstreamController) deviceDeleted(device *v1alpha1.Device) {
}

// Start DownstreamController
func (dc *DownstreamController) Start() error {
func (dc *DownstreamController) Start(ctx context.Context) error {
klog.Info("Start downstream devicecontroller")

dc.deviceModelStop = make(chan struct{})
go dc.syncDeviceModel(dc.deviceModelStop)
go dc.syncDeviceModel(ctx)

// Wait for adding all device model
// TODO need to think about sync
time.Sleep(1 * time.Second)
dc.deviceStop = make(chan struct{})
go dc.syncDevice(dc.deviceStop)

return nil
}

// Stop DownstreamController
func (dc *DownstreamController) Stop() error {
klog.Info("Stopping downstream devicecontroller")
defer klog.Info("Downstream devicecontroller stopped")
go dc.syncDevice(ctx)

dc.deviceStop <- struct{}{}
dc.deviceModelStop <- struct{}{}
return nil
}

Expand Down
53 changes: 17 additions & 36 deletions cloud/pkg/devicecontroller/controller/upstream.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"context"
"encoding/json"
"strconv"

Expand Down Expand Up @@ -48,11 +49,6 @@ const (
type UpstreamController struct {
crdClient *rest.RESTClient
messageLayer messagelayer.MessageLayer

// stop channel
stopDispatch chan struct{}
stopUpdateDeviceStatus chan struct{}

// message channel
deviceStatusChan chan model.Message

Expand All @@ -61,30 +57,28 @@ type UpstreamController struct {
}

// Start UpstreamController
func (uc *UpstreamController) Start() error {
func (uc *UpstreamController) Start(ctx context.Context) error {
klog.Info("Start upstream devicecontroller")
uc.stopDispatch = make(chan struct{})
uc.stopUpdateDeviceStatus = make(chan struct{})

uc.deviceStatusChan = make(chan model.Message, config.UpdateDeviceStatusBuffer)

go uc.dispatchMessage(uc.stopDispatch)
go uc.dispatchMessage(ctx)

for i := 0; i < config.UpdateDeviceStatusWorkers; i++ {
go uc.updateDeviceStatus(uc.stopUpdateDeviceStatus)
go uc.updateDeviceStatus(ctx)
}

return nil
}

func (uc *UpstreamController) dispatchMessage(stop chan struct{}) {
running := true
go func() {
<-stop
klog.Info("Stop dispatchMessage")
running = false
}()
for running {
func (uc *UpstreamController) dispatchMessage(ctx context.Context) {
for {
select {
case <-ctx.Done():
klog.Info("Stop dispatchMessage")
return
default:
}
msg, err := uc.messageLayer.Receive()
if err != nil {
klog.Warningf("Receive message failed, %s", err)
Expand All @@ -109,10 +103,12 @@ func (uc *UpstreamController) dispatchMessage(stop chan struct{}) {
}
}

func (uc *UpstreamController) updateDeviceStatus(stop chan struct{}) {
running := true
for running {
func (uc *UpstreamController) updateDeviceStatus(ctx context.Context) {
for {
select {
case <-ctx.Done():
klog.Info("Stop updateDeviceStatus")
return
case msg := <-uc.deviceStatusChan:
klog.Infof("Message: %s, operation is: %s, and resource is: %s", msg.GetID(), msg.GetOperation(), msg.GetResource())
msgTwin, err := uc.unmarshalDeviceStatusMessage(msg)
Expand Down Expand Up @@ -169,9 +165,6 @@ func (uc *UpstreamController) updateDeviceStatus(stop chan struct{}) {
continue
}
klog.Infof("Message: %s process successfully", msg.GetID())
case <-stop:
klog.Info("Stop updateDeviceStatus")
running = false
}
}
}
Expand All @@ -195,18 +188,6 @@ func (uc *UpstreamController) unmarshalDeviceStatusMessage(msg model.Message) (*
return twinUpdate, nil
}

// Stop UpstreamController
func (uc *UpstreamController) Stop() error {
klog.Info("Stopping upstream devicecontroller")
defer klog.Info("Upstream devicecontroller stopped")

uc.stopDispatch <- struct{}{}
for i := 0; i < config.UpdateDeviceStatusWorkers; i++ {
uc.stopUpdateDeviceStatus <- struct{}{}
}
return nil
}

// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
config, err := utils.KubeConfig()
Expand Down
22 changes: 11 additions & 11 deletions cloud/pkg/devicecontroller/module.go
@@ -1,21 +1,22 @@
package devicecontroller

import (
"context"
"os"
"time"

"k8s.io/klog"

"github.com/kubeedge/beehive/pkg/core"
bcontext "github.com/kubeedge/beehive/pkg/core/context"
beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/config"
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/constants"
"github.com/kubeedge/kubeedge/cloud/pkg/devicecontroller/controller"
)

// DeviceController use beehive context message layer
type DeviceController struct {
stopChan chan bool
cancel context.CancelFunc
}

func Register() {
Expand All @@ -34,9 +35,11 @@ func (dctl *DeviceController) Group() string {
}

// Start controller
func (dctl *DeviceController) Start(c *bcontext.Context) {
func (dctl *DeviceController) Start(c *beehiveContext.Context) {
var ctx context.Context
config.Context = c
dctl.stopChan = make(chan bool)

ctx, dctl.cancel = context.WithCancel(context.Background())

initConfig()

Expand All @@ -51,19 +54,16 @@ func (dctl *DeviceController) Start(c *bcontext.Context) {
os.Exit(1)
}

downstream.Start()
downstream.Start(ctx)
// wait for downstream controller to start and load deviceModels and devices
// TODO think about sync
time.Sleep(1 * time.Second)
upstream.Start()

<-dctl.stopChan
upstream.Stop()
downstream.Stop()
upstream.Start(ctx)
}

// Cleanup controller
func (dctl *DeviceController) Cleanup() {
dctl.stopChan <- true
dctl.cancel()
config.Context.Cleanup(dctl.Name())
}

Expand Down

0 comments on commit c6a5aa7

Please sign in to comment.