Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion app/artifact-cas/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Its structure contains the following top to down layers.

## System Dependencies

The CAS proxy **has only one running dependency**. A secret storage backend to retrieve the OCI repository credentials. Currently we support both [Hashicorp Vault](https://www.vaultproject.io/) and [AWS Secret Manager](https://aws.amazon.com/secrets-manager/).
The CAS proxy **has only one running dependency**. A secret storage backend to retrieve the OCI repository credentials. Currently, we support both [Hashicorp Vault](https://www.vaultproject.io/) and [AWS Secret Manager](https://aws.amazon.com/secrets-manager/).

This secret backend is used to download OCI repository credentials (repository path + key pair) during upload/downloads. This makes the Artifact CAS multi-tenant by default since the destination OCI backend gets selected at runtime.

Expand All @@ -34,6 +34,10 @@ The token gets signed by the control plane with a private key and verified by th

Note: there are plans to support [JWKS endpoints](https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-key-sets) to enable easy rotation of credentials.

## Client

The client code can be found [here](/internal/casclient/)

## Runbook

We use `make` for most development tasks. Run `make -C app/artifact-cas` to see a list of the available tasks.
Expand Down
2 changes: 1 addition & 1 deletion app/cli/cmd/attestation_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func newAttestationAddCmd() *cobra.Command {
},
RunE: func(cmd *cobra.Command, args []string) error {
a := action.NewAttestationAdd(
&action.AttestationAddOpts{ActionsOpts: actionOpts, ArtifacsCASConn: artifactCASConn},
&action.AttestationAddOpts{ActionsOpts: actionOpts, ArtifactsCASConn: artifactCASConn},
)

err := a.Run(name, value)
Expand Down
50 changes: 48 additions & 2 deletions app/cli/internal/action/artifact_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"io"
"os"
"path"
"time"

casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc"
"github.com/chainloop-dev/chainloop/internal/casclient"
"github.com/jedib0t/go-pretty/v6/progress"
"google.golang.org/grpc"

crv1 "github.com/google/go-containerregistry/pkg/v1"
Expand All @@ -50,7 +52,7 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error {
return fmt.Errorf("invalid digest: %w", err)
}

client := casclient.NewDownloader(a.artifactsCASConn, casclient.WithLogger(a.Logger), casclient.WithProgressRender(true))
client := casclient.NewDownloader(a.artifactsCASConn)
ctx := context.Background()
info, err := client.Describe(ctx, h.Hex)
if err != nil {
Expand All @@ -77,6 +79,11 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error {
w := io.MultiWriter(f, hash)

a.Logger.Info().Str("name", info.Filename).Str("to", downloadPath).Msg("downloading file")

// render progress bar
go renderOperationStatus(ctx, client.ProgressStatus, a.Logger)
defer close(client.ProgressStatus)

err = client.Download(ctx, w, h.Hex, info.Size)
if err != nil {
a.Logger.Debug().Err(err).Msg("problem downloading file")
Expand All @@ -87,7 +94,46 @@ func (a *ArtifactDownload) Run(downloadPath, digest string) error {
return fmt.Errorf("checksums mismatch: got: %s, expected: %s", got, want)
}

// Give some time for the progress renderer to finish
// TODO: Implement with proper subroutine messaging
time.Sleep(progress.DefaultUpdateFrequency)

a.Logger.Info().Str("path", downloadPath).Msg("file downloaded!")

return nil
}

func renderOperationStatus(ctx context.Context, progressChan casclient.ProgressStatusChan, output io.Writer) {
pw := progress.NewWriter()
pw.Style().Visibility.ETA = true
pw.Style().Visibility.Speed = true
pw.SetUpdateFrequency(progress.DefaultUpdateFrequency)

var tracker *progress.Tracker
go pw.Render()
defer pw.Stop()

for {
select {
case <-ctx.Done():
return
case status, ok := <-progressChan:
if !ok {
return
}

// Initialize tracker
if tracker == nil {
// Hack: Add 1 to the total to make sure the tracker is not marked as done before the upload is finished
// this way the current value will never reach the total
// but instead the tracker will be marked as done by the defer statement
total := status.TotalSizeBytes + 1
tracker = &progress.Tracker{Total: total, Units: progress.UnitsBytes}
defer tracker.MarkAsDone()
pw.AppendTracker(tracker)
}

tracker.SetValue(status.ProcessedBytes)
}
}
}
14 changes: 12 additions & 2 deletions app/cli/internal/action/artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package action

import (
"context"
"time"

casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc"
"github.com/chainloop-dev/chainloop/internal/casclient"
"github.com/jedib0t/go-pretty/v6/progress"
"google.golang.org/grpc"
)

Expand All @@ -41,11 +43,19 @@ func NewArtifactUpload(opts *ArtifactUploadOpts) *ArtifactUpload {
}

func (a *ArtifactUpload) Run(filePath string) (*CASArtifact, error) {
client := casclient.NewUploader(a.artifactsCASConn, casclient.WithLogger(a.Logger), casclient.WithProgressRender(true))
client := casclient.NewUploader(a.artifactsCASConn, casclient.WithLogger(a.Logger))
// render progress bar
go renderOperationStatus(context.Background(), client.ProgressStatus, a.Logger)
defer close(client.ProgressStatus)

res, err := client.Upload(context.Background(), filePath)
if err != nil {
return nil, err
}

// Give some time for the progress renderer to finish
// TODO: Implement with proper subroutine messaging
time.Sleep(progress.DefaultUpdateFrequency)

return &CASArtifact{Digest: res.Digest, fileName: res.Filename}, nil
}
8 changes: 4 additions & 4 deletions app/cli/internal/action/attestation_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package action
import (
"errors"

casclient "github.com/chainloop-dev/chainloop/app/cli/internal/casclient/grpc"
"github.com/chainloop-dev/chainloop/internal/attestation/crafter"
"github.com/chainloop-dev/chainloop/internal/casclient"
"google.golang.org/grpc"
)

type AttestationAddOpts struct {
*ActionsOpts
ArtifacsCASConn *grpc.ClientConn
ArtifactsCASConn *grpc.ClientConn
}

type AttestationAdd struct {
Expand All @@ -39,9 +39,9 @@ func NewAttestationAdd(cfg *AttestationAddOpts) *AttestationAdd {
ActionsOpts: cfg.ActionsOpts,
c: crafter.NewCrafter(
crafter.WithLogger(&cfg.Logger),
crafter.WithUploader(casclient.NewUploader(cfg.ArtifacsCASConn, casclient.WithLogger(cfg.Logger), casclient.WithProgressRender(true))),
crafter.WithUploader(casclient.NewUploader(cfg.ArtifactsCASConn, casclient.WithLogger(cfg.Logger))),
),
artifactsCASConn: cfg.ArtifacsCASConn,
artifactsCASConn: cfg.ArtifactsCASConn,
}
}

Expand Down
5 changes: 5 additions & 0 deletions internal/casclient/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Artifact Content Addressable Storage (CAS) Client code

Client code used to talk to the [Artifact Storage Proxy](/app/artifact-cas/).

It's a [bytestream gRPC client](https://pkg.go.dev/google.golang.org/api/transport/bytestream) that currently supports download by content digest (sha256) and upload methods.
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc
package casclient

import (
"context"
"errors"
"fmt"
"io"
"os"
"time"

v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1"
"github.com/chainloop-dev/chainloop/internal/attestation/crafter/materials"
"github.com/jedib0t/go-pretty/v6/progress"
"github.com/rs/zerolog"
"google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc"
Expand All @@ -39,8 +36,8 @@ func NewDownloader(conn *grpc.ClientConn, opts ...ClientOpts) *DownloaderClient
client := &DownloaderClient{
casClient: &casClient{
conn: conn,
logger: zerolog.New(os.Stderr),
progressStatus: make(chan *materials.UpDownStatus, 2),
ProgressStatus: make(chan *materials.UpDownStatus, 2),
logger: zerolog.Nop(),
},
}

Expand All @@ -61,11 +58,6 @@ func (c *DownloaderClient) Download(ctx context.Context, w io.Writer, digest str
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if c.renderProgress {
go c.renderDownloadStatus(ctx, c.logger)
defer close(c.progressStatus)
}

// Open the stream to start reading chunks
reader, err := bytestream.NewByteStreamClient(c.conn).Read(ctx, &bytestream.ReadRequest{ResourceName: digest})
if err != nil {
Expand Down Expand Up @@ -96,23 +88,19 @@ func (c *DownloaderClient) Download(ctx context.Context, w io.Writer, digest str
TotalSizeBytes: totalBytes, ProcessedBytes: totalDownloaded,
}

if c.renderProgress {
c.progressStatus <- latestStatus
select {
case c.ProgressStatus <- latestStatus:
// message sent
default:
c.logger.Debug().Msg("nobody listening to progress updates, dropping message")
}
}

// Give some time for the progress renderer to finish
// TODO: Implement with proper subroutine messaging
if c.renderProgress {
time.Sleep(renderUpdateFrequency)
// Block until the buffer has been filled or the upload process has been canceled
}

return nil
}

// Describe returns the metadata of a resource by its digest
// We use this to get the filename and the total size of the artifacct
// We use this to get the filename and the total size of the artifact
func (c *DownloaderClient) Describe(ctx context.Context, digest string) (*materials.ResourceInfo, error) {
client := v1.NewResourceServiceClient(c.conn)
resp, err := client.Describe(ctx, &v1.ResourceServiceDescribeRequest{Digest: digest})
Expand All @@ -124,38 +112,3 @@ func (c *DownloaderClient) Describe(ctx context.Context, digest string) (*materi
Digest: resp.GetResult().GetDigest(), Filename: resp.Result.GetFileName(), Size: resp.Result.GetSize(),
}, nil
}

func (c *DownloaderClient) renderDownloadStatus(ctx context.Context, output io.Writer) {
pw := progress.NewWriter()
pw.Style().Visibility.ETA = true
pw.Style().Visibility.Speed = true
pw.SetUpdateFrequency(renderUpdateFrequency)

var tracker *progress.Tracker
go pw.Render()
defer pw.Stop()

for {
select {
case <-ctx.Done():
return
case s, ok := <-c.progressStatus:
if !ok {
return
}

// Initialize tracker
if tracker == nil {
total := s.TotalSizeBytes
tracker = &progress.Tracker{
Total: total,
Units: progress.UnitsBytes,
}
defer tracker.MarkAsDone()
pw.AppendTracker(tracker)
}

tracker.SetValue(s.ProcessedBytes)
}
}
}
Loading