From 8e2387f70e415ef24245f185554d8b42e21dacfb Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sat, 7 Sep 2019 16:18:38 +0800 Subject: [PATCH 1/2] Add: writev --- conn.go => connection.go | 16 ++--- demo/hello/pkghandler.go | 7 +- demo/hello/tcp/client/client.go | 17 +++-- demo/hello/tcp/config.go | 13 ++-- demo/hello/tcp/server/server.go | 2 +- demo/util/pprof.go | 4 +- getty.go | 6 +- session.go | 121 +++++++++++++++++++++++++------- 8 files changed, 129 insertions(+), 57 deletions(-) rename conn.go => connection.go (97%) diff --git a/conn.go b/connection.go similarity index 97% rename from conn.go rename to connection.go index fd836991..7833094c 100644 --- a/conn.go +++ b/connection.go @@ -4,7 +4,7 @@ # LICENCE : Apache License 2.0 # EMAIL : alexstocks@foxmail.com # MOD : 2016-08-17 11:21 -# FILE : conn.go +# FILE : connection.go ******************************************************/ package getty @@ -89,7 +89,7 @@ func (c *gettyConn) GetActive() time.Time { return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active)))) } -func (c *gettyConn) Write(interface{}) (int, error) { +func (c *gettyConn) send(interface{}) (int, error) { return 0, nil } @@ -228,7 +228,7 @@ func (t *gettyTCPConn) SetCompressType(c CompressType) { } // tcp connection read -func (t *gettyTCPConn) read(p []byte) (int, error) { +func (t *gettyTCPConn) recv(p []byte) (int, error) { var ( err error currentTime time.Time @@ -258,7 +258,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) { } // tcp connection write -func (t *gettyTCPConn) Write(pkg interface{}) (int, error) { +func (t *gettyTCPConn) send(pkg interface{}) (int, error) { var ( err error currentTime time.Time @@ -379,7 +379,7 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) { } // udp connection read -func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { +func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) { var ( err error currentTime time.Time @@ -411,7 +411,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { } // write udp packet, @ctx should be of type UDPContext -func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { +func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) { var ( err error currentTime time.Time @@ -539,7 +539,7 @@ func (w *gettyWSConn) handlePong(string) error { } // websocket connection read -func (w *gettyWSConn) read() ([]byte, error) { +func (w *gettyWSConn) recv() ([]byte, error) { // Pls do not set read deadline when using ReadMessage. AlexStocks 20180310 // gorilla/websocket/conn.go:NextReader will always fail when got a timeout error. _, b, e := w.conn.ReadMessage() // the first return value is message type. @@ -578,7 +578,7 @@ func (w *gettyWSConn) updateWriteDeadline() error { } // websocket connection write -func (w *gettyWSConn) Write(pkg interface{}) (int, error) { +func (w *gettyWSConn) send(pkg interface{}) (int, error) { var ( err error ok bool diff --git a/demo/hello/pkghandler.go b/demo/hello/pkghandler.go index 681dc39e..4dccc67b 100644 --- a/demo/hello/pkghandler.go +++ b/demo/hello/pkghandler.go @@ -22,11 +22,12 @@ func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, return s, len(s), nil } -func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) error { +func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { s, ok := pkg.(string) if !ok { log.Infof("illegal pkg:%+v", pkg) - return errors.New("invalid package") + return nil, errors.New("invalid package") } - return ss.WriteBytes([]byte(s)) + + return []byte(s), nil } diff --git a/demo/hello/tcp/client/client.go b/demo/hello/tcp/client/client.go index ec58b14a..21c93ed0 100644 --- a/demo/hello/tcp/client/client.go +++ b/demo/hello/tcp/client/client.go @@ -34,7 +34,7 @@ var ( ) var ( - taskPool *gxsync.TaskPool + taskPool *gxsync.TaskPool ) func main() { @@ -44,13 +44,13 @@ func main() { util.Profiling(*pprofPort) - if *taskPoolMode { - taskPool = gxsync.NewTaskPool( - gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength), - gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber), - gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize), - ) - } + if *taskPoolMode { + taskPool = gxsync.NewTaskPool( + gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength), + gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber), + gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize), + ) + } client := getty.NewTCPClient( getty.WithServerAddress(*ip+":8090"), @@ -63,4 +63,3 @@ func main() { util.WaitCloseSignals(client) } - diff --git a/demo/hello/tcp/config.go b/demo/hello/tcp/config.go index 2c2f59ff..d6accf9d 100644 --- a/demo/hello/tcp/config.go +++ b/demo/hello/tcp/config.go @@ -15,6 +15,7 @@ import ( import ( "github.com/dubbogo/getty" + "github.com/dubbogo/gost/sync" ) import ( @@ -30,12 +31,12 @@ func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (er eventListener.SessionOnOpen = func(session getty.Session) { hello.Sessions = append(hello.Sessions, session) } - err = InitialSession(session) - if err != nil { - return - } - session.SetTaskPool(taskPool) - return + err = InitialSession(session) + if err != nil { + return + } + session.SetTaskPool(taskPool) + return } func InitialSession(session getty.Session) (err error) { diff --git a/demo/hello/tcp/server/server.go b/demo/hello/tcp/server/server.go index 410656ef..13b273bf 100644 --- a/demo/hello/tcp/server/server.go +++ b/demo/hello/tcp/server/server.go @@ -38,7 +38,7 @@ func main() { util.SetLimit() - util.Profiling(*pprofPort) + util.Profiling(*pprofPort) options := []getty.ServerOption{getty.WithLocalAddress(":8090")} diff --git a/demo/util/pprof.go b/demo/util/pprof.go index 7b8b9109..f044d0a4 100644 --- a/demo/util/pprof.go +++ b/demo/util/pprof.go @@ -8,13 +8,13 @@ package util import ( - "fmt" + "fmt" "net/http" _ "net/http/pprof" ) func Profiling(port int) { go func() { - http.ListenAndServe(fmt.Sprintf(":%d", port), nil) + http.ListenAndServe(fmt.Sprintf(":%d", port), nil) }() } diff --git a/getty.go b/getty.go index c67d1201..297669fb 100644 --- a/getty.go +++ b/getty.go @@ -46,10 +46,10 @@ type Reader interface { // Writer is used to marshal pkg and write to session type Writer interface { // if @Session is udpGettySession, the second parameter is UDPContext. - Write(Session, interface{}) error + Write(Session, interface{}) ([]byte, error) } -// tcp package handler interface +// package handler interface type ReadWriter interface { Reader Writer @@ -120,7 +120,7 @@ type Connection interface { writeTimeout() time.Duration // SetWriteTimeout sets deadline for the future read calls. SetWriteTimeout(time.Duration) - Write(interface{}) (int, error) + send(interface{}) (int, error) // don't distinguish between tcp connection and websocket connection. Because // gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close close(int) diff --git a/session.go b/session.go index e6108d72..8ea24917 100644 --- a/session.go +++ b/session.go @@ -33,6 +33,7 @@ const ( period = 60 * 1e9 // 1 minute pendingDuration = 3e9 defaultQLen = 1024 + maxIovecNum = 10 defaultSessionName = "session" defaultTCPSessionName = "tcp-session" defaultUDPSessionName = "udp-session" @@ -108,7 +109,6 @@ func newSession(endPoint EndPoint, conn Connection) *session { wait: pendingDuration, attrs: NewValuesContext(nil), rDone: make(chan struct{}), - grNum: 0, } ss.Connection.setSession(ss) @@ -355,6 +355,9 @@ func (s *session) sessionToken() string { } func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { + if pkg == nil { + return fmt.Errorf("@pkg is nil") + } if s.IsClosed() { return ErrSessionClosed } @@ -368,12 +371,31 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { } }() - var err error if timeout <= 0 { - if err = s.writer.Write(s, pkg); err != nil { + pkgBytes, err := s.writer.Write(s, pkg) + if err != nil { + log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%v", s.Stat(), pkg, err) + return perrors.WithStack(err) + } + var udpCtxPtr *UDPContext + if udpCtx, ok := pkg.(UDPContext); ok { + udpCtxPtr = &udpCtx + } else if udpCtxP, ok := pkg.(*UDPContext); ok { + udpCtxPtr = udpCtxP + } + if udpCtxPtr != nil { + udpCtxPtr.Pkg = pkgBytes + pkg = *udpCtxPtr + } else { + pkg = pkgBytes + } + _, err = s.Connection.send(pkg) + if err != nil { + log.Warn("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err) return perrors.WithStack(err) } s.incWritePkgNum() + return nil } select { case s.wQ <- pkg: @@ -394,7 +416,7 @@ func (s *session) WriteBytes(pkg []byte) error { } // s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout)) - if _, err := s.Connection.Write(pkg); err != nil { + if _, err := s.Connection.send(pkg); err != nil { return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg)) } @@ -403,7 +425,7 @@ func (s *session) WriteBytes(pkg []byte) error { return nil } -// Write multiple packages at once +// Write multiple packages at once. so we invoke write sys.call just one time. func (s *session) WriteBytesArray(pkgs ...[]byte) error { if s.IsClosed() { return ErrSessionClosed @@ -438,7 +460,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { l += len(pkgs[i]) } - // return s.Connection.Write(arr) if err = s.WriteBytes(arr); err != nil { return perrors.WithStack(err) } @@ -446,7 +467,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { num := len(pkgs) - 1 for i := 0; i < num; i++ { s.incWritePkgNum() - // gxlog.CError("after write, ss:%s", s.Stat()) } return nil @@ -468,7 +488,7 @@ func (s *session) run() { // call session opened s.UpdateActive() if err := s.listener.OnOpen(s); err != nil { - log.Errorf("[OnOpen] error: %#v", err) + log.Errorf("[OnOpen] session %s, error: %#v", s.Stat(), err) s.Close() return } @@ -481,13 +501,17 @@ func (s *session) run() { func (s *session) handleLoop() { var ( - err error - flag bool - wsFlag bool - wsConn *gettyWSConn - // start time.Time - counter gxtime.CountWatch - outPkg interface{} + err error + ok bool + flag bool + wsFlag bool + udpFlag bool + loopFlag bool + wsConn *gettyWSConn + counter gxtime.CountWatch + outPkg interface{} + pkgBytes []byte + iovec [][]byte ) defer func() { @@ -506,6 +530,8 @@ func (s *session) handleLoop() { flag = true // do not do any read/Write/cron operation while got Write error wsConn, wsFlag = s.Connection.(*gettyWSConn) + _, udpFlag = s.Connection.(*gettyUDPConn) + iovec = make([][]byte, 0, maxIovecNum) LOOP: for { // A select blocks until one of its cases is ready to run. @@ -519,21 +545,67 @@ LOOP: break LOOP } counter.Start() - // if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() { if counter.Count() > s.wait.Nanoseconds() { log.Infof("%s, [session.handleLoop] got done signal ", s.Stat()) break LOOP } - case outPkg = <-s.wQ: - if flag { - if err = s.writer.Write(s, outPkg); err != nil { + case outPkg, ok = <-s.wQ: + if !ok { + continue + } + if !flag { + log.Warn("[session.handleLoop] drop write out package %#v", outPkg) + continue + } + + if udpFlag || wsFlag { + err = s.WritePkg(outPkg, 0) + if err != nil { log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), err) s.stop() + // break LOOP flag = false + } + + continue + } + + iovec = iovec[:0] + for idx := 0; idx < maxIovecNum; idx++ { + pkgBytes, err = s.writer.Write(s, outPkg) + if err != nil { + log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), err) + s.stop() // break LOOP + flag = false + break + } + iovec = append(iovec, pkgBytes) + + if idx < maxIovecNum-1 { + loopFlag = true + select { + case outPkg, ok = <-s.wQ: + if !ok { + loopFlag = false + } + + default: + loopFlag = false + break + } + if !loopFlag { + break // break for-idx loop + } } - } else { - log.Infof("[session.handleLoop] drop writeout package{%#v}", outPkg) + } + err = s.WriteBytesArray(iovec[:]...) + if err != nil { + log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v", + s.sessionToken(), len(iovec), err) + s.stop() + // break LOOP + flag = false } case <-wheel.After(s.period): @@ -643,7 +715,7 @@ func (s *session) handleTCPPackage() error { for { // for clause for the network timeout condition check // s.conn.SetReadTimeout(time.Now().Add(s.rTimeout)) - bufLen, err = conn.read(buf) + bufLen, err = conn.recv(buf) if err != nil { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { break @@ -664,7 +736,6 @@ func (s *session) handleTCPPackage() error { if pktBuf.Len() <= 0 { break } - // pkg, err = s.pkgHandler.Read(s, pktBuf) pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes()) // for case 3/case 4 if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) { @@ -724,7 +795,7 @@ func (s *session) handleUDPPackage() error { break } - bufLen, addr, err = conn.read(buf) + bufLen, addr, err = conn.recv(buf) log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err) if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { continue @@ -785,7 +856,7 @@ func (s *session) handleWSPackage() error { if s.IsClosed() { break } - pkg, err = conn.read() + pkg, err = conn.recv() if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { continue } From e545f6faa6afae4b7952e0daf424b32f9949c024 Mon Sep 17 00:00:00 2001 From: AlexStocks Date: Sat, 7 Sep 2019 17:44:22 +0800 Subject: [PATCH 2/2] Imp: tcp stream handler --- demo/hello/pkghandler.go | 41 ++++++++++++++++++++++++++++----- demo/hello/tcp/client/client.go | 14 ++++++++++- demo/hello/tcp/config.go | 19 +++------------ 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/demo/hello/pkghandler.go b/demo/hello/pkghandler.go index 4dccc67b..658e75c5 100644 --- a/demo/hello/pkghandler.go +++ b/demo/hello/pkghandler.go @@ -8,6 +8,7 @@ package hello import ( + "encoding/binary" "errors" ) @@ -18,16 +19,44 @@ import ( type PackageHandler struct{} func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - s := string(data) - return s, len(s), nil + dataLen := len(data) + if dataLen < 4 { + return nil, 0, nil + } + + start := 0 + pos := start + 4 + pkgLen := int(binary.LittleEndian.Uint32(data[start:pos])) + if dataLen < pos+pkgLen { + return nil, pos + pkgLen, nil + } + start = pos + + pos = start + pkgLen + s := string(data[start:pos]) + + return s, pos, nil } -func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { - s, ok := pkg.(string) +func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) { + pkg, ok := p.(string) if !ok { - log.Infof("illegal pkg:%+v", pkg) + log.Infof("illegal pkg:%+v", p) return nil, errors.New("invalid package") } - return []byte(s), nil + pkgLen := int32(len(pkg)) + pkgStreams := make([]byte, 0, 4+len(pkg)) + + // pkg len + start := 0 + pos := start + 4 + binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen)) + start = pos + + // pkg + pos = start + int(pkgLen) + copy(pkgStreams[start:pos], pkg[:]) + + return pkgStreams[:pos], nil } diff --git a/demo/hello/tcp/client/client.go b/demo/hello/tcp/client/client.go index 21c93ed0..77467da0 100644 --- a/demo/hello/tcp/client/client.go +++ b/demo/hello/tcp/client/client.go @@ -57,9 +57,21 @@ func main() { getty.WithConnectionNumber(*connections), ) - client.RunEventLoop(tcp.NewHelloClientSession) + client.RunEventLoop(NewHelloClientSession) go hello.ClientRequest() util.WaitCloseSignals(client) + taskPool.Close() +} + +func NewHelloClientSession(session getty.Session) (err error) { + tcp.EventListener.SessionOnOpen = func(session getty.Session) { + hello.Sessions = append(hello.Sessions, session) + } + err = tcp.InitialSession(session) + if err != nil { + return + } + return } diff --git a/demo/hello/tcp/config.go b/demo/hello/tcp/config.go index d6accf9d..7950ef6e 100644 --- a/demo/hello/tcp/config.go +++ b/demo/hello/tcp/config.go @@ -15,7 +15,6 @@ import ( import ( "github.com/dubbogo/getty" - "github.com/dubbogo/gost/sync" ) import ( @@ -24,21 +23,9 @@ import ( var ( pkgHandler = &hello.PackageHandler{} - eventListener = &hello.MessageHandler{} + EventListener = &hello.MessageHandler{} ) -func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (err error) { - eventListener.SessionOnOpen = func(session getty.Session) { - hello.Sessions = append(hello.Sessions, session) - } - err = InitialSession(session) - if err != nil { - return - } - session.SetTaskPool(taskPool) - return -} - func InitialSession(session getty.Session) (err error) { session.SetCompressType(getty.CompressZip) @@ -65,7 +52,7 @@ func InitialSession(session getty.Session) (err error) { session.SetName("hello") session.SetMaxMsgLen(128) - session.SetRQLen(1024) + // session.SetRQLen(1024) session.SetWQLen(512) session.SetReadTimeout(time.Second) session.SetWriteTimeout(5 * time.Second) @@ -73,6 +60,6 @@ func InitialSession(session getty.Session) (err error) { session.SetWaitTime(time.Second) session.SetPkgHandler(pkgHandler) - session.SetEventListener(eventListener) + session.SetEventListener(EventListener) return nil }