diff --git a/app/artifact-cas/README.md b/app/artifact-cas/README.md index 6c783d7cc..41aa59e67 100644 --- a/app/artifact-cas/README.md +++ b/app/artifact-cas/README.md @@ -20,7 +20,7 @@ Its structure contains the following top to down layers. ## System Dependencies -The CAS proxy **has only one running dependency**. A secret storage backend to retrieve the OCI repository credentials. Currently we support both [Hashicorp Vault](https://www.vaultproject.io/) and [AWS Secret Manager](https://aws.amazon.com/secrets-manager/). +The CAS proxy **has only one running dependency**. A secret storage backend to retrieve the OCI repository credentials. Currently, we support both [Hashicorp Vault](https://www.vaultproject.io/) and [AWS Secret Manager](https://aws.amazon.com/secrets-manager/). This secret backend is used to download OCI repository credentials (repository path + key pair) during upload/downloads. This makes the Artifact CAS multi-tenant by default since the destination OCI backend gets selected at runtime. @@ -34,6 +34,10 @@ The token gets signed by the control plane with a private key and verified by th Note: there are plans to support [JWKS endpoints](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-key-sets) to enable easy rotation of credentials. +## Client + +The client code can be found [here](/internal/casclient/) + ## Runbook We use `make` for most development tasks. Run `make -C app/artifact-cas` to see a list of the available tasks. diff --git a/app/cli/cmd/attestation_add.go b/app/cli/cmd/attestation_add.go index 36888a514..6f330c25e 100644 --- a/app/cli/cmd/attestation_add.go +++ b/app/cli/cmd/attestation_add.go @@ -57,7 +57,7 @@ func newAttestationAddCmd() *cobra.Command { }, RunE: func(cmd *cobra.Command, args []string) error { a := action.NewAttestationAdd( - &action.AttestationAddOpts{ActionsOpts: actionOpts, ArtifacsCASConn: artifactCASConn}, + &action.AttestationAddOpts{ActionsOpts: actionOpts, ArtifactsCASConn: artifactCASConn}, ) err := a.Run(name, value) diff --git a/app/cli/internal/action/artifact_download.go b/app/cli/internal/action/artifact_download.go index f7600d03c..687b01cc5 100644 --- a/app/cli/internal/action/artifact_download.go +++ b/app/cli/internal/action/artifact_download.go @@ -23,8 +23,10 @@ import ( "io" "os" "path" + "time" - casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc" + "github.com/chainloop-dev/chainloop/internal/casclient" + "github.com/jedib0t/go-pretty/v6/progress" "google.golang.org/grpc" crv1 "github.com/google/go-containerregistry/pkg/v1" @@ -50,7 +52,7 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error { return fmt.Errorf("invalid digest: %w", err) } - client := casclient.NewDownloader(a.artifactsCASConn, casclient.WithLogger(a.Logger), casclient.WithProgressRender(true)) + client := casclient.NewDownloader(a.artifactsCASConn) ctx := context.Background() info, err := client.Describe(ctx, h.Hex) if err != nil { @@ -77,6 +79,11 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error { w := io.MultiWriter(f, hash) a.Logger.Info().Str("name", info.Filename).Str("to", downloadPath).Msg("downloading file") + + // render progress bar + go renderOperationStatus(ctx, client.ProgressStatus, a.Logger) + defer close(client.ProgressStatus) + err = client.Download(ctx, w, h.Hex, info.Size) if err != nil { a.Logger.Debug().Err(err).Msg("problem downloading file") @@ -87,7 +94,46 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error { return fmt.Errorf("checksums mismatch: got: %s, expected: %s", got, want) } + // Give some time for the progress renderer to finish + // TODO: Implement with proper subroutine messaging + time.Sleep(progress.DefaultUpdateFrequency) + a.Logger.Info().Str("path", downloadPath).Msg("file downloaded!") return nil } + +func renderOperationStatus(ctx context.Context, progressChan casclient.ProgressStatusChan, output io.Writer) { + pw := progress.NewWriter() + pw.Style().Visibility.ETA = true + pw.Style().Visibility.Speed = true + pw.SetUpdateFrequency(progress.DefaultUpdateFrequency) + + var tracker *progress.Tracker + go pw.Render() + defer pw.Stop() + + for { + select { + case <-ctx.Done(): + return + case status, ok := <-progressChan: + if !ok { + return + } + + // Initialize tracker + if tracker == nil { + // Hack: Add 1 to the total to make sure the tracker is not marked as done before the upload is finished + // this way the current value will never reach the total + // but instead the tracker will be marked as done by the defer statement + total := status.TotalSizeBytes + 1 + tracker = &progress.Tracker{Total: total, Units: progress.UnitsBytes} + defer tracker.MarkAsDone() + pw.AppendTracker(tracker) + } + + tracker.SetValue(status.ProcessedBytes) + } + } +} diff --git a/app/cli/internal/action/artifact_upload.go b/app/cli/internal/action/artifact_upload.go index 89ddb3949..97f5e8e01 100644 --- a/app/cli/internal/action/artifact_upload.go +++ b/app/cli/internal/action/artifact_upload.go @@ -17,8 +17,10 @@ package action import ( "context" + "time" - casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc" + "github.com/chainloop-dev/chainloop/internal/casclient" + "github.com/jedib0t/go-pretty/v6/progress" "google.golang.org/grpc" ) @@ -41,11 +43,19 @@ func NewArtifactUpload(opts *ArtifactUploadOpts) *ArtifactUpload { } func (a *ArtifactUpload) Run(filePath string) (*CASArtifact, error) { - client := casclient.NewUploader(a.artifactsCASConn, casclient.WithLogger(a.Logger), casclient.WithProgressRender(true)) + client := casclient.NewUploader(a.artifactsCASConn, casclient.WithLogger(a.Logger)) + // render progress bar + go renderOperationStatus(context.Background(), client.ProgressStatus, a.Logger) + defer close(client.ProgressStatus) + res, err := client.Upload(context.Background(), filePath) if err != nil { return nil, err } + // Give some time for the progress renderer to finish + // TODO: Implement with proper subroutine messaging + time.Sleep(progress.DefaultUpdateFrequency) + return &CASArtifact{Digest: res.Digest, fileName: res.Filename}, nil } diff --git a/app/cli/internal/action/attestation_add.go b/app/cli/internal/action/attestation_add.go index 681481f81..0531ea296 100644 --- a/app/cli/internal/action/attestation_add.go +++ b/app/cli/internal/action/attestation_add.go @@ -18,14 +18,14 @@ package action import ( "errors" - casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc" "github.com/chainloop-dev/chainloop/internal/attestation/crafter" + "github.com/chainloop-dev/chainloop/internal/casclient" "google.golang.org/grpc" ) type AttestationAddOpts struct { *ActionsOpts - ArtifacsCASConn *grpc.ClientConn + ArtifactsCASConn *grpc.ClientConn } type AttestationAdd struct { @@ -39,9 +39,9 @@ func NewAttestationAdd(cfg *AttestationAddOpts) *AttestationAdd { ActionsOpts: cfg.ActionsOpts, c: crafter.NewCrafter( crafter.WithLogger(&cfg.Logger), - crafter.WithUploader(casclient.NewUploader(cfg.ArtifacsCASConn, casclient.WithLogger(cfg.Logger), casclient.WithProgressRender(true))), + crafter.WithUploader(casclient.NewUploader(cfg.ArtifactsCASConn, casclient.WithLogger(cfg.Logger))), ), - artifactsCASConn: cfg.ArtifacsCASConn, + artifactsCASConn: cfg.ArtifactsCASConn, } } diff --git a/internal/casclient/README.md b/internal/casclient/README.md new file mode 100644 index 000000000..8d2e2359d --- /dev/null +++ b/internal/casclient/README.md @@ -0,0 +1,5 @@ +# Artifact Content Addressable Storage (CAS) Client code + +Client code used to talk to the [Artifact Storage Proxy](/app/artifact-cas/). + +It's a [bytestream gRPC client](https://pkg.go.dev/google.golang.org/api/transport/bytestream) that currently supports download by content digest (sha256) and upload methods. diff --git a/app/cli/internal/casclient/grpc/downloader.go b/internal/casclient/downloader.go similarity index 69% rename from app/cli/internal/casclient/grpc/downloader.go rename to internal/casclient/downloader.go index fac5a2729..b437d4ad9 100644 --- a/app/cli/internal/casclient/grpc/downloader.go +++ b/internal/casclient/downloader.go @@ -13,19 +13,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpc +package casclient import ( "context" "errors" "fmt" "io" - "os" - "time" v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1" "github.com/chainloop-dev/chainloop/internal/attestation/crafter/materials" - "github.com/jedib0t/go-pretty/v6/progress" "github.com/rs/zerolog" "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc" @@ -39,8 +36,8 @@ func NewDownloader(conn *grpc.ClientConn, opts ...ClientOpts) *DownloaderClient client := &DownloaderClient{ casClient: &casClient{ conn: conn, - logger: zerolog.New(os.Stderr), - progressStatus: make(chan *materials.UpDownStatus, 2), + ProgressStatus: make(chan *materials.UpDownStatus, 2), + logger: zerolog.Nop(), }, } @@ -61,11 +58,6 @@ func (c *DownloaderClient) Download(ctx context.Context, w io.Writer, digest str ctx, cancel := context.WithCancel(ctx) defer cancel() - if c.renderProgress { - go c.renderDownloadStatus(ctx, c.logger) - defer close(c.progressStatus) - } - // Open the stream to start reading chunks reader, err := bytestream.NewByteStreamClient(c.conn).Read(ctx, &bytestream.ReadRequest{ResourceName: digest}) if err != nil { @@ -96,23 +88,19 @@ func (c *DownloaderClient) Download(ctx context.Context, w io.Writer, digest str TotalSizeBytes: totalBytes, ProcessedBytes: totalDownloaded, } - if c.renderProgress { - c.progressStatus <- latestStatus + select { + case c.ProgressStatus <- latestStatus: + // message sent + default: + c.logger.Debug().Msg("nobody listening to progress updates, dropping message") } } - // Give some time for the progress renderer to finish - // TODO: Implement with proper subroutine messaging - if c.renderProgress { - time.Sleep(renderUpdateFrequency) - // Block until the buffer has been filled or the upload process has been canceled - } - return nil } // Describe returns the metadata of a resource by its digest -// We use this to get the filename and the total size of the artifacct +// We use this to get the filename and the total size of the artifact func (c *DownloaderClient) Describe(ctx context.Context, digest string) (*materials.ResourceInfo, error) { client := v1.NewResourceServiceClient(c.conn) resp, err := client.Describe(ctx, &v1.ResourceServiceDescribeRequest{Digest: digest}) @@ -124,38 +112,3 @@ func (c *DownloaderClient) Describe(ctx context.Context, digest string) (*materi Digest: resp.GetResult().GetDigest(), Filename: resp.Result.GetFileName(), Size: resp.Result.GetSize(), }, nil } - -func (c *DownloaderClient) renderDownloadStatus(ctx context.Context, output io.Writer) { - pw := progress.NewWriter() - pw.Style().Visibility.ETA = true - pw.Style().Visibility.Speed = true - pw.SetUpdateFrequency(renderUpdateFrequency) - - var tracker *progress.Tracker - go pw.Render() - defer pw.Stop() - - for { - select { - case <-ctx.Done(): - return - case s, ok := <-c.progressStatus: - if !ok { - return - } - - // Initialize tracker - if tracker == nil { - total := s.TotalSizeBytes - tracker = &progress.Tracker{ - Total: total, - Units: progress.UnitsBytes, - } - defer tracker.MarkAsDone() - pw.AppendTracker(tracker) - } - - tracker.SetValue(s.ProcessedBytes) - } - } -} diff --git a/app/cli/internal/casclient/grpc/uploader.go b/internal/casclient/uploader.go similarity index 74% rename from app/cli/internal/casclient/grpc/uploader.go rename to internal/casclient/uploader.go index cfa72b916..132f121f8 100644 --- a/app/cli/internal/casclient/grpc/uploader.go +++ b/internal/casclient/uploader.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpc +package casclient import ( "bytes" @@ -25,12 +25,10 @@ import ( "io" "os" "path" - "time" "code.cloudfoundry.org/bytefmt" v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1" "github.com/chainloop-dev/chainloop/internal/attestation/crafter/materials" - "github.com/jedib0t/go-pretty/v6/progress" "github.com/rs/zerolog" "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc" @@ -38,13 +36,12 @@ import ( cr_v1 "github.com/google/go-containerregistry/pkg/v1" ) +type ProgressStatusChan chan (*materials.UpDownStatus) type casClient struct { conn *grpc.ClientConn logger zerolog.Logger // channel to send progress status to the go-routine that's rendering the progress bar - progressStatus chan (*materials.UpDownStatus) - // wether to render progress bar - renderProgress bool + ProgressStatus ProgressStatusChan } type UploaderClient struct { *casClient @@ -53,12 +50,6 @@ type UploaderClient struct { type ClientOpts func(u *casClient) -func WithProgressRender(b bool) ClientOpts { - return func(u *casClient) { - u.renderProgress = b - } -} - func WithLogger(l zerolog.Logger) ClientOpts { return func(u *casClient) { u.logger = l @@ -71,8 +62,8 @@ func NewUploader(conn *grpc.ClientConn, opts ...ClientOpts) *UploaderClient { client := &UploaderClient{ casClient: &casClient{ conn: conn, - progressStatus: make(chan *materials.UpDownStatus, 2), // Adding some buffer - logger: zerolog.New(os.Stderr), + ProgressStatus: make(chan *materials.UpDownStatus), + logger: zerolog.Nop(), }, bufferSize: defaultUploadChunkSize, } @@ -89,12 +80,6 @@ func (c *UploaderClient) Upload(ctx context.Context, filepath string) (*material ctx, cancel := context.WithCancel(ctx) defer cancel() - // inititate progress bar - if c.renderProgress { - go c.renderUploadStatus(ctx, c.logger) - defer close(c.progressStatus) - } - // open file and calculate digest f, err := os.Open(filepath) if err != nil { @@ -104,7 +89,7 @@ func (c *UploaderClient) Upload(ctx context.Context, filepath string) (*material hash, _, err := cr_v1.SHA256(f) if err != nil { - return nil, fmt.Errorf("genering digest: %w", err) + return nil, fmt.Errorf("generating digest: %w", err) } // Since we have already iterated on the file to calculate the digest @@ -190,8 +175,11 @@ doUpload: Digest: hash.String(), TotalSizeBytes: info.Size(), ProcessedBytes: totalUploaded, } - if c.renderProgress { - c.progressStatus <- latestStatus + select { + case c.ProgressStatus <- latestStatus: + // message sent + default: + c.logger.Debug().Msg("nobody listening to progress updates, dropping message") } c.logger.Debug(). @@ -204,52 +192,9 @@ doUpload: return nil, err } - // Give some time for the progress renderer to finish - // TODO: Implement with proper subroutine messaging - if c.renderProgress { - time.Sleep(renderUpdateFrequency) - // Block until the buffer has been filled or the upload process has been canceled - } - return latestStatus, nil } -var renderUpdateFrequency = progress.DefaultUpdateFrequency - -func (c *UploaderClient) renderUploadStatus(ctx context.Context, output io.Writer) { - pw := progress.NewWriter() - pw.Style().Visibility.ETA = true - pw.Style().Visibility.Speed = true - pw.SetUpdateFrequency(renderUpdateFrequency) - - var tracker *progress.Tracker - go pw.Render() - defer pw.Stop() - - for { - select { - case <-ctx.Done(): - return - case status, ok := <-c.progressStatus: - if !ok { - return - } - - if tracker == nil { - // Hack: Add 1 to the total to make sure the tracker is not marked as done before the upload is finished - // this way the current value will never reach the total - // but instead the tracker will be marked as done by the defer statement - total := status.TotalSizeBytes + 1 - tracker = &progress.Tracker{Total: total, Units: progress.UnitsBytes} - defer tracker.MarkAsDone() - pw.AppendTracker(tracker) - } - - tracker.SetValue(status.ProcessedBytes) - } - } -} - // encodedResource returns a base64-encoded v1.UploadResource which wraps both the digest and fileName func encodeResource(fileName, digest string) (string, error) { if fileName == "" { diff --git a/app/cli/internal/casclient/grpc/uploader_test.go b/internal/casclient/uploader_test.go similarity index 99% rename from app/cli/internal/casclient/grpc/uploader_test.go rename to internal/casclient/uploader_test.go index 0b2a9a6a8..8048bfef1 100644 --- a/app/cli/internal/casclient/grpc/uploader_test.go +++ b/internal/casclient/uploader_test.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package grpc +package casclient import ( "bytes"