diff --git a/.gitignore b/.gitignore index c6fbb0f..9e8f57c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ build.sh tmp/ config.yaml e2e/testclient/testdata/*.bin -daemonset.yaml \ No newline at end of file +daemonset.yaml +output.bin \ No newline at end of file diff --git a/Makefile b/Makefile index a37a20c..e956356 100644 --- a/Makefile +++ b/Makefile @@ -25,4 +25,4 @@ publish-chart: rm beam-blobcache-v2-chart-$(chartVersion).tgz testclient: - go build -o bin/testclient e2e/testclient/main.go + GOOS=linux GOARCH=amd64 go build -o bin/testclient e2e/testclient/main.go diff --git a/e2e/testclient/main.go b/e2e/testclient/main.go index 72f7985..d1d9044 100644 --- a/e2e/testclient/main.go +++ b/e2e/testclient/main.go @@ -1,10 +1,10 @@ package main import ( - "bytes" "context" "crypto/sha256" "encoding/hex" + "flag" "io" "log" "os" @@ -13,9 +13,21 @@ import ( blobcache "github.com/beam-cloud/blobcache-v2/pkg" ) -var totalIterations int = 1 +var ( + totalIterations int + checkContent bool +) + +type TestResult struct { + ElapsedTime float64 + ContentCheckPassed bool +} func main() { + flag.IntVar(&totalIterations, "iterations", 3, "Number of iterations to run the tests") + flag.BoolVar(&checkContent, "checkcontent", true, "Check the content hash after receiving data") + flag.Parse() + configManager, err := blobcache.NewConfigManager[blobcache.BlobCacheConfig]() if err != nil { log.Fatalf("Failed to load config: %v\n", err) @@ -33,7 +45,7 @@ func main() { log.Fatalf("Unable to create client: %v\n", err) } - filePath := "e2e/testclient/testdata/test2.bin" + filePath := "e2e/testclient/testdata/test3.bin" b, err := os.ReadFile(filePath) if err != nil { log.Fatalf("Unable to read input file: %v\n", err) @@ -41,97 +53,185 @@ func main() { hashBytes := sha256.Sum256(b) fileHash := hex.EncodeToString(hashBytes[:]) - const chunkSize = 1024 * 1024 * 16 // 16MB chunks - var totalTime float64 + hash, err := storeFile(client, filePath) + if err != nil { + log.Fatalf("Failed to store file: %v\n", err) + } + var totalStreamResult, totalGetContentResult TestResult for i := 0; i < totalIterations; i++ { - chunks := make(chan []byte) + log.Printf("Iteration %d\n", i) - // Read file in chunks and dump into channel for StoreContent RPC calls - go func() { - file, err := os.Open(filePath) - if err != nil { - log.Fatalf("err: %v\n", err) - } - defer file.Close() + // Call GetContentStream + log.Printf("TestGetContentStream - %v\n", hash) + streamResult, err := TestGetContentStream(client, hash, len(b), fileHash) + if err != nil { + log.Fatalf("TestGetContentStream failed: %v\n", err) + } + totalStreamResult.ElapsedTime += streamResult.ElapsedTime + totalStreamResult.ContentCheckPassed = totalStreamResult.ContentCheckPassed || streamResult.ContentCheckPassed + log.Printf("TestGetContentStream - %v\n", streamResult) + + // Call GetContent + log.Printf("TestGetContent - %v\n", hash) + getContentResult, err := TestGetContent(client, hash, int64(len(b)), fileHash) + if err != nil { + log.Fatalf("TestGetContent failed: %v\n", err) + } + totalGetContentResult.ElapsedTime += getContentResult.ElapsedTime + totalGetContentResult.ContentCheckPassed = totalGetContentResult.ContentCheckPassed || getContentResult.ContentCheckPassed + log.Printf("TestGetContent - %v\n", getContentResult) + } - for { - buf := make([]byte, chunkSize) - n, err := file.Read(buf) + GenerateReport(totalStreamResult, totalGetContentResult, len(b), totalIterations) +} - if err != nil && err != io.EOF { - log.Fatalf("err reading file: %v\n", err) - } +func storeFile(client *blobcache.BlobCacheClient, filePath string) (string, error) { + chunks := make(chan []byte) + go func() { + file, err := os.Open(filePath) + if err != nil { + log.Fatalf("err: %v\n", err) + } + defer file.Close() - if n == 0 { - break - } + const chunkSize = 1024 * 1024 * 16 // 16MB chunks + for { + buf := make([]byte, chunkSize) + n, err := file.Read(buf) - chunks <- buf[:n] + if err != nil && err != io.EOF { + log.Fatalf("err reading file: %v\n", err) } - close(chunks) - }() + if n == 0 { + break + } - hash, err := client.StoreContent(chunks) - if err != nil { - log.Fatalf("Unable to store content: %v\n", err) + chunks <- buf[:n] } - startTime := time.Now() - content, err := client.GetContent(hash, 0, int64(len(b))) - if err != nil { - log.Fatalf("Unable to get content: %v\n", err) - } - elapsedTime := time.Since(startTime).Seconds() - totalTime += elapsedTime + close(chunks) + }() - hashBytes := sha256.Sum256(content) - responseHash := hex.EncodeToString(hashBytes[:]) + hash, err := client.StoreContent(chunks) + if err != nil { + return "", err + } + return hash, nil +} - log.Printf("Initial file len: %d\n", len(b)) - log.Printf("Response content len: %d\n", len(content)) - log.Printf("Hash of initial file: %s\n", fileHash) - log.Printf("Hash of stored content: %s\n", hash) - log.Printf("Hash of retrieved content: %s\n", responseHash) +func TestGetContentStream(client *blobcache.BlobCacheClient, hash string, fileSize int, expectedHash string) (TestResult, error) { + contentCheckPassed := false - log.Printf("Iteration %d: content length: %d, file length: %d, elapsed time: %f seconds\n", i+1, len(content), len(b), elapsedTime) + startTime := time.Now() + contentChan, err := client.GetContentStream(hash, 0, int64(fileSize)) + if err != nil { + return TestResult{}, err + } + + var contentStream []byte + chunkQueue := make(chan []byte, 50) // Buffered channel to queue chunks + done := make(chan struct{}) // Channel to signal completion - if len(content) != len(b) { - log.Fatalf("length mismatch: content len: %d, file len: %d\n", len(content), len(b)) + go func() { + for chunk := range chunkQueue { + contentStream = append(contentStream, chunk...) } + close(done) + }() - // Direct byte comparison loop - mismatchFound := false - for i := range content { - if content[i] != b[i] { - log.Printf("Byte mismatch at position %d: content byte: %x, file byte: %x\n", i, content[i], b[i]) - mismatchFound = true - break - } + for { + chunk, ok := <-contentChan + if !ok { + break } + chunkQueue <- chunk + } + close(chunkQueue) // Close the queue to signal no more chunks + <-done + + elapsedTime := time.Since(startTime).Seconds() + + // Verify received content's hash + if checkContent { + log.Printf("Verifying hash for GetContentStream\n") - if !mismatchFound { - log.Println("Direct byte comparison found no differences.") + hashBytes := sha256.Sum256(contentStream) + retrievedHash := hex.EncodeToString(hashBytes[:]) + if retrievedHash != expectedHash { + contentCheckPassed = false } else { - log.Println("Direct byte comparison found differences.") + contentCheckPassed = true } - // Cross-check with bytes.Equal - if bytes.Equal(content, b) { - log.Println("bytes.Equal confirms the slices are equal.") + log.Printf("Calculated hash for GetContentStream: expected %s, got %s", expectedHash, retrievedHash) + } + + return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil +} + +func TestGetContent(client *blobcache.BlobCacheClient, hash string, fileSize int64, expectedHash string) (TestResult, error) { + startTime := time.Now() + var content []byte + offset := int64(0) + const chunkSize = 1024 * 128 // 128k chunks + + for offset < fileSize { + end := offset + chunkSize + if end > fileSize { + end = fileSize + } + + chunk, err := client.GetContent(hash, offset, end-offset) + if err != nil { + return TestResult{}, err + } + content = append(content, chunk...) + offset = end + } + + elapsedTime := time.Since(startTime).Seconds() + + // Verify received content's hash + contentCheckPassed := false + if checkContent { + log.Printf("Verifying hash for GetContent\n") + + hashBytes := sha256.Sum256(content) + retrievedHash := hex.EncodeToString(hashBytes[:]) + if retrievedHash != expectedHash { + contentCheckPassed = false } else { - log.Println("bytes.Equal indicates the slices are not equal.") + contentCheckPassed = true } + log.Printf("Calculated hash for GetContent: expected %s, got %s", expectedHash, retrievedHash) } - averageTime := totalTime / 10 - mbPerSecond := (float64(len(b)*totalIterations) / (1024 * 1024)) / averageTime - log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecond) + return TestResult{ElapsedTime: elapsedTime, ContentCheckPassed: contentCheckPassed}, nil +} - _, err = client.StoreContentFromSource("images/agent.yaml", 0) - if err != nil { - log.Fatalf("Unable to store content from source: %v\n", err) +func GenerateReport(streamResult, contentResult TestResult, fileSize, iterations int) { + averageTimeStream := streamResult.ElapsedTime / float64(iterations) + averageTimeContent := contentResult.ElapsedTime / float64(iterations) + totalBytesReadMB := float64(fileSize*iterations) / (1024 * 1024) + mbPerSecondStream := totalBytesReadMB / streamResult.ElapsedTime + mbPerSecondContent := totalBytesReadMB / contentResult.ElapsedTime + + log.Printf("Total time for GetContentStream: %f seconds\n", streamResult.ElapsedTime) + log.Printf("Average time per iteration for GetContentStream: %f seconds\n", averageTimeStream) + log.Printf("Total time for GetContent: %f seconds\n", contentResult.ElapsedTime) + log.Printf("Average time per iteration for GetContent: %f seconds\n", averageTimeContent) + log.Printf("Total read: %.2f MB\n", totalBytesReadMB) + log.Printf("Average MB/s rate of reading (GetContentStream): %f\n", mbPerSecondStream) + log.Printf("Average MB/s rate of reading (GetContent): %f\n", mbPerSecondContent) + + if checkContent { + if streamResult.ContentCheckPassed && contentResult.ContentCheckPassed { + log.Println("Content check passed for all iterations.") + } else { + log.Println("Content check failed for some iterations.") + } } } diff --git a/pkg/blobfs_node.go b/pkg/blobfs_node.go index a4c081c..663b654 100644 --- a/pkg/blobfs_node.go +++ b/pkg/blobfs_node.go @@ -128,6 +128,9 @@ func (n *FSNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (* return nil, syscall.ENOENT } + // TODO: stream file to a temp file in the container somewhere + // /tmp/cache/path/to/file + out.Attr = *attr return node, fs.OK } @@ -172,8 +175,7 @@ func (n *FSNode) Read(ctx context.Context, f fs.FileHandle, dest []byte, off int return nil, syscall.EIO } - nRead := copy(dest, buffer) - return fuse.ReadResultData(dest[:nRead]), fs.OK + return fuse.ReadResultData(buffer), fs.OK } func (n *FSNode) Readlink(ctx context.Context) ([]byte, syscall.Errno) { diff --git a/pkg/client.go b/pkg/client.go index 89da6ef..14a885c 100644 --- a/pkg/client.go +++ b/pkg/client.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "io" "math" "net" "sync" @@ -22,6 +23,7 @@ import ( const ( getContentRequestTimeout = 30 * time.Second + getContentStreamRequestTimeout = 600 * time.Second storeContentRequestTimeout = 300 * time.Second closestHostTimeout = 30 * time.Second localClientCacheCleanupInterval = 5 * time.Second @@ -263,6 +265,54 @@ func (c *BlobCacheClient) GetContent(hash string, offset int64, length int64) ([ return nil, ErrContentNotFound } +func (c *BlobCacheClient) GetContentStream(hash string, offset int64, length int64) (chan []byte, error) { + ctx, cancel := context.WithTimeout(c.ctx, getContentRequestTimeout) + contentChan := make(chan []byte) + + go func() { + defer close(contentChan) + defer cancel() + + for attempt := 0; attempt < 3; attempt++ { + client, host, err := c.getGRPCClient(ctx, &ClientRequest{ + rt: ClientRequestTypeRetrieval, + hash: hash, + }) + if err != nil { + return + } + + stream, err := client.GetContentStream(ctx, &proto.GetContentRequest{Hash: hash, Offset: offset, Length: length}) + if err != nil { + c.metadata.RemoveEntryLocation(ctx, hash, host) + c.mu.Lock() + delete(c.localHostCache, hash) + c.mu.Unlock() + continue + } + + for { + resp, err := stream.Recv() + if err == io.EOF { + return + } + + if err != nil || !resp.Ok { + c.metadata.RemoveEntryLocation(ctx, hash, host) + c.mu.Lock() + delete(c.localHostCache, hash) + c.mu.Unlock() + break + } + + contentChan <- resp.Content + } + } + }() + + return contentChan, nil +} + func (c *BlobCacheClient) manageLocalClientCache(ttl time.Duration, interval time.Duration) { go func() { ticker := time.NewTicker(interval) diff --git a/pkg/server.go b/pkg/server.go index 5032285..32bf0fd 100644 --- a/pkg/server.go +++ b/pkg/server.go @@ -25,9 +25,10 @@ import ( ) const ( - writeBufferSizeBytes = 128 * 1024 - getContentBufferPoolSize = 128 - getContentBufferSize = 256 * 1024 + writeBufferSizeBytes int = 128 * 1024 + getContentBufferPoolSize int = 128 + getContentBufferSize int64 = 256 * 1024 + getContentStreamChunkSize int64 = 32 * 1024 * 1024 // 32MB ) type CacheServiceOpts struct { @@ -157,6 +158,7 @@ func (cs *CacheService) StartServer(port uint) error { grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize), grpc.WriteBufferSize(writeBufferSizeBytes), + grpc.NumStreamWorkers(uint32(runtime.NumCPU())), ) proto.RegisterBlobCacheServer(s, cs) @@ -204,23 +206,62 @@ func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentReq dst = dst[:req.Length] resp := &proto.GetContentResponse{Content: dst} - err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) + _, err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst) if err != nil { Logger.Debugf("Get - [%s] - %v", req.Hash, err) return &proto.GetContentResponse{Content: nil, Ok: false}, nil } - Logger.Debugf("Get - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length) + Logger.Debugf("Get[OK] - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length) resp.Ok = true return resp, nil } +func (cs *CacheService) GetContentStream(req *proto.GetContentRequest, stream proto.BlobCache_GetContentStreamServer) error { + const chunkSize = getContentStreamChunkSize + offset := req.Offset + remainingLength := req.Length + Logger.Infof("GetContentStream[ACK] - [%s] - %d bytes", req.Hash, remainingLength) + + for remainingLength > 0 { + currentChunkSize := chunkSize + if remainingLength < int64(chunkSize) { + currentChunkSize = remainingLength + } + + dst := make([]byte, currentChunkSize) + n, err := cs.cas.Get(req.Hash, offset, currentChunkSize, dst) + if err != nil { + Logger.Debugf("GetContentStream - [%s] - %v", req.Hash, err) + return status.Errorf(codes.NotFound, "Content not found: %v", err) + } + + Logger.Infof("GetContentStream[TX] - [%s] - %d bytes", req.Hash, n) + if err := stream.Send(&proto.GetContentResponse{ + Ok: true, + Content: dst[:n], + }); err != nil { + return status.Errorf(codes.Internal, "Failed to send content chunk: %v", err) + } + + // Break if this is the last chunk + if n < currentChunkSize { + break + } + + offset += int64(n) + remainingLength -= int64(n) + } + + return nil +} + func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourcePath string, sourceOffset int64) (string, error) { content := buffer.Bytes() size := buffer.Len() - Logger.Debugf("Store - rx (%d bytes)", size) + Logger.Infof("Store[ACK] (%d bytes)", size) hashBytes := sha256.Sum256(content) hash := hex.EncodeToString(hashBytes[:]) @@ -228,7 +269,7 @@ func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourceP // Store in local in-memory cache err := cs.cas.Add(ctx, hash, content, sourcePath, sourceOffset) if err != nil { - Logger.Infof("Store - [%s] - %v", hash, err) + Logger.Infof("Store[ERR] - [%s] - %v", hash, err) return "", status.Errorf(codes.Internal, "Failed to add content: %v", err) } @@ -236,12 +277,12 @@ func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourceP if cs.cfg.BlobFs.Enabled && sourcePath != "" { err := cs.metadata.StoreContentInBlobFs(ctx, sourcePath, hash, uint64(size)) if err != nil { - Logger.Infof("Store - [%s] unable to store content in blobfs - %v", hash, sourcePath, err) + Logger.Infof("Store[ERR] - [%s] unable to store content in blobfs - %v", hash, sourcePath, err) return "", status.Errorf(codes.Internal, "Failed to store blobfs reference: %v", err) } } - Logger.Infof("Store - [%s]", hash) + Logger.Infof("Store[OK] - [%s]", hash) content = nil return hash, nil } @@ -257,14 +298,13 @@ func (cs *CacheService) StoreContent(stream proto.BlobCache_StoreContentServer) } if err != nil { - Logger.Infof("Store - error: %v", err) + Logger.Infof("Store[ERR] - error: %v", err) return status.Errorf(codes.Unknown, "Received an error: %v", err) } - Logger.Debugf("Store - rx chunk (%d bytes)", len(req.Content)) - + Logger.Debugf("Store[RX] - chunk (%d bytes)", len(req.Content)) if _, err := buffer.Write(req.Content); err != nil { - Logger.Debugf("Store - failed to write to buffer: %v", err) + Logger.Debugf("Store[ERR] - failed to write to buffer: %v", err) return status.Errorf(codes.Internal, "Failed to write content to buffer: %v", err) } } @@ -286,7 +326,6 @@ func (cs *CacheService) usagePct() float64 { } func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error) { - return &proto.GetStateResponse{ Version: BlobCacheVersion, PrivateIpAddr: cs.privateIpAddr, @@ -296,17 +335,18 @@ func (cs *CacheService) GetState(ctx context.Context, req *proto.GetStateRequest func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.StoreContentFromSourceRequest) (*proto.StoreContentFromSourceResponse, error) { localPath := filepath.Join("/", req.SourcePath) + Logger.Infof("StoreFromContent[ACK] - [%s]", localPath) // Check if the file exists if _, err := os.Stat(localPath); os.IsNotExist(err) { - Logger.Infof("StoreFromContent - source not found: %v", err) + Logger.Infof("StoreFromContent[ERR] - source not found: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, status.Errorf(codes.NotFound, "File does not exist: %s", localPath) } // Open the file file, err := os.Open(localPath) if err != nil { - Logger.Infof("StoreFromContent - error reading source: %v", err) + Logger.Infof("StoreFromContent[ERR] - error reading source: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, status.Errorf(codes.Internal, "Failed to open file: %v", err) } defer file.Close() @@ -314,19 +354,19 @@ func (cs *CacheService) StoreContentFromSource(ctx context.Context, req *proto.S var buffer bytes.Buffer if _, err := io.Copy(&buffer, file); err != nil { - Logger.Infof("StoreFromContent - error copying source: %v", err) + Logger.Infof("StoreFromContent[ERR] - error copying source: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, nil } // Store the content hash, err := cs.store(ctx, &buffer, localPath, req.SourceOffset) if err != nil { - Logger.Infof("StoreFromContent - error storing data in cache: %v", err) + Logger.Infof("StoreFromContent[ERR] - error storing data in cache: %v", err) return &proto.StoreContentFromSourceResponse{Ok: false}, err } buffer.Reset() - Logger.Infof("StoreFromContent - [%s]", hash) + Logger.Infof("StoreFromContent[OK] - [%s]", hash) // HOTFIX: Manually trigger garbage collection go runtime.GC() diff --git a/pkg/storage.go b/pkg/storage.go index db9362c..d051cef 100644 --- a/pkg/storage.go +++ b/pkg/storage.go @@ -135,7 +135,7 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont return nil } -func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) error { +func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) (int64, error) { remainingLength := length o := offset dstOffset := int64(0) @@ -149,10 +149,13 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst // Check cache for chunk value, found := cas.cache.Get(chunkKey) if !found { - return ErrContentNotFound + return 0, ErrContentNotFound } - v := value.(cacheValue) + v, ok := value.(cacheValue) + if !ok { + return 0, fmt.Errorf("unexpected cache value type") + } chunkBytes := v.Content start := o % cas.config.PageSizeBytes @@ -165,7 +168,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst end := start + readLength if start < 0 || end <= start || end > int64(len(chunkBytes)) { - return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) + return 0, fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes)) } copy(dst[dstOffset:dstOffset+readLength], chunkBytes[start:end]) @@ -175,7 +178,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst dstOffset += readLength } - return nil + return dstOffset, nil } func (cas *ContentAddressableStorage) onEvict(item *ristretto.Item[interface{}]) { diff --git a/proto/blobcache.pb.go b/proto/blobcache.pb.go index 33810cb..85226db 100644 --- a/proto/blobcache.pb.go +++ b/proto/blobcache.pb.go @@ -483,32 +483,38 @@ var file_blobcache_proto_rawDesc = []byte{ 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x32, 0xe5, 0x02, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x62, + 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x32, 0xba, 0x03, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x4b, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x53, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, - 0x6e, 0x74, 0x12, 0x1e, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, - 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, - 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x6f, 0x0a, 0x16, 0x53, 0x74, 0x6f, 0x72, 0x65, - 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x12, 0x28, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, - 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x62, 0x6c, - 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, - 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, - 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1b, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, - 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, - 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x65, - 0x61, 0x6d, 0x2d, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, - 0x68, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x00, 0x12, 0x53, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, + 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, + 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x53, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x72, 0x65, + 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1e, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, + 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x6f, 0x0a, 0x16, + 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, + 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x28, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, + 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, + 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x29, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x53, 0x74, 0x6f, + 0x72, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x72, 0x6f, 0x6d, 0x53, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, + 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1a, 0x2e, 0x62, 0x6c, 0x6f, 0x62, + 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x62, 0x6c, 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, + 0x65, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2d, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2f, 0x62, 0x6c, + 0x6f, 0x62, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -536,15 +542,17 @@ var file_blobcache_proto_goTypes = []interface{}{ } var file_blobcache_proto_depIdxs = []int32{ 0, // 0: blobcache.BlobCache.GetContent:input_type -> blobcache.GetContentRequest - 2, // 1: blobcache.BlobCache.StoreContent:input_type -> blobcache.StoreContentRequest - 6, // 2: blobcache.BlobCache.StoreContentFromSource:input_type -> blobcache.StoreContentFromSourceRequest - 4, // 3: blobcache.BlobCache.GetState:input_type -> blobcache.GetStateRequest - 1, // 4: blobcache.BlobCache.GetContent:output_type -> blobcache.GetContentResponse - 3, // 5: blobcache.BlobCache.StoreContent:output_type -> blobcache.StoreContentResponse - 7, // 6: blobcache.BlobCache.StoreContentFromSource:output_type -> blobcache.StoreContentFromSourceResponse - 5, // 7: blobcache.BlobCache.GetState:output_type -> blobcache.GetStateResponse - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type + 0, // 1: blobcache.BlobCache.GetContentStream:input_type -> blobcache.GetContentRequest + 2, // 2: blobcache.BlobCache.StoreContent:input_type -> blobcache.StoreContentRequest + 6, // 3: blobcache.BlobCache.StoreContentFromSource:input_type -> blobcache.StoreContentFromSourceRequest + 4, // 4: blobcache.BlobCache.GetState:input_type -> blobcache.GetStateRequest + 1, // 5: blobcache.BlobCache.GetContent:output_type -> blobcache.GetContentResponse + 1, // 6: blobcache.BlobCache.GetContentStream:output_type -> blobcache.GetContentResponse + 3, // 7: blobcache.BlobCache.StoreContent:output_type -> blobcache.StoreContentResponse + 7, // 8: blobcache.BlobCache.StoreContentFromSource:output_type -> blobcache.StoreContentFromSourceResponse + 5, // 9: blobcache.BlobCache.GetState:output_type -> blobcache.GetStateResponse + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/proto/blobcache.proto b/proto/blobcache.proto index fc6f366..6ef6c2b 100644 --- a/proto/blobcache.proto +++ b/proto/blobcache.proto @@ -6,6 +6,7 @@ package blobcache; service BlobCache { rpc GetContent(GetContentRequest) returns (GetContentResponse) {} + rpc GetContentStream(GetContentRequest) returns (stream GetContentResponse) {} rpc StoreContent(stream StoreContentRequest) returns (StoreContentResponse) {} rpc StoreContentFromSource(StoreContentFromSourceRequest) returns (StoreContentFromSourceResponse) {} rpc GetState(GetStateRequest) returns (GetStateResponse) {} diff --git a/proto/blobcache_grpc.pb.go b/proto/blobcache_grpc.pb.go index 9204ba4..fa90b75 100644 --- a/proto/blobcache_grpc.pb.go +++ b/proto/blobcache_grpc.pb.go @@ -20,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion7 const ( BlobCache_GetContent_FullMethodName = "/blobcache.BlobCache/GetContent" + BlobCache_GetContentStream_FullMethodName = "/blobcache.BlobCache/GetContentStream" BlobCache_StoreContent_FullMethodName = "/blobcache.BlobCache/StoreContent" BlobCache_StoreContentFromSource_FullMethodName = "/blobcache.BlobCache/StoreContentFromSource" BlobCache_GetState_FullMethodName = "/blobcache.BlobCache/GetState" @@ -30,6 +31,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type BlobCacheClient interface { GetContent(ctx context.Context, in *GetContentRequest, opts ...grpc.CallOption) (*GetContentResponse, error) + GetContentStream(ctx context.Context, in *GetContentRequest, opts ...grpc.CallOption) (BlobCache_GetContentStreamClient, error) StoreContent(ctx context.Context, opts ...grpc.CallOption) (BlobCache_StoreContentClient, error) StoreContentFromSource(ctx context.Context, in *StoreContentFromSourceRequest, opts ...grpc.CallOption) (*StoreContentFromSourceResponse, error) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) @@ -52,8 +54,40 @@ func (c *blobCacheClient) GetContent(ctx context.Context, in *GetContentRequest, return out, nil } +func (c *blobCacheClient) GetContentStream(ctx context.Context, in *GetContentRequest, opts ...grpc.CallOption) (BlobCache_GetContentStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &BlobCache_ServiceDesc.Streams[0], BlobCache_GetContentStream_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &blobCacheGetContentStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type BlobCache_GetContentStreamClient interface { + Recv() (*GetContentResponse, error) + grpc.ClientStream +} + +type blobCacheGetContentStreamClient struct { + grpc.ClientStream +} + +func (x *blobCacheGetContentStreamClient) Recv() (*GetContentResponse, error) { + m := new(GetContentResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *blobCacheClient) StoreContent(ctx context.Context, opts ...grpc.CallOption) (BlobCache_StoreContentClient, error) { - stream, err := c.cc.NewStream(ctx, &BlobCache_ServiceDesc.Streams[0], BlobCache_StoreContent_FullMethodName, opts...) + stream, err := c.cc.NewStream(ctx, &BlobCache_ServiceDesc.Streams[1], BlobCache_StoreContent_FullMethodName, opts...) if err != nil { return nil, err } @@ -109,6 +143,7 @@ func (c *blobCacheClient) GetState(ctx context.Context, in *GetStateRequest, opt // for forward compatibility type BlobCacheServer interface { GetContent(context.Context, *GetContentRequest) (*GetContentResponse, error) + GetContentStream(*GetContentRequest, BlobCache_GetContentStreamServer) error StoreContent(BlobCache_StoreContentServer) error StoreContentFromSource(context.Context, *StoreContentFromSourceRequest) (*StoreContentFromSourceResponse, error) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) @@ -122,6 +157,9 @@ type UnimplementedBlobCacheServer struct { func (UnimplementedBlobCacheServer) GetContent(context.Context, *GetContentRequest) (*GetContentResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetContent not implemented") } +func (UnimplementedBlobCacheServer) GetContentStream(*GetContentRequest, BlobCache_GetContentStreamServer) error { + return status.Errorf(codes.Unimplemented, "method GetContentStream not implemented") +} func (UnimplementedBlobCacheServer) StoreContent(BlobCache_StoreContentServer) error { return status.Errorf(codes.Unimplemented, "method StoreContent not implemented") } @@ -162,6 +200,27 @@ func _BlobCache_GetContent_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _BlobCache_GetContentStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetContentRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(BlobCacheServer).GetContentStream(m, &blobCacheGetContentStreamServer{stream}) +} + +type BlobCache_GetContentStreamServer interface { + Send(*GetContentResponse) error + grpc.ServerStream +} + +type blobCacheGetContentStreamServer struct { + grpc.ServerStream +} + +func (x *blobCacheGetContentStreamServer) Send(m *GetContentResponse) error { + return x.ServerStream.SendMsg(m) +} + func _BlobCache_StoreContent_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(BlobCacheServer).StoreContent(&blobCacheStoreContentServer{stream}) } @@ -245,6 +304,11 @@ var BlobCache_ServiceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{ + { + StreamName: "GetContentStream", + Handler: _BlobCache_GetContentStream_Handler, + ServerStreams: true, + }, { StreamName: "StoreContent", Handler: _BlobCache_StoreContent_Handler,