Skip to content

Commit

Permalink
Merge pull request #26 from divebomb/master
Browse files Browse the repository at this point in the history
Add: writev
  • Loading branch information
wongoo committed Sep 10, 2019
2 parents 34d2a34 + e545f6f commit 2b6c658
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 70 deletions.
16 changes: 8 additions & 8 deletions conn.go → connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:21
# FILE : conn.go
# FILE : connection.go
******************************************************/

package getty
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active))))
}

func (c *gettyConn) Write(interface{}) (int, error) {
func (c *gettyConn) send(interface{}) (int, error) {
return 0, nil
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func (t *gettyTCPConn) SetCompressType(c CompressType) {
}

// tcp connection read
func (t *gettyTCPConn) read(p []byte) (int, error) {
func (t *gettyTCPConn) recv(p []byte) (int, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -258,7 +258,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
}

// tcp connection write
func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -379,7 +379,7 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) {
}

// udp connection read
func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -411,7 +411,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
}

// write udp packet, @ctx should be of type UDPContext
func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -539,7 +539,7 @@ func (w *gettyWSConn) handlePong(string) error {
}

// websocket connection read
func (w *gettyWSConn) read() ([]byte, error) {
func (w *gettyWSConn) recv() ([]byte, error) {
// Pls do not set read deadline when using ReadMessage. AlexStocks 20180310
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type.
Expand Down Expand Up @@ -578,7 +578,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
}

// websocket connection write
func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
func (w *gettyWSConn) send(pkg interface{}) (int, error) {
var (
err error
ok bool
Expand Down
44 changes: 37 additions & 7 deletions demo/hello/pkghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package hello

import (
"encoding/binary"
"errors"
)

Expand All @@ -18,15 +19,44 @@ import (
type PackageHandler struct{}

func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
s := string(data)
return s, len(s), nil
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}

start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos

pos = start + pkgLen
s := string(data[start:pos])

return s, pos, nil
}

func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) error {
s, ok := pkg.(string)
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Infof("illegal pkg:%+v", pkg)
return errors.New("invalid package")
log.Infof("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
return ss.WriteBytes([]byte(s))

pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))

// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos

// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])

return pkgStreams[:pos], nil
}
29 changes: 20 additions & 9 deletions demo/hello/tcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
)

var (
taskPool *gxsync.TaskPool
taskPool *gxsync.TaskPool
)

func main() {
Expand All @@ -44,23 +44,34 @@ func main() {

util.Profiling(*pprofPort)

if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}

client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections),
)

client.RunEventLoop(tcp.NewHelloClientSession)
client.RunEventLoop(NewHelloClientSession)

go hello.ClientRequest()

util.WaitCloseSignals(client)
taskPool.Close()
}

func NewHelloClientSession(session getty.Session) (err error) {
tcp.EventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = tcp.InitialSession(session)
if err != nil {
return
}
return
}
18 changes: 3 additions & 15 deletions demo/hello/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,9 @@ import (

var (
pkgHandler = &hello.PackageHandler{}
eventListener = &hello.MessageHandler{}
EventListener = &hello.MessageHandler{}
)

func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}

func InitialSession(session getty.Session) (err error) {
session.SetCompressType(getty.CompressZip)

Expand All @@ -64,14 +52,14 @@ func InitialSession(session getty.Session) (err error) {

session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetRQLen(1024)
// session.SetRQLen(1024)
session.SetWQLen(512)
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
session.SetWaitTime(time.Second)

session.SetPkgHandler(pkgHandler)
session.SetEventListener(eventListener)
session.SetEventListener(EventListener)
return nil
}
2 changes: 1 addition & 1 deletion demo/hello/tcp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func main() {

util.SetLimit()

util.Profiling(*pprofPort)
util.Profiling(*pprofPort)

options := []getty.ServerOption{getty.WithLocalAddress(":8090")}

Expand Down
4 changes: 2 additions & 2 deletions demo/util/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
package util

import (
"fmt"
"fmt"
"net/http"
_ "net/http/pprof"
)

func Profiling(port int) {
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}()
}
6 changes: 3 additions & 3 deletions getty.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type Reader interface {
// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) error
Write(Session, interface{}) ([]byte, error)
}

// tcp package handler interface
// package handler interface
type ReadWriter interface {
Reader
Writer
Expand Down Expand Up @@ -120,7 +120,7 @@ type Connection interface {
writeTimeout() time.Duration
// SetWriteTimeout sets deadline for the future read calls.
SetWriteTimeout(time.Duration)
Write(interface{}) (int, error)
send(interface{}) (int, error)
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int)
Expand Down
Loading

0 comments on commit 2b6c658

Please sign in to comment.