diff --git a/pkg/client.go b/pkg/client.go index 14039b8..89da6ef 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -20,11 +20,14 @@ import ( "tailscale.com/client/tailscale" ) -const getContentRequestTimeout = 30 * time.Second -const storeContentRequestTimeout = 300 * time.Second -const closestHostTimeout = 30 * time.Second -const localClientCacheCleanupInterval = 5 * time.Second -const localClientCacheTTL = 300 * time.Second +const ( + getContentRequestTimeout = 30 * time.Second + storeContentRequestTimeout = 300 * time.Second + closestHostTimeout = 30 * time.Second + localClientCacheCleanupInterval = 5 * time.Second + localClientCacheTTL = 300 * time.Second + readBufferSizeBytes = 128 * 1024 +) func AuthInterceptor(token string) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -160,6 +163,7 @@ func (c *BlobCacheClient) addHost(host *BlobCacheHost) error { grpc.MaxCallRecvMsgSize(maxMessageSize), grpc.MaxCallSendMsgSize(maxMessageSize), ), + grpc.WithReadBufferSize(readBufferSizeBytes), } if c.cfg.Token != "" { diff --git a/pkg/server.go b/pkg/server.go index 306f5ad..5032285 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -12,6 +12,7 @@ import ( "os/signal" "path/filepath" "runtime" + "sync" "syscall" "time" @@ -23,6 +24,12 @@ import ( "google.golang.org/grpc/status" ) +const ( + writeBufferSizeBytes = 128 * 1024 + getContentBufferPoolSize = 128 + getContentBufferSize = 256 * 1024 +) + type CacheServiceOpts struct { Addr string } @@ -149,6 +156,7 @@ func (cs *CacheService) StartServer(port uint) error { s := grpc.NewServer( grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize), + grpc.WriteBufferSize(writeBufferSizeBytes), ) proto.RegisterBlobCacheServer(s, cs) @@ -170,15 +178,42 @@ func (cs *CacheService) StartServer(port uint) error { return nil } +var getContentBufferPool = sync.Pool{ + New: func() interface{} { + b := make([]byte, getContentBufferSize) + return &b + }, +} + +func init() { + for i := 0; i < getContentBufferPoolSize; i++ { + //lint:ignore SA6002 reason: pre-allocating buffers for performance + getContentBufferPool.Put(make([]byte, getContentBufferSize)) + } +} + func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error) { - content, err := cs.cas.Get(req.Hash, req.Offset, req.Length) + bufPtr := getContentBufferPool.Get().(*[]byte) + defer getContentBufferPool.Put(bufPtr) + dst := *bufPtr + + if cap(dst) < int(req.Length) { + dst = make([]byte, req.Length) + *bufPtr = dst + } + dst = dst[:req.Length] + + resp := &proto.GetContentResponse{Content: dst} + err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) if err != nil { Logger.Debugf("Get - [%s] - %v", req.Hash, err) return &proto.GetContentResponse{Content: nil, Ok: false}, nil } Logger.Debugf("Get - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length) - return &proto.GetContentResponse{Content: content, Ok: true}, nil + + resp.Ok = true + return resp, nil } func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourcePath string, sourceOffset int64) (string, error) { diff --git a/pkg/storage.go b/pkg/storage.go index a814955..db9362c 100644 --- a/pkg/storage.go +++ b/pkg/storage.go @@ -1,7 +1,6 @@ package blobcache import ( - "bytes" "context" "errors" "fmt" @@ -136,11 +135,10 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont return nil } -func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]byte, error) { - buffer := bytes.NewBuffer(make([]byte, 0, length)) - +func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) error { remainingLength := length o := offset + dstOffset := int64(0) cas.cache.ResetTTL(hash, time.Duration(cas.config.ObjectTtlS)*time.Second) @@ -151,7 +149,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([] // Check cache for chunk value, found := cas.cache.Get(chunkKey) if !found { - return nil, ErrContentNotFound + return ErrContentNotFound } v := value.(cacheValue) @@ -167,18 +165,17 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([] end := start + readLength if start < 0 || end <= start || end > int64(len(chunkBytes)) { - return nil, fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) + return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) } - if _, err := buffer.Write(chunkBytes[start:end]); err != nil { - return nil, fmt.Errorf("failed to write to buffer: %v", err) - } + copy(dst[dstOffset:dstOffset+readLength], chunkBytes[start:end]) remainingLength -= readLength o += readLength + dstOffset += readLength } - return buffer.Bytes(), nil + return nil } func (cas *ContentAddressableStorage) onEvict(item *ristretto.Item[interface{}]) {