Skip to content

Commit

Permalink
Replace minio-go with aws-sdk-go for s3-compatible log backend (#670)
Browse files Browse the repository at this point in the history
* Logs should support specifying region when using S3-compatible object store

* Use aws-sdk-go client for s3 backed logstore

* fixes vendor with aws-sdk-go dependencies
  • Loading branch information
gviedma authored and Reed Allman committed Jan 10, 2018
1 parent 930d1e8 commit 60d2e92
Show file tree
Hide file tree
Showing 2,818 changed files with 1,881,230 additions and 33,199 deletions.
481 changes: 425 additions & 56 deletions Gopkg.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ ignored = ["github.com/fnproject/fn/cli"]
branch = "master"
name = "github.com/google/btree"

[[constraint]]
name = "github.com/minio/minio-go"
version = "4.0.1"

[[constraint]]
name = "github.com/openzipkin/zipkin-go-opentracing"
version = "0.3.1"
Expand Down Expand Up @@ -103,4 +99,8 @@ ignored = ["github.com/fnproject/fn/cli"]

[[constraint]]
name = "github.com/prometheus/common"
revision = "2f17f4a9d485bf34b4bfaccc273805040e4f86c8"
revision = "2f17f4a9d485bf34b4bfaccc273805040e4f86c8"

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "~1.12.59"
138 changes: 101 additions & 37 deletions api/logs/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@
package s3

import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/fnproject/fn/api/models"
"github.com/minio/minio-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/sirupsen/logrus"
)

Expand All @@ -25,11 +34,45 @@ const (
)

type store struct {
client *minio.Client
bucket string
client *s3.S3
uploader *s3manager.Uploader
downloader *s3manager.Downloader
bucket string
}

// s3://access_key_id:secret_access_key@host/location/bucket_name?ssl=true
// decorator around the Reader interface that keeps track of the number of bytes read
// in order to avoid double buffering and track Reader size
type countingReader struct {
r io.Reader
count int
}

func (cr *countingReader) Read(p []byte) (n int, err error) {
n, err = cr.r.Read(p)
cr.count += n
return n, err
}

func createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey string, useSSL bool) *store {
config := &aws.Config{
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
Endpoint: aws.String(endpoint),
Region: aws.String(region),
DisableSSL: aws.Bool(!useSSL),
S3ForcePathStyle: aws.Bool(true),
}
session := session.Must(session.NewSession(config))
client := s3.New(session)

return &store{
client: client,
uploader: s3manager.NewUploaderWithClient(client),
downloader: s3manager.NewDownloaderWithClient(client),
bucket: bucketName,
}
}

// s3://access_key_id:secret_access_key@host/region/bucket_name?ssl=true
// Note that access_key_id and secret_access_key must be URL encoded if they contain unsafe characters!
func New(u *url.URL) (models.LogStore, error) {
endpoint := u.Host
Expand All @@ -45,37 +88,33 @@ func New(u *url.URL) (models.LogStore, error) {
if len(strs) < 3 {
return nil, errors.New("must provide bucket name and region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
}
location := strs[1]
region := strs[1]
bucketName := strs[2]
if location == "" {
return nil, errors.New("must provide non-empty location in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
if region == "" {
return nil, errors.New("must provide non-empty region in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
} else if bucketName == "" {
return nil, errors.New("must provide non-empty bucket name in path of s3 api url. e.g. s3://s3.com/us-east-1/my_bucket")
}

logrus.WithFields(logrus.Fields{"bucketName": bucketName, "location": location, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")

client, err := minio.NewWithRegion(endpoint, accessKeyID, secretAccessKey, useSSL, location)
if err != nil {
return nil, err
}
logrus.WithFields(logrus.Fields{"bucketName": bucketName, "region": region, "endpoint": endpoint, "access_key_id": accessKeyID, "useSSL": useSSL}).Info("checking / creating s3 bucket")
store := createStore(bucketName, endpoint, region, accessKeyID, secretAccessKey, useSSL)

// ensure the bucket exists, creating if it does not
err = client.MakeBucket(bucketName, location)
if errMake := err; err != nil {
// Check to see if we already own this bucket (which happens if you run this twice)
exists, err := client.BucketExists(bucketName)
if err != nil {
return nil, err
} else if !exists {
return nil, errors.New("could not create bucket and bucket does not exist, please check permissions: " + errMake.Error())
_, err := store.client.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(bucketName)})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeBucketAlreadyOwnedByYou, s3.ErrCodeBucketAlreadyExists:
// bucket already exists, NO-OP
default:
return nil, fmt.Errorf("failed to create bucket %s: %s", bucketName, aerr.Message())
}
} else {
return nil, fmt.Errorf("unexpected error creating bucket %s: %s", bucketName, err.Error())
}
}

return &store{
client: client,
bucket: bucketName,
}, nil
return store, nil
}

func path(appName, callID string) string {
Expand All @@ -85,26 +124,51 @@ func path(appName, callID string) string {
}

func (s *store) InsertLog(ctx context.Context, appName, callID string, callLog io.Reader) error {
objectName := path(appName, callID)
_, err := s.client.PutObjectWithContext(ctx, s.bucket, objectName, callLog, -1, minio.PutObjectOptions{ContentType: contentType})
return err
}
span, ctx := opentracing.StartSpanFromContext(ctx, "s3_insert_log")
defer span.Finish()

// wrap original reader in a decorator to keep track of read bytes without buffering
cr := &countingReader{r: callLog}

func (s *store) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {
objectName := path(appName, callID)
obj, err := s.client.GetObjectWithContext(ctx, s.bucket, objectName, minio.GetObjectOptions{})
params := &s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
Body: cr,
ContentType: aws.String(contentType),
}

logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Uploading log")
_, err := s.uploader.UploadWithContext(ctx, params)
if err != nil {
return nil, err // this is always nil, for now, thanks minio :(
return fmt.Errorf("failed to write log, %v", err)
}

_, err = obj.Stat()
span.LogFields(log.Int("fn_s3_log_upload_size", cr.count))
return nil
}

func (s *store) GetLog(ctx context.Context, appName, callID string) (io.Reader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "s3_get_log")
defer span.Finish()

objectName := path(appName, callID)
logrus.WithFields(logrus.Fields{"bucketName": s.bucket, "key": objectName}).Debug("Downloading log")

// stream the logs to an in-memory buffer
target := &aws.WriteAtBuffer{}
size, err := s.downloader.DownloadWithContext(ctx, target, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(objectName),
})
if err != nil {
errResp := minio.ToErrorResponse(err)
if errResp.StatusCode == 404 {
aerr, ok := err.(awserr.Error)
if ok && aerr.Code() == s3.ErrCodeNoSuchKey {
return nil, models.ErrCallLogNotFound
}
return nil, err
return nil, fmt.Errorf("failed to read log, %v", err)
}

return obj, nil
span.LogFields(log.Int64("fn_s3_log_download_size", size))
return bytes.NewReader(target.Bytes()), nil
}
2 changes: 1 addition & 1 deletion api/logs/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestS3(t *testing.T) {

ls, err := New(uLog)
if err != nil {
t.Fatalf("failed to create sqlite3 datastore: %v", err)
t.Fatalf("failed to create s3 datastore: %v", err)
}
logTesting.Test(t, ls)
}
14 changes: 14 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.github/ISSUE_TEMPLATE.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.godoc_config

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 60d2e92

Please sign in to comment.