diff --git a/go.mod b/go.mod index ed822e5..0e776c3 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/knadh/koanf/providers/file v0.1.0 github.com/knadh/koanf/providers/rawbytes v0.1.0 github.com/knadh/koanf/v2 v2.0.1 + github.com/moby/sys/mountinfo v0.7.2 github.com/redis/go-redis/v9 v9.5.1 github.com/shirou/gopsutil v2.21.11+incompatible go.uber.org/zap v1.27.0 diff --git a/go.sum b/go.sum index f986f8a..52ca738 100644 --- a/go.sum +++ b/go.sum @@ -180,8 +180,9 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/moby/sys/mountinfo v0.7.2 h1:1shs6aH5s4o5H2zQLn796ADW1wMrIwHsyJ2v9KouLrg= +github.com/moby/sys/mountinfo v0.7.2/go.mod h1:1YOa8w8Ih7uW0wALDUgT1dTTSBrZ+HiBLGws92L2RU4= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= diff --git a/pkg/blobfs.go b/pkg/blobfs.go index b135c55..4b851b3 100644 --- a/pkg/blobfs.go +++ b/pkg/blobfs.go @@ -13,6 +13,7 @@ import ( "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" + "github.com/moby/sys/mountinfo" ) type BlobFsMetadata struct { @@ -158,6 +159,36 @@ func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan err return startServer, serverError, server, nil } +func updateReadAheadKB(mountPoint string, valueKB int) error { + mounts, err := mountinfo.GetMounts(nil) + if err != nil { + return fmt.Errorf("failed to get mount info: %w", err) + } + + var deviceID string + for _, mount := range mounts { + if mount.Mountpoint == mountPoint { + deviceID = fmt.Sprintf("%d:%d", mount.Major, mount.Minor) + break + } + } + + if deviceID == "" { + return fmt.Errorf("mount point %s not found", mountPoint) + } + + // Construct path to read_ahead_kb + readAheadPath := fmt.Sprintf("/sys/class/bdi/%s/read_ahead_kb", deviceID) + + // Update read_ahead_kb + cmd := exec.Command("sh", "-c", fmt.Sprintf("echo %d > %s", valueKB, readAheadPath)) + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to update read_ahead_kb: %w", err) + } + + return nil +} + // NewFileSystem initializes a new BlobFs with root metadata. func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error) { metadata := opts.Metadata diff --git a/pkg/blobfs_prefetch.go b/pkg/blobfs_prefetch.go index b182f5f..e9af9cc 100644 --- a/pkg/blobfs_prefetch.go +++ b/pkg/blobfs_prefetch.go @@ -269,6 +269,9 @@ func (pb *PrefetchBuffer) GetRange(offset uint64, dst []byte) error { } func (pb *PrefetchBuffer) tryGetRange(offset, length uint64) ([]byte, bool, bool) { + pb.mu.Lock() + defer pb.mu.Unlock() + windowIndex := offset / pb.windowSize var w *window @@ -287,10 +290,8 @@ func (pb *PrefetchBuffer) tryGetRange(offset, length uint64) ([]byte, bool, bool return nil, false, false } - if w.fetching { - w.mu.Lock() - defer w.mu.Unlock() - } + w.mu.Lock() + defer w.mu.Unlock() windowOffset := offset - (windowIndex * pb.windowSize) windowHead := (windowIndex * pb.windowSize) + w.readLength diff --git a/pkg/client.go b/pkg/client.go index 6a8a013..f82ac15 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -28,6 +28,10 @@ const ( closestHostTimeout = 30 * time.Second localClientCacheCleanupInterval = 5 * time.Second localClientCacheTTL = 300 * time.Second + + // NOTE: This value for readAheadKB is separate from the blobfs config since the FUSE library does + // weird stuff with the other read_ahead_kb value internally + readAheadKB = 32768 ) func AuthInterceptor(token string) grpc.UnaryClientInterceptor { @@ -126,6 +130,11 @@ func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheCli return nil, err } + err = updateReadAheadKB(cfg.BlobFs.MountPoint, readAheadKB) + if err != nil { + Logger.Errorf("Failed to update read_ahead_kb: %v", err) + } + bc.blobfsServer = server }