Permalink
Browse files

improve message buffer design.

  • Loading branch information...
bg5sbk committed Nov 21, 2014
1 parent a81c5c1 commit 53b14de6734bc9c026fb96993fc46297bbe14f7c
Showing with 571 additions and 257 deletions.
  1. +4 −4 README_CN.md
  2. +4 −4 README_EN.md
  3. +52 −7 base.go
  4. +4 −4 broadcast.go
  5. +150 −69 buffer_in.go
  6. +181 −62 buffer_out.go
  7. +76 −0 byteorder.go
  8. +1 −1 examples/benchmark/main.go
  9. +3 −3 examples/broadcast/main.go
  10. +3 −3 examples/echo_client/main.go
  11. +4 −4 examples/echo_server/main.go
  12. +15 −15 message.go
  13. +25 −33 packetn.go
  14. +11 −4 server.go
  15. +5 −5 server_test.go
  16. +25 −31 session.go
  17. +8 −8 util.go
View
@@ -28,7 +28,7 @@ proto := link.PacketN(2, binary.BigEndian)
在指定的端口启动一个服务器:
```go
server, _ := link.Listen("tcp", "0.0.0.0:8080", proto)
server, _ := link.Listen("tcp", "0.0.0.0:8080", proto, link.LittleEndian)
```
处理新进连接,并为新的Session设置消息处理器:
@@ -37,8 +37,8 @@ server, _ := link.Listen("tcp", "0.0.0.0:8080", proto)
server.AcceptLoop(func(session *link.Session) {
fmt.Println("session start")
session.ReadLoop(func(session *link.Session, msg link.InMessage) {
fmt.Printf("new message: %s\n", msg)
session.ReadLoop(func(session *link.Session, msg link.InBuffer) {
fmt.Printf("new message: %s\n", msg.Get())
})
fmt.Println("session closed")
@@ -50,7 +50,7 @@ server.AcceptLoop(func(session *link.Session) {
```go
proto := link.PacketN(2, binary.BigEndian)
client, _ := link.Dial("tcp", "127.0.0.1:8080", proto)
client, _ := link.Dial("tcp", "127.0.0.1:8080", proto, link.LittleEndian)
```
发送一个消息给服务端:
View
@@ -30,7 +30,7 @@ proto := link.PacketN(2, binary.BigEndian)
Setup a server on port `8080` and set protocol.
```go
server, _ := link.Listen("tcp", "0.0.0.0:8080", proto)
server, _ := link.Listen("tcp", "0.0.0.0:8080", proto, link.LittleEndian)
```
Handle incoming connections. And setup a message handler on the new session.
@@ -39,8 +39,8 @@ Handle incoming connections. And setup a message handler on the new session.
server.AcceptLoop(func(session *link.Session) {
fmt.Println("session start")
session.ReadLoop(func(session *link.Session, msg link.InMessage) {
fmt.Printf("new message: %s\n", msg)
session.ReadLoop(func(session *link.Session, msg link.InBuffer) {
fmt.Printf("new message: %s\n", msg.Get())
})
fmt.Println("session closed")
@@ -52,7 +52,7 @@ Use the same protocol dial to the server.
```go
proto := link.PacketN(2, binary.BigEndian)
client, _ := link.Dial("tcp", "127.0.0.1:8080", proto)
client, _ := link.Dial("tcp", "127.0.0.1:8080", proto, link.LittleEndian)
```
Send a message to server.
View
59 base.go
@@ -13,10 +13,15 @@ var (
NilBufferError = errors.New("Buffer is nil")
)
var (
BigEndian = BufferFactoryBE{}
LittleEndian = BufferFactoryLE{}
)
type Settings interface {
// Set max packet size and returns old size limitation.
// Set 0 means unlimit.
MaxPacketSize(uint) uint
MaxPacketSize(int) int
}
// Packet spliting protocol.
@@ -32,23 +37,63 @@ type PacketProtocol interface {
type PacketWriter interface {
Settings
// Begin a packet writing on the buff.
// If the size large than the buff capacity, the buff will be dropped and a new buffer will be created.
// Begin a packet writing on the buffer.
// If the packet size large than the buffer capacity, a new buffer will be created otherwise the buffer will be reused.
// The size no need to equals really packet size, some time we could not knows a message's packet size before it encoded,
// if the size less than really packet size, the buffer will auto grows when you append data into it.
// This method give the session a way to reuse buffer and avoid invoke Write() twice.
BeginPacket(size uint, buffer *OutMessage)
BeginPacket(size int, buffer OutBuffer)
// Finish a packet writing.
// Give the protocol writer a chance to set packet head data after packet body writed.
EndPacket(packet *OutMessage)
EndPacket(buffer OutBuffer)
// Write a packet to the conn.
WritePacket(conn net.Conn, buffer OutMessage) error
WritePacket(conn net.Conn, buffer OutBuffer) error
}
// Packet reader.
type PacketReader interface {
Settings
// Read a packet from conn.
ReadPacket(conn net.Conn, buffer *InMessage) error
// If the packet size large than the buffer capacity, a new buffer will be created otherwise the buffer will be reused.
ReadPacket(conn net.Conn, buffer InBuffer) error
}
// Message buffer factory.
type BufferFactory interface {
// Create a incoming message buffer.
NewInBuffer() InBuffer
// Create a outgoing message buffer.
NewOutBuffer() OutBuffer
}
// Big endian message buffer factory.
type BufferFactoryBE struct {
}
// Create a big endian incoming message buffer.
func (_ BufferFactoryBE) NewInBuffer() InBuffer {
return new(InBufferBE)
}
// Create a big endian outgoing message buffer.
func (_ BufferFactoryBE) NewOutBuffer() OutBuffer {
return new(OutBufferBE)
}
// Little endian message buffer factory.
type BufferFactoryLE struct {
}
// Create a little endian incoming message buffer.
func (_ BufferFactoryLE) NewInBuffer() InBuffer {
return new(InBufferLE)
}
// Create a little endian outgoing message buffer.
func (_ BufferFactoryLE) NewOutBuffer() OutBuffer {
return new(OutBufferLE)
}
View
@@ -7,7 +7,7 @@ import "sync"
type Broadcaster struct {
mutex sync.Mutex
writer PacketWriter
buffer OutMessage
buffer OutBuffer
}
// Craete a broadcaster.
@@ -19,11 +19,11 @@ func NewBroadcaster(protocol PacketProtocol) *Broadcaster {
func (b *Broadcaster) packet(message Message) error {
size := message.RecommendPacketSize()
b.writer.BeginPacket(size, &b.buffer)
if err := message.AppendToPacket(&b.buffer); err != nil {
b.writer.BeginPacket(size, b.buffer)
if err := message.AppendToPacket(b.buffer); err != nil {
return err
}
b.writer.EndPacket(&b.buffer)
b.writer.EndPacket(b.buffer)
return nil
}
Oops, something went wrong.

0 comments on commit 53b14de

Please sign in to comment.