Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(casbackends): support for s3 compatible endpoints (minio, cloudflare R2, ...) #1055

Merged
merged 4 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions app/cli/cmd/casbackend_add_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
package cmd

import (
"fmt"

"github.com/chainloop-dev/chainloop/app/cli/internal/action"
"github.com/chainloop-dev/chainloop/pkg/blobmanager/s3"
"github.com/go-kratos/kratos/v2/log"
"github.com/spf13/cobra"
)

func newCASBackendAddAWSS3Cmd() *cobra.Command {
var bucketName, accessKeyID, secretAccessKey, region string
var bucketName, accessKeyID, secretAccessKey, region, endpoint string
cmd := &cobra.Command{
Use: "aws-s3",
Short: "Register a AWS S3 storage bucket",
Expand All @@ -46,9 +48,15 @@ func newCASBackendAddAWSS3Cmd() *cobra.Command {
}
}

location := bucketName
// If there is a custom endpoint we want to store it as part of the fqdn location
if endpoint != "" {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason of storing the endpoint in the location instead of the credentials is so it's shown to the user and can't be updated later on.

location = fmt.Sprintf("%s/%s", endpoint, bucketName)
}

opts := &action.NewCASBackendAddOpts{
Name: name,
Location: bucketName,
Location: location,
Provider: s3.ProviderID,
Description: description,
Credentials: map[string]any{
Expand Down Expand Up @@ -83,8 +91,9 @@ func newCASBackendAddAWSS3Cmd() *cobra.Command {
cobra.CheckErr(err)

cmd.Flags().StringVar(&region, "region", "", "AWS region for the bucket")
err = cmd.MarkFlagRequired("region")
cobra.CheckErr(err)

cmd.Flags().StringVar(&endpoint, "endpoint", "", "Custom Endpoint URL for other S3 compatible backends i.e MinIO")

return cmd
}
131 changes: 96 additions & 35 deletions pkg/blobmanager/s3/backend.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 The Chainloop Authors.
// Copyright 2024 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -40,28 +41,16 @@ const (
)

type Backend struct {
client *s3.S3
bucket string
client *s3.S3
bucket string
customEndpoint string
}

var _ backend.UploaderDownloader = (*Backend)(nil)

type ConnOpt func(*aws.Config)
const defaultRegion = "us-east-1"

// Optional endpoint configuration
func WithEndpoint(endpoint string) ConnOpt {
return func(cfg *aws.Config) {
cfg.Endpoint = aws.String(endpoint)
}
}

func WithForcedS3PathStyle(force bool) ConnOpt {
return func(cfg *aws.Config) {
cfg.S3ForcePathStyle = aws.Bool(force)
}
}

func NewBackend(creds *Credentials, connOpts ...ConnOpt) (*Backend, error) {
func NewBackend(creds *Credentials) (*Backend, error) {
if creds == nil {
return nil, errors.New("credentials cannot be nil")
}
Expand All @@ -70,11 +59,27 @@ func NewBackend(creds *Credentials, connOpts ...ConnOpt) (*Backend, error) {
return nil, fmt.Errorf("invalid credentials: %w", err)
}

// Set a default region if not provided
var region = defaultRegion
if creds.Region != "" {
region = creds.Region
}

c := credentials.NewStaticCredentials(creds.AccessKeyID, creds.SecretAccessKey, "")
// Configure AWS session
cfg := &aws.Config{Credentials: c, Region: aws.String(creds.Region)}
for _, opt := range connOpts {
opt(cfg)
cfg := &aws.Config{Credentials: c, Region: aws.String(region)}

// Bucket might contain the not only the bucket name but also the endpoint
endpoint, bucket, err := extractLocationAndBucket(creds)
if err != nil {
return nil, fmt.Errorf("failed to parse bucket name: %w", err)
}

// we have a custom endpoint
// in some cases the server-side checksum verification is not supported like in the case of cloudflare r2
if endpoint != "" {
cfg.Endpoint = aws.String(endpoint)
cfg.S3ForcePathStyle = aws.Bool(true)
}

session, err := session.NewSession(cfg)
Expand All @@ -83,11 +88,55 @@ func NewBackend(creds *Credentials, connOpts ...ConnOpt) (*Backend, error) {
}

return &Backend{
client: s3.New(session),
bucket: creds.BucketName,
client: s3.New(session),
bucket: bucket,
customEndpoint: endpoint,
}, nil
}

// For now we are aware that the checksum verification is not supported by cloudflare r2
// https://developers.cloudflare.com/r2/api/s3/api/
func (b *Backend) checksumVerificationEnabled() bool {
var enabled = true
if b.customEndpoint != "" && strings.Contains(b.customEndpoint, "r2.cloudflarestorage.com") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloudflare doesn't support this feature.

enabled = false
}

return enabled
}

// Extract the custom endpoint and the bucket name from the location string
// The location string can be either a bucket name or a URL
// i.e bucket-name or https://custom-domain/bucket-name
func extractLocationAndBucket(creds *Credentials) (string, string, error) {
// Older versions of the credentials didn't have the location field
// and just the bucket name was stored in the bucket name field
if creds.BucketName != "" {
return "", creds.BucketName, nil
}

// Newer versions of the credentials have the location field which can contain the endpoint
// so we override the bucket and set the endpoint if needed
parsedLocation, err := url.Parse(creds.Location)
if err != nil {
return "", "", fmt.Errorf("failed to parse location: %w", err)
}

host := parsedLocation.Host
// It's a bucket name
if host == "" {
return "", creds.Location, nil
}

endpoint := fmt.Sprintf("%s://%s", parsedLocation.Scheme, host)
// It's a URL, extract bucket name from the path
if pathSegments := strings.Split(parsedLocation.Path, "/"); len(pathSegments) > 1 {
return endpoint, pathSegments[1], nil
}

return "", "", fmt.Errorf("the location doesn't contain a bucket name")
}

// Exists check that the artifact is already present in the repository
func (b *Backend) Exists(ctx context.Context, digest string) (bool, error) {
_, err := b.Describe(ctx, digest)
Expand All @@ -100,29 +149,41 @@ func (b *Backend) Exists(ctx context.Context, digest string) (bool, error) {

func (b *Backend) Upload(ctx context.Context, r io.Reader, resource *pb.CASResource) error {
uploader := s3manager.NewUploaderWithClient(b.client)

_, err := uploader.UploadWithContext(ctx, &s3manager.UploadInput{
input := &s3manager.UploadInput{
Bucket: aws.String(b.bucket),
Key: aws.String(resourceName(resource.Digest)),
Body: r,
// Check that the object is uploaded correctly
ChecksumSHA256: aws.String(hexSha256ToBinaryB64(resource.Digest)),
Metadata: map[string]*string{
annotationNameAuthor: aws.String(backend.AuthorAnnotation),
annotationNameFilename: aws.String(resource.FileName),
},
})
}

return err
if b.checksumVerificationEnabled() {
// Check that the object is uploaded correctly
input.ChecksumSHA256 = aws.String(hexSha256ToBinaryB64(resource.Digest))
}

if _, err := uploader.UploadWithContext(ctx, input); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on another topic, does this do streaming already?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying library does, the service layer that servers bytestream does not.

return fmt.Errorf("failed to upload to bucket: %w", err)
}

return nil
}

func (b *Backend) Describe(ctx context.Context, digest string) (*pb.CASResource, error) {
// and read the object back + validate integrity
resp, err := b.client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(resourceName(digest)),
ChecksumMode: aws.String("ENABLED"),
})
input := &s3.HeadObjectInput{
Bucket: aws.String(b.bucket),
Key: aws.String(resourceName(digest)),
}

if b.checksumVerificationEnabled() {
// Enable checksum verification
input.ChecksumMode = aws.String("ENABLED")
}

// and read the object back
resp, err := b.client.HeadObjectWithContext(ctx, input)

// check error is aws error
var awsErr awserr.Error
Expand Down
106 changes: 102 additions & 4 deletions pkg/blobmanager/s3/backend_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2023 The Chainloop Authors.
// Copyright 2024 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -129,6 +129,101 @@ func (s *testSuite) TestDescribe() {
s.Equal(int64(4), artifact.Size)
})
}
func (s *testSuite) TestChecksumVerificationEnabled() {
testCases := []struct {
name string
customEndpoint string
expected bool
}{
{
name: "no endpoint, a.k.a AWS",
customEndpoint: "",
expected: true,
},
{
name: "custom endpoint, i.e minio",
customEndpoint: s.minio.ConnectionString(s.T()),
expected: true,
},
{
name: "custom endpoint",
customEndpoint: "https://123.r2.cloudflarestorage.com/bucket-name",
expected: false,
},
}

for _, tc := range testCases {
s.Run(tc.name, func() {
b := &Backend{customEndpoint: tc.customEndpoint}
s.Equal(tc.expected, b.checksumVerificationEnabled())
})
}
}

