Skip to content

Commit

Permalink
remove event type from cloudhub
Browse files Browse the repository at this point in the history
  • Loading branch information
fisherxu committed Nov 8, 2019
1 parent b4c821d commit 818c954
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 318 deletions.
64 changes: 32 additions & 32 deletions cloud/pkg/cloudhub/channelq/channelq.go
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/klog"

beehiveContext "github.com/kubeedge/beehive/pkg/core/context"
beehiveModel "github.com/kubeedge/beehive/pkg/core/model"
"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub/common/model"
)

Expand All @@ -17,30 +18,30 @@ const (
rChanBufSize = 10
)

// EventSet holds a set of events
type EventSet interface {
// MessageSet holds a set of messages
type MessageSet interface {
Ack() error
Get() (*model.Event, error)
Get() (*beehiveModel.Message, error)
}

// ChannelEventSet is the channel implementation of EventSet
type ChannelEventSet struct {
current model.Event
messages <-chan model.Event
// ChannelMessageSet is the channel implementation of MessageSet
type ChannelMessageSet struct {
current beehiveModel.Message
messages <-chan beehiveModel.Message
}

// NewChannelEventSet initializes a new ChannelEventSet instance
func NewChannelEventSet(messages <-chan model.Event) *ChannelEventSet {
return &ChannelEventSet{messages: messages}
// NewChannelMessageSet initializes a new ChannelMessageSet instance
func NewChannelMessageSet(messages <-chan beehiveModel.Message) *ChannelMessageSet {
return &ChannelMessageSet{messages: messages}
}

// Ack acknowledges once the event is processed
func (s *ChannelEventSet) Ack() error {
func (s *ChannelMessageSet) Ack() error {
return nil
}

// Get obtains one event from the queue
func (s *ChannelEventSet) Get() (*model.Event, error) {
func (s *ChannelMessageSet) Get() (*beehiveModel.Message, error) {
var ok bool
s.current, ok = <-s.messages
if !ok {
Expand All @@ -49,22 +50,22 @@ func (s *ChannelEventSet) Get() (*model.Event, error) {
return &s.current, nil
}

// ChannelEventQueue is the channel implementation of EventQueue
type ChannelEventQueue struct {
// ChannelMessageQueue is the channel implementation of MessageQueue
type ChannelMessageQueue struct {
ctx *beehiveContext.Context
channelPool sync.Map
}

// NewChannelEventQueue initializes a new ChannelEventQueue
func NewChannelEventQueue(ctx *beehiveContext.Context) *ChannelEventQueue {
q := ChannelEventQueue{ctx: ctx}
// NewChannelMessageQueue initializes a new ChannelMessageQueue
func NewChannelMessageQueue(ctx *beehiveContext.Context) *ChannelMessageQueue {
q := ChannelMessageQueue{ctx: ctx}
return &q
}

// DispatchMessage gets the message from the cloud, extracts the
// node id from it, gets the channel associated with the node
// and pushes the event on the channel
func (q *ChannelEventQueue) DispatchMessage(ctx context.Context) {
func (q *ChannelMessageQueue) DispatchMessage(ctx context.Context) {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -96,28 +97,28 @@ func (q *ChannelEventQueue) DispatchMessage(ctx context.Context) {
klog.Infof("fail to get dispatch channel for %s", nodeID)
continue
}
rChannel <- model.MessageToEvent(&msg)
rChannel <- msg
}
}

func (q *ChannelEventQueue) getRChannel(nodeID string) (chan model.Event, error) {
func (q *ChannelMessageQueue) getRChannel(nodeID string) (chan beehiveModel.Message, error) {
channels, ok := q.channelPool.Load(nodeID)
if !ok {
klog.Errorf("rChannel for edge node %s is removed", nodeID)
return nil, fmt.Errorf("rChannel not found")
}
rChannel := channels.(chan model.Event)
rChannel := channels.(chan beehiveModel.Message)
return rChannel, nil
}

// Connect allocates rChannel for given project and group
func (q *ChannelEventQueue) Connect(info *model.HubInfo) error {
func (q *ChannelMessageQueue) Connect(info *model.HubInfo) error {
_, ok := q.channelPool.Load(info.NodeID)
if ok {
return fmt.Errorf("edge node %s is already connected", info.NodeID)
}
// allocate a new rchannel with default buffer size
rChannel := make(chan model.Event, rChanBufSize)
rChannel := make(chan beehiveModel.Message, rChanBufSize)
_, ok = q.channelPool.LoadOrStore(info.NodeID, rChannel)
if ok {
// rchannel is already allocated
Expand All @@ -127,40 +128,39 @@ func (q *ChannelEventQueue) Connect(info *model.HubInfo) error {
}

// Close closes rChannel for given project and group
func (q *ChannelEventQueue) Close(info *model.HubInfo) error {
func (q *ChannelMessageQueue) Close(info *model.HubInfo) error {
channels, ok := q.channelPool.Load(info.NodeID)
if !ok {
klog.Warningf("rChannel for edge node %s is already removed", info.NodeID)
return nil
}
rChannel := channels.(chan model.Event)
rChannel := channels.(chan beehiveModel.Message)
close(rChannel)
q.channelPool.Delete(info.NodeID)
return nil
}

// Publish sends message via the rchannel to Edge Controller
func (q *ChannelEventQueue) Publish(info *model.HubInfo, event *model.Event) error {
msg := model.EventToMessage(event)
func (q *ChannelMessageQueue) Publish(msg *beehiveModel.Message) error {
switch msg.Router.Source {
case model.ResTwin:
q.ctx.SendToGroup(model.SrcDeviceController, msg)
q.ctx.SendToGroup(model.SrcDeviceController, *msg)
default:
q.ctx.SendToGroup(model.SrcEdgeController, msg)
q.ctx.SendToGroup(model.SrcEdgeController, *msg)
}
return nil
}

// Consume retrieves message from the rChannel for given project and group
func (q *ChannelEventQueue) Consume(info *model.HubInfo) (EventSet, error) {
func (q *ChannelMessageQueue) Consume(info *model.HubInfo) (MessageSet, error) {
rChannel, err := q.getRChannel(info.NodeID)
if err != nil {
return nil, err
}
return NewChannelEventSet((<-chan model.Event)(rChannel)), nil
return NewChannelMessageSet((<-chan beehiveModel.Message)(rChannel)), nil
}

// Workload returns the number of queue channels connected to queue
func (q *ChannelEventQueue) Workload() (float64, error) {
func (q *ChannelMessageQueue) Workload() (float64, error) {
return 1, nil
}
8 changes: 4 additions & 4 deletions cloud/pkg/cloudhub/cloudhub.go
Expand Up @@ -41,18 +41,18 @@ func (a *cloudHub) Start(c *beehiveContext.Context) {

initHubConfig()

eventq := channelq.NewChannelEventQueue(c)
messageq := channelq.NewChannelMessageQueue(c)

// start dispatch message from the cloud to edge node
go eventq.DispatchMessage(ctx)
go messageq.DispatchMessage(ctx)

// start the cloudhub server
if util.HubConfig.ProtocolWebsocket {
go servers.StartCloudHub(servers.ProtocolWebsocket, eventq, c)
go servers.StartCloudHub(servers.ProtocolWebsocket, messageq, c)
}

if util.HubConfig.ProtocolQuic {
go servers.StartCloudHub(servers.ProtocolQuic, eventq, c)
go servers.StartCloudHub(servers.ProtocolQuic, messageq, c)
}

if util.HubConfig.ProtocolUDS {
Expand Down
68 changes: 13 additions & 55 deletions cloud/pkg/cloudhub/common/model/types.go
Expand Up @@ -61,48 +61,6 @@ type HubInfo struct {
NodeID string
}

// UserGroupInfo struct
type UserGroupInfo struct {
Resource string `json:"resource"`
Operation string `json:"operation"`
}

// Event represents message communicated between cloud hub and edge hub
type Event struct {
Group string `json:"msg_group"`
Source string `json:"source"`
UserGroup UserGroupInfo `json:"user_group"`
ID string `json:"msg_id"`
ParentID string `json:"parent_msg_id"`
Timestamp int64 `json:"timestamp"`
Content interface{} `json:"content"`
}

// EventToMessage converts an event to a model message
func EventToMessage(event *Event) model.Message {
var msg model.Message
msg.BuildHeader(event.ID, event.ParentID, event.Timestamp)
msg.BuildRouter(event.Source, event.Group, event.UserGroup.Resource, event.UserGroup.Operation)
msg.FillBody(event.Content)
return msg
}

// MessageToEvent converts a model message to an event
func MessageToEvent(msg *model.Message) Event {
var event Event
event.ID = msg.GetID()
event.ParentID = msg.GetParentID()
event.Timestamp = msg.GetTimestamp()
event.Source = msg.GetSource()
event.Group = msg.GetGroup()
event.Content = msg.GetContent()
event.UserGroup = UserGroupInfo{
Resource: msg.GetResource(),
Operation: msg.GetOperation(),
}
return event
}

// NewResource constructs a resource field using resource type and ID
func NewResource(resType, resID string, info *HubInfo) string {
var prefix string
Expand All @@ -116,20 +74,20 @@ func NewResource(resType, resID string, info *HubInfo) string {
}

// IsNodeStopped indicates if the node is stopped or running
func (event *Event) IsNodeStopped() bool {
tokens := strings.Split(event.UserGroup.Resource, "/")
func IsNodeStopped(msg *model.Message) bool {
tokens := strings.Split(msg.Router.Resource, "/")
if len(tokens) != 2 || tokens[0] != ResNode {
return false
}
if event.UserGroup.Operation == OpDelete {
if msg.Router.Operation == OpDelete {
return true
}
if event.UserGroup.Operation != OpUpdate || event.Content == nil {
if msg.Router.Operation != OpUpdate || msg.Content == nil {
return false
}
body, ok := event.Content.(map[string]interface{})
body, ok := msg.Content.(map[string]interface{})
if !ok {
klog.Errorf("fail to decode node update message: %s, type is %T", event.GetContent(), event.Content)
klog.Errorf("fail to decode node update message: %s, type is %T", msg.GetContent(), msg.Content)
// it can't be determined if the node has stopped
return false
}
Expand All @@ -142,16 +100,16 @@ func (event *Event) IsNodeStopped() bool {
}

// IsFromEdge judges if the event is sent from edge
func (event *Event) IsFromEdge() bool {
func IsFromEdge(msg *model.Message) bool {
return true
}

// IsToEdge judges if the vent should be sent to edge
func (event *Event) IsToEdge() bool {
if event.Source != SrcManager {
func IsToEdge(msg *model.Message) bool {
if msg.Router.Source != SrcManager {
return true
}
resource := event.UserGroup.Resource
resource := msg.Router.Resource
if strings.HasPrefix(resource, ResNode) {
tokens := strings.Split(resource, "/")
if len(tokens) >= 3 {
Expand All @@ -168,7 +126,7 @@ func (event *Event) IsToEdge() bool {
}
for res, ops := range resOpMap {
for _, op := range ops {
if event.UserGroup.Operation == op && strings.Contains(resource, res) {
if msg.Router.Operation == op && strings.Contains(resource, res) {
return false
}
}
Expand All @@ -177,6 +135,6 @@ func (event *Event) IsToEdge() bool {
}

// GetContent dumps the content to string
func (event *Event) GetContent() string {
return fmt.Sprintf("%v", event.Content)
func GetContent(msg *model.Message) string {
return fmt.Sprintf("%v", msg.Content)
}

0 comments on commit 818c954

Please sign in to comment.