Skip to content
Permalink
Browse files
Merge pull request #82 from takewofly/feature/split
Opt: split large packet to n*16KB packet
  • Loading branch information
AlexStocks committed Nov 17, 2021
2 parents a2461f8 + a3f9370 commit 54a2fadd513868cf9d6f0acb12ed517d50473c8d
Showing 2 changed files with 60 additions and 5 deletions.
@@ -167,6 +167,32 @@ func TestTCPClient(t *testing.T) {
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
assert.True(t, conn.compress == CompressSnappy)

batchSize := 128 * 1023
source := make([]byte, batchSize)
for i := 0; i < batchSize; i++ {
source[i] = 't'
}
l, err = ss.WriteBytes(source)
assert.Nil(t, err)
assert.True(t, l == batchSize)
beforeWriteBytes.Add(uint32(batchSize))
beforeWritePkgNum.Add(uint32(batchSize/16/1024) + 1)
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)

batchSize = 32 * 1024
source = make([]byte, batchSize)
for i := 0; i < batchSize; i++ {
source[i] = 't'
}
l, err = ss.WriteBytes(source)
assert.Nil(t, err)
assert.True(t, l == batchSize)
beforeWriteBytes.Add(uint32(batchSize))
beforeWritePkgNum.Add(2)
assert.Equal(t, beforeWriteBytes, conn.writeBytes)
assert.Equal(t, beforeWritePkgNum, conn.writePkgNum)

clt.Close()
assert.True(t, clt.IsClosed())
}
@@ -47,6 +47,7 @@ const (
pendingDuration = 3e9
// MaxWheelTimeSpan 900s, 15 minute
MaxWheelTimeSpan = 900e9
maxPacketLen = 16 * 1024

defaultSessionName = "session"
defaultTCPSessionName = "tcp-session"
@@ -127,8 +128,9 @@ type session struct {
attrs *gxcontext.ValuesContext

// goroutines sync
grNum uatomic.Int32
lock sync.RWMutex
grNum uatomic.Int32
lock sync.RWMutex
packetLock sync.RWMutex
}

func newSession(endPoint EndPoint, conn Connection) *session {
@@ -400,6 +402,8 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) (int, int, er
} else {
pkg = pkgBytes
}
s.packetLock.RLock()
defer s.packetLock.RUnlock()
if 0 < timeout {
s.Connection.SetWriteTimeout(timeout)
}
@@ -418,11 +422,34 @@ func (s *session) WriteBytes(pkg []byte) (int, error) {
return 0, ErrSessionClosed
}

lg, err := s.Connection.send(pkg)
leftPackageSize, totalSize, writeSize := len(pkg), len(pkg), 0
if leftPackageSize > maxPacketLen {
s.packetLock.Lock()
defer s.packetLock.Unlock()
} else {
s.packetLock.RLock()
defer s.packetLock.RUnlock()
}

for leftPackageSize > maxPacketLen {
_, err := s.Connection.send(pkg[writeSize:(writeSize + maxPacketLen)])
if err != nil {
return writeSize, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
leftPackageSize -= maxPacketLen
writeSize += maxPacketLen
}

if leftPackageSize == 0 {
return writeSize, nil
}

_, err := s.Connection.send(pkg[writeSize:])
if err != nil {
return 0, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
return writeSize, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
return lg, nil

return totalSize, nil
}

// WriteBytesArray Write multiple packages at once. so we invoke write sys.call just one time.
@@ -436,6 +463,8 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) (int, error) {

// reduce syscall and memcopy for multiple packages
if _, ok := s.Connection.(*gettyTCPConn); ok {
s.packetLock.RLock()
defer s.packetLock.RUnlock()
lg, err := s.Connection.send(pkgs)
if err != nil {
return 0, perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))

0 comments on commit 54a2fad

Please sign in to comment.