Skip to content

Commit

Permalink
feat: add option to enable span cache
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Jun 24, 2024
1 parent 22991e2 commit e541ee2
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/cloudwego/fastpb v0.0.4
github.com/cloudwego/frugal v0.1.15
github.com/cloudwego/localsession v0.0.2
github.com/cloudwego/netpoll v0.6.1
github.com/cloudwego/netpoll v0.6.2
github.com/cloudwego/runtimex v0.1.0
github.com/cloudwego/thriftgo v0.3.6
github.com/golang/mock v1.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/cloudwego/localsession v0.0.2 h1:N9/IDtCPj1fCL9bCTP+DbXx3f40YjVYWcwkJ
github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8=
github.com/cloudwego/netpoll v0.6.1 h1:Cjftvi6bmumsOijmuUFy6HqAUXMxAT3fKK96wsrm3XA=
github.com/cloudwego/netpoll v0.6.1/go.mod h1:kaqvfZ70qd4T2WtIIpCOi5Cxyob8viEpzLhCrTrz3HM=
github.com/cloudwego/netpoll v0.6.2 h1:+KdILv5ATJU+222wNNXpHapYaBeRvvL8qhJyhcxRxrQ=
github.com/cloudwego/netpoll v0.6.2/go.mod h1:kaqvfZ70qd4T2WtIIpCOi5Cxyob8viEpzLhCrTrz3HM=
github.com/cloudwego/runtimex v0.1.0 h1:HG+WxWoj5/CDChDZ7D99ROwvSMkuNXAqt6hnhTTZDiI=
github.com/cloudwego/runtimex v0.1.0/go.mod h1:23vL/HGV0W8nSCHbe084AgEBdDV4rvXenEUMnUNvUd8=
github.com/cloudwego/thriftgo v0.3.6 h1:gHHW8Ag3cAEQ/awP4emTJiRPr5yQjbANhcsmV8/Epbw=
Expand Down
23 changes: 16 additions & 7 deletions pkg/mem/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mem
import (
"fmt"
"sync"
"sync/atomic"
"testing"

"github.com/cloudwego/kitex/internal/test"
Expand Down Expand Up @@ -127,17 +128,20 @@ func BenchmarkSpanCacheCopy(b *testing.B) {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
from := make([]byte, sz)
sc := NewSpanCache(maxSpanObject * 8)
buffers := make([][]byte, 1024)
var bidx int32
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
b.ReportAllocs()
b.ResetTimer()
var buffer []byte
idx := atomic.AddInt32(&bidx, 1)
for pb.Next() {
buffer = sc.Copy(from)
buffer := sc.Copy(from)
buffer[0] = 'a'
buffer[sz-1] = 'z'
buffers[idx] = buffer
}
_ = buffer
})
_ = buffers
})
}
}
Expand All @@ -146,18 +150,23 @@ func BenchmarkMakeAndCopy(b *testing.B) {
for _, sz := range benchStringSizes {
b.Run(fmt.Sprintf("size=%d", sz), func(b *testing.B) {
from := make([]byte, sz)
buffers := make([][]byte, 1024)
var bidx int32
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
b.ReportAllocs()
b.ResetTimer()
idx := atomic.AddInt32(&bidx, 1)
var buffer []byte
for pb.Next() {
buffer = make([]byte, sz)
copy(buffer, from)
buffer[0] = 'a'
buffer[sz-1] = 'z'
buffers[idx] = buffer
}
_ = buffer
})
_ = buffers
})
}
}
27 changes: 21 additions & 6 deletions pkg/protocol/bthrift/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@ import (

var (
// Binary protocol for bthrift.
Binary binaryProtocol
_ BTProtocol = binaryProtocol{}
spanCache = mem.NewSpanCache(1024 * 1024) // 1MB
Binary binaryProtocol
_ BTProtocol = binaryProtocol{}
spanCache = mem.NewSpanCache(1024 * 1024)
spanCacheEnable bool = false
)

const binaryInplaceThreshold = 4096 // 4k

type binaryProtocol struct{}

// SetSpanCache enable/disable binary protocol bytes/string allocator
func SetSpanCache(enable bool) {
spanCacheEnable = enable
}

func (binaryProtocol) WriteMessageBegin(buf []byte, name string, typeID thrift.TMessageType, seqid int32) int {
offset := 0
version := uint32(thrift.VERSION_1) | uint32(typeID)
Expand Down Expand Up @@ -467,8 +473,12 @@ func (binaryProtocol) ReadString(buf []byte) (value string, length int, err erro
if size < 0 || int(size) > len(buf) {
return value, length, perrors.NewProtocolErrorWithType(thrift.INVALID_DATA, "[ReadString] the string size greater than buf length")
}
data := spanCache.Copy(buf[length : length+int(size)])
value = utils.SliceByteToString(data)
if spanCacheEnable {
data := spanCache.Copy(buf[length : length+int(size)])
value = utils.SliceByteToString(data)
} else {
value = string(buf[length : length+int(size)])
}
length += int(size)
return
}
Expand All @@ -484,7 +494,12 @@ func (binaryProtocol) ReadBinary(buf []byte) (value []byte, length int, err erro
if size < 0 || size > len(buf) {
return value, length, perrors.NewProtocolErrorWithType(thrift.INVALID_DATA, "[ReadBinary] the binary size greater than buf length")
}
value = spanCache.Copy(buf[length : length+size])
if spanCacheEnable {
value = spanCache.Copy(buf[length : length+size])
} else {
value = make([]byte, size)
copy(value, buf[length:length+size])
}
length += size
return
}
Expand Down

0 comments on commit e541ee2

Please sign in to comment.