Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"tailscale.com/client/tailscale"
)

const getContentRequestTimeout = 30 * time.Second
const storeContentRequestTimeout = 300 * time.Second
const closestHostTimeout = 30 * time.Second
const localClientCacheCleanupInterval = 5 * time.Second
const localClientCacheTTL = 300 * time.Second
const (
getContentRequestTimeout = 30 * time.Second
storeContentRequestTimeout = 300 * time.Second
closestHostTimeout = 30 * time.Second
localClientCacheCleanupInterval = 5 * time.Second
localClientCacheTTL = 300 * time.Second
readBufferSizeBytes = 128 * 1024
)

func AuthInterceptor(token string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand Down Expand Up @@ -160,6 +163,7 @@ func (c *BlobCacheClient) addHost(host *BlobCacheHost) error {
grpc.MaxCallRecvMsgSize(maxMessageSize),
grpc.MaxCallSendMsgSize(maxMessageSize),
),
grpc.WithReadBufferSize(readBufferSizeBytes),
}

if c.cfg.Token != "" {
Expand Down
39 changes: 37 additions & 2 deletions pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os/signal"
"path/filepath"
"runtime"
"sync"
"syscall"
"time"

Expand All @@ -23,6 +24,12 @@ import (
"google.golang.org/grpc/status"
)

const (
writeBufferSizeBytes = 128 * 1024
getContentBufferPoolSize = 128
getContentBufferSize = 256 * 1024
)

type CacheServiceOpts struct {
Addr string
}
Expand Down Expand Up @@ -149,6 +156,7 @@ func (cs *CacheService) StartServer(port uint) error {
s := grpc.NewServer(
grpc.MaxRecvMsgSize(maxMessageSize),
grpc.MaxSendMsgSize(maxMessageSize),
grpc.WriteBufferSize(writeBufferSizeBytes),
)
proto.RegisterBlobCacheServer(s, cs)

Expand All @@ -170,15 +178,42 @@ func (cs *CacheService) StartServer(port uint) error {
return nil
}

var getContentBufferPool = sync.Pool{
New: func() interface{} {
b := make([]byte, getContentBufferSize)
return &b
},
}

func init() {
for i := 0; i < getContentBufferPoolSize; i++ {
//lint:ignore SA6002 reason: pre-allocating buffers for performance
getContentBufferPool.Put(make([]byte, getContentBufferSize))
}
}

func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error) {
content, err := cs.cas.Get(req.Hash, req.Offset, req.Length)
bufPtr := getContentBufferPool.Get().(*[]byte)
defer getContentBufferPool.Put(bufPtr)
dst := *bufPtr

if cap(dst) < int(req.Length) {
dst = make([]byte, req.Length)
*bufPtr = dst
}
dst = dst[:req.Length]

resp := &proto.GetContentResponse{Content: dst}
err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst)
if err != nil {
Logger.Debugf("Get - [%s] - %v", req.Hash, err)
return &proto.GetContentResponse{Content: nil, Ok: false}, nil
}

Logger.Debugf("Get - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length)
return &proto.GetContentResponse{Content: content, Ok: true}, nil

resp.Ok = true
return resp, nil
}

func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourcePath string, sourceOffset int64) (string, error) {
Expand Down
17 changes: 7 additions & 10 deletions pkg/storage.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blobcache

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -136,11 +135,10 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont
return nil
}

func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]byte, error) {
buffer := bytes.NewBuffer(make([]byte, 0, length))

func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) error {
remainingLength := length
o := offset
dstOffset := int64(0)

cas.cache.ResetTTL(hash, time.Duration(cas.config.ObjectTtlS)*time.Second)

Expand All @@ -151,7 +149,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]
// Check cache for chunk
value, found := cas.cache.Get(chunkKey)
if !found {
return nil, ErrContentNotFound
return ErrContentNotFound
}

v := value.(cacheValue)
Expand All @@ -167,18 +165,17 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]
end := start + readLength

if start < 0 || end <= start || end > int64(len(chunkBytes)) {
return nil, fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes))
return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes))
}

if _, err := buffer.Write(chunkBytes[start:end]); err != nil {
return nil, fmt.Errorf("failed to write to buffer: %v", err)
}
copy(dst[dstOffset:dstOffset+readLength], chunkBytes[start:end])

remainingLength -= readLength
o += readLength
dstOffset += readLength
}

return buffer.Bytes(), nil
return nil
}

func (cas *ContentAddressableStorage) onEvict(item *ristretto.Item[interface{}]) {
Expand Down