Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ build.sh
tmp/
config.yaml
e2e/testclient/testdata/*.bin
daemonset.yaml
daemonset.yaml
output.bin
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ publish-chart:
rm beam-blobcache-v2-chart-$(chartVersion).tgz

testclient:
go build -o bin/testclient e2e/testclient/main.go
GOOS=linux GOARCH=amd64 go build -o bin/testclient e2e/testclient/main.go
232 changes: 166 additions & 66 deletions e2e/testclient/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package main

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"flag"
"io"
"log"
"os"
Expand All @@ -13,9 +13,21 @@ import (
blobcache "github.com/beam-cloud/blobcache-v2/pkg"
)

var totalIterations int = 1
var (
totalIterations int
checkContent bool
)

type TestResult struct {
ElapsedTime float64
ContentCheckPassed bool
}

func main() {
flag.IntVar(&totalIterations, "iterations", 3, "Number of iterations to run the tests")
flag.BoolVar(&checkContent, "checkcontent", true, "Check the content hash after receiving data")
flag.Parse()

configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]()
if err != nil {
log.Fatalf("Failed to load config: %v\n", err)
Expand All @@ -33,105 +45,193 @@ func main() {
log.Fatalf("Unable to create client: %v\n", err)
}

filePath := "e2e/testclient/testdata/test2.bin"
filePath := "e2e/testclient/testdata/test3.bin"
b, err := os.ReadFile(filePath)
if err != nil {
log.Fatalf("Unable to read input file: %v\n", err)
}
hashBytes := sha256.Sum256(b)
fileHash := hex.EncodeToString(hashBytes[:])

const chunkSize = 1024 * 1024 * 16 // 16MB chunks
var totalTime float64
hash, err := storeFile(client, filePath)
if err != nil {
log.Fatalf("Failed to store file: %v\n", err)
}

var totalStreamResult, totalGetContentResult TestResult
for i := 0; i < totalIterations; i++ {
chunks := make(chan []byte)
log.Printf("Iteration %d\n", i)

// Read file in chunks and dump into channel for StoreContent RPC calls
go func() {
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("err: %v\n", err)
}
defer file.Close()
// Call GetContentStream
log.Printf("TestGetContentStream - %v\n", hash)
streamResult, err := TestGetContentStream(client, hash, len(b), fileHash)
if err != nil {
log.Fatalf("TestGetContentStream failed: %v\n", err)
}
totalStreamResult.ElapsedTime += streamResult.ElapsedTime
totalStreamResult.ContentCheckPassed = totalStreamResult.ContentCheckPassed || streamResult.ContentCheckPassed
log.Printf("TestGetContentStream - %v\n", streamResult)

// Call GetContent
log.Printf("TestGetContent - %v\n", hash)
getContentResult, err := TestGetContent(client, hash, int64(len(b)), fileHash)
if err != nil {
log.Fatalf("TestGetContent failed: %v\n", err)
}
totalGetContentResult.ElapsedTime += getContentResult.ElapsedTime
totalGetContentResult.ContentCheckPassed = totalGetContentResult.ContentCheckPassed || getContentResult.ContentCheckPassed
log.Printf("TestGetContent - %v\n", getContentResult)
}

for {
buf := make([]byte, chunkSize)
n, err := file.Read(buf)
GenerateReport(totalStreamResult, totalGetContentResult, len(b), totalIterations)
}

if err != nil && err != io.EOF {
log.Fatalf("err reading file: %v\n", err)
}
func storeFile(client *blobcache.BlobCacheClient, filePath string) (string, error) {
chunks := make(chan []byte)
go func() {
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("err: %v\n", err)
}
defer file.Close()

if n == 0 {
break
}
const chunkSize = 1024 * 1024 * 16 // 16MB chunks
for {
buf := make([]byte, chunkSize)
n, err := file.Read(buf)

chunks <- buf[:n]
if err != nil && err != io.EOF {
log.Fatalf("err reading file: %v\n", err)
}

close(chunks)
}()
if n == 0 {
break
}

hash, err := client.StoreContent(chunks)
if err != nil {
log.Fatalf("Unable to store content: %v\n", err)
chunks <- buf[:n]
}

startTime := time.Now()
content, err := client.GetContent(hash, 0, int64(len(b)))
if err != nil {
log.Fatalf("Unable to get content: %v\n", err)
}
elapsedTime := time.Since(startTime).Seconds()
totalTime += elapsedTime
close(chunks)
}()

hashBytes := sha256.Sum256(content)
responseHash := hex.EncodeToString(hashBytes[:])
hash, err := client.StoreContent(chunks)
if err != nil {
return "", err
}
return hash, nil
}

log.Printf("Initial file len: %d\n", len(b))
log.Printf("Response content len: %d\n", len(content))
log.Printf("Hash of initial file: %s\n", fileHash)
log.Printf("Hash of stored content: %s\n", hash)
log.Printf("Hash of retrieved content: %s\n", responseHash)
func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSize int, expectedHash string) (TestResult, error) {
contentCheckPassed := false

log.Printf("Iteration %d: content length: %d, file length: %d, elapsed time: %f seconds\n", i+1, len(content), len(b), elapsedTime)
startTime := time.Now()
contentChan, err := client.GetContentStream(hash, 0, int64(fileSize))
if err != nil {
return TestResult{}, err
}

var contentStream []byte
chunkQueue := make(chan []byte, 50) // Buffered channel to queue chunks
done := make(chan struct{}) // Channel to signal completion

if len(content) != len(b) {
log.Fatalf("length mismatch: content len: %d, file len: %d\n", len(content), len(b))
go func() {
for chunk := range chunkQueue {
contentStream = append(contentStream, chunk...)
}
close(done)
}()

// Direct byte comparison loop
mismatchFound := false
for i := range content {
if content[i] != b[i] {
log.Printf("Byte mismatch at position %d: content byte: %x, file byte: %x\n", i, content[i], b[i])
mismatchFound = true
break
}
for {
chunk, ok := <-contentChan
if !ok {
break
}
chunkQueue <- chunk
}
close(chunkQueue) // Close the queue to signal no more chunks
<-done

elapsedTime := time.Since(startTime).Seconds()

// Verify received content's hash
if checkContent {
log.Printf("Verifying hash for GetContentStream\n")

if !mismatchFound {
log.Println("Direct byte comparison found no differences.")
hashBytes := sha256.Sum256(contentStream)
retrievedHash := hex.EncodeToString(hashBytes[:])
if retrievedHash != expectedHash {
contentCheckPassed = false
} else {
log.Println("Direct byte comparison found differences.")
contentCheckPassed = true
}

// Cross-check with bytes.Equal
if bytes.Equal(content, b) {
log.Println("bytes.Equal confirms the slices are equal.")
log.Printf("Calculated hash for GetContentStream: expected %s, got %s", expectedHash, retrievedHash)
}

return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil
}

func TestGetContent(client *blobcache.BlobCacheClient, hash string, fileSize int64, expectedHash string) (TestResult, error) {
startTime := time.Now()
var content []byte
offset := int64(0)
const chunkSize = 1024 * 128 // 128k chunks

for offset < fileSize {
end := offset + chunkSize
if end > fileSize {
end = fileSize
}

chunk, err := client.GetContent(hash, offset, end-offset)
if err != nil {
return TestResult{}, err
}
content = append(content, chunk...)
offset = end
}

elapsedTime := time.Since(startTime).Seconds()

// Verify received content's hash
contentCheckPassed := false
if checkContent {
log.Printf("Verifying hash for GetContent\n")

hashBytes := sha256.Sum256(content)
retrievedHash := hex.EncodeToString(hashBytes[:])
if retrievedHash != expectedHash {
contentCheckPassed = false
} else {
log.Println("bytes.Equal indicates the slices are not equal.")
contentCheckPassed = true
}

log.Printf("Calculated hash for GetContent: expected %s, got %s", expectedHash, retrievedHash)
}

averageTime := totalTime / 10
mbPerSecond := (float64(len(b)*totalIterations) / (1024 * 1024)) / averageTime
log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecond)
return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil
}

_, err = client.StoreContentFromSource("images/agent.yaml", 0)
if err != nil {
log.Fatalf("Unable to store content from source: %v\n", err)
func GenerateReport(streamResult, contentResult TestResult, fileSize, iterations int) {
averageTimeStream := streamResult.ElapsedTime / float64(iterations)
averageTimeContent := contentResult.ElapsedTime / float64(iterations)
totalBytesReadMB := float64(fileSize*iterations) / (1024 * 1024)
mbPerSecondStream := totalBytesReadMB / streamResult.ElapsedTime
mbPerSecondContent := totalBytesReadMB / contentResult.ElapsedTime

log.Printf("Total time for GetContentStream: %f seconds\n", streamResult.ElapsedTime)
log.Printf("Average time per iteration for GetContentStream: %f seconds\n", averageTimeStream)
log.Printf("Total time for GetContent: %f seconds\n", contentResult.ElapsedTime)
log.Printf("Average time per iteration for GetContent: %f seconds\n", averageTimeContent)
log.Printf("Total read: %.2f MB\n", totalBytesReadMB)
log.Printf("Average MB/s rate of reading (GetContentStream): %f\n", mbPerSecondStream)
log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecondContent)

if checkContent {
if streamResult.ContentCheckPassed && contentResult.ContentCheckPassed {
log.Println("Content check passed for all iterations.")
} else {
log.Println("Content check failed for some iterations.")
}
}
}
6 changes: 4 additions & 2 deletions pkg/blobfs_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*
return nil, syscall.ENOENT
}

// TODO: stream file to a temp file in the container somewhere
// /tmp/cache/path/to/file

out.Attr = *attr
return node, fs.OK
}
Expand Down Expand Up @@ -172,8 +175,7 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int
return nil, syscall.EIO
}

nRead := copy(dest, buffer)
return fuse.ReadResultData(dest[:nRead]), fs.OK
return fuse.ReadResultData(buffer), fs.OK
}

func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno) {
Expand Down
50 changes: 50 additions & 0 deletions pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"math"
"net"
"sync"
Expand All @@ -22,6 +23,7 @@ import (

const (
getContentRequestTimeout = 30 * time.Second
getContentStreamRequestTimeout = 600 * time.Second
storeContentRequestTimeout = 300 * time.Second
closestHostTimeout = 30 * time.Second
localClientCacheCleanupInterval = 5 * time.Second
Expand Down Expand Up @@ -263,6 +265,54 @@ func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64) ([
return nil, ErrContentNotFound
}

func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64) (chan []byte, error) {
ctx, cancel := context.WithTimeout(c.ctx, getContentRequestTimeout)
contentChan := make(chan []byte)

go func() {
defer close(contentChan)
defer cancel()

for attempt := 0; attempt < 3; attempt++ {
client, host, err := c.getGRPCClient(ctx, &ClientRequest{
rt: ClientRequestTypeRetrieval,
hash: hash,
})
if err != nil {
return
}

stream, err := client.GetContentStream(ctx, &proto.GetContentRequest{Hash: hash, Offset: offset, Length: length})
if err != nil {
c.metadata.RemoveEntryLocation(ctx, hash, host)
c.mu.Lock()
delete(c.localHostCache, hash)
c.mu.Unlock()
continue
}

for {
resp, err := stream.Recv()
if err == io.EOF {
return
}

if err != nil || !resp.Ok {
c.metadata.RemoveEntryLocation(ctx, hash, host)
c.mu.Lock()
delete(c.localHostCache, hash)
c.mu.Unlock()
break
}

contentChan <- resp.Content
}
}
}()

return contentChan, nil
}

func (c *BlobCacheClient) manageLocalClientCache(ttl time.Duration, interval time.Duration) {
go func() {
ticker := time.NewTicker(interval)
Expand Down
Loading