Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: writev #26

Merged
merged 2 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions conn.go → connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:21
# FILE : conn.go
# FILE : connection.go
******************************************************/

package getty
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active))))
}

func (c *gettyConn) Write(interface{}) (int, error) {
func (c *gettyConn) send(interface{}) (int, error) {
return 0, nil
}

Expand Down Expand Up @@ -228,7 +228,7 @@ func (t *gettyTCPConn) SetCompressType(c CompressType) {
}

// tcp connection read
func (t *gettyTCPConn) read(p []byte) (int, error) {
func (t *gettyTCPConn) recv(p []byte) (int, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -258,7 +258,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
}

// tcp connection write
func (t *gettyTCPConn) Write(pkg interface{}) (int, error) {
func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -379,7 +379,7 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) {
}

// udp connection read
func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -411,7 +411,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
}

// write udp packet, @ctx should be of type UDPContext
func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) {
func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
var (
err error
currentTime time.Time
Expand Down Expand Up @@ -539,7 +539,7 @@ func (w *gettyWSConn) handlePong(string) error {
}

// websocket connection read
func (w *gettyWSConn) read() ([]byte, error) {
func (w *gettyWSConn) recv() ([]byte, error) {
// Pls do not set read deadline when using ReadMessage. AlexStocks 20180310
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type.
Expand Down Expand Up @@ -578,7 +578,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
}

// websocket connection write
func (w *gettyWSConn) Write(pkg interface{}) (int, error) {
func (w *gettyWSConn) send(pkg interface{}) (int, error) {
var (
err error
ok bool
Expand Down
44 changes: 37 additions & 7 deletions demo/hello/pkghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package hello

import (
"encoding/binary"
"errors"
)

Expand All @@ -18,15 +19,44 @@ import (
type PackageHandler struct{}

func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
s := string(data)
return s, len(s), nil
dataLen := len(data)
if dataLen < 4 {
return nil, 0, nil
}

start := 0
pos := start + 4
pkgLen := int(binary.LittleEndian.Uint32(data[start:pos]))
if dataLen < pos+pkgLen {
return nil, pos + pkgLen, nil
}
start = pos

pos = start + pkgLen
s := string(data[start:pos])

return s, pos, nil
}

func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) error {
s, ok := pkg.(string)
func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) {
pkg, ok := p.(string)
if !ok {
log.Infof("illegal pkg:%+v", pkg)
return errors.New("invalid package")
log.Infof("illegal pkg:%+v", p)
return nil, errors.New("invalid package")
}
return ss.WriteBytes([]byte(s))

pkgLen := int32(len(pkg))
pkgStreams := make([]byte, 0, 4+len(pkg))

// pkg len
start := 0
pos := start + 4
binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen))
start = pos

// pkg
pos = start + int(pkgLen)
copy(pkgStreams[start:pos], pkg[:])

return pkgStreams[:pos], nil
}
29 changes: 20 additions & 9 deletions demo/hello/tcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
)

var (
taskPool *gxsync.TaskPool
taskPool *gxsync.TaskPool
)

func main() {
Expand All @@ -44,23 +44,34 @@ func main() {

util.Profiling(*pprofPort)

if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}
if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
}

client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections),
)

client.RunEventLoop(tcp.NewHelloClientSession)
client.RunEventLoop(NewHelloClientSession)

go hello.ClientRequest()

util.WaitCloseSignals(client)
taskPool.Close()
}

func NewHelloClientSession(session getty.Session) (err error) {
tcp.EventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = tcp.InitialSession(session)
if err != nil {
return
}
return
}
18 changes: 3 additions & 15 deletions demo/hello/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,9 @@ import (

var (
pkgHandler = &hello.PackageHandler{}
eventListener = &hello.MessageHandler{}
EventListener = &hello.MessageHandler{}
)

func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) {
eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session)
}
err = InitialSession(session)
if err != nil {
return
}
session.SetTaskPool(taskPool)
return
}

func InitialSession(session getty.Session) (err error) {
session.SetCompressType(getty.CompressZip)

Expand All @@ -64,14 +52,14 @@ func InitialSession(session getty.Session) (err error) {

session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetRQLen(1024)
// session.SetRQLen(1024)
session.SetWQLen(512)
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
session.SetWaitTime(time.Second)

session.SetPkgHandler(pkgHandler)
session.SetEventListener(eventListener)
session.SetEventListener(EventListener)
return nil
}
2 changes: 1 addition & 1 deletion demo/hello/tcp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func main() {

util.SetLimit()

util.Profiling(*pprofPort)
util.Profiling(*pprofPort)

options := []getty.ServerOption{getty.WithLocalAddress(":8090")}

Expand Down
4 changes: 2 additions & 2 deletions demo/util/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
package util

import (
"fmt"
"fmt"
"net/http"
_ "net/http/pprof"
)

func Profiling(port int) {
go func() {
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}()
}
6 changes: 3 additions & 3 deletions getty.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type Reader interface {
// Writer is used to marshal pkg and write to session
type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) error
Write(Session, interface{}) ([]byte, error)
}

// tcp package handler interface
// package handler interface
type ReadWriter interface {
Reader
Writer
Expand Down Expand Up @@ -120,7 +120,7 @@ type Connection interface {
writeTimeout() time.Duration
// SetWriteTimeout sets deadline for the future read calls.
SetWriteTimeout(time.Duration)
Write(interface{}) (int, error)
send(interface{}) (int, error)
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int)
Expand Down
Loading