Skip to content

Commit

Permalink
Check protocol version
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jul 15, 2020
1 parent e389e1d commit fa8091c
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 28 deletions.
2 changes: 1 addition & 1 deletion client/dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,4 @@ func TestDTopic_OrderedDelivery(t *testing.T) {
if err != olric.ErrNotImplemented {
t.Errorf("Expected ErrNotImplemented. Got: %v", err)
}
}
}
2 changes: 1 addition & 1 deletion dtopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,4 @@ func TestDTopic_OrderedDelivery(t *testing.T) {
if err != ErrNotImplemented {
t.Errorf("Expected ErrNotImplemented. Got: %v", err)
}
}
}
7 changes: 5 additions & 2 deletions internal/protocol/dmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type DMapMessage struct {
func NewDMapMessage(opcode OpCode) *DMapMessage {
return &DMapMessage{
Header: Header{
Magic: MagicDMapReq,
Magic: MagicDMapReq,
Version: Version1,
},
DMapMessageHeader: DMapMessageHeader{
Op: opcode,
Expand All @@ -61,6 +62,7 @@ func NewDMapMessageFromRequest(buf *bytes.Buffer) *DMapMessage {
return &DMapMessage{
Header: Header{
Magic: MagicDMapReq,
Version: Version1,
MessageLength: uint32(buf.Len()),
},
DMapMessageHeader: DMapMessageHeader{},
Expand All @@ -71,7 +73,8 @@ func NewDMapMessageFromRequest(buf *bytes.Buffer) *DMapMessage {
func (d *DMapMessage) Response(buf *bytes.Buffer) EncodeDecoder {
msg := &DMapMessage{
Header: Header{
Magic: MagicDMapRes,
Magic: MagicDMapRes,
Version: Version1,
},
DMapMessageHeader: DMapMessageHeader{
Op: d.Op,
Expand Down
7 changes: 5 additions & 2 deletions internal/protocol/dtopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type DTopicMessage struct {
func NewDTopicMessage(opcode OpCode) *DTopicMessage {
return &DTopicMessage{
Header: Header{
Magic: MagicDTopicReq,
Magic: MagicDTopicReq,
Version: Version1,
},
DTopicMessageHeader: DTopicMessageHeader{
Op: opcode,
Expand All @@ -59,6 +60,7 @@ func NewDTopicMessageFromRequest(buf *bytes.Buffer) *DTopicMessage {
return &DTopicMessage{
Header: Header{
Magic: MagicDTopicReq,
Version: Version1,
MessageLength: uint32(buf.Len()),
},
DTopicMessageHeader: DTopicMessageHeader{},
Expand All @@ -69,7 +71,8 @@ func NewDTopicMessageFromRequest(buf *bytes.Buffer) *DTopicMessage {
func (d *DTopicMessage) Response(buf *bytes.Buffer) EncodeDecoder {
msg := &DTopicMessage{
Header: Header{
Magic: MagicDTopicRes,
Magic: MagicDTopicRes,
Version: Version1,
},
DTopicMessageHeader: DTopicMessageHeader{
Op: d.Op,
Expand Down
7 changes: 5 additions & 2 deletions internal/protocol/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type PipelineMessage struct {
func NewPipelineMessage(opcode OpCode) *PipelineMessage {
return &PipelineMessage{
Header: Header{
Magic: MagicPipelineReq,
Magic: MagicPipelineReq,
Version: Version1,
},
PipelineMessageHeader: PipelineMessageHeader{
Op: opcode,
Expand All @@ -57,6 +58,7 @@ func NewPipelineMessageFromRequest(buf *bytes.Buffer) *PipelineMessage {
return &PipelineMessage{
Header: Header{
Magic: MagicPipelineReq,
Version: Version1,
MessageLength: uint32(buf.Len()),
},
PipelineMessageHeader: PipelineMessageHeader{},
Expand All @@ -67,7 +69,8 @@ func NewPipelineMessageFromRequest(buf *bytes.Buffer) *PipelineMessage {
func (d *PipelineMessage) Response(buf *bytes.Buffer) EncodeDecoder {
msg := &PipelineMessage{
Header: Header{
Magic: MagicPipelineRes,
Magic: MagicPipelineRes,
Version: Version1,
},
PipelineMessageHeader: PipelineMessageHeader{
Op: d.Op,
Expand Down
26 changes: 17 additions & 9 deletions internal/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,15 @@ import (
"github.com/pkg/errors"
)

// Version1 denotes the first public version of Olric Binary Protocol.
var Version1 uint8 = 1

// pool is good for recycling memory while reading messages from the socket.
var pool = bufpool.New()

// MagicCode defines an unique code to distinguish a request message from a response message in Olric Binary Protocol.
type MagicCode uint8

const (
// MagicReq defines an magic code for REQUEST in Olric Binary Protocol
MagicReq MagicCode = 0xE2

// MagicRes defines an magic code for RESPONSE in Olric Binary Protocol
MagicRes MagicCode = 0xE3
)

type EncodeDecoder interface {
Encode() error

Expand All @@ -66,13 +61,23 @@ type EncodeDecoder interface {
Response(*bytes.Buffer) EncodeDecoder
}

const headerSize int64 = 5
const headerSize int64 = 6

type Header struct {
Magic MagicCode // 1 byte
Version uint8 // 1 byte
MessageLength uint32 // 4 bytes
}

func checkProtocolVersion(header *Header) error {
switch header.Version {
case Version1:
return nil
default:
return fmt.Errorf("unsupported protocol version: %d", header.Version)
}
}

func readHeader(conn io.ReadWriteCloser) (Header, error) {
buf := pool.Get()
defer pool.Put(buf)
Expand All @@ -87,6 +92,9 @@ func readHeader(conn io.ReadWriteCloser) (Header, error) {
if err != nil {
return header, err
}
if err := checkProtocolVersion(&header); err != nil {
return header, err
}
return header, nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/protocol/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type StreamMessage struct {
func NewStreamMessage(opcode OpCode) *StreamMessage {
return &StreamMessage{
Header: Header{
Magic: MagicStreamReq,
Magic: MagicStreamReq,
Version: Version1,
},
StreamMessageHeader: StreamMessageHeader{
Op: opcode,
Expand All @@ -70,6 +71,7 @@ func NewStreamMessageFromRequest(buf *bytes.Buffer) *StreamMessage {
return &StreamMessage{
Header: Header{
Magic: MagicStreamReq,
Version: Version1,
MessageLength: uint32(buf.Len()),
},
StreamMessageHeader: StreamMessageHeader{},
Expand All @@ -80,7 +82,8 @@ func NewStreamMessageFromRequest(buf *bytes.Buffer) *StreamMessage {
func (d *StreamMessage) Response(buf *bytes.Buffer) EncodeDecoder {
msg := &StreamMessage{
Header: Header{
Magic: MagicStreamRes,
Magic: MagicStreamRes,
Version: Version1,
},
StreamMessageHeader: StreamMessageHeader{
Op: d.Op,
Expand Down
7 changes: 5 additions & 2 deletions internal/protocol/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type SystemMessage struct {
func NewSystemMessage(opcode OpCode) *SystemMessage {
return &SystemMessage{
Header: Header{
Magic: MagicSystemReq,
Magic: MagicSystemReq,
Version: Version1,
},
SystemMessageHeader: SystemMessageHeader{
Op: opcode,
Expand All @@ -57,6 +58,7 @@ func NewSystemMessageFromRequest(buf *bytes.Buffer) *SystemMessage {
return &SystemMessage{
Header: Header{
Magic: MagicSystemReq,
Version: Version1,
MessageLength: uint32(buf.Len()),
},
SystemMessageHeader: SystemMessageHeader{},
Expand All @@ -67,7 +69,8 @@ func NewSystemMessageFromRequest(buf *bytes.Buffer) *SystemMessage {
func (d *SystemMessage) Response(buf *bytes.Buffer) EncodeDecoder {
msg := &SystemMessage{
Header: Header{
Magic: MagicSystemRes,
Magic: MagicSystemRes,
Version: Version1,
},
SystemMessageHeader: SystemMessageHeader{
Op: d.Op,
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *Server) processMessage(conn io.ReadWriteCloser, connStatus *uint32, don
}

// processConn waits for requests and calls request handlers to generate a response. The connections are reusable.
func (s *Server) processConn(conn net.Conn) {
func (s *Server) processConn(conn io.ReadWriteCloser) {
defer s.wg.Done()

// connStatus is useful for closing the server gracefully.
Expand Down
2 changes: 0 additions & 2 deletions rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"sync/atomic"

"github.com/buraksezer/olric/config"

"github.com/buraksezer/olric/internal/discovery"

"github.com/buraksezer/olric/internal/protocol"
"github.com/buraksezer/olric/internal/storage"
"github.com/vmihailenco/msgpack"
Expand Down
6 changes: 2 additions & 4 deletions serializer/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ func (g gobSerializer) Marshal(value interface{}) ([]byte, error) {
v := reflect.New(t).Elem().Interface()
gob.Register(v)
}
// TODO: As an optimization, you may want to a bytes.Buffer pool to reduce
// memory consumption.
var res bytes.Buffer
err := gob.NewEncoder(&res).Encode(&value)
res := new(bytes.Buffer)
err := gob.NewEncoder(res).Encode(&value)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fa8091c

Please sign in to comment.