Skip to content

Commit

Permalink
amqp: handler: Check if headers are passed
Browse files Browse the repository at this point in the history
  • Loading branch information
João Neto authored and netoax committed Apr 22, 2020
1 parent e4b1a3c commit c397429
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions pkg/server/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package server

import (
"errors"

"github.com/CESARBR/knot-babeltower/pkg/logging"
"github.com/CESARBR/knot-babeltower/pkg/network"
"github.com/CESARBR/knot-babeltower/pkg/thing/controllers"
Expand Down Expand Up @@ -80,10 +82,16 @@ func (mc *MsgHandler) onMsgReceived(msgChan chan network.InMsg) {
mc.logger.Infof("exchange: %s, routing key: %s", msg.Exchange, msg.RoutingKey)
mc.logger.Infof("message received: %s", string(msg.Body))

token, ok := msg.Headers["Authorization"].(string)
if !ok {
mc.logger.Error(errors.New("authorization token not provided"))
continue
}

if msg.Exchange == exchangeFogIn {
err = mc.handleClientMessages(msg)
err = mc.handleClientMessages(msg, token)
} else if msg.Exchange == exchangeConnOut {
err = mc.handleConnectorMessages(msg)
err = mc.handleConnectorMessages(msg, token)
}

if err != nil {
Expand All @@ -93,35 +101,33 @@ func (mc *MsgHandler) onMsgReceived(msgChan chan network.InMsg) {
}
}

func (mc *MsgHandler) handleClientMessages(msg network.InMsg) error {
authorizationHeader := msg.Headers["Authorization"]
func (mc *MsgHandler) handleClientMessages(msg network.InMsg, token string) error {

switch msg.RoutingKey {
case "device.register":
return mc.thingController.Register(msg.Body, authorizationHeader.(string))
return mc.thingController.Register(msg.Body, token)
case "device.unregister":
return mc.thingController.Unregister(msg.Body, authorizationHeader.(string))
return mc.thingController.Unregister(msg.Body, token)
case "schema.update":
return mc.thingController.UpdateSchema(msg.Body, authorizationHeader.(string))
return mc.thingController.UpdateSchema(msg.Body, token)
case "device.cmd.auth":
return mc.thingController.AuthDevice(msg.Body, authorizationHeader.(string))
return mc.thingController.AuthDevice(msg.Body, token)
case "device.cmd.list":
return mc.thingController.ListDevices(authorizationHeader.(string))
return mc.thingController.ListDevices(token)
case "data.publish":
return mc.thingController.PublishData(msg.Body, authorizationHeader.(string))
return mc.thingController.PublishData(msg.Body, token)
}

return nil
}

func (mc *MsgHandler) handleConnectorMessages(msg network.InMsg) error {
authorizationHeader := msg.Headers["Authorization"]
func (mc *MsgHandler) handleConnectorMessages(msg network.InMsg, token string) error {

switch msg.RoutingKey {
case "data.request":
return mc.thingController.RequestData(msg.Body, authorizationHeader.(string))
return mc.thingController.RequestData(msg.Body, token)
case "data.update":
return mc.thingController.UpdateData(msg.Body, authorizationHeader.(string))
return mc.thingController.UpdateData(msg.Body, token)
case "device.registered":
// Ignore message
}
Expand Down

0 comments on commit c397429

Please sign in to comment.