Skip to content

Commit

Permalink
configurable S3 PUT options
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Feb 19, 2024
1 parent e7d6bc3 commit e48a559
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 5 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/configurable-s3-put-options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: configurable s3 put options

The s3ng blobstore can now be configured with several options: `s3.disable_content_sha254`, `s3.disable_multipart`, `s3.send_content_md5`, `s3.concurrent_stream_parts`, `s3.num_threads` and `s3.part_size`. If unset we default to `s3.send_content_md5: true`, which was hardcoded before. We also default to `s3.concurrent_stream_parts: true` and `s3.num_threads: 4` to allow concurrent uploads even when `s3.send_content_md5` is set to `true`. When tweaking the uploads try setting `s3.send_content_md5: false` and `s3.concurrent_stream_parts: false` first, as this will try to concurrently stream an uploaded file to the s3 store without cutting it into parts first.

https://github.com/cs3org/reva/pull/
28 changes: 24 additions & 4 deletions pkg/storage/fs/s3ng/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,22 @@ import (
type Blobstore struct {
client *minio.Client

defaultPutOptions Options

bucket string
}

type Options struct {
DisableContentSha256 bool
DisableMultipart bool
SendContentMd5 bool
ConcurrentStreamParts bool
NumThreads uint
PartSize uint64
}

// New returns a new Blobstore
func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, error) {
func New(endpoint, region, bucket, accessKey, secretKey string, defaultPutOptions Options) (*Blobstore, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, errors.Wrap(err, "failed to parse s3 endpoint")
Expand All @@ -58,8 +69,9 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err
}

return &Blobstore{
client: client,
bucket: bucket,
client: client,
bucket: bucket,
defaultPutOptions: defaultPutOptions,
}, nil
}

Expand All @@ -71,7 +83,15 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
}
defer reader.Close()

_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{ContentType: "application/octet-stream", SendContentMd5: true})
_, err = bs.client.PutObject(context.Background(), bs.bucket, bs.path(node), reader, node.Blobsize, minio.PutObjectOptions{
ContentType: "application/octet-stream",
SendContentMd5: bs.defaultPutOptions.SendContentMd5,
ConcurrentStreamParts: bs.defaultPutOptions.ConcurrentStreamParts,
NumThreads: bs.defaultPutOptions.NumThreads,
PartSize: bs.defaultPutOptions.PartSize,
DisableMultipart: bs.defaultPutOptions.DisableMultipart,
DisableContentSha256: bs.defaultPutOptions.DisableContentSha256,
})

if err != nil {
return errors.Wrapf(err, "could not store object '%s' into bucket '%s'", bs.path(node), bs.bucket)
Expand Down
29 changes: 29 additions & 0 deletions pkg/storage/fs/s3ng/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ type Options struct {

// Secret key for the s3 blobstore
S3SecretKey string `mapstructure:"s3.secret_key"`

// disable sending content sha256
DisableContentSha256 bool `mapstructure:"s3.disable_content_sha254"`

// disable multipart uploads
DisableMultipart bool `mapstructure:"s3.disable_multipart"`

// enable sending content md5, defaults to true if unset
SendContentMd5 bool `mapstructure:"s3.send_content_md5"`

// use concurrent stream parts
ConcurrentStreamParts bool `mapstructure:"s3.concurrent_stream_parts"`

// number of concurrent uploads
NumThreads uint `mapstructure:"s3.num_threads"`

// part size for concurrent uploads
PartSize uint64 `mapstructure:"s3.part_size"`
}

// S3ConfigComplete return true if all required s3 fields are set
Expand All @@ -60,5 +78,16 @@ func parseConfig(m map[string]interface{}) (*Options, error) {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}

// if unset we set these defaults
if m["s3.send_content_md5"] == nil {
o.SendContentMd5 = true
}
if m["s3.concurrent_stream_parts"] == nil {
o.ConcurrentStreamParts = true
}
if m["s3.num_threads"] == nil {
o.NumThreads = 4
}
return o, nil
}
11 changes: 10 additions & 1 deletion pkg/storage/fs/s3ng/s3ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, fmt.Errorf("S3 configuration incomplete")
}

bs, err := blobstore.New(o.S3Endpoint, o.S3Region, o.S3Bucket, o.S3AccessKey, o.S3SecretKey)
defaultPutOptions := blobstore.Options{
DisableContentSha256: o.DisableContentSha256,
DisableMultipart: o.DisableMultipart,
SendContentMd5: o.SendContentMd5,
ConcurrentStreamParts: o.ConcurrentStreamParts,
NumThreads: o.NumThreads,
PartSize: o.PartSize,
}

bs, err := blobstore.New(o.S3Endpoint, o.S3Region, o.S3Bucket, o.S3AccessKey, o.S3SecretKey, defaultPutOptions)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e48a559

Please sign in to comment.