diff --git a/.gitignore b/.gitignore index 9e8f57c..5bd184c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,13 @@ *.tgz .DS_Store bin/blobcache +bin/throughput +bin/fs bin/testclient build.sh tmp/ config.yaml -e2e/testclient/testdata/*.bin +e2e/throughput/testdata/*.bin +e2e/fs/testdata/*.bin daemonset.yaml output.bin \ No newline at end of file diff --git a/Makefile b/Makefile index e956356..7728648 100644 --- a/Makefile +++ b/Makefile @@ -24,5 +24,6 @@ publish-chart: helm push beam-blobcache-v2-chart-$(chartVersion).tgz oci://public.ecr.aws/n4e0e1y0 rm beam-blobcache-v2-chart-$(chartVersion).tgz -testclient: - GOOS=linux GOARCH=amd64 go build -o bin/testclient e2e/testclient/main.go +testclients: + GOOS=linux GOARCH=amd64 go build -o bin/throughput e2e/throughput/main.go + GOOS=linux GOARCH=amd64 go build -o bin/fs e2e/fs/main.go diff --git a/cmd/main.go b/cmd/main.go index 8c4427a..07871dd 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,7 +10,7 @@ import ( func main() { configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]() if err != nil { - log.Fatalf("Failed to load config: %v\n", err) + log.Fatalf("Failed to load config: %v", err) } ctx := context.Background() diff --git a/e2e/fs/main.go b/e2e/fs/main.go new file mode 100644 index 0000000..1ffd31f --- /dev/null +++ b/e2e/fs/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "syscall" + + blobcache "github.com/beam-cloud/blobcache-v2/pkg" +) + +var ( + totalIterations int + checkContent bool +) + +type TestResult struct { + ElapsedTime float64 + ContentCheckPassed bool +} + +func main() { + flag.IntVar(&totalIterations, "iterations", 3, "Number of iterations to run the tests") + flag.BoolVar(&checkContent, "checkcontent", true, "Check the content hash after receiving data") + flag.Parse() + + configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]() + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + cfg := configManager.GetConfig() + + // Initialize logger + blobcache.InitLogger(cfg.DebugMode) + + ctx := context.Background() + + _, err = blobcache.NewBlobCacheClient(ctx, cfg) + if err != nil { + log.Fatalf("Unable to create client: %v", err) + } + + // Block until Ctrl+C (SIGINT) or SIGTERM is received + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + <-sigChan + + log.Println("Received interrupt or termination signal, exiting.") +} diff --git a/e2e/testclient/main.go b/e2e/throughput/main.go similarity index 96% rename from e2e/testclient/main.go rename to e2e/throughput/main.go index d1d9044..a4d7320 100644 --- a/e2e/testclient/main.go +++ b/e2e/throughput/main.go @@ -30,7 +30,7 @@ func main() { configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]() if err != nil { - log.Fatalf("Failed to load config: %v\n", err) + log.Fatalf("Failed to load config: %v", err) } cfg := configManager.GetConfig() @@ -42,10 +42,10 @@ func main() { client, err := blobcache.NewBlobCacheClient(ctx, cfg) if err != nil { - log.Fatalf("Unable to create client: %v\n", err) + log.Fatalf("Unable to create client: %v", err) } - filePath := "e2e/testclient/testdata/test3.bin" + filePath := "e2e/throughput/testdata/test3.bin" b, err := os.ReadFile(filePath) if err != nil { log.Fatalf("Unable to read input file: %v\n", err) @@ -101,7 +101,7 @@ func storeFile(client *blobcache.BlobCacheClient, filePath string) (string, erro n, err := file.Read(buf) if err != nil && err != io.EOF { - log.Fatalf("err reading file: %v\n", err) + log.Fatalf("err reading file: %v", err) } if n == 0 { @@ -155,7 +155,7 @@ func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSi // Verify received content's hash if checkContent { - log.Printf("Verifying hash for GetContentStream\n") + log.Printf("Verifying hash for GetContentStream") hashBytes := sha256.Sum256(contentStream) retrievedHash := hex.EncodeToString(hashBytes[:]) diff --git a/pkg/blobfs.go b/pkg/blobfs.go index 8f726c5..b135c55 100644 --- a/pkg/blobfs.go +++ b/pkg/blobfs.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "log" "os" "os/exec" "strings" @@ -67,17 +66,18 @@ type BlobFsSystemOpts struct { } type BlobFs struct { - ctx context.Context - root *FSNode - verbose bool - Metadata *BlobCacheMetadata - Client *BlobCacheClient - Config BlobCacheConfig + ctx context.Context + root *FSNode + verbose bool + Metadata *BlobCacheMetadata + Client *BlobCacheClient + Config BlobCacheConfig + PrefetchManager *PrefetchManager } func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan error, *fuse.Server, error) { mountPoint := opts.Config.BlobFs.MountPoint - Logger.Infof("Mounting to %s\n", mountPoint) + Logger.Infof("Mounting to %s", mountPoint) if _, err := os.Stat(mountPoint); os.IsNotExist(err) { err = os.MkdirAll(mountPoint, 0755) @@ -170,19 +170,24 @@ func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error) Metadata: metadata, } + if opts.Config.BlobFs.Prefetch.Enabled { + bfs.PrefetchManager = NewPrefetchManager(ctx, opts.Config, opts.Client) + bfs.PrefetchManager.Start() + } + rootID := GenerateFsID("/") rootPID := "" // Root node has no parent rootPath := "/" dirMeta, err := metadata.GetFsNode(bfs.ctx, rootID) if err != nil || dirMeta == nil { - log.Printf("Root node metadata not found, creating it now...\n") + Logger.Infof("Root node metadata not found, creating it now...") dirMeta = &BlobFsMetadata{PID: rootPID, ID: rootID, Path: rootPath, Ino: 1, Mode: fuse.S_IFDIR | 0755} err := metadata.SetFsNode(bfs.ctx, rootID, dirMeta) if err != nil { - log.Fatalf("Unable to create blobfs root node dir metdata: %+v\n", err) + Logger.Fatalf("Unable to create blobfs root node dir metdata: %+v", err) } } diff --git a/pkg/blobfs_node.go b/pkg/blobfs_node.go index 663b654..76c969f 100644 --- a/pkg/blobfs_node.go +++ b/pkg/blobfs_node.go @@ -12,13 +12,14 @@ import ( ) type BlobFsNode struct { - Path string - ID string - PID string - Name string - Target string - Hash string - Attr fuse.Attr + Path string + ID string + PID string + Name string + Target string + Hash string + Attr fuse.Attr + Prefetch *bool } type FSNode struct { fs.Inode @@ -94,13 +95,14 @@ func (n *FSNode) inodeFromFsId(ctx context.Context, fsId string) (*fs.Inode, *fu // Create a new Inode on lookup node := n.NewInode(ctx, &FSNode{filesystem: n.filesystem, bfsNode: &BlobFsNode{ - Path: metadata.Path, - ID: metadata.ID, - PID: metadata.PID, - Name: metadata.Name, - Hash: metadata.Hash, - Attr: attr, - Target: "", + Path: metadata.Path, + ID: metadata.ID, + PID: metadata.PID, + Name: metadata.Name, + Hash: metadata.Hash, + Attr: attr, + Target: "", + Prefetch: nil, }, attr: attr}, fs.StableAttr{Mode: metadata.Mode, Ino: metadata.Ino, Gen: metadata.Gen}, ) @@ -128,9 +130,6 @@ func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (* return nil, syscall.ENOENT } - // TODO: stream file to a temp file in the container somewhere - // /tmp/cache/path/to/file - out.Attr = *attr return node, fs.OK } @@ -162,6 +161,30 @@ func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuse return nil, 0, fs.OK } +func (n *FSNode) shouldPrefetch(node *BlobFsNode) bool { + if node.Prefetch != nil { + return *node.Prefetch + } + + if !n.filesystem.Config.BlobFs.Prefetch.Enabled { + return false + } + + if n.bfsNode.Attr.Size < n.filesystem.Config.BlobFs.Prefetch.MinFileSizeBytes { + return false + } + + for _, ext := range n.filesystem.Config.BlobFs.Prefetch.IgnoreFileExt { + if strings.HasSuffix(node.Name, ext) { + return false + } + } + + prefetch := true + node.Prefetch = &prefetch + return true +} + func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) { n.log("Read called with offset: %v", off) @@ -170,6 +193,17 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int return fuse.ReadResultData(dest[:0]), fs.OK } + // Attempt to prefetch the file + if n.shouldPrefetch(n.bfsNode) { + buffer := n.filesystem.PrefetchManager.GetPrefetchBuffer(n.bfsNode.Hash, n.bfsNode.Attr.Size) + if buffer != nil { + data, err := buffer.GetRange(uint64(off), uint64(len(dest))) + if err == nil { + return fuse.ReadResultData(data), fs.OK + } + } + } + buffer, err := n.filesystem.Client.GetContent(n.bfsNode.Hash, off, int64(len(dest))) if err != nil { return nil, syscall.EIO diff --git a/pkg/blobfs_prefetch.go b/pkg/blobfs_prefetch.go new file mode 100644 index 0000000..030b0eb --- /dev/null +++ b/pkg/blobfs_prefetch.go @@ -0,0 +1,318 @@ +package blobcache + +import ( + "context" + "fmt" + "sync" + "time" +) + +const ( + prefetchEvictionInterval = 5 * time.Second + prefetchSegmentIdleTTL = 5 * time.Second // remove stale segments if no reads in the past 30s + preemptiveFetchThresholdBytes = 16 * 1024 * 1024 // if the next segment is within 16MB of where we are reading, start fetching it +) + +type PrefetchManager struct { + ctx context.Context + config BlobCacheConfig + buffers sync.Map + client *BlobCacheClient + currentPrefetchSizeBytes uint64 +} + +func NewPrefetchManager(ctx context.Context, config BlobCacheConfig, client *BlobCacheClient) *PrefetchManager { + return &PrefetchManager{ + ctx: ctx, + config: config, + buffers: sync.Map{}, + client: client, + currentPrefetchSizeBytes: 0, + } +} + +func (pm *PrefetchManager) Start() { + go pm.evictIdleBuffers() +} + +// GetPrefetchBuffer returns an existing prefetch buffer if it exists, or nil +func (pm *PrefetchManager) GetPrefetchBuffer(hash string, fileSize uint64) *PrefetchBuffer { + if val, ok := pm.buffers.Load(hash); ok { + return val.(*PrefetchBuffer) + } + + ctx, cancel := context.WithCancel(pm.ctx) + newBuffer := NewPrefetchBuffer(PrefetchOpts{ + Ctx: ctx, + CancelFunc: cancel, + Hash: hash, + FileSize: fileSize, + WindowSize: pm.config.BlobFs.Prefetch.WindowSizeBytes, + DataTimeout: time.Second * time.Duration(pm.config.BlobFs.Prefetch.DataTimeoutS), + Client: pm.client, + Manager: pm, + }) + + pm.buffers.Store(hash, newBuffer) + return newBuffer +} + +func (pm *PrefetchManager) evictIdleBuffers() { + for { + select { + case <-pm.ctx.Done(): + return + case <-time.After(prefetchEvictionInterval): + pm.buffers.Range(func(key, value any) bool { + buffer := value.(*PrefetchBuffer) + + // If no reads have happened in any segments in the buffer + // stop any fetch operations and clear the buffer so it can + // be garbage collected + unused := buffer.evictIdle() + if unused { + buffer.Clear() + pm.buffers.Delete(key) + } + + return true + }) + } + } +} + +type PrefetchBuffer struct { + ctx context.Context + cancelFunc context.CancelFunc + manager *PrefetchManager + hash string + windowSize uint64 + lastRead time.Time + fileSize uint64 + client *BlobCacheClient + mu sync.Mutex + dataCond *sync.Cond + dataTimeout time.Duration + currentWindow *window + nextWindow *window + prevWindow *window +} + +type window struct { + index uint64 + data []byte + readLength uint64 + lastRead time.Time + fetching bool +} + +type PrefetchOpts struct { + Ctx context.Context + CancelFunc context.CancelFunc + Hash string + FileSize uint64 + WindowSize uint64 + Offset uint64 + Client *BlobCacheClient + DataTimeout time.Duration + Manager *PrefetchManager +} + +func NewPrefetchBuffer(opts PrefetchOpts) *PrefetchBuffer { + pb := &PrefetchBuffer{ + ctx: opts.Ctx, + cancelFunc: opts.CancelFunc, + hash: opts.Hash, + manager: opts.Manager, + lastRead: time.Now(), + fileSize: opts.FileSize, + client: opts.Client, + windowSize: opts.WindowSize, + dataTimeout: opts.DataTimeout, + mu: sync.Mutex{}, + currentWindow: nil, + nextWindow: nil, + prevWindow: nil, + } + pb.dataCond = sync.NewCond(&pb.mu) + return pb +} + +func (pb *PrefetchBuffer) fetch(offset uint64, bufferSize uint64) { + bufferIndex := offset / bufferSize + + pb.mu.Lock() + windows := []*window{pb.currentWindow, pb.nextWindow, pb.prevWindow} + for _, w := range windows { + if w != nil && w.index == bufferIndex { + pb.mu.Unlock() + return + } + } + + w := &window{ + index: bufferIndex, + data: make([]byte, 0, bufferSize), + readLength: 0, + lastRead: time.Now(), + fetching: true, + } + + // Slide windows + pb.prevWindow = pb.currentWindow + pb.currentWindow = pb.nextWindow + pb.nextWindow = w + pb.mu.Unlock() + + contentChan, err := pb.client.GetContentStream(pb.hash, int64(offset), int64(bufferSize)) + if err != nil { + return + } + + for { + select { + case <-pb.ctx.Done(): + return + case chunk, ok := <-contentChan: + if !ok { + pb.mu.Lock() + w.fetching = false + w.lastRead = time.Now() + pb.dataCond.Broadcast() + pb.mu.Unlock() + return + } + + pb.mu.Lock() + w.data = append(w.data, chunk...) + w.readLength += uint64(len(chunk)) + w.lastRead = time.Now() + pb.dataCond.Broadcast() + pb.mu.Unlock() + } + } +} + +func (pb *PrefetchBuffer) evictIdle() bool { + unused := true + + pb.mu.Lock() + windows := []*window{pb.prevWindow, pb.currentWindow, pb.nextWindow} + for _, w := range windows { + if w != nil && time.Since(w.lastRead) > prefetchSegmentIdleTTL && !w.fetching { + Logger.Debugf("Evicting segment %s-%d", pb.hash, w.index) + w.data = nil + } else { + unused = false + } + } + pb.prevWindow, pb.currentWindow, pb.nextWindow = windows[0], windows[1], windows[2] + pb.mu.Unlock() + + return unused +} + +func (pb *PrefetchBuffer) Clear() { + pb.cancelFunc() // Stop any fetch operations + + pb.mu.Lock() + defer pb.mu.Unlock() + + // Clear all window data + windows := []*window{pb.prevWindow, pb.currentWindow, pb.nextWindow} + for _, window := range windows { + window.data = nil + } +} + +func (pb *PrefetchBuffer) GetRange(offset, length uint64) ([]byte, error) { + bufferSize := pb.windowSize + bufferIndex := offset / bufferSize + bufferOffset := offset % bufferSize + + var result []byte + + for length > 0 { + data, ready := pb.tryGetRange(bufferIndex, bufferOffset, offset, length) + if ready { + result = append(result, data...) + dataLen := uint64(len(data)) + length -= dataLen + offset += dataLen + bufferIndex = offset / bufferSize + bufferOffset = offset % bufferSize + } else { + if err := pb.waitForSignal(); err != nil { + return nil, err + } + } + } + + return result, nil +} + +func (pb *PrefetchBuffer) waitForSignal() error { + timeoutChan := time.After(pb.dataTimeout) + + for { + select { + case <-waitForCondition(pb.dataCond): + return nil + case <-timeoutChan: + return fmt.Errorf("timeout occurred waiting for prefetch data") + case <-pb.ctx.Done(): + return pb.ctx.Err() + } + } +} + +func (pb *PrefetchBuffer) tryGetRange(bufferIndex, bufferOffset, offset, length uint64) ([]byte, bool) { + pb.mu.Lock() + defer pb.mu.Unlock() + + var w *window + var windows []*window = []*window{pb.currentWindow, pb.nextWindow, pb.prevWindow} + for _, win := range windows { + if win != nil && win.index == bufferIndex { + w = win + break + } + } + + if w == nil { + go pb.fetch(bufferIndex*pb.windowSize, pb.windowSize) + return nil, false + } + + if w.readLength > bufferOffset { + w.lastRead = time.Now() + + // Calculate the relative offset within the buffer + relativeOffset := offset - (bufferIndex * pb.windowSize) + availableLength := w.readLength - relativeOffset + readLength := min(int64(length), int64(availableLength)) + + // Pre-emptively start fetching the next buffer if within the threshold + if w.readLength-relativeOffset <= preemptiveFetchThresholdBytes { + nextBufferIndex := bufferIndex + 1 + if pb.nextWindow == nil || pb.nextWindow.index != nextBufferIndex { + go pb.fetch(nextBufferIndex*pb.windowSize, pb.windowSize) + } + } + + return w.data[relativeOffset : int64(relativeOffset)+int64(readLength)], true + } + + return nil, false +} + +func waitForCondition(cond *sync.Cond) <-chan struct{} { + ch := make(chan struct{}) + go func() { + cond.L.Lock() + cond.Wait() + cond.L.Unlock() + close(ch) + }() + return ch +} diff --git a/pkg/client.go b/pkg/client.go index 14a885c..46207b7 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -382,7 +382,7 @@ func (c *BlobCacheClient) getGRPCClient(ctx context.Context, request *ClientRequ } defer c.metadata.RemoveClientLock(ctx, c.hostname, request.hash) - Logger.Infof("Content not available in any nearby cache - repopulating from: %s\n", entry.SourcePath) + Logger.Infof("Content not available in any nearby cache - repopulating from: %s", entry.SourcePath) host, err = c.hostMap.Closest(closestHostTimeout) if err != nil { return nil, nil, err @@ -402,7 +402,7 @@ func (c *BlobCacheClient) getGRPCClient(ctx context.Context, request *ClientRequ } if resp.Ok { - Logger.Infof("Content repopulated from source: %s\n", entry.SourcePath) + Logger.Infof("Content repopulated from source: %s", entry.SourcePath) c.mu.Lock() c.localHostCache[request.hash] = &localClientCache{ host: host, @@ -494,7 +494,7 @@ func (c *BlobCacheClient) StoreContent(chunks chan []byte) (string, error) { return "", err } - Logger.Debugf("Elapsed time to send content: %v\n", time.Since(start)) + Logger.Debugf("Elapsed time to send content: %v", time.Since(start)) return resp.Hash, nil } diff --git a/pkg/config.default.yaml b/pkg/config.default.yaml index 98ff4d0..5aacb66 100644 --- a/pkg/config.default.yaml +++ b/pkg/config.default.yaml @@ -12,7 +12,14 @@ discoveryMode: metadata directIO: false options: [] blobfs: - enabled: false + prefetch: + enabled: false + idleTtlS: 60 + minFileSizeBytes: 1048576 # 1MB + windowSizeBytes: 134217728 # 128MB + dataTimeoutS: 30 + ignoreFileExt: + - .clip mountPoint: /tmp/test maxBackgroundTasks: 512 maxReadAheadKB: 128 diff --git a/pkg/server.go b/pkg/server.go index 32bf0fd..7cd6170 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -28,7 +28,7 @@ const ( writeBufferSizeBytes int = 128 * 1024 getContentBufferPoolSize int = 128 getContentBufferSize int64 = 256 * 1024 - getContentStreamChunkSize int64 = 32 * 1024 * 1024 // 32MB + getContentStreamChunkSize int64 = 16 * 1024 * 1024 // 16MB ) type CacheServiceOpts struct { @@ -81,11 +81,11 @@ func NewCacheService(ctx context.Context, cfg BlobCacheConfig) (*CacheService, e for _, sourceConfig := range cfg.BlobFs.Sources { _, err := NewSource(sourceConfig) if err != nil { - Logger.Errorf("Failed to configure content source: %+v\n", err) + Logger.Errorf("Failed to configure content source: %+v", err) continue } - Logger.Infof("Configured and mounted source: %+v\n", sourceConfig.FilesystemName) + Logger.Infof("Configured and mounted source: %+v", sourceConfig.FilesystemName) } } @@ -162,7 +162,7 @@ func (cs *CacheService) StartServer(port uint) error { ) proto.RegisterBlobCacheServer(s, cs) - Logger.Infof("Running @ %s%s, cfg: %+v\n", cs.hostname, addr, cs.cfg) + Logger.Infof("Running @ %s%s, cfg: %+v", cs.hostname, addr, cs.cfg) go s.Serve(localListener) go s.Serve(tailscaleListener) @@ -222,7 +222,7 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr const chunkSize = getContentStreamChunkSize offset := req.Offset remainingLength := req.Length - Logger.Infof("GetContentStream[ACK] - [%s] - %d bytes", req.Hash, remainingLength) + Logger.Infof("GetContentStream[ACK] - [%s] - offset=%d, length=%d, %d bytes", req.Hash, offset, req.Length, remainingLength) for remainingLength > 0 { currentChunkSize := chunkSize @@ -237,6 +237,10 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr return status.Errorf(codes.NotFound, "Content not found: %v", err) } + if n == 0 { + break + } + Logger.Infof("GetContentStream[TX] - [%s] - %d bytes", req.Hash, n) if err := stream.Send(&proto.GetContentResponse{ Ok: true, diff --git a/pkg/source.go b/pkg/source.go index 34f6ae6..822365c 100644 --- a/pkg/source.go +++ b/pkg/source.go @@ -39,13 +39,13 @@ func NewSource(config SourceConfig) (Source, error) { // NOTE: this is a no-op if already formatted err = s.Format(config.FilesystemName) if err != nil { - Logger.Fatalf("Unable to format filesystem: %+v\n", err) + Logger.Fatalf("Unable to format filesystem: %+v", err) } // Mount filesystem err = s.Mount(config.FilesystemPath) if err != nil { - Logger.Fatalf("Unable to mount filesystem: %+v\n", err) + Logger.Fatalf("Unable to mount filesystem: %+v", err) } return s, nil @@ -58,7 +58,7 @@ func NewSource(config SourceConfig) (Source, error) { // Mount filesystem err = s.Mount(config.FilesystemPath) if err != nil { - Logger.Fatalf("Unable to mount filesystem: %+v\n", err) + Logger.Fatalf("Unable to mount filesystem: %+v", err) } return s, nil diff --git a/pkg/source_juicefs.go b/pkg/source_juicefs.go index c5699b7..74452de 100644 --- a/pkg/source_juicefs.go +++ b/pkg/source_juicefs.go @@ -21,7 +21,7 @@ func NewJuiceFsSource(config JuiceFSConfig) (Source, error) { } func (s *JuiceFsSource) Mount(localPath string) error { - Logger.Infof("JuiceFS filesystem mounting to: '%s'\n", localPath) + Logger.Infof("JuiceFS filesystem mounting to: '%s'", localPath) cacheSize := strconv.FormatInt(s.config.CacheSize, 10) @@ -126,6 +126,6 @@ func (s *JuiceFsSource) Unmount(localPath string) error { return fmt.Errorf("error executing juicefs umount: %v, output: %s", err, string(output)) } - Logger.Infof("JuiceFS filesystem unmounted from: '%s'\n", localPath) + Logger.Infof("JuiceFS filesystem unmounted from: '%s'", localPath) return nil } diff --git a/pkg/source_mountpoint.go b/pkg/source_mountpoint.go index 36dc287..02428ff 100644 --- a/pkg/source_mountpoint.go +++ b/pkg/source_mountpoint.go @@ -48,7 +48,7 @@ func (s *MountPointSource) Mount(localPath string) error { } }() - Logger.Infof("Mountpoint filesystem is being mounted to: '%s'\n", localPath) + Logger.Infof("Mountpoint filesystem is being mounted to: '%s'", localPath) return nil } diff --git a/pkg/types.go b/pkg/types.go index ab8bb30..0121bac 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -90,15 +90,25 @@ type RedisConfig struct { } type BlobFsConfig struct { - Enabled bool `key:"enabled" json:"enabled"` - MountPoint string `key:"mountPoint" json:"mount_point"` - Sources []SourceConfig `key:"sources" json:"sources"` - MaxBackgroundTasks int `key:"maxBackgroundTasks" json:"max_background_tasks"` - MaxWriteKB int `key:"maxWriteKB" json:"max_write_kb"` - MaxReadAheadKB int `key:"maxReadAheadKB" json:"max_read_ahead_kb"` - DirectMount bool `key:"directMount" json:"direct_mount"` - DirectIO bool `key:"directIO" json:"direct_io"` - Options []string `key:"options" json:"options"` + Enabled bool `key:"enabled" json:"enabled"` + Prefetch BlobFsPrefetchConfig `key:"prefetch" json:"prefetch"` + MountPoint string `key:"mountPoint" json:"mount_point"` + Sources []SourceConfig `key:"sources" json:"sources"` + MaxBackgroundTasks int `key:"maxBackgroundTasks" json:"max_background_tasks"` + MaxWriteKB int `key:"maxWriteKB" json:"max_write_kb"` + MaxReadAheadKB int `key:"maxReadAheadKB" json:"max_read_ahead_kb"` + DirectMount bool `key:"directMount" json:"direct_mount"` + DirectIO bool `key:"directIO" json:"direct_io"` + Options []string `key:"options" json:"options"` +} + +type BlobFsPrefetchConfig struct { + Enabled bool `key:"enabled" json:"enabled"` + MinFileSizeBytes uint64 `key:"minFileSizeBytes" json:"min_file_size_bytes"` + IdleTtlS int `key:"idleTtlS" json:"idle_ttl_s"` + WindowSizeBytes uint64 `key:"windowSizeBytes" json:"window_size_bytes"` + IgnoreFileExt []string `key:"ignoreFileExt" json:"ignore_file_ext"` + DataTimeoutS int `key:"dataTimeoutS" json:"data_timeout_s"` } type SourceConfig struct {