Skip to content

Commit

Permalink
feat: add SUBSCRIBE <-> SUBACK
Browse files Browse the repository at this point in the history
  • Loading branch information
kariyayo committed Mar 12, 2019
1 parent 1d1adca commit ea09680
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 0 deletions.
27 changes: 27 additions & 0 deletions mqtt/handler/subscribe_handler.go
@@ -0,0 +1,27 @@
package handler

import (
"fmt"

"github.com/bati11/oreno-mqtt/mqtt/packet"
)

func HandleSubscribe(reader *packet.MQTTReader) (*packet.Suback, error) {
fmt.Printf(" HandleSubscribe\n")

subscribe, err := reader.ReadSubscribe()
if err != nil {
return nil, err
}

fmt.Printf(" %#v\n", subscribe.VariableHeader)
fmt.Printf(" %+v\n", subscribe.Payload)

var qoss []uint8
for range subscribe.Payload.TopicFilterPairs {
qoss = append(qoss, 0)
}

suback := packet.NewSubackForSuccess(subscribe.VariableHeader.PacketIdentifier, qoss)
return &suback, nil
}
25 changes: 25 additions & 0 deletions mqtt/packet/suback.go
@@ -0,0 +1,25 @@
package packet

type Suback struct {
FixedHeader
SubackVariableHeader
SubackPayload
}

func NewSubackForSuccess(packetIdentifier uint16, qoss []uint8) Suback {
variableHeader := SubackVariableHeader{packetIdentifier}
payload := SubackPayload{qoss}
fixedHeader := FixedHeader{
PacketType: SUBACK,
RemainingLength: variableHeader.Length() + payload.Length(),
}
return Suback{fixedHeader, variableHeader, payload}
}

func (s *Suback) ToBytes() []byte {
var result []byte
result = append(result, s.FixedHeader.ToBytes()...)
result = append(result, s.SubackVariableHeader.ToBytes()...)
result = append(result, s.SubackPayload.ToBytes()...)
return result
}
13 changes: 13 additions & 0 deletions mqtt/packet/suback_payload.go
@@ -0,0 +1,13 @@
package packet

type SubackPayload struct {
ReturnCodes []byte
}

func (s *SubackPayload) Length() uint {
return uint(len(s.ReturnCodes))
}

func (s *SubackPayload) ToBytes() []byte {
return s.ReturnCodes
}
17 changes: 17 additions & 0 deletions mqtt/packet/suback_variable_header.go
@@ -0,0 +1,17 @@
package packet

import "encoding/binary"

type SubackVariableHeader struct {
PacketIdentifier uint16
}

func (s *SubackVariableHeader) Length() uint {
return 2 // uint16 size
}

func (s *SubackVariableHeader) ToBytes() []byte {
result := make([]byte, binary.MaxVarintLen16)
binary.BigEndian.PutUint16(result, s.PacketIdentifier)
return result
}
24 changes: 24 additions & 0 deletions mqtt/packet/subscribe.go
@@ -0,0 +1,24 @@
package packet

type Subscribe struct {
FixedHeader *FixedHeader
VariableHeader *SubscribeVariableHeader
Payload *SubscribePayload
}

func (reader *MQTTReader) ReadSubscribe() (*Subscribe, error) {
fixedHeader, err := reader.readFixedHeader()
if err != nil {
return nil, err
}
variableHeader, err := reader.readSubscribeVariableHeader()
if err != nil {
return nil, err
}
payloadLength := fixedHeader.RemainingLength - variableHeader.Length()
payload, err := reader.readSubscribePayload(payloadLength)
if err != nil {
return nil, err
}
return &Subscribe{fixedHeader, variableHeader, payload}, nil
}
75 changes: 75 additions & 0 deletions mqtt/packet/subscribe_payload.go
@@ -0,0 +1,75 @@
package packet

import (
"bufio"
"encoding/binary"
"io"
)

type TopicFilterPair struct {
Filter string
QoS uint8
}
type SubscribePayload struct {
TopicFilterPairs []*TopicFilterPair
}

func (reader *MQTTReader) readSubscribePayload(payloadLength uint) (*SubscribePayload, error) {
var topicFilterPairs []*TopicFilterPair

remain := payloadLength
for remain > 0 {
length, err := extractLength(reader.r)
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
remain = remain - 2 - uint(length)

if remain < 0 {
break
}

bs := make([]byte, length)
_, err = io.ReadFull(reader.r, bs)
if err != nil {
return nil, err
}
topicFilter := string(bs)

qos, err := extractQoS(reader.r)
if err != nil {
return nil, err
}
remain--

topicFilterPairs = append(topicFilterPairs, &TopicFilterPair{topicFilter, qos})
}
return &SubscribePayload{topicFilterPairs}, nil
}

func extractLength(r *bufio.Reader) (uint16, error) {
lengthMSB, err := r.ReadByte()
if err != nil {
return 0, err
}
lengthLSB, err := r.ReadByte()
if err != nil {
return 0, err
}
length := binary.BigEndian.Uint16([]byte{lengthMSB, lengthLSB})
return length, nil
}

func extractQoS(r *bufio.Reader) (uint8, error) {
b, err := r.ReadByte()
if err != nil {
return 0, err
}
qos1 := refbit(b, 1)
qos2 := refbit(b, 0)
qos := qos2 << qos1
return qos, nil
}
27 changes: 27 additions & 0 deletions mqtt/packet/subscribe_variable_header.go
@@ -0,0 +1,27 @@
package packet

import (
"encoding/binary"
)

type SubscribeVariableHeader struct {
PacketIdentifier uint16
}

func (s *SubscribeVariableHeader) Length() uint {
return 2 // PacketIdentifier (uint16) byte size
}

func (reader *MQTTReader) readSubscribeVariableHeader() (*SubscribeVariableHeader, error) {
packetIdentifierMSB, err := reader.r.ReadByte()
if err != nil {
return nil, err
}
packetIdentifierLSB, err := reader.r.ReadByte()
if err != nil {
return nil, err
}
packetIdentifier := binary.BigEndian.Uint16([]byte{packetIdentifierMSB, packetIdentifierLSB})

return &SubscribeVariableHeader{packetIdentifier}, nil
}
9 changes: 9 additions & 0 deletions mqtt/server.go
Expand Up @@ -59,6 +59,15 @@ func handle(conn net.Conn) error {
if err != nil {
return err
}
case packet.SUBSCRIBE:
suback, err := handler.HandleSubscribe(mqttReader)
if err != nil {
return err
}
_, err = conn.Write(suback.ToBytes())
if err != nil {
return err
}
case packet.DISCONNECT:
return nil
}
Expand Down

0 comments on commit ea09680

Please sign in to comment.