Skip to content

Commit

Permalink
mqtt access server implemented.
Browse files Browse the repository at this point in the history
  • Loading branch information
ruizeng committed Dec 4, 2015
1 parent 1aee088 commit 49b6bf0
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 40 deletions.
3 changes: 2 additions & 1 deletion build/local/linux/run.sh
Expand Up @@ -4,10 +4,11 @@
echo 'CREATE DATABASE PandoCloud' | mysql -uroot

# start services
$GOPATH/bin/httpaccess -etcd http://localhost:2379 -httphost :80 -loglevel debug -usehttps -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem &
$GOPATH/bin/httpaccess -etcd http://localhost:2379 -httphost inner:80 -loglevel debug -usehttps -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem &
$GOPATH/bin/registry -etcd http://localhost:2379 -rpchost localhost:20034 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP -dbhost localhost -dbname PandoCloud -dbport 3306 -dbuser root -loglevel debug &
$GOPATH/bin/apiprovider -etcd http://localhost:2379 -loglevel debug -httphost localhost:8888 &
$GOPATH/bin/devicemanager -etcd http://localhost:2379 -loglevel debug -rpchost localhost:20033 &
$GOPATH/bin/controller -etcd http://localhost:2379 -loglevel debug -rpchost localhost:20032 &
$GOPATH/bin/mqttaccess -etcd http://localhost:2379 -loglevel debug -rpchost localhost:20030 &

