From a23ecbab39f26c0a1bde26d361ced2161e25ae52 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 08:53:27 -0500 Subject: [PATCH 01/17] more cleanup --- Makefile | 2 +- e2e/testclient/main.go | 99 +++++++++++++++++++++----------------- pkg/client.go | 57 ++++++++++++++++++++++ pkg/server.go | 35 +++++++++++++- proto/blobcache.pb.go | 68 ++++++++++++++------------ proto/blobcache.proto | 1 + proto/blobcache_grpc.pb.go | 66 ++++++++++++++++++++++++- 7 files changed, 252 insertions(+), 76 deletions(-) diff --git a/Makefile b/Makefile index a37a20c..e956356 100644 --- a/Makefile +++ b/Makefile @@ -25,4 +25,4 @@ publish-chart: rm beam-blobcache-v2-chart-$(chartVersion).tgz testclient: - go build -o bin/testclient e2e/testclient/main.go + GOOS=linux GOARCH=amd64 go build -o bin/testclient e2e/testclient/main.go diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index 72f7985..f700e8e 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -13,7 +13,10 @@ import ( blobcache "github.com/beam-cloud/blobcache-v2/pkg" ) -var totalIterations int = 1 +var ( + totalIterations int = 3 + checkHash bool = false +) func main() { configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]() @@ -33,7 +36,7 @@ func main() { log.Fatalf("Unable to create client: %v\n", err) } - filePath := "e2e/testclient/testdata/test2.bin" + filePath := "e2e/testclient/testdata/test3.bin" b, err := os.ReadFile(filePath) if err != nil { log.Fatalf("Unable to read input file: %v\n", err) @@ -44,6 +47,7 @@ func main() { const chunkSize = 1024 * 1024 * 16 // 16MB chunks var totalTime float64 + var storedHashed string = "" for i := 0; i < totalIterations; i++ { chunks := make(chan []byte) @@ -73,65 +77,74 @@ func main() { close(chunks) }() - hash, err := client.StoreContent(chunks) - if err != nil { - log.Fatalf("Unable to store content: %v\n", err) + if storedHashed == "" { + hash, err := client.StoreContent(chunks) + if err != nil { + log.Fatalf("Unable to store content: %v\n", err) + } + storedHashed = hash } startTime := time.Now() - content, err := client.GetContent(hash, 0, int64(len(b))) + contentChan, err := client.GetContentStream(storedHashed, 0, int64(len(b))) if err != nil { - log.Fatalf("Unable to get content: %v\n", err) + log.Fatalf("Unable to get content stream: %v\n", err) + } + + var content []byte + for { + chunk, ok := <-contentChan + if !ok { + log.Println("Content channel closed, exiting loop") + break + } + content = append(content, chunk...) } elapsedTime := time.Since(startTime).Seconds() totalTime += elapsedTime - hashBytes := sha256.Sum256(content) - responseHash := hex.EncodeToString(hashBytes[:]) + if checkHash { + hashBytes := sha256.Sum256(content) + responseHash := hex.EncodeToString(hashBytes[:]) - log.Printf("Initial file len: %d\n", len(b)) - log.Printf("Response content len: %d\n", len(content)) - log.Printf("Hash of initial file: %s\n", fileHash) - log.Printf("Hash of stored content: %s\n", hash) - log.Printf("Hash of retrieved content: %s\n", responseHash) + log.Printf("Initial file len: %d\n", len(b)) + log.Printf("Response content len: %d\n", len(content)) + log.Printf("Hash of initial file: %s\n", fileHash) + log.Printf("Hash of stored content: %s\n", storedHashed) + log.Printf("Hash of retrieved content: %s\n", responseHash) + log.Printf("Iteration %d: content length: %d, file length: %d, elapsed time: %f seconds\n", i+1, len(content), len(b), elapsedTime) - log.Printf("Iteration %d: content length: %d, file length: %d, elapsed time: %f seconds\n", i+1, len(content), len(b), elapsedTime) + if len(content) != len(b) { + log.Fatalf("length mismatch: content len: %d, file len: %d\n", len(content), len(b)) + } - if len(content) != len(b) { - log.Fatalf("length mismatch: content len: %d, file len: %d\n", len(content), len(b)) - } + // Direct byte comparison loop + mismatchFound := false + for i := range content { + if content[i] != b[i] { + log.Printf("Byte mismatch at position %d: content byte: %x, file byte: %x\n", i, content[i], b[i]) + mismatchFound = true + break + } + } - // Direct byte comparison loop - mismatchFound := false - for i := range content { - if content[i] != b[i] { - log.Printf("Byte mismatch at position %d: content byte: %x, file byte: %x\n", i, content[i], b[i]) - mismatchFound = true - break + if !mismatchFound { + log.Println("Direct byte comparison found no differences.") + } else { + log.Println("Direct byte comparison found differences.") } - } - if !mismatchFound { - log.Println("Direct byte comparison found no differences.") - } else { - log.Println("Direct byte comparison found differences.") - } + // Cross-check with bytes.Equal + if bytes.Equal(content, b) { + log.Println("bytes.Equal confirms the slices are equal.") + } else { + log.Println("bytes.Equal indicates the slices are not equal.") + } - // Cross-check with bytes.Equal - if bytes.Equal(content, b) { - log.Println("bytes.Equal confirms the slices are equal.") - } else { - log.Println("bytes.Equal indicates the slices are not equal.") } - } - averageTime := totalTime / 10 + averageTime := totalTime / float64(totalIterations) mbPerSecond := (float64(len(b)*totalIterations) / (1024 * 1024)) / averageTime log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecond) - - _, err = client.StoreContentFromSource("images/agent.yaml", 0) - if err != nil { - log.Fatalf("Unable to store content from source: %v\n", err) - } } diff --git a/pkg/client.go b/pkg/client.go index 89da6ef..a779870 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -4,6 +4,8 @@ import ( "context" "crypto/tls" "fmt" + "io" + "log" "math" "net" "sync" @@ -22,6 +24,7 @@ import ( const ( getContentRequestTimeout = 30 * time.Second + getContentStreamRequestTimeout = 600 * time.Second storeContentRequestTimeout = 300 * time.Second closestHostTimeout = 30 * time.Second localClientCacheCleanupInterval = 5 * time.Second @@ -512,3 +515,57 @@ func (c *BlobCacheClient) GetState() error { return nil } + +func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64) (chan []byte, error) { + ctx, cancel := context.WithTimeout(c.ctx, getContentStreamRequestTimeout) + contentChan := make(chan []byte) + + go func() { + defer func() { + close(contentChan) + }() + + defer cancel() + + for attempt := 0; attempt < 3; attempt++ { + client, host, err := c.getGRPCClient(ctx, &ClientRequest{ + rt: ClientRequestTypeRetrieval, + hash: hash, + }) + if err != nil { + log.Printf("Error getting content stream for %s: %v\n", hash, err) + return + } + + stream, err := client.GetContentStream(ctx, &proto.GetContentRequest{Hash: hash, Offset: offset, Length: length}) + if err != nil { + c.metadata.RemoveEntryLocation(ctx, hash, host) + c.mu.Lock() + delete(c.localHostCache, hash) + c.mu.Unlock() + continue + } + + for { + resp, err := stream.Recv() + if err == io.EOF { + return + } + + // If an unexpected error occurs, assume the content is no + // longer available on this host and remove entry from metadata + if err != nil || !resp.Ok { + c.metadata.RemoveEntryLocation(ctx, hash, host) + c.mu.Lock() + delete(c.localHostCache, hash) + c.mu.Unlock() + break + } + + contentChan <- resp.Content + } + } + }() + + return contentChan, nil +} diff --git a/pkg/server.go b/pkg/server.go index 5032285..25e1e4f 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -157,6 +157,7 @@ func (cs *CacheService) StartServer(port uint) error { grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize), grpc.WriteBufferSize(writeBufferSizeBytes), + grpc.NumStreamWorkers(uint32(runtime.NumCPU())), ) proto.RegisterBlobCacheServer(s, cs) @@ -286,7 +287,6 @@ func (cs *CacheService) usagePct() float64 { } func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error) { - return &proto.GetStateResponse{ Version: BlobCacheVersion, PrivateIpAddr: cs.privateIpAddr, @@ -333,3 +333,36 @@ func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.S return &proto.StoreContentFromSourceResponse{Ok: true, Hash: hash}, nil } + +func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error { + dst := make([]byte, req.Length) + + err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) + if err != nil { + Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) + return status.Errorf(codes.NotFound, "Content not found: %v", err) + } + + Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, len(dst)) + + chunkSize := 1024 * 1024 * 32 // 32MB chunks + for start := 0; start < len(dst); start += chunkSize { + end := start + chunkSize + if end > len(dst) { + end = len(dst) + } + + resp := &proto.GetContentResponse{ + Ok: true, + Content: dst[start:end], + } + + if err := stream.Send(resp); err != nil { + Logger.Errorf("GetContentStream - failed to send chunk: %v", err) + return status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) + } + } + + Logger.Debugf("GetContentStream - [%s] completed", req.Hash) + return nil +} diff --git a/proto/blobcache.pb.go b/proto/blobcache.pb.go index 33810cb..85226db 100644 --- a/proto/blobcache.pb.go +++ b/proto/blobcache.pb.go @@ -483,32 +483,38 @@ var file_blobcache_proto_rawDesc = []byte{ 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x32, 0xe5, 0x02, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x62, + 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x32, 0xba, 0x03, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x4b, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x53, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, - 0x6e, 0x74, 0x12, 0x1e, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, - 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, - 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x6f, 0x0a, 0x16, 0x53, 0x74, 0x6f, 0x72, 0x65, - 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x12, 0x28, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, - 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x62, 0x6c, - 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, - 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, - 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1b, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, - 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, - 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x65, - 0x61, 0x6d, 0x2d, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, - 0x68, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x00, 0x12, 0x53, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, + 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, + 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x53, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x65, + 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1e, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x6f, 0x0a, 0x16, + 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, + 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x28, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, + 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, + 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x29, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, + 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, + 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x2e, 0x62, 0x6c, 0x6f, 0x62, + 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, + 0x65, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2d, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x62, 0x6c, + 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -536,15 +542,17 @@ var file_blobcache_proto_goTypes = []interface{}{ } var file_blobcache_proto_depIdxs = []int32{ 0, // 0: blobcache.BlobCache.GetContent:input_type -> blobcache.GetContentRequest - 2, // 1: blobcache.BlobCache.StoreContent:input_type -> blobcache.StoreContentRequest - 6, // 2: blobcache.BlobCache.StoreContentFromSource:input_type -> blobcache.StoreContentFromSourceRequest - 4, // 3: blobcache.BlobCache.GetState:input_type -> blobcache.GetStateRequest - 1, // 4: blobcache.BlobCache.GetContent:output_type -> blobcache.GetContentResponse - 3, // 5: blobcache.BlobCache.StoreContent:output_type -> blobcache.StoreContentResponse - 7, // 6: blobcache.BlobCache.StoreContentFromSource:output_type -> blobcache.StoreContentFromSourceResponse - 5, // 7: blobcache.BlobCache.GetState:output_type -> blobcache.GetStateResponse - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type + 0, // 1: blobcache.BlobCache.GetContentStream:input_type -> blobcache.GetContentRequest + 2, // 2: blobcache.BlobCache.StoreContent:input_type -> blobcache.StoreContentRequest + 6, // 3: blobcache.BlobCache.StoreContentFromSource:input_type -> blobcache.StoreContentFromSourceRequest + 4, // 4: blobcache.BlobCache.GetState:input_type -> blobcache.GetStateRequest + 1, // 5: blobcache.BlobCache.GetContent:output_type -> blobcache.GetContentResponse + 1, // 6: blobcache.BlobCache.GetContentStream:output_type -> blobcache.GetContentResponse + 3, // 7: blobcache.BlobCache.StoreContent:output_type -> blobcache.StoreContentResponse + 7, // 8: blobcache.BlobCache.StoreContentFromSource:output_type -> blobcache.StoreContentFromSourceResponse + 5, // 9: blobcache.BlobCache.GetState:output_type -> blobcache.GetStateResponse + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/proto/blobcache.proto b/proto/blobcache.proto index fc6f366..6ef6c2b 100644 --- a/proto/blobcache.proto +++ b/proto/blobcache.proto @@ -6,6 +6,7 @@ package blobcache; service BlobCache { rpc GetContent(GetContentRequest) returns (GetContentResponse) {} + rpc GetContentStream(GetContentRequest) returns (stream GetContentResponse) {} rpc StoreContent(stream StoreContentRequest) returns (StoreContentResponse) {} rpc StoreContentFromSource(StoreContentFromSourceRequest) returns (StoreContentFromSourceResponse) {} rpc GetState(GetStateRequest) returns (GetStateResponse) {} diff --git a/proto/blobcache_grpc.pb.go b/proto/blobcache_grpc.pb.go index 9204ba4..fa90b75 100644 --- a/proto/blobcache_grpc.pb.go +++ b/proto/blobcache_grpc.pb.go @@ -20,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion7 const ( BlobCache_GetContent_FullMethodName = "/blobcache.BlobCache/GetContent" + BlobCache_GetContentStream_FullMethodName = "/blobcache.BlobCache/GetContentStream" BlobCache_StoreContent_FullMethodName = "/blobcache.BlobCache/StoreContent" BlobCache_StoreContentFromSource_FullMethodName = "/blobcache.BlobCache/StoreContentFromSource" BlobCache_GetState_FullMethodName = "/blobcache.BlobCache/GetState" @@ -30,6 +31,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type BlobCacheClient interface { GetContent(ctx context.Context, in *GetContentRequest, opts ...grpc.CallOption) (*GetContentResponse, error) + GetContentStream(ctx context.Context, in *GetContentRequest, opts ...grpc.CallOption) (BlobCache_GetContentStreamClient, error) StoreContent(ctx context.Context, opts ...grpc.CallOption) (BlobCache_StoreContentClient, error) StoreContentFromSource(ctx context.Context, in *StoreContentFromSourceRequest, opts ...grpc.CallOption) (*StoreContentFromSourceResponse, error) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) @@ -52,8 +54,40 @@ func (c *blobCacheClient) GetContent(ctx context.Context, in *GetContentRequest, return out, nil } +func (c *blobCacheClient) GetContentStream(ctx context.Context, in *GetContentRequest, opts ...grpc.CallOption) (BlobCache_GetContentStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &BlobCache_ServiceDesc.Streams[0], BlobCache_GetContentStream_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &blobCacheGetContentStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type BlobCache_GetContentStreamClient interface { + Recv() (*GetContentResponse, error) + grpc.ClientStream +} + +type blobCacheGetContentStreamClient struct { + grpc.ClientStream +} + +func (x *blobCacheGetContentStreamClient) Recv() (*GetContentResponse, error) { + m := new(GetContentResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *blobCacheClient) StoreContent(ctx context.Context, opts ...grpc.CallOption) (BlobCache_StoreContentClient, error) { - stream, err := c.cc.NewStream(ctx, &BlobCache_ServiceDesc.Streams[0], BlobCache_StoreContent_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &BlobCache_ServiceDesc.Streams[1], BlobCache_StoreContent_FullMethodName, opts...) if err != nil { return nil, err } @@ -109,6 +143,7 @@ func (c *blobCacheClient) GetState(ctx context.Context, in *GetStateRequest, opt // for forward compatibility type BlobCacheServer interface { GetContent(context.Context, *GetContentRequest) (*GetContentResponse, error) + GetContentStream(*GetContentRequest, BlobCache_GetContentStreamServer) error StoreContent(BlobCache_StoreContentServer) error StoreContentFromSource(context.Context, *StoreContentFromSourceRequest) (*StoreContentFromSourceResponse, error) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) @@ -122,6 +157,9 @@ type UnimplementedBlobCacheServer struct { func (UnimplementedBlobCacheServer) GetContent(context.Context, *GetContentRequest) (*GetContentResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetContent not implemented") } +func (UnimplementedBlobCacheServer) GetContentStream(*GetContentRequest, BlobCache_GetContentStreamServer) error { + return status.Errorf(codes.Unimplemented, "method GetContentStream not implemented") +} func (UnimplementedBlobCacheServer) StoreContent(BlobCache_StoreContentServer) error { return status.Errorf(codes.Unimplemented, "method StoreContent not implemented") } @@ -162,6 +200,27 @@ func _BlobCache_GetContent_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _BlobCache_GetContentStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetContentRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(BlobCacheServer).GetContentStream(m, &blobCacheGetContentStreamServer{stream}) +} + +type BlobCache_GetContentStreamServer interface { + Send(*GetContentResponse) error + grpc.ServerStream +} + +type blobCacheGetContentStreamServer struct { + grpc.ServerStream +} + +func (x *blobCacheGetContentStreamServer) Send(m *GetContentResponse) error { + return x.ServerStream.SendMsg(m) +} + func _BlobCache_StoreContent_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(BlobCacheServer).StoreContent(&blobCacheStoreContentServer{stream}) } @@ -245,6 +304,11 @@ var BlobCache_ServiceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{ + { + StreamName: "GetContentStream", + Handler: _BlobCache_GetContentStream_Handler, + ServerStreams: true, + }, { StreamName: "StoreContent", Handler: _BlobCache_StoreContent_Handler, From 2d8e92c2018ac5d063f44056d443d3b70013f90e Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Tue, 17 Dec 2024 17:16:18 -0500 Subject: [PATCH 02/17] update to not read all content --- pkg/blobfs_node.go | 3 +++ pkg/server.go | 31 ++++++++++++++++--------------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/pkg/blobfs_node.go b/pkg/blobfs_node.go index a4c081c..a6fa4e2 100644 --- a/pkg/blobfs_node.go +++ b/pkg/blobfs_node.go @@ -128,6 +128,9 @@ func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (* return nil, syscall.ENOENT } + // TODO: stream file to a temp file in the container somewhere + // /tmp/cache/path/to/file + out.Attr = *attr return node, fs.OK } diff --git a/pkg/server.go b/pkg/server.go index 25e1e4f..9035c73 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -335,32 +335,33 @@ func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.S } func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error { - dst := make([]byte, req.Length) + chunkSize := int64(1024 * 1024 * 64) // 64MB chunks + offset := req.Offset - err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) - if err != nil { - Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) - return status.Errorf(codes.NotFound, "Content not found: %v", err) - } - - Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, len(dst)) - - chunkSize := 1024 * 1024 * 32 // 32MB chunks - for start := 0; start < len(dst); start += chunkSize { - end := start + chunkSize - if end > len(dst) { - end = len(dst) + for { + content, err := cs.cas.Get(req.Hash, offset, chunkSize) + if err != nil { + Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) + return status.Errorf(codes.NotFound, "Content not found: %v", err) } + Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, len(content)) + resp := &proto.GetContentResponse{ Ok: true, - Content: dst[start:end], + Content: content, } if err := stream.Send(resp); err != nil { Logger.Errorf("GetContentStream - failed to send chunk: %v", err) return status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) } + + if int64(len(content)) < chunkSize { + break + } + + offset += int64(len(content)) } Logger.Debugf("GetContentStream - [%s] completed", req.Hash) From bd5b3120d98de5fdf3101d4f7ea1ea39275fa814 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Tue, 17 Dec 2024 18:05:07 -0500 Subject: [PATCH 03/17] wip --- e2e/testclient/main.go | 33 ++++++++++-- pkg/client.go | 111 +++++++++++++++++++++-------------------- pkg/server.go | 41 ++++++++++++--- pkg/storage.go | 5 +- 4 files changed, 124 insertions(+), 66 deletions(-) diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index f700e8e..bff1d8f 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -92,14 +92,36 @@ func main() { } var content []byte + chunkQueue := make(chan []byte, 10) // Buffered channel to queue chunks + done := make(chan struct{}) // Channel to signal completion + + // Goroutine to write chunks to file + go func() { + file, err := os.Create("output.bin") + if err != nil { + log.Fatalf("Unable to create output file: %v\n", err) + } + defer file.Close() + + for chunk := range chunkQueue { + _, err := file.Write(chunk) + if err != nil { + log.Fatalf("Error writing chunk to file: %v\n", err) + } + } + close(done) + }() + for { chunk, ok := <-contentChan if !ok { - log.Println("Content channel closed, exiting loop") break } - content = append(content, chunk...) + chunkQueue <- chunk } + close(chunkQueue) // Close the queue to signal no more chunks + + <-done // Wait for the file writing to complete elapsedTime := time.Since(startTime).Seconds() totalTime += elapsedTime @@ -145,6 +167,11 @@ func main() { } averageTime := totalTime / float64(totalIterations) - mbPerSecond := (float64(len(b)*totalIterations) / (1024 * 1024)) / averageTime + totalBytesReadMB := float64(len(b)*totalIterations) / (1024 * 1024) + mbPerSecond := totalBytesReadMB / totalTime + + log.Printf("Total time: %f seconds\n", totalTime) + log.Printf("Average time per iteration: %f seconds\n", averageTime) + log.Printf("Total read: %.2f MB\n", totalBytesReadMB) log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecond) } diff --git a/pkg/client.go b/pkg/client.go index a779870..9ee07ce 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "fmt" "io" - "log" "math" "net" "sync" @@ -266,6 +265,62 @@ func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64) ([ return nil, ErrContentNotFound } +func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64) (chan []byte, error) { + ctx, cancel := context.WithTimeout(c.ctx, getContentRequestTimeout) + contentChan := make(chan []byte) + + go func() { + defer close(contentChan) + defer cancel() + + for attempt := 0; attempt < 3; attempt++ { + client, host, err := c.getGRPCClient(ctx, &ClientRequest{ + rt: ClientRequestTypeRetrieval, + hash: hash, + }) + if err != nil { + return + } + + stream, err := client.GetContentStream(ctx, &proto.GetContentRequest{Hash: hash, Offset: offset, Length: length}) + if err != nil { + c.metadata.RemoveEntryLocation(ctx, hash, host) + c.mu.Lock() + delete(c.localHostCache, hash) + c.mu.Unlock() + continue + } + + for { + resp, err := stream.Recv() + if err != nil { + if err == io.EOF { + return + } + + c.metadata.RemoveEntryLocation(ctx, hash, host) + c.mu.Lock() + delete(c.localHostCache, hash) + c.mu.Unlock() + break + } + + if !resp.Ok { + c.metadata.RemoveEntryLocation(ctx, hash, host) + c.mu.Lock() + delete(c.localHostCache, hash) + c.mu.Unlock() + break + } + + contentChan <- resp.Content + } + } + }() + + return contentChan, nil +} + func (c *BlobCacheClient) manageLocalClientCache(ttl time.Duration, interval time.Duration) { go func() { ticker := time.NewTicker(interval) @@ -515,57 +570,3 @@ func (c *BlobCacheClient) GetState() error { return nil } - -func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64) (chan []byte, error) { - ctx, cancel := context.WithTimeout(c.ctx, getContentStreamRequestTimeout) - contentChan := make(chan []byte) - - go func() { - defer func() { - close(contentChan) - }() - - defer cancel() - - for attempt := 0; attempt < 3; attempt++ { - client, host, err := c.getGRPCClient(ctx, &ClientRequest{ - rt: ClientRequestTypeRetrieval, - hash: hash, - }) - if err != nil { - log.Printf("Error getting content stream for %s: %v\n", hash, err) - return - } - - stream, err := client.GetContentStream(ctx, &proto.GetContentRequest{Hash: hash, Offset: offset, Length: length}) - if err != nil { - c.metadata.RemoveEntryLocation(ctx, hash, host) - c.mu.Lock() - delete(c.localHostCache, hash) - c.mu.Unlock() - continue - } - - for { - resp, err := stream.Recv() - if err == io.EOF { - return - } - - // If an unexpected error occurs, assume the content is no - // longer available on this host and remove entry from metadata - if err != nil || !resp.Ok { - c.metadata.RemoveEntryLocation(ctx, hash, host) - c.mu.Lock() - delete(c.localHostCache, hash) - c.mu.Unlock() - break - } - - contentChan <- resp.Content - } - } - }() - - return contentChan, nil -} diff --git a/pkg/server.go b/pkg/server.go index 9035c73..2efb409 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) @@ -158,6 +159,10 @@ func (cs *CacheService) StartServer(port uint) error { grpc.MaxSendMsgSize(maxMessageSize), grpc.WriteBufferSize(writeBufferSizeBytes), grpc.NumStreamWorkers(uint32(runtime.NumCPU())), + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: 10 * time.Second, + Timeout: 20 * time.Second, + }), ) proto.RegisterBlobCacheServer(s, cs) @@ -335,28 +340,42 @@ func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.S } func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error { - chunkSize := int64(1024 * 1024 * 64) // 64MB chunks + const chunkSize = int64(1024 * 1024 * 64) // 64MB chunks offset := req.Offset + // Create a buffered channel to queue chunks + chunkChan := make(chan *proto.GetContentResponse, 10) + errChan := make(chan error, 1) + + // Goroutine to send chunks in order + go func() { + for resp := range chunkChan { + if err := stream.Send(resp); err != nil { + errChan <- status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) + return + } + } + errChan <- nil + }() + for { + // Fetch content in chunks content, err := cs.cas.Get(req.Hash, offset, chunkSize) if err != nil { Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) + close(chunkChan) return status.Errorf(codes.NotFound, "Content not found: %v", err) } Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, len(content)) - resp := &proto.GetContentResponse{ + // Queue the chunk for sending + chunkChan <- &proto.GetContentResponse{ Ok: true, Content: content, } - if err := stream.Send(resp); err != nil { - Logger.Errorf("GetContentStream - failed to send chunk: %v", err) - return status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) - } - + // Break if this is the last chunk if int64(len(content)) < chunkSize { break } @@ -364,6 +383,14 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr offset += int64(len(content)) } + // Close the channel to signal completion + close(chunkChan) + + // Wait for the sending goroutine to finish + if err := <-errChan; err != nil { + return err + } + Logger.Debugf("GetContentStream - [%s] completed", req.Hash) return nil } diff --git a/pkg/storage.go b/pkg/storage.go index db9362c..266be84 100644 --- a/pkg/storage.go +++ b/pkg/storage.go @@ -152,7 +152,10 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst return ErrContentNotFound } - v := value.(cacheValue) + v, ok := value.(cacheValue) + if !ok { + return nil, fmt.Errorf("unexpected cache value type") + } chunkBytes := v.Content start := o % cas.config.PageSizeBytes From c643eb268c0e7f837108d8cbd43531d0780eb713 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Tue, 17 Dec 2024 18:08:52 -0500 Subject: [PATCH 04/17] fix comparison --- e2e/testclient/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index bff1d8f..e73a773 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -95,7 +95,7 @@ func main() { chunkQueue := make(chan []byte, 10) // Buffered channel to queue chunks done := make(chan struct{}) // Channel to signal completion - // Goroutine to write chunks to file + // Goroutine to write chunks to file and accumulate content go func() { file, err := os.Create("output.bin") if err != nil { @@ -108,6 +108,7 @@ func main() { if err != nil { log.Fatalf("Error writing chunk to file: %v\n", err) } + content = append(content, chunk...) // Accumulate chunks into content } close(done) }() From 64e29b4b721ca36726d44b27d765d301e83a9b35 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Tue, 17 Dec 2024 18:24:50 -0500 Subject: [PATCH 05/17] wip --- e2e/testclient/main.go | 2 +- pkg/server.go | 31 +++---------------------------- 2 files changed, 4 insertions(+), 29 deletions(-) diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index e73a773..bc7a8a2 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -108,7 +108,7 @@ func main() { if err != nil { log.Fatalf("Error writing chunk to file: %v\n", err) } - content = append(content, chunk...) // Accumulate chunks into content + // content = append(content, chunk...) // Accumulate chunks into content } close(done) }() diff --git a/pkg/server.go b/pkg/server.go index 2efb409..49ae013 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -343,36 +343,20 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr const chunkSize = int64(1024 * 1024 * 64) // 64MB chunks offset := req.Offset - // Create a buffered channel to queue chunks - chunkChan := make(chan *proto.GetContentResponse, 10) - errChan := make(chan error, 1) - - // Goroutine to send chunks in order - go func() { - for resp := range chunkChan { - if err := stream.Send(resp); err != nil { - errChan <- status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) - return - } - } - errChan <- nil - }() - for { - // Fetch content in chunks content, err := cs.cas.Get(req.Hash, offset, chunkSize) if err != nil { Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) - close(chunkChan) return status.Errorf(codes.NotFound, "Content not found: %v", err) } Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, len(content)) - // Queue the chunk for sending - chunkChan <- &proto.GetContentResponse{ + if err := stream.Send(&proto.GetContentResponse{ Ok: true, Content: content, + }); err != nil { + return status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) } // Break if this is the last chunk @@ -383,14 +367,5 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr offset += int64(len(content)) } - // Close the channel to signal completion - close(chunkChan) - - // Wait for the sending goroutine to finish - if err := <-errChan; err != nil { - return err - } - - Logger.Debugf("GetContentStream - [%s] completed", req.Hash) return nil } From 46af505d03dcf0272c19d4e3a56d52f09bebcda0 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Tue, 17 Dec 2024 18:26:53 -0500 Subject: [PATCH 06/17] improve buffer latency --- pkg/blobfs_node.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/blobfs_node.go b/pkg/blobfs_node.go index a6fa4e2..663b654 100644 --- a/pkg/blobfs_node.go +++ b/pkg/blobfs_node.go @@ -175,8 +175,7 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int return nil, syscall.EIO } - nRead := copy(dest, buffer) - return fuse.ReadResultData(dest[:nRead]), fs.OK + return fuse.ReadResultData(buffer), fs.OK } func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno) { From 0eb970b5a661ed1e8a0becaaf170dfc21c76ad87 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Tue, 17 Dec 2024 20:54:49 -0500 Subject: [PATCH 07/17] update golang version --- Dockerfile | 2 +- go.mod | 19 +++++++++---------- go.sum | 50 ++++++++++++++++++++++++++------------------------ 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1856df6..aab0905 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1.6 -FROM golang:1.22-bullseye AS build +FROM golang:1.23.4-bullseye AS build WORKDIR /workspace diff --git a/go.mod b/go.mod index ed822e5..7238841 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/beam-cloud/blobcache-v2 -go 1.22.0 +go 1.23.1 -toolchain go1.22.4 +toolchain go1.23.4 require ( github.com/beam-cloud/ristretto v0.0.0-20241013204426-d1403e359aa2 @@ -21,7 +21,7 @@ require ( go.uber.org/zap v1.27.0 google.golang.org/grpc v1.62.0 google.golang.org/protobuf v1.33.0 - tailscale.com v1.72.1 + tailscale.com v1.78.3 ) require ( @@ -65,12 +65,12 @@ require ( github.com/gorilla/csrf v1.7.2 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/hdevalence/ed25519consensus v0.2.0 // indirect - github.com/illarion/gonotify v1.0.1 // indirect + github.com/illarion/gonotify/v2 v2.0.3 // indirect github.com/insomniacslk/dhcp v0.0.0-20231206064809-8c70d406f6d2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect github.com/jsimonetti/rtnetlink v1.4.0 // indirect - github.com/klauspost/compress v1.17.7 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/kortschak/wol v0.0.0-20200729010619-da482cc4850a // indirect github.com/mdlayher/genetlink v1.3.2 // indirect @@ -91,13 +91,12 @@ require ( github.com/tailscale/golang-x-crypto v0.0.0-20240604161659-3fde5e568aa4 // indirect github.com/tailscale/goupnp v1.0.1-0.20210804011211-c64d0f06ea05 // indirect github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a // indirect - github.com/tailscale/netlink v1.1.1-0.20211101221916-cabfb018fe85 // indirect + github.com/tailscale/netlink v1.1.1-0.20240822203006-4d49adab4de7 // indirect github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4 // indirect github.com/tailscale/web-client-prebuilt v0.0.0-20240226180453-5db17b287bf1 // indirect - github.com/tailscale/wireguard-go v0.0.0-20240731203015-71393c576b98 // indirect + github.com/tailscale/wireguard-go v0.0.0-20241113014420-4e883d38c8d3 // indirect github.com/tcnksm/go-httpstat v0.2.0 // indirect github.com/u-root/uio v0.0.0-20240118234441-a3c409a6018e // indirect - github.com/vishvananda/netlink v1.2.1-beta.2 // indirect github.com/vishvananda/netns v0.0.4 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect @@ -108,8 +107,8 @@ require ( golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.27.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.25.0 // indirect + golang.org/x/sync v0.9.0 // indirect + golang.org/x/sys v0.27.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/go.sum b/go.sum index f986f8a..8d7d4b6 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= filippo.io/mkcert v1.4.4 h1:8eVbbwfVlaqUM7OwuftKc2nuYOoTDQWqsoXmzoXZdbc= filippo.io/mkcert v1.4.4/go.mod h1:VyvOchVuAye3BoUsPUOOofKygVwLV2KQMVFJNRq+1dA= -github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= -github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c h1:pxW6RcqyfI9/kWtOwnv/G+AzdKuy2ZrqINhenH4HyNs= +github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/akutz/memconn v0.1.0 h1:NawI0TORU4hcOMsMr11g7vwlCdkYeLKXBcxWu2W/P8A= github.com/akutz/memconn v0.1.0/go.mod h1:Jo8rI7m0NieZyLI5e2CDlRdRqRRB4S7Xp77ukDjH+Fw= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= @@ -120,8 +120,8 @@ github.com/hanwen/go-fuse/v2 v2.5.1 h1:OQBE8zVemSocRxA4OaFJbjJ5hlpCmIWbGr7r0M4uo github.com/hanwen/go-fuse/v2 v2.5.1/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs= github.com/hdevalence/ed25519consensus v0.2.0 h1:37ICyZqdyj0lAZ8P4D1d1id3HqbbG1N3iBb1Tb4rdcU= github.com/hdevalence/ed25519consensus v0.2.0/go.mod h1:w3BHWjwJbFU29IRHL1Iqkw3sus+7FctEyM4RqDxYNzo= -github.com/illarion/gonotify v1.0.1 h1:F1d+0Fgbq/sDWjj/r66ekjDG+IDeecQKUFH4wNwsoio= -github.com/illarion/gonotify v1.0.1/go.mod h1:zt5pmDofZpU1f8aqlK0+95eQhoEAn/d4G4B/FjVW4jE= +github.com/illarion/gonotify/v2 v2.0.3 h1:B6+SKPo/0Sw8cRJh1aLzNEeNVFfzE3c6N+o+vyxM+9A= +github.com/illarion/gonotify/v2 v2.0.3/go.mod h1:38oIJTgFqupkEydkkClkbL6i5lXV/bxdH9do5TALPEE= github.com/insomniacslk/dhcp v0.0.0-20231206064809-8c70d406f6d2 h1:9K06NfxkBh25x56yVhWWlKFE8YpicaSfHwoV8SFbueA= github.com/insomniacslk/dhcp v0.0.0-20231206064809-8c70d406f6d2/go.mod h1:3A9PQ1cunSDF/1rbTq99Ts4pVnycWg+vlPkfeD2NLFI= github.com/jellydator/ttlcache/v3 v3.1.0 h1:0gPFG0IHHP6xyUyXq+JaD8fwkDCqgqwohXNJBcYE71g= @@ -135,8 +135,8 @@ github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 h1:elKwZS1OcdQ0 github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86/go.mod h1:aFAMtuldEgx/4q7iSGazk22+IcgvtiC+HIimFO9XlS8= github.com/jsimonetti/rtnetlink v1.4.0 h1:Z1BF0fRgcETPEa0Kt0MRk3yV5+kF1FWTni6KUFKrq2I= github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4oskfOqvPteYS6E= -github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= -github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/parsers/json v0.1.0 h1:dzSZl5pf5bBcW0Acnu20Djleto19T0CfHcvZ14NJ6fU= @@ -196,10 +196,14 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus-community/pro-bing v0.4.0 h1:YMbv+i08gQz97OZZBwLyvmmQEEzyfyrrjEaAchdy3R4= github.com/prometheus-community/pro-bing v0.4.0/go.mod h1:b7wRYZtCcPmt4Sz319BykUU241rWLe1VFXyiyWK/dH4= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/safchain/ethtool v0.3.0 h1:gimQJpsI6sc1yIqP/y8GYgiXn/NjgvpM0RNoWLVVmP0= github.com/safchain/ethtool v0.3.0/go.mod h1:SA9BwrgyAqNo7M+uaL6IYbxpm5wk3L7Mm6ocLW+CJUs= github.com/shirou/gopsutil v2.21.11+incompatible h1:lOGOyCG67a5dv2hq5Z1BLDUqqKp3HkbjPcz5j6XMS0U= @@ -217,16 +221,16 @@ github.com/tailscale/goupnp v1.0.1-0.20210804011211-c64d0f06ea05 h1:4chzWmimtJPx github.com/tailscale/goupnp v1.0.1-0.20210804011211-c64d0f06ea05/go.mod h1:PdCqy9JzfWMJf1H5UJW2ip33/d4YkoKN0r67yKH1mG8= github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a h1:SJy1Pu0eH1C29XwJucQo73FrleVK6t4kYz4NVhp34Yw= github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a/go.mod h1:DFSS3NAGHthKo1gTlmEcSBiZrRJXi28rLNd/1udP1c8= -github.com/tailscale/netlink v1.1.1-0.20211101221916-cabfb018fe85 h1:zrsUcqrG2uQSPhaUPjUQwozcRdDdSxxqhNgNZ3drZFk= -github.com/tailscale/netlink v1.1.1-0.20211101221916-cabfb018fe85/go.mod h1:NzVQi3Mleb+qzq8VmcWpSkcSYxXIg0DkI6XDzpVkhJ0= +github.com/tailscale/netlink v1.1.1-0.20240822203006-4d49adab4de7 h1:uFsXVBE9Qr4ZoF094vE6iYTLDl0qCiKzYXlL6UeWObU= +github.com/tailscale/netlink v1.1.1-0.20240822203006-4d49adab4de7/go.mod h1:NzVQi3Mleb+qzq8VmcWpSkcSYxXIg0DkI6XDzpVkhJ0= github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4 h1:Gz0rz40FvFVLTBk/K8UNAenb36EbDSnh+q7Z9ldcC8w= github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4/go.mod h1:phI29ccmHQBc+wvroosENp1IF9195449VDnFDhJ4rJU= github.com/tailscale/web-client-prebuilt v0.0.0-20240226180453-5db17b287bf1 h1:tdUdyPqJ0C97SJfjB9tW6EylTtreyee9C44de+UBG0g= github.com/tailscale/web-client-prebuilt v0.0.0-20240226180453-5db17b287bf1/go.mod h1:agQPE6y6ldqCOui2gkIh7ZMztTkIQKH049tv8siLuNQ= github.com/tailscale/wf v0.0.0-20240214030419-6fbb0a674ee6 h1:l10Gi6w9jxvinoiq15g8OToDdASBni4CyJOdHY1Hr8M= github.com/tailscale/wf v0.0.0-20240214030419-6fbb0a674ee6/go.mod h1:ZXRML051h7o4OcI0d3AaILDIad/Xw0IkXaHM17dic1Y= -github.com/tailscale/wireguard-go v0.0.0-20240731203015-71393c576b98 h1:RNpJrXfI5u6e+uzyIzvmnXbhmhdRkVf//90sMBH3lso= -github.com/tailscale/wireguard-go v0.0.0-20240731203015-71393c576b98/go.mod h1:BOm5fXUBFM+m9woLNBoxI9TaBXXhGNP50LX/TGIvGb4= +github.com/tailscale/wireguard-go v0.0.0-20241113014420-4e883d38c8d3 h1:dmoPb3dG27tZgMtrvqfD/LW4w7gA6BSWl8prCPNmkCQ= +github.com/tailscale/wireguard-go v0.0.0-20241113014420-4e883d38c8d3/go.mod h1:BOm5fXUBFM+m9woLNBoxI9TaBXXhGNP50LX/TGIvGb4= github.com/tailscale/xnet v0.0.0-20240729143630-8497ac4dab2e h1:zOGKqN5D5hHhiYUp091JqK7DPCqSARyUfduhGUY8Bek= github.com/tailscale/xnet v0.0.0-20240729143630-8497ac4dab2e/go.mod h1:orPd6JZXXRyuDusYilywte7k094d7dycXXU5YnWsrwg= github.com/tc-hib/winres v0.2.1 h1:YDE0FiP0VmtRaDn7+aaChp1KiF4owBiJa5l964l5ujA= @@ -237,8 +241,6 @@ github.com/u-root/u-root v0.12.0 h1:K0AuBFriwr0w/PGS3HawiAw89e3+MU7ks80GpghAsNs= github.com/u-root/u-root v0.12.0/go.mod h1:FYjTOh4IkIZHhjsd17lb8nYW6udgXdJhG1c0r6u0arI= github.com/u-root/uio v0.0.0-20240118234441-a3c409a6018e h1:BA9O3BmlTmpjbvajAwzWx4Wo2TRVdpPXZEeemGQcajw= github.com/u-root/uio v0.0.0-20240118234441-a3c409a6018e/go.mod h1:eLL9Nub3yfAho7qB0MzZizFhTU2QkLeoVsWdHtDW264= -github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= -github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= @@ -260,8 +262,8 @@ golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= -golang.org/x/exp/typeparams v0.0.0-20240119083558-1b970713d09a h1:8qmSSA8Gz/1kTrCe0nqR0R3Gb/NDhykzWw2q2mWZydM= -golang.org/x/exp/typeparams v0.0.0-20240119083558-1b970713d09a/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= +golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f h1:phY1HzDcf18Aq9A8KkmRtY9WvOFIxN8wgfvy6Zm1DV8= +golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/image v0.18.0 h1:jGzIakQa/ZXI1I0Fxvaa9W7yP25TqT6cHIHn+6CqvSQ= golang.org/x/image v0.18.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E= golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= @@ -270,8 +272,8 @@ golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -281,8 +283,8 @@ golang.org/x/sys v0.0.0-20220817070843-5a390386f1f2/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.1-0.20230131160137-e7d7f63158de/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= @@ -311,11 +313,11 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gvisor.dev/gvisor v0.0.0-20240722211153-64c016c92987 h1:TU8z2Lh3Bbq77w0t1eG8yRlLcNHzZu3x6mhoH2Mk0c8= gvisor.dev/gvisor v0.0.0-20240722211153-64c016c92987/go.mod h1:sxc3Uvk/vHcd3tj7/DHVBoR5wvWT/MmRq2pj7HRJnwU= -honnef.co/go/tools v0.4.6 h1:oFEHCKeID7to/3autwsWfnuv69j3NsfcXbvJKuIcep8= -honnef.co/go/tools v0.4.6/go.mod h1:+rnGS1THNh8zMwnd2oVOTL9QF6vmfyG6ZXBULae2uc0= +honnef.co/go/tools v0.5.1 h1:4bH5o3b5ZULQ4UrBmP+63W9r7qIkqJClEA9ko5YKx+I= +honnef.co/go/tools v0.5.1/go.mod h1:e9irvo83WDG9/irijV44wr3tbhcFeRnfpVlRqVwpzMs= howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM= howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= software.sslmate.com/src/go-pkcs12 v0.4.0 h1:H2g08FrTvSFKUj+D309j1DPfk5APnIdAQAB8aEykJ5k= software.sslmate.com/src/go-pkcs12 v0.4.0/go.mod h1:Qiz0EyvDRJjjxGyUQa2cCNZn/wMyzrRJ/qcDXOQazLI= -tailscale.com v1.72.1 h1:hk82jek36ph2S3Tfsh57NVWKEm/pZ9nfUonvlowpfaA= -tailscale.com v1.72.1/go.mod h1:v7OHtg0KLAnhOVf81Z8WrjNefj238QbFhgkWJQoKxbs= +tailscale.com v1.78.3 h1:2BJepIEYA0ba0ZXn2rOuZjYzIV4Az+X9RS5XJF007Ug= +tailscale.com v1.78.3/go.mod h1:gT7ALbLFCr2YIu0kgc9Q3tBVaTlod65D2N6jMLH11Bk= From 9380cdba25c9a390475476d14c5be2d70a7d1a1a Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 08:57:32 -0500 Subject: [PATCH 08/17] Revert "update golang version" This reverts commit 0eb970b5a661ed1e8a0becaaf170dfc21c76ad87. --- Dockerfile | 2 +- go.mod | 19 ++++++++++--------- go.sum | 50 ++++++++++++++++++++++++-------------------------- 3 files changed, 35 insertions(+), 36 deletions(-) diff --git a/Dockerfile b/Dockerfile index aab0905..1856df6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1.6 -FROM golang:1.23.4-bullseye AS build +FROM golang:1.22-bullseye AS build WORKDIR /workspace diff --git a/go.mod b/go.mod index 7238841..ed822e5 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/beam-cloud/blobcache-v2 -go 1.23.1 +go 1.22.0 -toolchain go1.23.4 +toolchain go1.22.4 require ( github.com/beam-cloud/ristretto v0.0.0-20241013204426-d1403e359aa2 @@ -21,7 +21,7 @@ require ( go.uber.org/zap v1.27.0 google.golang.org/grpc v1.62.0 google.golang.org/protobuf v1.33.0 - tailscale.com v1.78.3 + tailscale.com v1.72.1 ) require ( @@ -65,12 +65,12 @@ require ( github.com/gorilla/csrf v1.7.2 // indirect github.com/gorilla/securecookie v1.1.2 // indirect github.com/hdevalence/ed25519consensus v0.2.0 // indirect - github.com/illarion/gonotify/v2 v2.0.3 // indirect + github.com/illarion/gonotify v1.0.1 // indirect github.com/insomniacslk/dhcp v0.0.0-20231206064809-8c70d406f6d2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect github.com/jsimonetti/rtnetlink v1.4.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.17.7 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/kortschak/wol v0.0.0-20200729010619-da482cc4850a // indirect github.com/mdlayher/genetlink v1.3.2 // indirect @@ -91,12 +91,13 @@ require ( github.com/tailscale/golang-x-crypto v0.0.0-20240604161659-3fde5e568aa4 // indirect github.com/tailscale/goupnp v1.0.1-0.20210804011211-c64d0f06ea05 // indirect github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a // indirect - github.com/tailscale/netlink v1.1.1-0.20240822203006-4d49adab4de7 // indirect + github.com/tailscale/netlink v1.1.1-0.20211101221916-cabfb018fe85 // indirect github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4 // indirect github.com/tailscale/web-client-prebuilt v0.0.0-20240226180453-5db17b287bf1 // indirect - github.com/tailscale/wireguard-go v0.0.0-20241113014420-4e883d38c8d3 // indirect + github.com/tailscale/wireguard-go v0.0.0-20240731203015-71393c576b98 // indirect github.com/tcnksm/go-httpstat v0.2.0 // indirect github.com/u-root/uio v0.0.0-20240118234441-a3c409a6018e // indirect + github.com/vishvananda/netlink v1.2.1-beta.2 // indirect github.com/vishvananda/netns v0.0.4 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect @@ -107,8 +108,8 @@ require ( golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.27.0 // indirect - golang.org/x/sync v0.9.0 // indirect - golang.org/x/sys v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.25.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/go.sum b/go.sum index 8d7d4b6..f986f8a 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= filippo.io/mkcert v1.4.4 h1:8eVbbwfVlaqUM7OwuftKc2nuYOoTDQWqsoXmzoXZdbc= filippo.io/mkcert v1.4.4/go.mod h1:VyvOchVuAye3BoUsPUOOofKygVwLV2KQMVFJNRq+1dA= -github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c h1:pxW6RcqyfI9/kWtOwnv/G+AzdKuy2ZrqINhenH4HyNs= -github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/akutz/memconn v0.1.0 h1:NawI0TORU4hcOMsMr11g7vwlCdkYeLKXBcxWu2W/P8A= github.com/akutz/memconn v0.1.0/go.mod h1:Jo8rI7m0NieZyLI5e2CDlRdRqRRB4S7Xp77ukDjH+Fw= github.com/alexbrainman/sspi v0.0.0-20231016080023-1a75b4708caa h1:LHTHcTQiSGT7VVbI0o4wBRNQIgn917usHWOd6VAffYI= @@ -120,8 +120,8 @@ github.com/hanwen/go-fuse/v2 v2.5.1 h1:OQBE8zVemSocRxA4OaFJbjJ5hlpCmIWbGr7r0M4uo github.com/hanwen/go-fuse/v2 v2.5.1/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs= github.com/hdevalence/ed25519consensus v0.2.0 h1:37ICyZqdyj0lAZ8P4D1d1id3HqbbG1N3iBb1Tb4rdcU= github.com/hdevalence/ed25519consensus v0.2.0/go.mod h1:w3BHWjwJbFU29IRHL1Iqkw3sus+7FctEyM4RqDxYNzo= -github.com/illarion/gonotify/v2 v2.0.3 h1:B6+SKPo/0Sw8cRJh1aLzNEeNVFfzE3c6N+o+vyxM+9A= -github.com/illarion/gonotify/v2 v2.0.3/go.mod h1:38oIJTgFqupkEydkkClkbL6i5lXV/bxdH9do5TALPEE= +github.com/illarion/gonotify v1.0.1 h1:F1d+0Fgbq/sDWjj/r66ekjDG+IDeecQKUFH4wNwsoio= +github.com/illarion/gonotify v1.0.1/go.mod h1:zt5pmDofZpU1f8aqlK0+95eQhoEAn/d4G4B/FjVW4jE= github.com/insomniacslk/dhcp v0.0.0-20231206064809-8c70d406f6d2 h1:9K06NfxkBh25x56yVhWWlKFE8YpicaSfHwoV8SFbueA= github.com/insomniacslk/dhcp v0.0.0-20231206064809-8c70d406f6d2/go.mod h1:3A9PQ1cunSDF/1rbTq99Ts4pVnycWg+vlPkfeD2NLFI= github.com/jellydator/ttlcache/v3 v3.1.0 h1:0gPFG0IHHP6xyUyXq+JaD8fwkDCqgqwohXNJBcYE71g= @@ -135,8 +135,8 @@ github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 h1:elKwZS1OcdQ0 github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86/go.mod h1:aFAMtuldEgx/4q7iSGazk22+IcgvtiC+HIimFO9XlS8= github.com/jsimonetti/rtnetlink v1.4.0 h1:Z1BF0fRgcETPEa0Kt0MRk3yV5+kF1FWTni6KUFKrq2I= github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4oskfOqvPteYS6E= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/parsers/json v0.1.0 h1:dzSZl5pf5bBcW0Acnu20Djleto19T0CfHcvZ14NJ6fU= @@ -196,14 +196,10 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus-community/pro-bing v0.4.0 h1:YMbv+i08gQz97OZZBwLyvmmQEEzyfyrrjEaAchdy3R4= github.com/prometheus-community/pro-bing v0.4.0/go.mod h1:b7wRYZtCcPmt4Sz319BykUU241rWLe1VFXyiyWK/dH4= -github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= -github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= -github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= -github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/safchain/ethtool v0.3.0 h1:gimQJpsI6sc1yIqP/y8GYgiXn/NjgvpM0RNoWLVVmP0= github.com/safchain/ethtool v0.3.0/go.mod h1:SA9BwrgyAqNo7M+uaL6IYbxpm5wk3L7Mm6ocLW+CJUs= github.com/shirou/gopsutil v2.21.11+incompatible h1:lOGOyCG67a5dv2hq5Z1BLDUqqKp3HkbjPcz5j6XMS0U= @@ -221,16 +217,16 @@ github.com/tailscale/goupnp v1.0.1-0.20210804011211-c64d0f06ea05 h1:4chzWmimtJPx github.com/tailscale/goupnp v1.0.1-0.20210804011211-c64d0f06ea05/go.mod h1:PdCqy9JzfWMJf1H5UJW2ip33/d4YkoKN0r67yKH1mG8= github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a h1:SJy1Pu0eH1C29XwJucQo73FrleVK6t4kYz4NVhp34Yw= github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a/go.mod h1:DFSS3NAGHthKo1gTlmEcSBiZrRJXi28rLNd/1udP1c8= -github.com/tailscale/netlink v1.1.1-0.20240822203006-4d49adab4de7 h1:uFsXVBE9Qr4ZoF094vE6iYTLDl0qCiKzYXlL6UeWObU= -github.com/tailscale/netlink v1.1.1-0.20240822203006-4d49adab4de7/go.mod h1:NzVQi3Mleb+qzq8VmcWpSkcSYxXIg0DkI6XDzpVkhJ0= +github.com/tailscale/netlink v1.1.1-0.20211101221916-cabfb018fe85 h1:zrsUcqrG2uQSPhaUPjUQwozcRdDdSxxqhNgNZ3drZFk= +github.com/tailscale/netlink v1.1.1-0.20211101221916-cabfb018fe85/go.mod h1:NzVQi3Mleb+qzq8VmcWpSkcSYxXIg0DkI6XDzpVkhJ0= github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4 h1:Gz0rz40FvFVLTBk/K8UNAenb36EbDSnh+q7Z9ldcC8w= github.com/tailscale/peercred v0.0.0-20240214030740-b535050b2aa4/go.mod h1:phI29ccmHQBc+wvroosENp1IF9195449VDnFDhJ4rJU= github.com/tailscale/web-client-prebuilt v0.0.0-20240226180453-5db17b287bf1 h1:tdUdyPqJ0C97SJfjB9tW6EylTtreyee9C44de+UBG0g= github.com/tailscale/web-client-prebuilt v0.0.0-20240226180453-5db17b287bf1/go.mod h1:agQPE6y6ldqCOui2gkIh7ZMztTkIQKH049tv8siLuNQ= github.com/tailscale/wf v0.0.0-20240214030419-6fbb0a674ee6 h1:l10Gi6w9jxvinoiq15g8OToDdASBni4CyJOdHY1Hr8M= github.com/tailscale/wf v0.0.0-20240214030419-6fbb0a674ee6/go.mod h1:ZXRML051h7o4OcI0d3AaILDIad/Xw0IkXaHM17dic1Y= -github.com/tailscale/wireguard-go v0.0.0-20241113014420-4e883d38c8d3 h1:dmoPb3dG27tZgMtrvqfD/LW4w7gA6BSWl8prCPNmkCQ= -github.com/tailscale/wireguard-go v0.0.0-20241113014420-4e883d38c8d3/go.mod h1:BOm5fXUBFM+m9woLNBoxI9TaBXXhGNP50LX/TGIvGb4= +github.com/tailscale/wireguard-go v0.0.0-20240731203015-71393c576b98 h1:RNpJrXfI5u6e+uzyIzvmnXbhmhdRkVf//90sMBH3lso= +github.com/tailscale/wireguard-go v0.0.0-20240731203015-71393c576b98/go.mod h1:BOm5fXUBFM+m9woLNBoxI9TaBXXhGNP50LX/TGIvGb4= github.com/tailscale/xnet v0.0.0-20240729143630-8497ac4dab2e h1:zOGKqN5D5hHhiYUp091JqK7DPCqSARyUfduhGUY8Bek= github.com/tailscale/xnet v0.0.0-20240729143630-8497ac4dab2e/go.mod h1:orPd6JZXXRyuDusYilywte7k094d7dycXXU5YnWsrwg= github.com/tc-hib/winres v0.2.1 h1:YDE0FiP0VmtRaDn7+aaChp1KiF4owBiJa5l964l5ujA= @@ -241,6 +237,8 @@ github.com/u-root/u-root v0.12.0 h1:K0AuBFriwr0w/PGS3HawiAw89e3+MU7ks80GpghAsNs= github.com/u-root/u-root v0.12.0/go.mod h1:FYjTOh4IkIZHhjsd17lb8nYW6udgXdJhG1c0r6u0arI= github.com/u-root/uio v0.0.0-20240118234441-a3c409a6018e h1:BA9O3BmlTmpjbvajAwzWx4Wo2TRVdpPXZEeemGQcajw= github.com/u-root/uio v0.0.0-20240118234441-a3c409a6018e/go.mod h1:eLL9Nub3yfAho7qB0MzZizFhTU2QkLeoVsWdHtDW264= +github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= +github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= @@ -262,8 +260,8 @@ golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= -golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f h1:phY1HzDcf18Aq9A8KkmRtY9WvOFIxN8wgfvy6Zm1DV8= -golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= +golang.org/x/exp/typeparams v0.0.0-20240119083558-1b970713d09a h1:8qmSSA8Gz/1kTrCe0nqR0R3Gb/NDhykzWw2q2mWZydM= +golang.org/x/exp/typeparams v0.0.0-20240119083558-1b970713d09a/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/image v0.18.0 h1:jGzIakQa/ZXI1I0Fxvaa9W7yP25TqT6cHIHn+6CqvSQ= golang.org/x/image v0.18.0/go.mod h1:4yyo5vMFQjVjUcVk4jEQcU9MGy/rulF5WvUILseCM2E= golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= @@ -272,8 +270,8 @@ golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -283,8 +281,8 @@ golang.org/x/sys v0.0.0-20220817070843-5a390386f1f2/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.1-0.20230131160137-e7d7f63158de/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= @@ -313,11 +311,11 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gvisor.dev/gvisor v0.0.0-20240722211153-64c016c92987 h1:TU8z2Lh3Bbq77w0t1eG8yRlLcNHzZu3x6mhoH2Mk0c8= gvisor.dev/gvisor v0.0.0-20240722211153-64c016c92987/go.mod h1:sxc3Uvk/vHcd3tj7/DHVBoR5wvWT/MmRq2pj7HRJnwU= -honnef.co/go/tools v0.5.1 h1:4bH5o3b5ZULQ4UrBmP+63W9r7qIkqJClEA9ko5YKx+I= -honnef.co/go/tools v0.5.1/go.mod h1:e9irvo83WDG9/irijV44wr3tbhcFeRnfpVlRqVwpzMs= +honnef.co/go/tools v0.4.6 h1:oFEHCKeID7to/3autwsWfnuv69j3NsfcXbvJKuIcep8= +honnef.co/go/tools v0.4.6/go.mod h1:+rnGS1THNh8zMwnd2oVOTL9QF6vmfyG6ZXBULae2uc0= howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM= howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= software.sslmate.com/src/go-pkcs12 v0.4.0 h1:H2g08FrTvSFKUj+D309j1DPfk5APnIdAQAB8aEykJ5k= software.sslmate.com/src/go-pkcs12 v0.4.0/go.mod h1:Qiz0EyvDRJjjxGyUQa2cCNZn/wMyzrRJ/qcDXOQazLI= -tailscale.com v1.78.3 h1:2BJepIEYA0ba0ZXn2rOuZjYzIV4Az+X9RS5XJF007Ug= -tailscale.com v1.78.3/go.mod h1:gT7ALbLFCr2YIu0kgc9Q3tBVaTlod65D2N6jMLH11Bk= +tailscale.com v1.72.1 h1:hk82jek36ph2S3Tfsh57NVWKEm/pZ9nfUonvlowpfaA= +tailscale.com v1.72.1/go.mod h1:v7OHtg0KLAnhOVf81Z8WrjNefj238QbFhgkWJQoKxbs= From 91cb3e547f7bc10211b5ca9a0c2d48f936bb5a7a Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 09:07:44 -0500 Subject: [PATCH 09/17] address pr comments --- pkg/client.go | 14 +++------- pkg/server.go | 71 +++++++++++++++++++++++++------------------------- pkg/storage.go | 10 +++---- 3 files changed, 44 insertions(+), 51 deletions(-) diff --git a/pkg/client.go b/pkg/client.go index 9ee07ce..14a885c 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -293,19 +293,11 @@ func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int for { resp, err := stream.Recv() - if err != nil { - if err == io.EOF { - return - } - - c.metadata.RemoveEntryLocation(ctx, hash, host) - c.mu.Lock() - delete(c.localHostCache, hash) - c.mu.Unlock() - break + if err == io.EOF { + return } - if !resp.Ok { + if err != nil || !resp.Ok { c.metadata.RemoveEntryLocation(ctx, hash, host) c.mu.Lock() delete(c.localHostCache, hash) diff --git a/pkg/server.go b/pkg/server.go index 49ae013..9080347 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -26,9 +26,10 @@ import ( ) const ( - writeBufferSizeBytes = 128 * 1024 - getContentBufferPoolSize = 128 - getContentBufferSize = 256 * 1024 + writeBufferSizeBytes = 128 * 1024 + getContentBufferPoolSize = 128 + getContentBufferSize = 256 * 1024 + getContentStreamChunkSize = 64 * 1024 * 1024 // 64MB ) type CacheServiceOpts struct { @@ -210,7 +211,7 @@ func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentReq dst = dst[:req.Length] resp := &proto.GetContentResponse{Content: dst} - err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) + _, err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) if err != nil { Logger.Debugf("Get - [%s] - %v", req.Hash, err) return &proto.GetContentResponse{Content: nil, Ok: false}, nil @@ -222,6 +223,37 @@ func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentReq return resp, nil } +func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error { + const chunkSize = getContentStreamChunkSize + offset := req.Offset + dst := make([]byte, chunkSize) + + for { + n, err := cs.cas.Get(req.Hash, offset, chunkSize, dst) + if err != nil { + Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) + return status.Errorf(codes.NotFound, "Content not found: %v", err) + } + + Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, n) + if err := stream.Send(&proto.GetContentResponse{ + Ok: true, + Content: dst[:n], + }); err != nil { + return status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) + } + + // Break if this is the last chunk + if n < chunkSize { + break + } + + offset += int64(n) + } + + return nil +} + func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourcePath string, sourceOffset int64) (string, error) { content := buffer.Bytes() size := buffer.Len() @@ -338,34 +370,3 @@ func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.S return &proto.StoreContentFromSourceResponse{Ok: true, Hash: hash}, nil } - -func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error { - const chunkSize = int64(1024 * 1024 * 64) // 64MB chunks - offset := req.Offset - - for { - content, err := cs.cas.Get(req.Hash, offset, chunkSize) - if err != nil { - Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) - return status.Errorf(codes.NotFound, "Content not found: %v", err) - } - - Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, len(content)) - - if err := stream.Send(&proto.GetContentResponse{ - Ok: true, - Content: content, - }); err != nil { - return status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) - } - - // Break if this is the last chunk - if int64(len(content)) < chunkSize { - break - } - - offset += int64(len(content)) - } - - return nil -} diff --git a/pkg/storage.go b/pkg/storage.go index 266be84..d051cef 100644 --- a/pkg/storage.go +++ b/pkg/storage.go @@ -135,7 +135,7 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont return nil } -func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) error { +func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) (int64, error) { remainingLength := length o := offset dstOffset := int64(0) @@ -149,12 +149,12 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst // Check cache for chunk value, found := cas.cache.Get(chunkKey) if !found { - return ErrContentNotFound + return 0, ErrContentNotFound } v, ok := value.(cacheValue) if !ok { - return nil, fmt.Errorf("unexpected cache value type") + return 0, fmt.Errorf("unexpected cache value type") } chunkBytes := v.Content @@ -168,7 +168,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst end := start + readLength if start < 0 || end <= start || end > int64(len(chunkBytes)) { - return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) + return 0, fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) } copy(dst[dstOffset:dstOffset+readLength], chunkBytes[start:end]) @@ -178,7 +178,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst dstOffset += readLength } - return nil + return dstOffset, nil } func (cas *ContentAddressableStorage) onEvict(item *ristretto.Item[interface{}]) { From 79d5d90d83fe0b0a8b18e88b278f22f921120dd4 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 09:11:45 -0500 Subject: [PATCH 10/17] write to disk --- .gitignore | 3 ++- e2e/testclient/main.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index c6fbb0f..9e8f57c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ build.sh tmp/ config.yaml e2e/testclient/testdata/*.bin -daemonset.yaml \ No newline at end of file +daemonset.yaml +output.bin \ No newline at end of file diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index bc7a8a2..f1d94a9 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -108,7 +108,8 @@ func main() { if err != nil { log.Fatalf("Error writing chunk to file: %v\n", err) } - // content = append(content, chunk...) // Accumulate chunks into content + + content = append(content, chunk...) // Accumulate chunks } close(done) }() From f6a6847e98d9a85f93c47984811bd3915a07c809 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 09:17:51 -0500 Subject: [PATCH 11/17] improve stopping behavior --- pkg/server.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/server.go b/pkg/server.go index 9080347..1a60811 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -26,10 +26,10 @@ import ( ) const ( - writeBufferSizeBytes = 128 * 1024 - getContentBufferPoolSize = 128 - getContentBufferSize = 256 * 1024 - getContentStreamChunkSize = 64 * 1024 * 1024 // 64MB + writeBufferSizeBytes int = 128 * 1024 + getContentBufferPoolSize int = 128 + getContentBufferSize int64 = 256 * 1024 + getContentStreamChunkSize int64 = 64 * 1024 * 1024 // 64MB ) type CacheServiceOpts struct { @@ -226,10 +226,16 @@ func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentReq func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error { const chunkSize = getContentStreamChunkSize offset := req.Offset - dst := make([]byte, chunkSize) + remainingLength := req.Length - for { - n, err := cs.cas.Get(req.Hash, offset, chunkSize, dst) + for remainingLength > 0 { + currentChunkSize := chunkSize + if remainingLength < int64(chunkSize) { + currentChunkSize = remainingLength + } + + dst := make([]byte, currentChunkSize) + n, err := cs.cas.Get(req.Hash, offset, currentChunkSize, dst) if err != nil { Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) return status.Errorf(codes.NotFound, "Content not found: %v", err) @@ -244,11 +250,12 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr } // Break if this is the last chunk - if n < chunkSize { + if n < currentChunkSize { break } offset += int64(n) + remainingLength -= int64(n) } return nil From e0b3a7dc309ff4c72b3c5ad9a3bb558d8f8682b6 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 09:24:38 -0500 Subject: [PATCH 12/17] wip --- pkg/server.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/server.go b/pkg/server.go index 1a60811..b55f503 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -21,7 +21,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) @@ -160,10 +159,6 @@ func (cs *CacheService) StartServer(port uint) error { grpc.MaxSendMsgSize(maxMessageSize), grpc.WriteBufferSize(writeBufferSizeBytes), grpc.NumStreamWorkers(uint32(runtime.NumCPU())), - grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: 10 * time.Second, - Timeout: 20 * time.Second, - }), ) proto.RegisterBlobCacheServer(s, cs) From 75dcda7d330909994f16067a508ad6db5a6dad28 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 09:36:15 -0500 Subject: [PATCH 13/17] better logs --- pkg/server.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/server.go b/pkg/server.go index b55f503..79e3434 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -212,7 +212,7 @@ func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentReq return &proto.GetContentResponse{Content: nil, Ok: false}, nil } - Logger.Debugf("Get - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length) + Logger.Debugf("Get[OK] - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length) resp.Ok = true return resp, nil @@ -222,6 +222,7 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr const chunkSize = getContentStreamChunkSize offset := req.Offset remainingLength := req.Length + Logger.Infof("GetContentStream[ACK] - [%s] - %d bytes", req.Hash, remainingLength) for remainingLength > 0 { currentChunkSize := chunkSize @@ -236,7 +237,7 @@ func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream pr return status.Errorf(codes.NotFound, "Content not found: %v", err) } - Logger.Infof("GetContentStream - [%s] - %d bytes", req.Hash, n) + Logger.Infof("GetContentStream[TX] - [%s] - %d bytes", req.Hash, n) if err := stream.Send(&proto.GetContentResponse{ Ok: true, Content: dst[:n], @@ -260,7 +261,7 @@ func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourceP content := buffer.Bytes() size := buffer.Len() - Logger.Debugf("Store - rx (%d bytes)", size) + Logger.Infof("Store[ACK] (%d bytes)", size) hashBytes := sha256.Sum256(content) hash := hex.EncodeToString(hashBytes[:]) @@ -268,7 +269,7 @@ func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourceP // Store in local in-memory cache err := cs.cas.Add(ctx, hash, content, sourcePath, sourceOffset) if err != nil { - Logger.Infof("Store - [%s] - %v", hash, err) + Logger.Infof("Store[ERR] - [%s] - %v", hash, err) return "", status.Errorf(codes.Internal, "Failed to add content: %v", err) } @@ -276,12 +277,12 @@ func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourceP if cs.cfg.BlobFs.Enabled && sourcePath != "" { err := cs.metadata.StoreContentInBlobFs(ctx, sourcePath, hash, uint64(size)) if err != nil { - Logger.Infof("Store - [%s] unable to store content in blobfs - %v", hash, sourcePath, err) + Logger.Infof("Store[ERR] - [%s] unable to store content in blobfs - %v", hash, sourcePath, err) return "", status.Errorf(codes.Internal, "Failed to store blobfs reference: %v", err) } } - Logger.Infof("Store - [%s]", hash) + Logger.Infof("Store[OK] - [%s]", hash) content = nil return hash, nil } @@ -297,14 +298,13 @@ func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) } if err != nil { - Logger.Infof("Store - error: %v", err) + Logger.Infof("Store[ERR] - error: %v", err) return status.Errorf(codes.Unknown, "Received an error: %v", err) } - Logger.Debugf("Store - rx chunk (%d bytes)", len(req.Content)) - + Logger.Debugf("Store[RX] - chunk (%d bytes)", len(req.Content)) if _, err := buffer.Write(req.Content); err != nil { - Logger.Debugf("Store - failed to write to buffer: %v", err) + Logger.Debugf("Store[ERR] - failed to write to buffer: %v", err) return status.Errorf(codes.Internal, "Failed to write content to buffer: %v", err) } } @@ -335,17 +335,18 @@ func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error) { localPath := filepath.Join("/", req.SourcePath) + Logger.Infof("StoreFromContent[ACK] - [%s]", localPath) // Check if the file exists if _, err := os.Stat(localPath); os.IsNotExist(err) { - Logger.Infof("StoreFromContent - source not found: %v", err) + Logger.Infof("StoreFromContent[ERR] - source not found: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, status.Errorf(codes.NotFound, "File does not exist: %s", localPath) } // Open the file file, err := os.Open(localPath) if err != nil { - Logger.Infof("StoreFromContent - error reading source: %v", err) + Logger.Infof("StoreFromContent[ERR] - error reading source: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, status.Errorf(codes.Internal, "Failed to open file: %v", err) } defer file.Close() @@ -353,19 +354,19 @@ func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.S var buffer bytes.Buffer if _, err := io.Copy(&buffer, file); err != nil { - Logger.Infof("StoreFromContent - error copying source: %v", err) + Logger.Infof("StoreFromContent[ERR] - error copying source: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, nil } // Store the content hash, err := cs.store(ctx, &buffer, localPath, req.SourceOffset) if err != nil { - Logger.Infof("StoreFromContent - error storing data in cache: %v", err) + Logger.Infof("StoreFromContent[ERR] - error storing data in cache: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, err } buffer.Reset() - Logger.Infof("StoreFromContent - [%s]", hash) + Logger.Infof("StoreFromContent[OK] - [%s]", hash) // HOTFIX: Manually trigger garbage collection go runtime.GC() From 3b7a3dd4c2482a458876a27739630c75adb45d60 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 10:06:26 -0500 Subject: [PATCH 14/17] better test suite --- e2e/testclient/main.go | 271 +++++++++++++++++++++++++---------------- 1 file changed, 167 insertions(+), 104 deletions(-) diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index f1d94a9..5b10460 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -1,10 +1,10 @@ package main import ( - "bytes" "context" "crypto/sha256" "encoding/hex" + "flag" "io" "log" "os" @@ -14,11 +14,20 @@ import ( ) var ( - totalIterations int = 3 - checkHash bool = false + totalIterations int + checkContent bool ) +type TestResult struct { + ElapsedTime float64 + ContentCheckPassed bool +} + func main() { + flag.IntVar(&totalIterations, "iterations", 3, "Number of iterations to run the tests") + flag.BoolVar(&checkContent, "checkcontent", true, "Check the content hash after receiving data") + flag.Parse() + configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]() if err != nil { log.Fatalf("Failed to load config: %v\n", err) @@ -44,136 +53,190 @@ func main() { hashBytes := sha256.Sum256(b) fileHash := hex.EncodeToString(hashBytes[:]) - const chunkSize = 1024 * 1024 * 16 // 16MB chunks - var totalTime float64 + hash, err := storeFile(client, filePath) + if err != nil { + log.Fatalf("Failed to store file: %v\n", err) + } - var storedHashed string = "" + var totalStreamResult, totalGetContentResult TestResult for i := 0; i < totalIterations; i++ { - chunks := make(chan []byte) + log.Printf("Iteration %d\n", i) - // Read file in chunks and dump into channel for StoreContent RPC calls - go func() { - file, err := os.Open(filePath) - if err != nil { - log.Fatalf("err: %v\n", err) - } - defer file.Close() + // Call GetContentStream + log.Printf("TestGetContentStream - %v\n", hash) + streamResult, err := TestGetContentStream(client, hash, len(b), fileHash) + if err != nil { + log.Fatalf("TestGetContentStream failed: %v\n", err) + } + totalStreamResult.ElapsedTime += streamResult.ElapsedTime + totalStreamResult.ContentCheckPassed = totalStreamResult.ContentCheckPassed || streamResult.ContentCheckPassed + log.Printf("TestGetContentStream - %v\n", streamResult) + + // Call GetContent + log.Printf("TestGetContent - %v\n", hash) + getContentResult, err := TestGetContent(client, hash, int64(len(b)), fileHash) + if err != nil { + log.Fatalf("TestGetContent failed: %v\n", err) + } + totalGetContentResult.ElapsedTime += getContentResult.ElapsedTime + totalGetContentResult.ContentCheckPassed = totalGetContentResult.ContentCheckPassed || getContentResult.ContentCheckPassed + log.Printf("TestGetContent - %v\n", getContentResult) + } - for { - buf := make([]byte, chunkSize) - n, err := file.Read(buf) + GenerateReport(totalStreamResult, totalGetContentResult, len(b), totalIterations) +} - if err != nil && err != io.EOF { - log.Fatalf("err reading file: %v\n", err) - } +func storeFile(client *blobcache.BlobCacheClient, filePath string) (string, error) { + chunks := make(chan []byte) + go func() { + file, err := os.Open(filePath) + if err != nil { + log.Fatalf("err: %v\n", err) + } + defer file.Close() - if n == 0 { - break - } + const chunkSize = 1024 * 1024 * 16 // 16MB chunks + for { + buf := make([]byte, chunkSize) + n, err := file.Read(buf) - chunks <- buf[:n] + if err != nil && err != io.EOF { + log.Fatalf("err reading file: %v\n", err) } - close(chunks) - }() - - if storedHashed == "" { - hash, err := client.StoreContent(chunks) - if err != nil { - log.Fatalf("Unable to store content: %v\n", err) + if n == 0 { + break } - storedHashed = hash - } - startTime := time.Now() - contentChan, err := client.GetContentStream(storedHashed, 0, int64(len(b))) - if err != nil { - log.Fatalf("Unable to get content stream: %v\n", err) + chunks <- buf[:n] } - var content []byte - chunkQueue := make(chan []byte, 10) // Buffered channel to queue chunks - done := make(chan struct{}) // Channel to signal completion + close(chunks) + }() - // Goroutine to write chunks to file and accumulate content - go func() { - file, err := os.Create("output.bin") - if err != nil { - log.Fatalf("Unable to create output file: %v\n", err) - } - defer file.Close() + hash, err := client.StoreContent(chunks) + if err != nil { + return "", err + } + return hash, nil +} - for chunk := range chunkQueue { - _, err := file.Write(chunk) - if err != nil { - log.Fatalf("Error writing chunk to file: %v\n", err) - } +func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSize int, expectedHash string) (TestResult, error) { + contentCheckPassed := false - content = append(content, chunk...) // Accumulate chunks - } - close(done) - }() + startTime := time.Now() + contentChan, err := client.GetContentStream(hash, 0, int64(fileSize)) + if err != nil { + return TestResult{}, err + } - for { - chunk, ok := <-contentChan - if !ok { - break - } - chunkQueue <- chunk + var contentStream []byte + chunkQueue := make(chan []byte, 10) // Buffered channel to queue chunks + done := make(chan struct{}) // Channel to signal completion + + go func() { + file, err := os.Create("output_stream.bin") + if err != nil { + log.Fatalf("Unable to create output file: %v\n", err) } - close(chunkQueue) // Close the queue to signal no more chunks + defer file.Close() - <-done // Wait for the file writing to complete - elapsedTime := time.Since(startTime).Seconds() - totalTime += elapsedTime + for chunk := range chunkQueue { + _, err := file.Write(chunk) + if err != nil { + log.Fatalf("Error writing chunk to file: %v\n", err) + } - if checkHash { - hashBytes := sha256.Sum256(content) - responseHash := hex.EncodeToString(hashBytes[:]) + contentStream = append(contentStream, chunk...) // Accumulate chunks + } + close(done) + }() - log.Printf("Initial file len: %d\n", len(b)) - log.Printf("Response content len: %d\n", len(content)) - log.Printf("Hash of initial file: %s\n", fileHash) - log.Printf("Hash of stored content: %s\n", storedHashed) - log.Printf("Hash of retrieved content: %s\n", responseHash) - log.Printf("Iteration %d: content length: %d, file length: %d, elapsed time: %f seconds\n", i+1, len(content), len(b), elapsedTime) + for { + chunk, ok := <-contentChan + if !ok { + break + } + chunkQueue <- chunk + } + close(chunkQueue) // Close the queue to signal no more chunks + <-done // Wait for the file writing to complete + + elapsedTime := time.Since(startTime).Seconds() + + // Verify hash + if checkContent { + log.Printf("Verifying hash for GetContentStream\n") + hashBytes := sha256.Sum256(contentStream) + retrievedHash := hex.EncodeToString(hashBytes[:]) + if retrievedHash != expectedHash { + log.Printf("Hash mismatch for GetContentStream: expected %s, got %s", expectedHash, retrievedHash) + contentCheckPassed = false + } + log.Printf("Hash verification for GetContentStream - %v\n", contentCheckPassed) + } - if len(content) != len(b) { - log.Fatalf("length mismatch: content len: %d, file len: %d\n", len(content), len(b)) - } + return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil +} - // Direct byte comparison loop - mismatchFound := false - for i := range content { - if content[i] != b[i] { - log.Printf("Byte mismatch at position %d: content byte: %x, file byte: %x\n", i, content[i], b[i]) - mismatchFound = true - break - } - } +func TestGetContent(client *blobcache.BlobCacheClient, hash string, fileSize int64, expectedHash string) (TestResult, error) { + startTime := time.Now() + var content []byte + offset := int64(0) + const chunkSize = 1024 * 1024 * 16 // 16MB chunks - if !mismatchFound { - log.Println("Direct byte comparison found no differences.") - } else { - log.Println("Direct byte comparison found differences.") - } + for offset < fileSize { + end := offset + chunkSize + if end > fileSize { + end = fileSize + } - // Cross-check with bytes.Equal - if bytes.Equal(content, b) { - log.Println("bytes.Equal confirms the slices are equal.") - } else { - log.Println("bytes.Equal indicates the slices are not equal.") - } + chunk, err := client.GetContent(hash, offset, end-offset) + if err != nil { + return TestResult{}, err + } + content = append(content, chunk...) + offset = end + } + elapsedTime := time.Since(startTime).Seconds() + + // Verify hash + contentCheckPassed := false + if checkContent { + log.Printf("Verifying hash for GetContent\n") + hashBytes := sha256.Sum256(content) + retrievedHash := hex.EncodeToString(hashBytes[:]) + if retrievedHash != expectedHash { + log.Printf("Hash mismatch for GetContent: expected %s, got %s", expectedHash, retrievedHash) + contentCheckPassed = false } + log.Printf("Hash verification for GetContent - %v\n", contentCheckPassed) } - averageTime := totalTime / float64(totalIterations) - totalBytesReadMB := float64(len(b)*totalIterations) / (1024 * 1024) - mbPerSecond := totalBytesReadMB / totalTime + return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil +} - log.Printf("Total time: %f seconds\n", totalTime) - log.Printf("Average time per iteration: %f seconds\n", averageTime) +func GenerateReport(streamResult, contentResult TestResult, fileSize, iterations int) { + averageTimeStream := streamResult.ElapsedTime / float64(iterations) + averageTimeContent := contentResult.ElapsedTime / float64(iterations) + totalBytesReadMB := float64(fileSize*iterations) / (1024 * 1024) + mbPerSecondStream := totalBytesReadMB / streamResult.ElapsedTime + mbPerSecondContent := totalBytesReadMB / contentResult.ElapsedTime + + log.Printf("Total time for GetContentStream: %f seconds\n", streamResult.ElapsedTime) + log.Printf("Average time per iteration for GetContentStream: %f seconds\n", averageTimeStream) + log.Printf("Total time for GetContent: %f seconds\n", contentResult.ElapsedTime) + log.Printf("Average time per iteration for GetContent: %f seconds\n", averageTimeContent) log.Printf("Total read: %.2f MB\n", totalBytesReadMB) - log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecond) + log.Printf("Average MB/s rate of reading (GetContentStream): %f\n", mbPerSecondStream) + log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecondContent) + + if checkContent { + if streamResult.ContentCheckPassed && contentResult.ContentCheckPassed { + log.Println("Content check passed for all iterations.") + } else { + log.Println("Content check failed for some iterations.") + } + } } From 18436a05d5be4303bde506e25d70da8442fa44f4 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 10:13:23 -0500 Subject: [PATCH 15/17] remove output file --- e2e/testclient/main.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index 5b10460..53c169a 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -142,11 +142,6 @@ func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSi defer file.Close() for chunk := range chunkQueue { - _, err := file.Write(chunk) - if err != nil { - log.Fatalf("Error writing chunk to file: %v\n", err) - } - contentStream = append(contentStream, chunk...) // Accumulate chunks } close(done) @@ -170,10 +165,12 @@ func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSi hashBytes := sha256.Sum256(contentStream) retrievedHash := hex.EncodeToString(hashBytes[:]) if retrievedHash != expectedHash { - log.Printf("Hash mismatch for GetContentStream: expected %s, got %s", expectedHash, retrievedHash) contentCheckPassed = false + } else { + contentCheckPassed = true } - log.Printf("Hash verification for GetContentStream - %v\n", contentCheckPassed) + + log.Printf("Calculated hash for GetContentStream: expected %s, got %s", expectedHash, retrievedHash) } return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil @@ -208,10 +205,12 @@ func TestGetContent(client *blobcache.BlobCacheClient, hash string, fileSize int hashBytes := sha256.Sum256(content) retrievedHash := hex.EncodeToString(hashBytes[:]) if retrievedHash != expectedHash { - log.Printf("Hash mismatch for GetContent: expected %s, got %s", expectedHash, retrievedHash) contentCheckPassed = false + } else { + contentCheckPassed = true } - log.Printf("Hash verification for GetContent - %v\n", contentCheckPassed) + + log.Printf("Calculated hash for GetContent: expected %s, got %s", expectedHash, retrievedHash) } return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil From 3ad04e6ceebd88fef8e53be75fdff2197c2aa089 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 10:15:31 -0500 Subject: [PATCH 16/17] remove file again --- e2e/testclient/main.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index 53c169a..d2db2e9 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -131,18 +131,12 @@ func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSi } var contentStream []byte - chunkQueue := make(chan []byte, 10) // Buffered channel to queue chunks + chunkQueue := make(chan []byte, 20) // Buffered channel to queue chunks done := make(chan struct{}) // Channel to signal completion go func() { - file, err := os.Create("output_stream.bin") - if err != nil { - log.Fatalf("Unable to create output file: %v\n", err) - } - defer file.Close() - for chunk := range chunkQueue { - contentStream = append(contentStream, chunk...) // Accumulate chunks + contentStream = append(contentStream, chunk...) } close(done) }() @@ -155,13 +149,14 @@ func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSi chunkQueue <- chunk } close(chunkQueue) // Close the queue to signal no more chunks - <-done // Wait for the file writing to complete + <-done elapsedTime := time.Since(startTime).Seconds() - // Verify hash + // Verify received content's hash if checkContent { log.Printf("Verifying hash for GetContentStream\n") + hashBytes := sha256.Sum256(contentStream) retrievedHash := hex.EncodeToString(hashBytes[:]) if retrievedHash != expectedHash { @@ -180,7 +175,7 @@ func TestGetContent(client *blobcache.BlobCacheClient, hash string, fileSize int startTime := time.Now() var content []byte offset := int64(0) - const chunkSize = 1024 * 1024 * 16 // 16MB chunks + const chunkSize = 1024 * 128 // 128k chunks for offset < fileSize { end := offset + chunkSize @@ -198,10 +193,11 @@ func TestGetContent(client *blobcache.BlobCacheClient, hash string, fileSize int elapsedTime := time.Since(startTime).Seconds() - // Verify hash + // Verify received content's hash contentCheckPassed := false if checkContent { log.Printf("Verifying hash for GetContent\n") + hashBytes := sha256.Sum256(content) retrievedHash := hex.EncodeToString(hashBytes[:]) if retrievedHash != expectedHash { From 07c40c111db823039d7d961093fc1ac086e14f31 Mon Sep 17 00:00:00 2001 From: luke-lombardi <33990301+luke-lombardi@users.noreply.github.com> Date: Sat, 11 Jan 2025 10:31:58 -0500 Subject: [PATCH 17/17] wip --- e2e/testclient/main.go | 2 +- pkg/server.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index d2db2e9..d1d9044 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -131,7 +131,7 @@ func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSi } var contentStream []byte - chunkQueue := make(chan []byte, 20) // Buffered channel to queue chunks + chunkQueue := make(chan []byte, 50) // Buffered channel to queue chunks done := make(chan struct{}) // Channel to signal completion go func() { diff --git a/pkg/server.go b/pkg/server.go index 79e3434..32bf0fd 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -28,7 +28,7 @@ const ( writeBufferSizeBytes int = 128 * 1024 getContentBufferPoolSize int = 128 getContentBufferSize int64 = 256 * 1024 - getContentStreamChunkSize int64 = 64 * 1024 * 1024 // 64MB + getContentStreamChunkSize int64 = 32 * 1024 * 1024 // 32MB ) type CacheServiceOpts struct {