Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a9c3a18
start laying out prefetch
luke-lombardi Jan 11, 2025
0f57f40
add separate throughput test
luke-lombardi Jan 11, 2025
8767978
clean up logs
luke-lombardi Jan 11, 2025
d01cf8c
refactor config
luke-lombardi Jan 12, 2025
139f49e
wip
luke-lombardi Jan 12, 2025
1bb3616
somewhat working prefetch
luke-lombardi Jan 12, 2025
1bbe917
fetch fetch fetch
luke-lombardi Jan 12, 2025
d6081dd
embed the lock inside the for loop
luke-lombardi Jan 12, 2025
c2f8904
wip
luke-lombardi Jan 12, 2025
5cc303a
fix bugs, verify checksums
luke-lombardi Jan 13, 2025
fe8b65d
rename vars
luke-lombardi Jan 13, 2025
237a302
more cleanup
luke-lombardi Jan 13, 2025
f6a629a
handle error
luke-lombardi Jan 13, 2025
4535bb3
rename configs
luke-lombardi Jan 13, 2025
792deee
more renames
luke-lombardi Jan 13, 2025
366dca5
clear segments
luke-lombardi Jan 13, 2025
25dd64f
lower eviction interval
luke-lombardi Jan 13, 2025
4d7fb60
fix lock
luke-lombardi Jan 13, 2025
45d8e6e
fix race
luke-lombardi Jan 13, 2025
51bf4ae
fix comment
luke-lombardi Jan 13, 2025
b9a30fd
explicitly clear buffer
luke-lombardi Jan 13, 2025
368cf33
more aggressive gc
luke-lombardi Jan 13, 2025
c6cfaa2
force gc on evict
luke-lombardi Jan 13, 2025
90b665e
remove idle
luke-lombardi Jan 13, 2025
c06d3d4
cache prefetch and support ignore file exts
luke-lombardi Jan 13, 2025
ac62cbb
add data timeout
luke-lombardi Jan 13, 2025
4630b9f
add total prefetch size
luke-lombardi Jan 13, 2025
a79c709
wip
luke-lombardi Jan 13, 2025
e106b46
remove logs
luke-lombardi Jan 13, 2025
13312f9
more cleanup
luke-lombardi Jan 13, 2025
075ddd6
remove prefetch limit
luke-lombardi Jan 13, 2025
b6a1d93
remove logs.txt
luke-lombardi Jan 13, 2025
d5a12d0
test sliding windows
luke-lombardi Jan 13, 2025
7b3f08e
remove debug logs
luke-lombardi Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
*.tgz
.DS_Store
bin/blobcache
bin/throughput
bin/fs
bin/testclient
build.sh
tmp/
config.yaml
e2e/testclient/testdata/*.bin
e2e/throughput/testdata/*.bin
e2e/fs/testdata/*.bin
daemonset.yaml
output.bin
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ publish-chart:
helm push beam-blobcache-v2-chart-$(chartVersion).tgz oci://public.ecr.aws/n4e0e1y0
rm beam-blobcache-v2-chart-$(chartVersion).tgz

testclient:
GOOS=linux GOARCH=amd64 go build -o bin/testclient e2e/testclient/main.go
testclients:
GOOS=linux GOARCH=amd64 go build -o bin/throughput e2e/throughput/main.go
GOOS=linux GOARCH=amd64 go build -o bin/fs e2e/fs/main.go
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func main() {
configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]()
if err != nil {
log.Fatalf("Failed to load config: %v\n", err)
log.Fatalf("Failed to load config: %v", err)
}

ctx := context.Background()
Expand Down
52 changes: 52 additions & 0 deletions e2e/fs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"

blobcache "github.com/beam-cloud/blobcache-v2/pkg"
)

var (
totalIterations int
checkContent bool
)

type TestResult struct {
ElapsedTime float64
ContentCheckPassed bool
}

func main() {
flag.IntVar(&totalIterations, "iterations", 3, "Number of iterations to run the tests")
flag.BoolVar(&checkContent, "checkcontent", true, "Check the content hash after receiving data")
flag.Parse()

configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]()
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}

cfg := configManager.GetConfig()

// Initialize logger
blobcache.InitLogger(cfg.DebugMode)

ctx := context.Background()

_, err = blobcache.NewBlobCacheClient(ctx, cfg)
if err != nil {
log.Fatalf("Unable to create client: %v", err)
}

// Block until Ctrl+C (SIGINT) or SIGTERM is received
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan

log.Println("Received interrupt or termination signal, exiting.")
}
10 changes: 5 additions & 5 deletions e2e/testclient/main.go → e2e/throughput/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {

configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]()
if err != nil {
log.Fatalf("Failed to load config: %v\n", err)
log.Fatalf("Failed to load config: %v", err)
}

cfg := configManager.GetConfig()
Expand All @@ -42,10 +42,10 @@ func main() {

client, err := blobcache.NewBlobCacheClient(ctx, cfg)
if err != nil {
log.Fatalf("Unable to create client: %v\n", err)
log.Fatalf("Unable to create client: %v", err)
}

filePath := "e2e/testclient/testdata/test3.bin"
filePath := "e2e/throughput/testdata/test3.bin"
b, err := os.ReadFile(filePath)
if err != nil {
log.Fatalf("Unable to read input file: %v\n", err)
Expand Down Expand Up @@ -101,7 +101,7 @@ func storeFile(client *blobcache.BlobCacheClient, filePath string) (string, erro
n, err := file.Read(buf)

if err != nil && err != io.EOF {
log.Fatalf("err reading file: %v\n", err)
log.Fatalf("err reading file: %v", err)
}

if n == 0 {
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSi

// Verify received content's hash
if checkContent {
log.Printf("Verifying hash for GetContentStream\n")
log.Printf("Verifying hash for GetContentStream")

hashBytes := sha256.Sum256(contentStream)
retrievedHash := hex.EncodeToString(hashBytes[:])
Expand Down
25 changes: 15 additions & 10 deletions pkg/blobfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"log"
"os"
"os/exec"
"strings"
Expand Down Expand Up @@ -67,17 +66,18 @@ type BlobFsSystemOpts struct {
}

type BlobFs struct {
ctx context.Context
root *FSNode
verbose bool
Metadata *BlobCacheMetadata
Client *BlobCacheClient
Config BlobCacheConfig
ctx context.Context
root *FSNode
verbose bool
Metadata *BlobCacheMetadata
Client *BlobCacheClient
Config BlobCacheConfig
PrefetchManager *PrefetchManager
}

func Mount(ctx context.Context, opts BlobFsSystemOpts) (func() error, <-chan error, *fuse.Server, error) {
mountPoint := opts.Config.BlobFs.MountPoint
Logger.Infof("Mounting to %s\n", mountPoint)
Logger.Infof("Mounting to %s", mountPoint)

if _, err := os.Stat(mountPoint); os.IsNotExist(err) {
err = os.MkdirAll(mountPoint, 0755)
Expand Down Expand Up @@ -170,19 +170,24 @@ func NewFileSystem(ctx context.Context, opts BlobFsSystemOpts) (*BlobFs, error)
Metadata: metadata,
}

if opts.Config.BlobFs.Prefetch.Enabled {
bfs.PrefetchManager = NewPrefetchManager(ctx, opts.Config, opts.Client)
bfs.PrefetchManager.Start()
}

rootID := GenerateFsID("/")
rootPID := "" // Root node has no parent
rootPath := "/"

dirMeta, err := metadata.GetFsNode(bfs.ctx, rootID)
if err != nil || dirMeta == nil {
log.Printf("Root node metadata not found, creating it now...\n")
Logger.Infof("Root node metadata not found, creating it now...")

dirMeta = &BlobFsMetadata{PID: rootPID, ID: rootID, Path: rootPath, Ino: 1, Mode: fuse.S_IFDIR | 0755}

err := metadata.SetFsNode(bfs.ctx, rootID, dirMeta)
if err != nil {
log.Fatalf("Unable to create blobfs root node dir metdata: %+v\n", err)
Logger.Fatalf("Unable to create blobfs root node dir metdata: %+v", err)
}
}

Expand Down
68 changes: 51 additions & 17 deletions pkg/blobfs_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (
)

type BlobFsNode struct {
Path string
ID string
PID string
Name string
Target string
Hash string
Attr fuse.Attr
Path string
ID string
PID string
Name string
Target string
Hash string
Attr fuse.Attr
Prefetch *bool
}
type FSNode struct {
fs.Inode
Expand Down Expand Up @@ -94,13 +95,14 @@ func (n *FSNode) inodeFromFsId(ctx context.Context, fsId string) (*fs.Inode, *fu
// Create a new Inode on lookup
node := n.NewInode(ctx,
&FSNode{filesystem: n.filesystem, bfsNode: &BlobFsNode{
Path: metadata.Path,
ID: metadata.ID,
PID: metadata.PID,
Name: metadata.Name,
Hash: metadata.Hash,
Attr: attr,
Target: "",
Path: metadata.Path,
ID: metadata.ID,
PID: metadata.PID,
Name: metadata.Name,
Hash: metadata.Hash,
Attr: attr,
Target: "",
Prefetch: nil,
}, attr: attr},
fs.StableAttr{Mode: metadata.Mode, Ino: metadata.Ino, Gen: metadata.Gen},
)
Expand Down Expand Up @@ -128,9 +130,6 @@ func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*
return nil, syscall.ENOENT
}

// TODO: stream file to a temp file in the container somewhere
// /tmp/cache/path/to/file

out.Attr = *attr
return node, fs.OK
}
Expand Down Expand Up @@ -162,6 +161,30 @@ func (n *FSNode) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuse
return nil, 0, fs.OK
}

func (n *FSNode) shouldPrefetch(node *BlobFsNode) bool {
if node.Prefetch != nil {
return *node.Prefetch
}

if !n.filesystem.Config.BlobFs.Prefetch.Enabled {
return false
}

if n.bfsNode.Attr.Size < n.filesystem.Config.BlobFs.Prefetch.MinFileSizeBytes {
return false
}

for _, ext := range n.filesystem.Config.BlobFs.Prefetch.IgnoreFileExt {
if strings.HasSuffix(node.Name, ext) {
return false
}
}

prefetch := true
node.Prefetch = &prefetch
return true
}

func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
n.log("Read called with offset: %v", off)

Expand All @@ -170,6 +193,17 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int
return fuse.ReadResultData(dest[:0]), fs.OK
}

// Attempt to prefetch the file
if n.shouldPrefetch(n.bfsNode) {
buffer := n.filesystem.PrefetchManager.GetPrefetchBuffer(n.bfsNode.Hash, n.bfsNode.Attr.Size)
if buffer != nil {
data, err := buffer.GetRange(uint64(off), uint64(len(dest)))
if err == nil {
return fuse.ReadResultData(data), fs.OK
}
}
}

buffer, err := n.filesystem.Client.GetContent(n.bfsNode.Hash, off, int64(len(dest)))
if err != nil {
return nil, syscall.EIO
Expand Down
Loading