Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artifact up/download to/from Azure Blob Storage #2318

Merged
merged 6 commits into from Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
108 changes: 63 additions & 45 deletions agent/artifact_downloader.go
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/aws/aws-sdk-go/service/s3"
"github.com/buildkite/agent/v3/api"
iartifact "github.com/buildkite/agent/v3/internal/artifact"
"github.com/buildkite/agent/v3/logger"
"github.com/buildkite/agent/v3/pool"
)
Expand Down Expand Up @@ -57,14 +58,14 @@ func NewArtifactDownloader(l logger.Logger, ac APIClient, c ArtifactDownloaderCo

func (a *ArtifactDownloader) Download(ctx context.Context) error {
// Turn the download destination into an absolute path and confirm it exists
downloadDestination, _ := filepath.Abs(a.conf.Destination)
fileInfo, err := os.Stat(downloadDestination)
destination, _ := filepath.Abs(a.conf.Destination)
fileInfo, err := os.Stat(destination)
if err != nil {
return fmt.Errorf("Could not find information about destination: %s %v",
downloadDestination, err)
destination, err)
}
if !fileInfo.IsDir() {
return fmt.Errorf("%s is not a directory", downloadDestination)
return fmt.Errorf("%s is not a directory", destination)
}

artifacts, err := NewArtifactSearcher(a.logger, a.apiClient, a.conf.BuildID).
Expand All @@ -79,7 +80,7 @@ func (a *ArtifactDownloader) Download(ctx context.Context) error {
return errors.New("No artifacts found for downloading")
}

a.logger.Info("Found %d artifacts. Starting to download to: %s", artifactCount, downloadDestination)
a.logger.Info("Found %d artifacts. Starting to download to: %s", artifactCount, destination)

p := pool.New(pool.MaxConcurrencyLimit)
errors := []error{}
Expand All @@ -101,46 +102,7 @@ func (a *ArtifactDownloader) Download(ctx context.Context) error {
path = strings.Replace(path, `\`, `/`, -1)
}

// Handle downloading from S3, GS, or RT
var dler interface {
Start(context.Context) error
}
switch {
case strings.HasPrefix(artifact.UploadDestination, "s3://"):
bucketName, _ := ParseS3Destination(artifact.UploadDestination)
dler = NewS3Downloader(a.logger, S3DownloaderConfig{
S3Client: s3Clients[bucketName],
Path: path,
S3Path: artifact.UploadDestination,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
case strings.HasPrefix(artifact.UploadDestination, "gs://"):
dler = NewGSDownloader(a.logger, GSDownloaderConfig{
Path: path,
Bucket: artifact.UploadDestination,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
case strings.HasPrefix(artifact.UploadDestination, "rt://"):
dler = NewArtifactoryDownloader(a.logger, ArtifactoryDownloaderConfig{
Path: path,
Repository: artifact.UploadDestination,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
default:
dler = NewDownload(a.logger, http.DefaultClient, DownloadConfig{
URL: artifact.URL,
Path: path,
Destination: downloadDestination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
}
dler := a.createDownloader(artifact, path, destination, s3Clients)

// If the downloaded encountered an error, lock
// the pool, collect it, then unlock the pool
Expand Down Expand Up @@ -188,3 +150,59 @@ func (a *ArtifactDownloader) generateS3Clients(artifacts []*api.Artifact) (map[s

return s3Clients, nil
}

type downloader interface {
Start(context.Context) error
}

func (a *ArtifactDownloader) createDownloader(artifact *api.Artifact, path, destination string, s3Clients map[string]*s3.S3) downloader {
// Handle downloading from S3, GS, RT, or Azure
switch {
case strings.HasPrefix(artifact.UploadDestination, "s3://"):
bucketName, _ := ParseS3Destination(artifact.UploadDestination)
return NewS3Downloader(a.logger, S3DownloaderConfig{
S3Client: s3Clients[bucketName],
Path: path,
S3Path: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

case strings.HasPrefix(artifact.UploadDestination, "gs://"):
return NewGSDownloader(a.logger, GSDownloaderConfig{
Path: path,
Bucket: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

case strings.HasPrefix(artifact.UploadDestination, "rt://"):
return NewArtifactoryDownloader(a.logger, ArtifactoryDownloaderConfig{
Path: path,
Repository: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

case iartifact.IsAzureBlobPath(artifact.UploadDestination):
return iartifact.NewAzureBlobDownloader(a.logger, iartifact.AzureBlobDownloaderConfig{
Path: path,
Repository: artifact.UploadDestination,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})

default:
return NewDownload(a.logger, http.DefaultClient, DownloadConfig{
URL: artifact.URL,
Path: path,
Destination: destination,
Retries: 5,
DebugHTTP: a.conf.DebugHTTP,
})
}
}
77 changes: 48 additions & 29 deletions agent/artifact_uploader.go
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/internal/artifact"
"github.com/buildkite/agent/v3/internal/experiments"
"github.com/buildkite/agent/v3/internal/mime"
"github.com/buildkite/agent/v3/logger"
Expand Down Expand Up @@ -246,41 +247,59 @@ func (a *ArtifactUploader) build(path string, absolutePath string, globPath stri
return artifact, nil
}

func (a *ArtifactUploader) upload(ctx context.Context, artifacts []*api.Artifact) error {
var uploader Uploader
var err error

// Determine what uploader to use
if a.conf.Destination != "" {
if strings.HasPrefix(a.conf.Destination, "s3://") {
uploader, err = NewS3Uploader(a.logger, S3UploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})
} else if strings.HasPrefix(a.conf.Destination, "gs://") {
uploader, err = NewGSUploader(a.logger, GSUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})
} else if strings.HasPrefix(a.conf.Destination, "rt://") {
uploader, err = NewArtifactoryUploader(a.logger, ArtifactoryUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})
} else {
return fmt.Errorf("invalid upload destination: '%v'. Only s3://, gs:// or rt:// upload schemes are allowed. Did you forget to surround your artifact upload pattern in double quotes?", a.conf.Destination)
// createUploader applies some heuristics to the destination to infer which
// uploader to use.
func (a *ArtifactUploader) createUploader() (uploader Uploader, err error) {
var dest string
defer func() {
if err != nil || dest == "" {
return
}
a.logger.Info("Uploading to %s (%q), using your agent configuration", dest, a.conf.Destination)
}()

a.logger.Info("Uploading to %q, using your agent configuration", a.conf.Destination)
} else {
uploader = NewFormUploader(a.logger, FormUploaderConfig{
switch {
case a.conf.Destination == "":
a.logger.Info("Uploading to default Buildkite artifact storage")
return NewFormUploader(a.logger, FormUploaderConfig{
DebugHTTP: a.conf.DebugHTTP,
}), nil

case strings.HasPrefix(a.conf.Destination, "s3://"):
dest = "Amazon S3"
return NewS3Uploader(a.logger, S3UploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})

a.logger.Info("Uploading to default Buildkite artifact storage")
case strings.HasPrefix(a.conf.Destination, "gs://"):
dest = "Google Cloud Storage"
return NewGSUploader(a.logger, GSUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})

case strings.HasPrefix(a.conf.Destination, "rt://"):
dest = "Artifactory"
return NewArtifactoryUploader(a.logger, ArtifactoryUploaderConfig{
Destination: a.conf.Destination,
DebugHTTP: a.conf.DebugHTTP,
})

case artifact.IsAzureBlobPath(a.conf.Destination):
dest = "Azure Blob storage"
return artifact.NewAzureBlobUploader(a.logger, artifact.AzureBlobUploaderConfig{
Destination: a.conf.Destination,
})

default:
return nil, fmt.Errorf("invalid upload destination: '%v'. Only s3://*, gs://*, rt://*, or https://*.blob.core.windows.net destinations are allowed. Did you forget to surround your artifact upload pattern in double quotes?", a.conf.Destination)
}
}

// Check if creation caused an error
func (a *ArtifactUploader) upload(ctx context.Context, artifacts []*api.Artifact) error {
// Determine what uploader to use
uploader, err := a.createUploader()
if err != nil {
return fmt.Errorf("creating uploader: %v", err)
}
Expand Down Expand Up @@ -413,7 +432,7 @@ func (a *ArtifactUploader) upload(ctx context.Context, artifacts []*api.Artifact
roko.WithMaxAttempts(10),
roko.WithStrategy(roko.Constant(5*time.Second)),
).DoWithContext(ctx, func(r *roko.Retrier) error {
if err := uploader.Upload(artifact); err != nil {
if err := uploader.Upload(ctx, artifact); err != nil {
a.logger.Warn("%s (%s)", err, r)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion agent/artifactory_uploader.go
@@ -1,6 +1,7 @@
package agent

import (
"context"
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (u *ArtifactoryUploader) URL(artifact *api.Artifact) string {
return url.String()
}

func (u *ArtifactoryUploader) Upload(artifact *api.Artifact) error {
func (u *ArtifactoryUploader) Upload(_ context.Context, artifact *api.Artifact) error {
// Open file from filesystem
u.logger.Debug("Reading file \"%s\"", artifact.AbsolutePath)
f, err := os.Open(artifact.AbsolutePath)
Expand Down
3 changes: 2 additions & 1 deletion agent/form_uploader.go
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"bytes"
"context"
_ "crypto/sha512" // import sha512 to make sha512 ssl certs work
"fmt"
"io"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (u *FormUploader) URL(artifact *api.Artifact) string {
return ""
}

func (u *FormUploader) Upload(artifact *api.Artifact) error {
func (u *FormUploader) Upload(_ context.Context, artifact *api.Artifact) error {
if artifact.FileSize > maxFormUploadedArtifactSize {
return errArtifactTooLarge{Size: artifact.FileSize}
}
Expand Down
11 changes: 8 additions & 3 deletions agent/form_uploader_test.go
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -16,6 +17,8 @@ import (
)

func TestFormUploading(t *testing.T) {
ctx := context.Background()

server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/buildkiteartifacts.com":
Expand Down Expand Up @@ -106,7 +109,7 @@ func TestFormUploading(t *testing.T) {
}},
}

if err := uploader.Upload(artifact); err != nil {
if err := uploader.Upload(ctx, artifact); err != nil {
t.Errorf("uploader.Upload(artifact) = %v", err)
}
}
Expand All @@ -117,6 +120,7 @@ func TestFormUploading(t *testing.T) {
}

func TestFormUploadFileMissing(t *testing.T) {
ctx := context.Background()
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
http.Error(rw, "Not found", http.StatusNotFound)
}))
Expand Down Expand Up @@ -154,12 +158,13 @@ func TestFormUploadFileMissing(t *testing.T) {
}},
}

if err := uploader.Upload(artifact); !os.IsNotExist(err) {
if err := uploader.Upload(ctx, artifact); !os.IsNotExist(err) {
t.Errorf("uploader.Upload(artifact) = %v, want os.ErrNotExist", err)
}
}

func TestFormUploadTooBig(t *testing.T) {
ctx := context.Background()
uploader := NewFormUploader(logger.Discard, FormUploaderConfig{})
const size = int64(6442450944) // 6Gb
artifact := &api.Artifact{
Expand All @@ -172,7 +177,7 @@ func TestFormUploadTooBig(t *testing.T) {
UploadInstructions: &api.ArtifactUploadInstructions{},
}

if err := uploader.Upload(artifact); !errors.Is(err, errArtifactTooLarge{Size: size}) {
if err := uploader.Upload(ctx, artifact); !errors.Is(err, errArtifactTooLarge{Size: size}) {
t.Errorf("uploader.Upload(artifact) = %v, want errArtifactTooLarge", err)
}
}
2 changes: 1 addition & 1 deletion agent/gs_uploader.go
Expand Up @@ -119,7 +119,7 @@ func (u *GSUploader) URL(artifact *api.Artifact) string {
return artifactURL.String()
}

func (u *GSUploader) Upload(artifact *api.Artifact) error {
func (u *GSUploader) Upload(_ context.Context, artifact *api.Artifact) error {
permission := os.Getenv("BUILDKITE_GS_ACL")

// The dirtiest validation method ever...
Expand Down
3 changes: 2 additions & 1 deletion agent/s3_uploader.go
@@ -1,6 +1,7 @@
package agent

import (
"context"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (u *S3Uploader) URL(artifact *api.Artifact) string {
return url.String()
}

func (u *S3Uploader) Upload(artifact *api.Artifact) error {
func (u *S3Uploader) Upload(_ context.Context, artifact *api.Artifact) error {

permission, err := u.resolvePermission()
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion agent/uploader.go
@@ -1,6 +1,8 @@
package agent

import (
"context"

"github.com/buildkite/agent/v3/api"
)

Expand All @@ -10,5 +12,5 @@ type Uploader interface {
URL(*api.Artifact) string

// The actual uploading of the file
Upload(*api.Artifact) error
Upload(context.Context, *api.Artifact) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay!

}
3 changes: 3 additions & 0 deletions clicommand/global.go
Expand Up @@ -103,6 +103,9 @@ var RedactedVars = cli.StringSliceFlag{
"*_PRIVATE_KEY",
"*_ACCESS_KEY",
"*_SECRET_KEY",
// Connection strings frequently contain passwords, e.g.
// https://user:pass@host/ or Server=foo;Database=my-db;User Id=user;Password=pass;
"*_CONNECTION_STRING",
DrJosh9000 marked this conversation as resolved.
Show resolved Hide resolved
},
}

Expand Down