func (s *testSuite) TestExtractLocationAndBucket() {
type expected struct {
endpoint string
bucket string
err string
}

testCases := []struct {
name string
creds *Credentials
expected *expected
}{
{
name: "no location",
creds: &Credentials{
BucketName: "bucket",
},
expected: &expected{
bucket: "bucket",
},
},
{
name: "location is a bucket name",
creds: &Credentials{
Location: "bucket",
},
expected: &expected{
bucket: "bucket",
},
},
{
name: "location is a URL",
creds: &Credentials{
Location: "https://custom-domain/bucket",
},
expected: &expected{
endpoint: "https://custom-domain",
bucket: "bucket",
},
},
{
name: "invalid URL",
creds: &Credentials{
Location: "https://custom-domain",
},
expected: &expected{
err: "doesn't contain a bucket name",
},
},
}

for _, tc := range testCases {
s.Run(tc.name, func() {
endpoint, bucket, err := extractLocationAndBucket(tc.creds)
if tc.expected.err != "" {
s.ErrorContains(err, tc.expected.err)
} else {
s.NoError(err)
s.Equal(tc.expected.endpoint, endpoint)
s.Equal(tc.expected.bucket, bucket)
}
})
}
}

func (s *testSuite) TestDownload() {
s.T().Run("exist but not uploaded by Chainloop", func(t *testing.T) {
Expand Down Expand Up @@ -183,14 +278,15 @@ const testBucket = "test-bucket"

func (s *testSuite) SetupTest() {
s.minio = newMinioInstance(s.T())
location := fmt.Sprintf("http://%s/%s", s.minio.ConnectionString(s.T()), testBucket)

// Create backend
backend, err := NewBackend(&Credentials{
AccessKeyID: "root",
SecretAccessKey: "test-password",
Region: "us-east-1",
BucketName: testBucket,
}, WithEndpoint(fmt.Sprintf("http://%s", s.minio.ConnectionString(s.T()))), WithForcedS3PathStyle(true))
Location: location,
})
require.NoError(s.T(), err)
s.backend = backend

Expand All @@ -199,7 +295,9 @@ func (s *testSuite) SetupTest() {
SecretAccessKey: "wrong-password",
Region: "us-east-1",
BucketName: testBucket,
}, WithEndpoint(fmt.Sprintf("http://%s", s.minio.ConnectionString(s.T()))), WithForcedS3PathStyle(true))
Location: location,
})

require.NoError(s.T(), err)
s.invalidBackend = invalidBackend

Expand Down
Loading
Loading