Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tusd datastorages #4148

Closed
wants to merge 81 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
1293660
use tusd datastorages
butonic Sep 1, 2023
7d2a741
add comments
butonic Nov 1, 2023
27936aa
test msgpack metadata
butonic Nov 1, 2023
11ebb76
run all tests again
butonic Nov 1, 2023
80cad58
test copy
butonic Nov 1, 2023
21301b4
log error when trying to read upload info
butonic Nov 1, 2023
6d1eb80
log upload info details when copying fails
butonic Nov 1, 2023
aa14928
log as error
butonic Nov 1, 2023
eaf5632
reinitialze logger
butonic Nov 2, 2023
69b14f5
more logging
butonic Nov 2, 2023
fb88329
declare length
butonic Nov 2, 2023
32f5141
use oc mtime as revision mtime
butonic Nov 9, 2023
1afb42a
fix lint
butonic Nov 9, 2023
1aaec6c
Merge branch 'edge' into use-tus-data-storage
butonic Nov 9, 2023
96ece53
declare length only if it is available
butonic Nov 9, 2023
b421107
allow overwriting existing revisions
butonic Nov 13, 2023
366dbd4
do not hide current revision
butonic Nov 13, 2023
303440a
Revert "do not hide current revision"
butonic Nov 13, 2023
8fc873a
persist revisions by current time, not oc mtime
butonic Nov 13, 2023
30ab0b6
filter multiple revisions for the same mtime in ocdav
butonic Nov 14, 2023
2204836
use metadata to store and filter current revision
butonic Nov 14, 2023
7e295be
use oc mtime as revision time if given
butonic Nov 14, 2023
1b8c07c
reuse current revision if upload has same mtime
butonic Nov 14, 2023
052b04a
use mtime as revision time if given
butonic Nov 14, 2023
7bf1c79
track uploads in dedicated metadata
butonic Nov 15, 2023
0b7abc5
convert uploads to revisions
butonic Nov 15, 2023
0df306e
restore old revision logic
butonic Nov 15, 2023
d75bf9e
fix blobid passing
butonic Nov 16, 2023
0077f19
fix HasUploadMetadata interface
butonic Nov 16, 2023
a832581
fix legacy chunking
butonic Nov 16, 2023
84b9b8f
fix chinkung v1
butonic Nov 16, 2023
1ec133d
only create revision on first overwrite
butonic Nov 16, 2023
c520b49
restore writing revision before async processing
butonic Nov 16, 2023
ba50382
fix oldchunking revisions
butonic Nov 17, 2023
90092e4
refactor revisionkey split and join
butonic Nov 17, 2023
01b80cd
no deet to store the current revision, use mtime
butonic Nov 17, 2023
508780c
fix mtime handling
butonic Nov 17, 2023
53c509b
try to fix versions
butonic Nov 17, 2023
b91e7b3
Merge branch 'edge' into use-tus-data-storage
butonic Nov 20, 2023
a4b88d2
make test helper always send size
butonic Nov 20, 2023
b4cbd8e
fix nc upload mock
butonic Nov 20, 2023
ea4d951
more test fixes
butonic Nov 20, 2023
d790238
revert dying if dataprovider does not support tus
butonic Nov 22, 2023
26524bf
tus datatx logging cleanup
butonic Nov 22, 2023
88d4cb9
remove unused enable_home config for decomposedfs
butonic Nov 22, 2023
aae59e3
cleanup
butonic Nov 22, 2023
ad21320
fix linting
butonic Nov 22, 2023
39c6bb4
Merge branch 'edge' into use-tus-data-storage
butonic Nov 22, 2023
12b9979
always send upload length
butonic Nov 22, 2023
7bfc0e7
if index does not exist return empty list of matches
butonic Nov 22, 2023
22f68c1
do not require sending upload length
butonic Nov 23, 2023
25ad42b
Merge branch 'edge' into use-tus-data-storage
butonic Nov 24, 2023
b9e0f3e
also send content length to dataprovider
butonic Nov 24, 2023
f3b262b
fmt.Println debugging
butonic Nov 24, 2023
98250c7
Add logging around file locks
aduffeck Nov 27, 2023
e8155c8
More logging
aduffeck Nov 27, 2023
3538d62
Test if the hang is related to closing the response body
aduffeck Nov 29, 2023
cd73b5c
More logging
aduffeck Nov 29, 2023
767df60
Merge branch 'edge' into use-tus-data-storage
butonic Dec 4, 2023
5e8383c
Merge branch 'edge' into use-tus-data-storage
butonic Dec 4, 2023
89f9606
remove debug logging
butonic Dec 4, 2023
a5edbe5
more cleanup
butonic Dec 4, 2023
d99a424
Apply suggestions from code review
butonic Dec 5, 2023
2d95dcc
use log from context
butonic Dec 5, 2023
87916ac
simplify if conditions
butonic Dec 5, 2023
5053013
fix typo
butonic Dec 5, 2023
1a35958
fix listing and purging upload sessions
butonic Dec 5, 2023
ea559e1
drop comment
butonic Dec 5, 2023
829df6d
fix upload progress creation and read offset from tus upload
butonic Dec 5, 2023
747f8f7
read uploads concurrently
butonic Dec 5, 2023
b643432
fix loading upload sessions
butonic Dec 5, 2023
5c9beca
Purge both the metadata and the upload info when finalizing
aduffeck Dec 5, 2023
9c2ed70
Use Size when reporting the size
aduffeck Dec 5, 2023
9167e4b
Implement tus datastores which support cleaning up metadata only
aduffeck Dec 6, 2023
086e321
Directly write blob to the right location in s3
aduffeck Dec 6, 2023
c11ca05
Clarify naming
aduffeck Dec 7, 2023
be393b3
WIP: Get rid of separate tus fileinfo metadata
aduffeck Dec 7, 2023
f8264b5
make filestore upload session compatible
butonic Dec 7, 2023
7d3ae90
Do not read the session we just created just to get the ID
aduffeck Dec 7, 2023
d41ec1e
Simplify finishing the upload
aduffeck Dec 7, 2023
e3ba91d
Retrieve the upload offset from the session instead of reading the parts
aduffeck Dec 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/tusd-data-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Use Tusd data storage implementations

