From 1e869acf0b3623bbd7d6cc4ab8079f506881fac2 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:23:35 -0500 Subject: [PATCH 1/8] try to remove extra copy --- pkg/server.go | 7 +++++-- pkg/storage.go | 14 ++++++-------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/server.go b/pkg/server.go index 306f5ad..88600d2 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -171,14 +171,17 @@ func (cs *CacheService) StartServer(port uint) error { } func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error) { - content, err := cs.cas.Get(req.Hash, req.Offset, req.Length) + dst := bytes.NewBuffer(make([]byte, req.Length)) + resp := &proto.GetContentResponse{Content: dst.Bytes()} + + err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) if err != nil { Logger.Debugf("Get - [%s] - %v", req.Hash, err) return &proto.GetContentResponse{Content: nil, Ok: false}, nil } Logger.Debugf("Get - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length) - return &proto.GetContentResponse{Content: content, Ok: true}, nil + return resp, nil } func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourcePath string, sourceOffset int64) (string, error) { diff --git a/pkg/storage.go b/pkg/storage.go index a814955..fb35fc8 100644 --- a/pkg/storage.go +++ b/pkg/storage.go @@ -136,9 +136,7 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont return nil } -func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]byte, error) { - buffer := bytes.NewBuffer(make([]byte, 0, length)) - +func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst *bytes.Buffer) error { remainingLength := length o := offset @@ -151,7 +149,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([] // Check cache for chunk value, found := cas.cache.Get(chunkKey) if !found { - return nil, ErrContentNotFound + return ErrContentNotFound } v := value.(cacheValue) @@ -167,18 +165,18 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([] end := start + readLength if start < 0 || end <= start || end > int64(len(chunkBytes)) { - return nil, fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) + return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) } - if _, err := buffer.Write(chunkBytes[start:end]); err != nil { - return nil, fmt.Errorf("failed to write to buffer: %v", err) + if _, err := dst.Write(chunkBytes[start:end]); err != nil { + return fmt.Errorf("failed to write to buffer: %v", err) } remainingLength -= readLength o += readLength } - return buffer.Bytes(), nil + return nil } func (cas *ContentAddressableStorage) onEvict(item *ristretto.Item[interface{}]) { From 2ee98f1ec0545f4452bdee1af397cf77f6b50f97 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:31:18 -0500 Subject: [PATCH 2/8] fix allocation --- pkg/server.go | 6 ++++-- pkg/storage.go | 7 ++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/server.go b/pkg/server.go index 88600d2..1dfb52c 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -171,8 +171,8 @@ func (cs *CacheService) StartServer(port uint) error { } func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error) { - dst := bytes.NewBuffer(make([]byte, req.Length)) - resp := &proto.GetContentResponse{Content: dst.Bytes()} + dst := make([]byte, req.Length) + resp := &proto.GetContentResponse{Content: dst} err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) if err != nil { @@ -181,6 +181,8 @@ func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentReq } Logger.Debugf("Get - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length) + + resp.Ok = true return resp, nil } diff --git a/pkg/storage.go b/pkg/storage.go index fb35fc8..068a526 100644 --- a/pkg/storage.go +++ b/pkg/storage.go @@ -1,7 +1,6 @@ package blobcache import ( - "bytes" "context" "errors" "fmt" @@ -136,7 +135,7 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont return nil } -func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst *bytes.Buffer) error { +func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) error { remainingLength := length o := offset @@ -168,9 +167,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) } - if _, err := dst.Write(chunkBytes[start:end]); err != nil { - return fmt.Errorf("failed to write to buffer: %v", err) - } + copy(dst[o:o+readLength], chunkBytes[start:end]) remainingLength -= readLength o += readLength From 2af1495524737ae7076accc65597e098cd06952f Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:51:55 -0500 Subject: [PATCH 3/8] wip --- pkg/server.go | 9 +++++++++ pkg/storage.go | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/server.go b/pkg/server.go index 1dfb52c..eeff734 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "net" + "net/http" + _ "net/http/pprof" "os" "os/signal" "path/filepath" @@ -68,6 +70,12 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, e return nil, err } + go func() { + runtime.SetBlockProfileRate(1) + runtime.SetMutexProfileFraction(1) + http.ListenAndServe("localhost:6666", nil) + }() + // Mount cache as a FUSE filesystem if blobfs is enabled if cfg.BlobFs.Enabled { for _, sourceConfig := range cfg.BlobFs.Sources { @@ -149,6 +157,7 @@ func (cs *CacheService) StartServer(port uint) error { s := grpc.NewServer( grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize), + grpc.WriteBufferSize(128*1024), ) proto.RegisterBlobCacheServer(s, cs) diff --git a/pkg/storage.go b/pkg/storage.go index 068a526..db9362c 100644 --- a/pkg/storage.go +++ b/pkg/storage.go @@ -138,6 +138,7 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) error { remainingLength := length o := offset + dstOffset := int64(0) cas.cache.ResetTTL(hash, time.Duration(cas.config.ObjectTtlS)*time.Second) @@ -167,10 +168,11 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) } - copy(dst[o:o+readLength], chunkBytes[start:end]) + copy(dst[dstOffset:dstOffset+readLength], chunkBytes[start:end]) remainingLength -= readLength o += readLength + dstOffset += readLength } return nil From e4c0393fcffa156ca09bd23f039877c80223b722 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 16:01:53 -0500 Subject: [PATCH 4/8] wip --- pkg/client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/client.go b/pkg/client.go index 14039b8..f3ec00c 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -160,6 +160,7 @@ func (c *BlobCacheClient) addHost(host *BlobCacheHost) error { grpc.MaxCallRecvMsgSize(maxMessageSize), grpc.MaxCallSendMsgSize(maxMessageSize), ), + grpc.WithReadBufferSize(128 * 1024), } if c.cfg.Token != "" { From f75db2657841992ad7c2ff2b54e2a767dc3c0c2a Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:05:09 -0500 Subject: [PATCH 5/8] cleanup --- pkg/client.go | 13 ++++++++----- pkg/server.go | 43 ++++++++++++++++++++++++++++++++----------- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/pkg/client.go b/pkg/client.go index f3ec00c..1c5a785 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -20,11 +20,14 @@ import ( "tailscale.com/client/tailscale" ) -const getContentRequestTimeout = 30 * time.Second -const storeContentRequestTimeout = 300 * time.Second -const closestHostTimeout = 30 * time.Second -const localClientCacheCleanupInterval = 5 * time.Second -const localClientCacheTTL = 300 * time.Second +const ( + getContentRequestTimeout = 30 * time.Second + storeContentRequestTimeout = 300 * time.Second + closestHostTimeout = 30 * time.Second + localClientCacheCleanupInterval = 5 * time.Second + localClientCacheTTL = 300 * time.Second + readBufferSizeBytes = 128 * 1024 +) func AuthInterceptor(token string) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { diff --git a/pkg/server.go b/pkg/server.go index eeff734..f9f458b 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -8,12 +8,11 @@ import ( "fmt" "io" "net" - "net/http" - _ "net/http/pprof" "os" "os/signal" "path/filepath" "runtime" + "sync" "syscall" "time" @@ -25,6 +24,12 @@ import ( "google.golang.org/grpc/status" ) +const ( + writeBufferSizeBytes = 128 * 1024 + getContentBufferPoolSize = 64 + getContentBufferSize = 128 * 1024 +) + type CacheServiceOpts struct { Addr string } @@ -70,12 +75,6 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, e return nil, err } - go func() { - runtime.SetBlockProfileRate(1) - runtime.SetMutexProfileFraction(1) - http.ListenAndServe("localhost:6666", nil) - }() - // Mount cache as a FUSE filesystem if blobfs is enabled if cfg.BlobFs.Enabled { for _, sourceConfig := range cfg.BlobFs.Sources { @@ -157,7 +156,7 @@ func (cs *CacheService) StartServer(port uint) error { s := grpc.NewServer( grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize), - grpc.WriteBufferSize(128*1024), + grpc.WriteBufferSize(writeBufferSizeBytes), ) proto.RegisterBlobCacheServer(s, cs) @@ -179,10 +178,32 @@ func (cs *CacheService) StartServer(port uint) error { return nil } +var getContentBufferPool = sync.Pool{ + New: func() interface{} { + b := make([]byte, getContentBufferSize) + return &b + }, +} + +func init() { + for i := 0; i < getContentBufferPoolSize; i++ { + //lint:ignore SA6002 reason: pre-allocating buffers for performance + getContentBufferPool.Put(make([]byte, getContentBufferSize)) + } +} + func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error) { - dst := make([]byte, req.Length) - resp := &proto.GetContentResponse{Content: dst} + bufPtr := getContentBufferPool.Get().(*[]byte) + defer getContentBufferPool.Put(bufPtr) + dst := *bufPtr + if cap(dst) < int(req.Length) { + dst = make([]byte, req.Length) + *bufPtr = dst + } + dst = dst[:req.Length] + + resp := &proto.GetContentResponse{Content: dst} err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) if err != nil { Logger.Debugf("Get - [%s] - %v", req.Hash, err) From a78f9076673e3906561c4ecfec4b5afbe986322b Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:05:37 -0500 Subject: [PATCH 6/8] use const --- pkg/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/client.go b/pkg/client.go index 1c5a785..89da6ef 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -163,7 +163,7 @@ func (c *BlobCacheClient) addHost(host *BlobCacheHost) error { grpc.MaxCallRecvMsgSize(maxMessageSize), grpc.MaxCallSendMsgSize(maxMessageSize), ), - grpc.WithReadBufferSize(128 * 1024), + grpc.WithReadBufferSize(readBufferSizeBytes), } if c.cfg.Token != "" { From 03ec41c86490df853046a40d36d3e832156a4552 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:07:50 -0500 Subject: [PATCH 7/8] add more buffers --- pkg/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server.go b/pkg/server.go index f9f458b..c5baf84 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -26,7 +26,7 @@ import ( const ( writeBufferSizeBytes = 128 * 1024 - getContentBufferPoolSize = 64 + getContentBufferPoolSize = 128 getContentBufferSize = 128 * 1024 ) From 3d98f9d1b1b0dd570e790cabca3cf24b78fbdac7 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:15:40 -0500 Subject: [PATCH 8/8] bigger buffer --- pkg/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server.go b/pkg/server.go index c5baf84..5032285 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -27,7 +27,7 @@ import ( const ( writeBufferSizeBytes = 128 * 1024 getContentBufferPoolSize = 128 - getContentBufferSize = 128 * 1024 + getContentBufferSize = 256 * 1024 ) type CacheServiceOpts struct {