exit 0
4 changes: 3 additions & 1 deletion docs/zh-cn/quick-start/ubuntu.md
Expand Up @@ -124,4 +124,6 @@ sh -x ./build/local/linux/run.sh
```
go install github.com/PandoCloud/pando-cloud/tests/device
$GOPATH/bin/device
```
```

也可以使用任何实现了[Pando嵌入式框架](https://github.com/PandoCloud/pando-embeded-framework)的设备进行测试。(登录地址改为本地http接入服务器地址)
5 changes: 3 additions & 2 deletions pkg/mqtt/broker.go
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"net"
"time"
)

type Broker struct {
Expand All @@ -21,12 +22,12 @@ func (b *Broker) Handle(conn net.Conn) {
b.mgr.NewConn(conn)
}

func (b *Broker) SendMessageToDevice(deviceid uint64, msgtype string, message []byte) error {
func (b *Broker) SendMessageToDevice(deviceid uint64, msgtype string, message []byte, timeout time.Duration) error {
msg := &Publish{}
msg.Header.QosLevel = QosAtLeastOnce
msg.TopicName = DeviceIdToClientId(deviceid) + "/" + msgtype
msg.Payload = BytesPayload(message)
return b.mgr.PublishMessage2Device(deviceid, msg)
return b.mgr.PublishMessage2Device(deviceid, msg, timeout)
}

func (b *Broker) GetToken(deviceid uint64) ([]byte, error) {
Expand Down
84 changes: 55 additions & 29 deletions pkg/mqtt/connection.go
Expand Up @@ -2,6 +2,7 @@ package mqtt

import (
"encoding/hex"
"errors"
"github.com/PandoCloud/pando-cloud/pkg/rpcs"
"github.com/PandoCloud/pando-cloud/pkg/server"
"io"
Expand All @@ -22,23 +23,25 @@ type ResponseType struct {
}

type Connection struct {
DeviceId uint64
Conn net.Conn
SendChan chan Message
Mgr *Manager
MessageId uint16
KeepAlive uint16
LastHbTime int64
Token []byte
Mgr *Manager
DeviceId uint64
Conn net.Conn
SendChan chan Message
MessageId uint16
MessageWaitChan map[uint16]chan error
KeepAlive uint16
LastHbTime int64
Token []byte
}

func NewConnection(conn net.Conn, mgr *Manager) *Connection {
sendchan := make(chan Message, SendChanLen)
c := &Connection{
Conn: conn,
SendChan: sendchan,
Mgr: mgr,
KeepAlive: defaultKeepAlive,
Conn: conn,
SendChan: sendchan,
Mgr: mgr,
KeepAlive: defaultKeepAlive,
MessageWaitChan: make(map[uint16]chan error),
}

go c.SendMsgToClient()
Expand All @@ -53,6 +56,44 @@ func (c *Connection) Submit(msg Message) {
}
}

// Publish will publish a message , and return a chan to wait for completion.
func (c *Connection) Publish(msg Message, timeout time.Duration) chan error {
message := msg.(*Publish)
message.MessageId = c.MessageId
c.MessageId++
c.Submit(message)

// we don't wait for confirm.
if timeout == 0 {
return nil
}

ch := make(chan error)
c.MessageWaitChan[message.MessageId] = ch
// wait for timeout and
go func() {
timer := time.NewTimer(timeout)
<-timer.C
waitCh, exist := c.MessageWaitChan[message.MessageId]
if exist {
waitCh <- errors.New("timeout pushlishing message.")
delete(c.MessageWaitChan, message.MessageId)
close(waitCh)
}
}()

return ch
}

func (c *Connection) confirmPublish(messageid uint16) {
waitCh, exist := c.MessageWaitChan[messageid]
if exist {
waitCh <- nil
delete(c.MessageWaitChan, messageid)
close(waitCh)
}
}

func (c *Connection) ValidateToken(token []byte) error {

err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceId, token)
Expand Down Expand Up @@ -169,7 +210,7 @@ func (c *Connection) RcvMsgFromClient() {

case *PubAck:
server.Log.Infof("%s, comes publish ack", host)
// TODO - notify sender
c.confirmPublish(msg.MessageId)

case *PubRec:
server.Log.Infof("%s, comes publish rec", host)
Expand All @@ -183,7 +224,7 @@ func (c *Connection) RcvMsgFromClient() {

case *PubComp:
server.Log.Infof("%s, comes publish comp", host)
// TODO - notify sender
c.confirmPublish(msg.MessageId)

case *PingReq:
server.Log.Infof("%s, ping req comes", host)
Expand Down Expand Up @@ -224,21 +265,6 @@ func (c *Connection) SendMsgToClient() {
return
}

switch msg := msg.(type) {
case *Publish:
server.Log.Infof("publish msg, check for resend")
if msg.QosLevel.IsAtLeastOnce() || msg.QosLevel.IsExactlyOnce() {
msg.MessageId = c.MessageId
c.MessageId++
}
case *PubRel:
server.Log.Infof("pubrel msg, check for resend")
msg.MessageId = c.MessageId
c.MessageId++
default:
server.Log.Infof("normal msg")
}

server.Log.Debug("send msg to %s=======\n%v\n=========", host, msg)
err := msg.Encode(c.Conn)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions pkg/mqtt/manager.go
Expand Up @@ -60,17 +60,16 @@ func (m *Manager) GetToken(deviceid uint64) ([]byte, error) {
return con.Token, nil
}

func (m *Manager) PublishMessage2Device(deviceid uint64, msg *Publish) error {
func (m *Manager) PublishMessage2Device(deviceid uint64, msg *Publish, timeout time.Duration) error {
m.CxtMutex.RLock()
con, exist := m.IdToConn[deviceid]
m.CxtMutex.RUnlock()
if !exist {
return errorf("device not exist: %v", deviceid)
}

con.Submit(msg)

return nil
err := <-con.Publish(msg, timeout)
return err
}

func (m *Manager) PublishMessage2Server(deviceid uint64, msg *Publish) error {
Expand Down
42 changes: 39 additions & 3 deletions services/mqttaccess/access.go
@@ -1,13 +1,20 @@
package main

import (
"errors"
"github.com/PandoCloud/pando-cloud/pkg/mqtt"
"github.com/PandoCloud/pando-cloud/pkg/protocol"
"github.com/PandoCloud/pando-cloud/pkg/rpcs"
"github.com/PandoCloud/pando-cloud/pkg/server"
"time"
)

const (
defaultTimeoutSecond = 5

commandGetCurrentStatus = uint16(65528)
)

type Access struct {
MqttBroker *mqtt.Broker
}
Expand All @@ -33,12 +40,41 @@ func (a *Access) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus)
if err != nil {
return err
}
return a.MqttBroker.SendMessageToDevice(args.DeviceId, "d", msg)
return a.MqttBroker.SendMessageToDevice(args.DeviceId, "d", msg, defaultTimeoutSecond*time.Second)
}

func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
server.Log.Infof("Access Get Status: %v", args)
return nil
// first send a get status command
cmdArgs := rpcs.ArgsSendCommand{
DeviceId: args.Id,
SubDevice: 0,
No: commandGetCurrentStatus,
Priority: 99,
WaitTime: 0,
}
cmdReply := rpcs.ReplySendCommand{}
err := a.SendCommand(cmdArgs, &cmdReply)
if err != nil {
return err
}

// then wait for status report
StatusChan[args.Id] = make(chan *protocol.Data)
timer := time.NewTimer(defaultTimeoutSecond * time.Second)
select {
case <-timer.C:
// timeout
close(StatusChan[args.Id])
delete(StatusChan, args.Id)
return errors.New("get status timeout.")
case data := <-StatusChan[args.Id]:
// go it
close(StatusChan[args.Id])
delete(StatusChan, args.Id)
reply.Status = data.SubData
return nil
}
}

func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
Expand All @@ -58,5 +94,5 @@ func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCom
if err != nil {
return err
}
return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg)
return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, time.Duration(args.WaitTime)*time.Second)
}
8 changes: 8 additions & 0 deletions services/mqttaccess/mqtt_provider.go
Expand Up @@ -51,6 +51,14 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid uint64, msgtype string, message
server.Log.Errorf("unmarshal data error : %v", err)
return
}
// if there is a realtime query
ch, exist := StatusChan[deviceid]
if exist {
ch <- data
return
}

// it's a normal report.
reply := rpcs.ReplyPutData{}
args := rpcs.ArgsPutData{
DeviceId: deviceid,
Expand Down
11 changes: 11 additions & 0 deletions services/mqttaccess/status.go
@@ -0,0 +1,11 @@
package main

import (
"github.com/PandoCloud/pando-cloud/pkg/protocol"
)

var StatusChan map[uint64]chan *protocol.Data

func init() {
StatusChan = make(map[uint64]chan *protocol.Data)
}

0 comments on commit 49b6bf0

Please sign in to comment.