Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace minio-go with aws-sdk-go for s3-compatible log backend #670

Merged
merged 4 commits into from
Jan 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
481 changes: 425 additions & 56 deletions Gopkg.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ ignored = ["github.com/fnproject/fn/cli"]
branch = "master"
name = "github.com/google/btree"

[[constraint]]
name = "github.com/minio/minio-go"
version = "4.0.1"

[[constraint]]
name = "github.com/openzipkin/zipkin-go-opentracing"
version = "0.3.1"
Expand Down Expand Up @@ -103,4 +99,8 @@ ignored = ["github.com/fnproject/fn/cli"]

[[constraint]]
name = "github.com/prometheus/common"
revision = "2f17f4a9d485bf34b4bfaccc273805040e4f86c8"
revision = "2f17f4a9d485bf34b4bfaccc273805040e4f86c8"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "~1.12.59"
138 changes: 101 additions & 37 deletions api/logs/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@
package s3

import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/fnproject/fn/api/models"
"github.com/minio/minio-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/sirupsen/logrus"
)

Expand All @@ -25,11 +34,45 @@ const (
)

type store struct {
client *minio.Client
bucket string
client *s3.S3
uploader *s3manager.Uploader
downloader *s3manager.Downloader
bucket string
}

// s3://access_key_id:secret_access_key@host/location/bucket_name?ssl=true
// decorator around the Reader interface that keeps track of the number of bytes read
// in order to avoid double buffering and track Reader size
type countingReader struct {
r io.Reader
count int
}

func (cr *countingReader) Read(p []byte) (n int, err error) {
n, err = cr.r.Read(p)
cr.count += n
return n, err
}

func createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey string, useSSL bool) *store {
config := &aws.Config{
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
Endpoint: aws.String(endpoint),
Region: aws.String(region),
DisableSSL: aws.Bool(!useSSL),
S3ForcePathStyle: aws.Bool(true),
}
session := session.Must(session.NewSession(config))
client := s3.New(session)

return &store{
client: client,
uploader: s3manager.NewUploaderWithClient(client),
downloader: s3manager.NewDownloaderWithClient(client),
bucket: bucketName,
}
}

// s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true
// Note that access_key_id and secret_access_key must be URL encoded if they contain unsafe characters!
func New(u *url.URL) (models.LogStore, error) {
endpoint := u.Host
Expand All @@ -45,37 +88,33 @@ func New(u *url.URL) (models.LogStore, error) {
if len(strs) < 3 {
return nil, errors.New("must provide bucket name and region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
}
location := strs[1]
region := strs[1]
bucketName := strs[2]
if location == "" {
return nil, errors.New("must provide non-empty location in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
if region == "" {
return nil, errors.New("must provide non-empty region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
} else if bucketName == "" {
return nil, errors.New("must provide non-empty bucket name in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
}

logrus.WithFields(logrus.Fields{"bucketName": bucketName, "location": location, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")

client, err := minio.NewWithRegion(endpoint, accessKeyID, secretAccessKey, useSSL, location)
if err != nil {
return nil, err
}
logrus.WithFields(logrus.Fields{"bucketName": bucketName, "region": region, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")
store := createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey, useSSL)

// ensure the bucket exists, creating if it does not
err = client.MakeBucket(bucketName, location)
if errMake := err; err != nil {
// Check to see if we already own this bucket (which happens if you run this twice)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, err
} else if !exists {
return nil, errors.New("could not create bucket and bucket does not exist, please check permissions: " + errMake.Error())
_, err := store.client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucketName)})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyOwnedByYou, s3.ErrCodeBucketAlreadyExists:
// bucket already exists, NO-OP
default:
return nil, fmt.Errorf("failed to create bucket %s: %s", bucketName, aerr.Message())
}
} else {
return nil, fmt.Errorf("unexpected error creating bucket %s: %s", bucketName, err.Error())
}
}

return &store{
client: client,
bucket: bucketName,
}, nil
return store, nil
}

func path(appName, callID string) string {
Expand All @@ -85,26 +124,51 @@ func path(appName, callID string) string {
}

func (s *store) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error {
objectName := path(appName, callID)
_, err := s.client.PutObjectWithContext(ctx, s.bucket, objectName, callLog, -1, minio.PutObjectOptions{ContentType: contentType})
return err
}
span, ctx := opentracing.StartSpanFromContext(ctx, "s3_insert_log")
defer span.Finish()

// wrap original reader in a decorator to keep track of read bytes without buffering
cr := &countingReader{r: callLog}

func (s *store) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {
objectName := path(appName, callID)
obj, err := s.client.GetObjectWithContext(ctx, s.bucket, objectName, minio.GetObjectOptions{})
params := &s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
Body: cr,
ContentType: aws.String(contentType),
}

logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading log")
_, err := s.uploader.UploadWithContext(ctx, params)
if err != nil {
return nil, err // this is always nil, for now, thanks minio :(
return fmt.Errorf("failed to write log, %v", err)
}

_, err = obj.Stat()
span.LogFields(log.Int("fn_s3_log_upload_size", cr.count))
return nil
}

func (s *store) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "s3_get_log")
defer span.Finish()

objectName := path(appName, callID)
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading log")

// stream the logs to an in-memory buffer
target := &aws.WriteAtBuffer{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have used https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#S3.GetObject w/ some success in the past to avoid having to copy into a buffer here, am less familiar with the s3Manager api.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to punt on optimizing here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should work as well. Will look at making the change tomorrow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdallman I've looked into replacing this with s3.GetObject and the challenge there is that we wouldn't be able to simply return the Body of s3.GetObjectOutput as the io.Reader result of GetLog. The reason is that the result body needs to be closed in order to not leak connections (see aws/aws-sdk-go#408), so unfortunately we wouldn't be able to avoid copying into an intermediate buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. we could change LogStore.GetLog to return an io.ReadCloser ?

size, err := s.downloader.DownloadWithContext(ctx, target, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
})
if err != nil {
errResp := minio.ToErrorResponse(err)
if errResp.StatusCode == 404 {
aerr, ok := err.(awserr.Error)
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
return nil, models.ErrCallLogNotFound
}
return nil, err
return nil, fmt.Errorf("failed to read log, %v", err)
}

return obj, nil
span.LogFields(log.Int64("fn_s3_log_download_size", size))
return bytes.NewReader(target.Bytes()), nil
}
2 changes: 1 addition & 1 deletion api/logs/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestS3(t *testing.T) {

ls, err := New(uLog)
if err != nil {
t.Fatalf("failed to create sqlite3 datastore: %v", err)
t.Fatalf("failed to create s3 datastore: %v", err)
}
logTesting.Test(t, ls)
}
14 changes: 14 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.github/ISSUE_TEMPLATE.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.godoc_config

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.