Skip to content
Permalink
Browse files
Imp: tcp stream handler
  • Loading branch information
AlexStocks committed Sep 7, 2019
1 parent 8e2387f commit e545f6faa6afae4b7952e0daf424b32f9949c024
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 23 deletions.
@@ -8,6 +8,7 @@
package hello

import (
"encoding/binary"
"errors"
)

@@ -18,16 +19,44 @@ import (
type PackageHandler struct{}

func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
s := string(data)
return s, len(s), nil
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}

start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos

pos = start + pkgLen
s := string(data[start:pos])

return s, pos, nil
}

func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
s, ok := pkg.(string)
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Infof("illegal pkg:%+v", pkg)
log.Infof("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}

return []byte(s), nil
pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))

// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos

// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])

return pkgStreams[:pos], nil
}
@@ -57,9 +57,21 @@ func main() {
getty.WithConnectionNumber(*connections),
)

client.RunEventLoop(tcp.NewHelloClientSession)
client.RunEventLoop(NewHelloClientSession)

go hello.ClientRequest()

util.WaitCloseSignals(client)
taskPool.Close()
}

func NewHelloClientSession(session getty.Session) (err error) {
tcp.EventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = tcp.InitialSession(session)
if err != nil {
return
}
return
}
@@ -15,7 +15,6 @@ import (

import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
)

import (
@@ -24,21 +23,9 @@ import (

var (
pkgHandler = &hello.PackageHandler{}
eventListener = &hello.MessageHandler{}
EventListener = &hello.MessageHandler{}
)

func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}

func InitialSession(session getty.Session) (err error) {
session.SetCompressType(getty.CompressZip)

@@ -65,14 +52,14 @@ func InitialSession(session getty.Session) (err error) {

session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetRQLen(1024)
// session.SetRQLen(1024)
session.SetWQLen(512)
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
session.SetWaitTime(time.Second)

session.SetPkgHandler(pkgHandler)
session.SetEventListener(eventListener)
session.SetEventListener(EventListener)
return nil
}

0 comments on commit e545f6f

Please sign in to comment.