Decomposedfs now uses the data store implementation for uploads that comes with tusd instead of implementing the interface itself. This allows storing uploads directly in s3. When all bytes are transferred tusd will call `PreFinishResponseCallback` if the storage driver implements it.

https://github.com/cs3org/reva/pull/4148
9 changes: 9 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}, nil
}

// FIXME: This is a hack to transport more metadata to the storage.FS InitiateUpload implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, we already talked about it, but just for the record 😛 I would rather track the FIXME issues and thought-comments on github or drop them entirely. Either they are important and should be tracked and prioritized properly or they aren't, in which case the FIXMEs in the code are just ballast we'll carry on indefinitely.

// we should use a request object that can carry
// * if-match
// * if-unmodified-since
// * uploadLength from the tus Upload-Length header
// * checksum from the tus Upload-Checksum header
// * mtime from the X-OC-Mtime header
// * expires from the s.conf.UploadExpiration ... should that not be part of the driver?
// * providerID
metadata := map[string]string{}
ifMatch := req.GetIfMatch()
if ifMatch != "" {
Expand Down
6 changes: 6 additions & 0 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ func getDataTXs(c *config, fs storage.FS, publisher events.Publisher) (map[strin
if tx, err := f(c.DataTXs[t], publisher); err == nil {
if handler, err := tx.Handler(fs); err == nil {
txs[t] = handler
// FIXME we at least need to log this. the ocm received storage e.g. does not support tus
// } else {
// return nil, err
}
// FIXME we at least need to log this. the ocm received storage e.g. does not support tus
// } else {
// return nil, err
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/http/services/owncloud/ocdav/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,13 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ

httpReq, err := rhttp.NewRequest(ctx, http.MethodPut, ep, r.Body)
if err != nil {
log.Error().Err(err).Msg("error creating new request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
Propagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
httpReq.Header.Set(datagateway.TokenTransportHeader, token)
httpReq.ContentLength = r.ContentLength

httpRes, err := s.client.Do(httpReq)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/http/services/owncloud/ocdav/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (s *svc) handleSpacesTusPost(w http.ResponseWriter, r *http.Request, spaceI

sublog := appctx.GetLogger(ctx).With().Str("spaceid", spaceID).Str("path", r.URL.Path).Logger()

// use filename to build a storage space reference
// but what if upload happens directly to toh resourceid .. and filename is empty?
// currently there is always a validator thet requires the filename is not empty ...
// hm -> bug: clients currently cannot POST to an existing source with a resource id only
ref, err := spacelookup.MakeStorageSpaceReference(spaceID, path.Join(r.URL.Path, meta["filename"]))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand Down
1 change: 1 addition & 0 deletions internal/http/services/owncloud/ocdav/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (h *VersionsHandler) doListVersions(w http.ResponseWriter, r *http.Request,
Type: provider.ResourceType_RESOURCE_TYPE_FILE,
Id: &provider.ResourceId{
StorageId: "versions",
SpaceId: info.Id.SpaceId,
OpaqueId: info.Id.OpaqueId + "@" + versions[i].GetKey(),
},
// Checksum
Expand Down
138 changes: 106 additions & 32 deletions pkg/rhttp/datatx/manager/tus/tus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

// Package tus implements a data tx manager that handles uploads using the TUS protocol.
// reva storage drivers should implement the hasTusDatastore interface by using composition
// of an upstream tusd.DataStore. If necessary they can also implement a tusd.DataStore directly.
package tus

import (
Expand All @@ -33,12 +36,12 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/registry"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/mitchellh/mapstructure"
)
Expand Down Expand Up @@ -76,46 +79,80 @@ func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, e
}

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
composable, ok := fs.(composable)
if !ok {
return nil, errtypes.NotSupported("file system does not support the tus protocol")
zlog, err := logger.FromConfig(&logger.LogConf{
Output: "stderr",
Mode: "json",
Level: "error", // FIXME introduce shared config for logging
})
if err != nil {
return nil, errtypes.NotSupported("could not initialize log")
}

// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)

config := tusd.Config{
StoreComposer: composer,
StoreComposer: composer,
PreUploadCreateCallback: func(hook tusd.HookEvent) error {
return errors.New("uploads must be created with a cs3 InitiateUpload call")
},
NotifyCompleteUploads: true,
Logger: log.New(appctx.GetLogger(context.Background()), "", 0),
Logger: log.New(zlog, "", 0),
}

var dataStore tusd.DataStore

cb, ok := fs.(hasTusDatastore)
if ok {
dataStore = cb.GetDataStore()
composable, ok := dataStore.(composable)
if !ok {
return nil, errtypes.NotSupported("tus datastore is not composable")
}
composable.UseIn(composer)
config.PreFinishResponseCallback = cb.PreFinishResponseCallback
} else {
composable, ok := fs.(composable)
if !ok {
return nil, errtypes.NotSupported("storage driver does not support the tus protocol")
}

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)
dataStore, ok = fs.(tusd.DataStore)
if !ok {
return nil, errtypes.NotSupported("storage driver does not support the tus datastore")
}
}

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return nil, err
}

if _, ok := fs.(storage.UploadSessionLister); ok {
usl, ok := fs.(storage.UploadSessionLister)
if ok {
// We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info
go func() {
for {
ev := <-handler.CompleteUploads
// We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files
// so we create a Progress instance here that is used to read the correct properties
up := upload.Progress{
Info: ev.Upload,
sessions, err := usl.ListUploadSessions(context.Background(), storage.UploadSessionFilter{ID: &ev.Upload.ID})
if err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Str("id", ev.Upload.ID).Msg("failed to list upload session for upload")
continue
}
if len(sessions) != 1 {
appctx.GetLogger(context.Background()).Error().Err(err).Str("id", ev.Upload.ID).Msg("no upload session found")
butonic marked this conversation as resolved.
Show resolved Hide resolved
continue
}
executant := up.Executant()
ref := up.Reference()
us := sessions[0]

executant := us.Executant()
ref := us.Reference()
datatx.InvalidateCache(&executant, &ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil {
if err := datatx.EmitFileUploadedEvent(us.SpaceOwner(), &executant, &ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
Expand All @@ -137,7 +174,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
metrics.UploadsActive.Sub(1)
}()
// set etag, mtime and file id
setHeaders(fs, w, r)
setHeaders(dataStore, usl, w, r)
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
Expand All @@ -147,7 +184,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
metrics.UploadsActive.Sub(1)
}()
// set etag, mtime and file id
setHeaders(fs, w, r)
setHeaders(dataStore, usl, w, r)
handler.PatchFile(w, r)
case "DELETE":
handler.DelFile(w, r)
Expand All @@ -174,14 +211,14 @@ type composable interface {
UseIn(composer *tusd.StoreComposer)
}

func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) {
type hasTusDatastore interface {
PreFinishResponseCallback(hook tusd.HookEvent) error
GetDataStore() tusd.DataStore
}

func setHeaders(datastore tusd.DataStore, usl storage.UploadSessionLister, w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id := path.Base(r.URL.Path)
datastore, ok := fs.(tusd.DataStore)
if !ok {
appctx.GetLogger(ctx).Error().Interface("fs", fs).Msg("storage is not a tus datastore")
return
}
upload, err := datastore.GetUpload(ctx, id)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload from storage")
Expand All @@ -192,14 +229,51 @@ func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) {
appctx.GetLogger(ctx).Error().Err(err).Msg("could not get upload info for upload")
return
}
expires := info.MetaData["expires"]
var expires string

var resourceid provider.ResourceId
var uploadSession storage.UploadSession
if usl != nil {
sessions, err := usl.ListUploadSessions(ctx, storage.UploadSessionFilter{ID: &id})
if err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Str("id", id).Msg("failed to list upload session for upload")
return
}
if len(sessions) != 1 {
butonic marked this conversation as resolved.
Show resolved Hide resolved
appctx.GetLogger(context.Background()).Error().Err(err).Str("id", id).Msg("no upload session found")
return
}
uploadSession = sessions[0]

t := time.Time{}
if uploadSession.Expires() != t {
expires = uploadSession.Expires().Format(net.RFC1123)
}

reference := uploadSession.Reference()
resourceid = *reference.GetResourceId()
}

// FIXME expires should be part of the tus handler
// fallback for outdated storageproviders that implement a tus datastore
if expires == "" {
expires = info.MetaData["expires"]
}

if expires != "" {
w.Header().Set(net.HeaderTusUploadExpires, expires)
}
resourceid := provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.Storage["SpaceRoot"],
OpaqueId: info.Storage["NodeId"],

// fallback for outdated storageproviders that implement a tus datastore
if resourceid.GetStorageId() == "" {
resourceid.StorageId = info.MetaData["providerID"]
}
if resourceid.GetSpaceId() == "" {
resourceid.SpaceId = info.MetaData["SpaceRoot"]
}
if resourceid.GetOpaqueId() == "" {
resourceid.OpaqueId = info.MetaData["NodeId"]
}

w.Header().Set(net.HeaderOCFileID, storagespace.FormatResourceID(resourceid))
}
3 changes: 2 additions & 1 deletion pkg/storage/fs/nextcloud/nextcloud_server_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ var responses = map[string]Response{

`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetMD {"ref":{"path":"/file"},"mdKeys":null}`: {404, ``, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"path":"/file"},"uploadLength":0,"metadata":{"providerID":""}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":0,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":1,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":2,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},

`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetMD {"ref":{"path":"/yes"},"mdKeys":[]}`: {200, `{"opaque":{},"type":1,"id":{"opaque_id":"fileid-/yes"},"checksum":{},"etag":"deadbeef","mime_type":"text/plain","mtime":{"seconds":1234567890},"path":"/yes","permission_set":{},"size":1,"canonical_metadata":{},"arbitrary_metadata":{}}`, serverStateEmpty},

Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/fs/ocis/ocis.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/tus"
)

func init() {
Expand All @@ -46,5 +47,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, err
}

return decomposedfs.NewDefault(m, bs, stream)
tusDataStore := tus.NewFileStore(o.Root)

return decomposedfs.NewDefault(m, bs, tusDataStore, stream)
}
4 changes: 4 additions & 0 deletions pkg/storage/fs/s3ng/blobstore/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func New(endpoint, region, bucket, accessKey, secretKey string) (*Blobstore, err
}, nil
}

func (bs *Blobstore) MoveBlob(node *node.Node, source, bucket, key string) error {
return nil
}

// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
reader, err := os.Open(source)
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/fs/s3ng/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ type Options struct {

// Secret key for the s3 blobstore
S3SecretKey string `mapstructure:"s3.secret_key"`

// UploadObjectPrefix for the s3 blobstore
S3UploadObjectPrefix string `mapstructure:"s3.upload_object_prefix"`

// UploadMetadataPrefix for the s3 blobstore
S3UploadMetadataPrefix string `mapstructure:"s3.upload_metadata_prefix"`

// UploadTemporaryDirectory for the s3 blobstore
S3UploadTemporaryDirectory string `mapstructure:"s3.upload_temporary_directory"`

// DisableSSL for the s3 blobstore
S3DisableSSL bool `mapstructure:"s3.disable_ssl"`

// ForcePathStyle for the s3 blobstore
S3ForcePathStyle bool `mapstructure:"s3.force_path_style"`

// Root for the upload sessions
Root string `mapstructure:"root"`
}

// S3ConfigComplete return true if all required s3 fields are set
Expand Down
19 changes: 18 additions & 1 deletion pkg/storage/fs/s3ng/s3ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ package s3ng
import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/cs3org/reva/v2/pkg/storage/utils/tus"
)

func init() {
Expand All @@ -49,5 +54,17 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, err
}

return decomposedfs.NewDefault(m, bs, stream)
s3Config := aws.NewConfig()
s3Config.WithCredentials(credentials.NewStaticCredentials(o.S3AccessKey, o.S3SecretKey, "")).
WithEndpoint(o.S3Endpoint).
WithRegion(o.S3Region).
WithS3ForcePathStyle(o.S3ForcePathStyle).
WithDisableSSL(o.S3DisableSSL)

tusDataStore := tus.NewS3Store(o.Root, o.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config))
tusDataStore.ObjectPrefix = o.S3UploadObjectPrefix
tusDataStore.MetadataObjectPrefix = o.S3UploadMetadataPrefix
tusDataStore.TemporaryDirectory = o.S3UploadTemporaryDirectory

return decomposedfs.NewDefault(m, bs, tusDataStore, stream)
}
Loading
Loading