Skip to content

Commit

Permalink
refactor: separates PUBLISH fixed header and the others
Browse files Browse the repository at this point in the history
  • Loading branch information
kariyayo committed Mar 11, 2019
1 parent 177d095 commit 4a5cb80
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 50 deletions.
2 changes: 1 addition & 1 deletion mqtt/handler/publish_handler.go
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/bati11/oreno-mqtt/mqtt/packet"
)

func HandlePublish(fixedHeader packet.FixedHeader, r *bufio.Reader) error {
func HandlePublish(fixedHeader packet.PublishFixedHeader, r *bufio.Reader) error {
fmt.Printf(" HandlePublish\n")
variableHeader, err := packet.ToPublishVariableHeader(fixedHeader, r)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mqtt/packet/connack.go
Expand Up @@ -17,19 +17,19 @@ func (h ConnackVariableHeader) ToBytes() []byte {
}

type Connack struct {
FixedHeader
PublishFixedHeader
ConnackVariableHeader
}

func (c Connack) ToBytes() []byte {
var result []byte
result = append(result, c.FixedHeader.ToBytes()...)
result = append(result, c.PublishFixedHeader.ToBytes()...)
result = append(result, c.ConnackVariableHeader.ToBytes()...)
return result
}

func newConnack() Connack {
fixedHeader := FixedHeader{
fixedHeader := PublishFixedHeader{
PacketType: 2,
RemainingLength: 2,
}
Expand Down
77 changes: 65 additions & 12 deletions mqtt/packet/fixed_header.go
Expand Up @@ -2,6 +2,7 @@ package packet

import (
"bufio"
"io"
)

const (
Expand All @@ -22,12 +23,30 @@ const (
DISCONNECT
)

type MQTTReader struct {
byte1 *byte
r *bufio.Reader
}

func NewMQTTReader(r io.Reader) *MQTTReader {
bufr := bufio.NewReader(r)
return &MQTTReader{r: bufr}
}

func (d *MQTTReader) ReadPacketType() (uint8, error) {
if d.byte1 == nil {
byte1, err := d.r.ReadByte()
if err != nil {
return 0, err
}
d.byte1 = &byte1
}
return *d.byte1 >> 4, nil
}

type FixedHeader struct {
PacketType byte
Dup byte
QoS1 byte
QoS2 byte
Retain byte
Reserved byte
RemainingLength uint
}

Expand All @@ -40,21 +59,55 @@ func (h FixedHeader) ToBytes() []byte {
return result
}

func ToFixedHeader(r *bufio.Reader) (FixedHeader, error) {
b, err := r.ReadByte()
func ToFixedHeader(reader *MQTTReader) (FixedHeader, error) {
packetType, err := reader.ReadPacketType()
if err != nil {
return FixedHeader{}, err
}
packetType := b >> 4
dup := refbit(b, 3)
qos1 := refbit(b, 2)
qos2 := refbit(b, 1)
retain := refbit(b, 0)
remainingLength, err := decodeRemainingLength(r)
reserved := *reader.byte1 >> 4
remainingLength, err := decodeRemainingLength(reader.r)
if err != nil {
return FixedHeader{}, err
}
return FixedHeader{
PacketType: packetType,
Reserved: reserved,
RemainingLength: remainingLength,
}, nil
}

type PublishFixedHeader struct {
PacketType byte
Dup byte
QoS1 byte
QoS2 byte
Retain byte
RemainingLength uint
}

func (h PublishFixedHeader) ToBytes() []byte {
var result []byte
b := h.PacketType << 4
result = append(result, b)
remainingLength := encodeRemainingLength(h.RemainingLength)
result = append(result, remainingLength...)
return result
}

func ToPublishFixedHeader(reader *MQTTReader) (PublishFixedHeader, error) {
packetType, err := reader.ReadPacketType()
if err != nil {
return PublishFixedHeader{}, err
}
dup := refbit(*reader.byte1, 3)
qos1 := refbit(*reader.byte1, 2)
qos2 := refbit(*reader.byte1, 1)
retain := refbit(*reader.byte1, 0)
remainingLength, err := decodeRemainingLength(reader.r)
if err != nil {
return PublishFixedHeader{}, err
}
return PublishFixedHeader{
PacketType: packetType,
Dup: dup,
QoS1: qos1,
Expand Down
35 changes: 17 additions & 18 deletions mqtt/packet/fixed_header_test.go
@@ -1,7 +1,6 @@
package packet_test

import (
"bufio"
"bytes"
"reflect"
"testing"
Expand All @@ -11,63 +10,63 @@ import (

func TestToFixedHeader(t *testing.T) {
type args struct {
r *bufio.Reader
r *packet.MQTTReader
}
tests := []struct {
name string
args args
want packet.FixedHeader
want packet.PublishFixedHeader
wantErr bool
}{
{
name: "[0x00,0x00]",
args: args{bufio.NewReader(bytes.NewBuffer([]byte{
args: args{packet.NewMQTTReader(bytes.NewBuffer([]byte{
0x00, // 0000 0 00 0
0x00, // 0
}))},
want: packet.FixedHeader{PacketType: 0, Dup: 0, QoS1: 0, QoS2: 0, Retain: 0, RemainingLength: 0},
want: packet.PublishFixedHeader{PacketType: 0, Dup: 0, QoS1: 0, QoS2: 0, Retain: 0, RemainingLength: 0},
wantErr: false,
},
{
name: "[0x1b,0x7F]",
args: args{bufio.NewReader(bytes.NewBuffer([]byte{
args: args{packet.NewMQTTReader(bytes.NewBuffer([]byte{
0x1B, // 0001 1 01 1
0x7F, // 127
}))},
want: packet.FixedHeader{PacketType: 1, Dup: 1, QoS1: 0, QoS2: 1, Retain: 1, RemainingLength: 127},
want: packet.PublishFixedHeader{PacketType: 1, Dup: 1, QoS1: 0, QoS2: 1, Retain: 1, RemainingLength: 127},
wantErr: false,
},
{
name: "[0x24,0x80,0x01]",
args: args{bufio.NewReader(bytes.NewBuffer([]byte{
args: args{packet.NewMQTTReader(bytes.NewBuffer([]byte{
0x24, // 0002 0 10 0
0x80, 0x01, //128
}))},
want: packet.FixedHeader{PacketType: 2, Dup: 0, QoS1: 1, QoS2: 0, Retain: 0, RemainingLength: 128},
want: packet.PublishFixedHeader{PacketType: 2, Dup: 0, QoS1: 1, QoS2: 0, Retain: 0, RemainingLength: 128},
wantErr: false,
},
{
name: "[]",
args: args{bufio.NewReader(bytes.NewBuffer(nil))},
want: packet.FixedHeader{},
args: args{packet.NewMQTTReader(bytes.NewBuffer(nil))},
want: packet.PublishFixedHeader{},
wantErr: true,
},
{
name: "[0x24]",
args: args{bufio.NewReader(bytes.NewBuffer([]byte{0x24}))},
want: packet.FixedHeader{},
args: args{packet.NewMQTTReader(bytes.NewBuffer([]byte{0x24}))},
want: packet.PublishFixedHeader{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := packet.ToFixedHeader(tt.args.r)
got, err := packet.ToPublishFixedHeader(tt.args.r)
if (err != nil) != tt.wantErr {
t.Errorf("ToFixedHeader() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("ToPublishFixedHeader() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ToFixedHeader() = %v, want %v", got, tt.want)
t.Errorf("ToPublishFixedHeader() = %v, want %v", got, tt.want)
}
})
}
Expand Down Expand Up @@ -154,7 +153,7 @@ func TestFixedHeader_ToBytes(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := packet.FixedHeader{
h := packet.PublishFixedHeader{
PacketType: tt.fields.PacketType,
Dup: tt.fields.Dup,
QoS1: tt.fields.QoS1,
Expand All @@ -163,7 +162,7 @@ func TestFixedHeader_ToBytes(t *testing.T) {
RemainingLength: tt.fields.RemainingLength,
}
if got := h.ToBytes(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("FixedHeader.ToBytes() = %v, want %v", got, tt.want)
t.Errorf("PublishFixedHeader.ToBytes() = %v, want %v", got, tt.want)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion mqtt/packet/publish_variable_header.go
Expand Up @@ -21,7 +21,7 @@ func (p *PublishVariableHeader) Length() uint {
return result
}

func ToPublishVariableHeader(fixedHeader FixedHeader, r *bufio.Reader) (PublishVariableHeader, error) {
func ToPublishVariableHeader(fixedHeader PublishFixedHeader, r *bufio.Reader) (PublishVariableHeader, error) {
if fixedHeader.PacketType != 3 {
return PublishVariableHeader{}, fmt.Errorf("packet type is invalid. it got is %v", fixedHeader.PacketType)
}
Expand Down
8 changes: 4 additions & 4 deletions mqtt/packet/publish_variable_header_test.go
Expand Up @@ -11,7 +11,7 @@ import (

func TestToPublishVariableHeader(t *testing.T) {
type args struct {
fixedHeader packet.FixedHeader
fixedHeader packet.PublishFixedHeader
r *bufio.Reader
}
tests := []struct {
Expand All @@ -23,7 +23,7 @@ func TestToPublishVariableHeader(t *testing.T) {
{
name: "a/b",
args: args{
packet.FixedHeader{PacketType: packet.PUBLISH, RemainingLength: 10},
packet.PublishFixedHeader{PacketType: packet.PUBLISH, RemainingLength: 10},
bufio.NewReader(bytes.NewBuffer([]byte{
0x00, // Length LSB
0x03, // Length MSB
Expand All @@ -36,7 +36,7 @@ func TestToPublishVariableHeader(t *testing.T) {
{
name: "256文字",
args: args{
packet.FixedHeader{PacketType: packet.PUBLISH, RemainingLength: 10},
packet.PublishFixedHeader{PacketType: packet.PUBLISH, RemainingLength: 10},
bufio.NewReader(bytes.NewBuffer([]byte{
0x01,
0x00,
Expand All @@ -62,7 +62,7 @@ func TestToPublishVariableHeader(t *testing.T) {
}

func TestPublishVariableHeader_Length(t *testing.T) {
fixedHeader := packet.FixedHeader{PacketType: packet.PUBLISH, RemainingLength: 10}
fixedHeader := packet.PublishFixedHeader{PacketType: packet.PUBLISH, RemainingLength: 10}
variableHeaderBytes := []byte{
0x00, // Length LSB
0x03, // Length MSB
Expand Down
33 changes: 22 additions & 11 deletions mqtt/server.go
Expand Up @@ -35,33 +35,44 @@ func handle(conn net.Conn) error {

for {
r := bufio.NewReader(conn)
fixedHeader, err := packet.ToFixedHeader(r)
mqttReader := packet.NewMQTTReader(r)
packetType, err := mqttReader.ReadPacketType()
if err != nil {
if err == io.EOF {
// クライアント側から既に切断してる場合
return nil
}
return err
}
fmt.Printf("-----\n%+v\n", fixedHeader)

switch fixedHeader.PacketType {
case packet.CONNECT:
connack, err := handler.HandleConnect(fixedHeader, r)
if packetType == packet.PUBLISH {
fixedHeader, err := packet.ToPublishFixedHeader(mqttReader)
if err != nil {
return err
}
_, err = conn.Write(connack.ToBytes())
fmt.Printf("-----\n%+v\n", fixedHeader)
err = handler.HandlePublish(fixedHeader, r)
if err != nil {
return err
}
case packet.PUBLISH:
err := handler.HandlePublish(fixedHeader, r)
} else {
fixedHeader, err := packet.ToFixedHeader(mqttReader)
if err != nil {
return err
}
case packet.DISCONNECT:
return nil
fmt.Printf("-----\n%+v\n", fixedHeader)
switch fixedHeader.PacketType {
case packet.CONNECT:
connack, err := handler.HandleConnect(fixedHeader, r)
if err != nil {
return err
}
_, err = conn.Write(connack.ToBytes())
if err != nil {
return err
}
case packet.DISCONNECT:
return nil
}
}
}
}

0 comments on commit 4a5cb80

Please sign in to comment.