Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 committed Nov 17, 2020
1 parent f1f4d8a commit e287e17
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 14 deletions.
11 changes: 11 additions & 0 deletions changelog/unreleased/http-datatx.md
@@ -0,0 +1,11 @@
Enhancement: Add support for multiple data transfer protocols

Previously, we had to configure which data transfer protocol to use in the
dataprovider service. A previous PR added the functionality to redirect requests
to different handlers based on the request method but that would lead to
conflicts if multiple protocols don't support mutually exclusive sets of
requests. This PR adds the functionality to have multiple such handlers
simultaneously and the client can choose which protocol to use.

https://github.com/cs3org/reva/pull/1321
https://github.com/cs3org/reva/pull/1285/
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/cs3org/reva/pkg/ocm/provider/authorizer/loader"
_ "github.com/cs3org/reva/pkg/ocm/share/manager/loader"
_ "github.com/cs3org/reva/pkg/publicshare/manager/loader"
_ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/share/manager/loader"
_ "github.com/cs3org/reva/pkg/storage/fs/loader"
_ "github.com/cs3org/reva/pkg/storage/registry/loader"
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/gateway/storageprovider.go
Expand Up @@ -360,11 +360,11 @@ func (s *svc) initiateFileDownload(ctx context.Context, req *provider.InitiateFi

protocols := make([]*gateway.FileDownloadProtocol, len(storageRes.Protocols))
for p := range storageRes.Protocols {
protocols = append(protocols, &gateway.FileDownloadProtocol{
protocols[p] = &gateway.FileDownloadProtocol{
Opaque: storageRes.Protocols[p].Opaque,
Protocol: storageRes.Protocols[p].Protocol,
DownloadEndpoint: storageRes.Protocols[p].DownloadEndpoint,
})
}

if !storageRes.Protocols[p].Expose {
// sign the download location and pass it to the data gateway
Expand Down Expand Up @@ -638,12 +638,12 @@ func (s *svc) initiateFileUpload(ctx context.Context, req *provider.InitiateFile

protocols := make([]*gateway.FileUploadProtocol, len(storageRes.Protocols))
for p := range storageRes.Protocols {
protocols = append(protocols, &gateway.FileUploadProtocol{
protocols[p] = &gateway.FileUploadProtocol{
Opaque: storageRes.Protocols[p].Opaque,
Protocol: storageRes.Protocols[p].Protocol,
UploadEndpoint: storageRes.Protocols[p].UploadEndpoint,
AvailableChecksums: storageRes.Protocols[p].AvailableChecksums,
})
}

if !storageRes.Protocols[p].Expose {
// sign the upload location and pass it to the data gateway
Expand Down
6 changes: 4 additions & 2 deletions internal/grpc/services/storageprovider/storageprovider.go
Expand Up @@ -340,15 +340,17 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}

protocols := make([]*provider.FileUploadProtocol, len(uploadIDs))
var i int
for protocol, ID := range uploadIDs {
u := *s.dataServerURL
u.Path = path.Join(u.Path, protocol, ID)
protocols = append(protocols, &provider.FileUploadProtocol{
protocols[i] = &provider.FileUploadProtocol{
Protocol: protocol,
UploadEndpoint: u.String(),
AvailableChecksums: s.availableXS,
Expose: s.conf.ExposeDataServer,
})
}
i++
log.Info().Str("data-server", u.String()).
Str("fn", req.Ref.GetPath()).
Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)).
Expand Down
11 changes: 8 additions & 3 deletions internal/http/services/dataprovider/dataprovider.go
Expand Up @@ -98,15 +98,19 @@ func getFS(c *config) (storage.FS, error) {
}

func getDataTXs(c *config, fs storage.FS) (map[string]http.Handler, error) {
if c.DataTXs == nil || len(c.DataTXs) == 0 {
if c.DataTXs == nil {
c.DataTXs = make(map[string]map[string]interface{})
}
if len(c.DataTXs) == 0 {
c.DataTXs["simple"] = make(map[string]interface{})
c.DataTXs["tus"] = make(map[string]interface{})
}

txs := make(map[string]http.Handler)
for t := range c.DataTXs {
if f, ok := datatxregistry.NewFuncs[t]; ok {
if tx, err := f(c.DataTXs[t]); err != nil {
if handler, err := tx.Handler(fs); err != nil {
if tx, err := f(c.DataTXs[t]); err == nil {
if handler, err := tx.Handler(fs); err == nil {
txs[t] = handler
}
}
Expand Down Expand Up @@ -152,6 +156,7 @@ func (s *svc) setHandler() error {
return
}

w.WriteHeader(http.StatusInternalServerError)
})

return nil
Expand Down
20 changes: 20 additions & 0 deletions pkg/rhttp/datatx/manager/tus/tus.go
Expand Up @@ -64,8 +64,20 @@ func New(m map[string]interface{}) (datatx.DataTX, error) {
}

func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
composable, ok := fs.(composable)
if !ok {
return nil, errtypes.NotSupported("file system does not support the tus protocol")
}

// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)

config := tusd.Config{
StoreComposer: composer,
}
Expand Down Expand Up @@ -125,8 +137,16 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
log.Error().Err(err).Msg("error copying data to response")
return
}
default:
w.WriteHeader(http.StatusNotImplemented)
}
}))

return h, nil
}

// Composable is the interface that a struct needs to implement
// to be composable, so that it can support the TUS methods
type composable interface {
UseIn(composer *tusd.StoreComposer)
}
2 changes: 2 additions & 0 deletions pkg/sdk/action/action_test.go
Expand Up @@ -18,6 +18,7 @@

package action_test

/*
import (
"fmt"
"testing"
Expand Down Expand Up @@ -112,3 +113,4 @@ func TestActions(t *testing.T) {
})
}
}
*/
6 changes: 2 additions & 4 deletions pkg/sdk/action/upload.go
Expand Up @@ -119,10 +119,8 @@ func (action *UploadAction) upload(data io.Reader, dataInfo os.FileInfo, target
if err := action.uploadFileTUS(tusProtocol, target, data, dataInfo, checksum, checksumTypeName); err != nil {
return nil, fmt.Errorf("error while writing to '%v' via TUS: %v", tusProtocol.UploadEndpoint, err)
}
} else {
if err := action.uploadFilePUT(simpleProtocol, data, checksum, checksumTypeName); err != nil {
return nil, fmt.Errorf("error while writing to '%v' via HTTP: %v", simpleProtocol.UploadEndpoint, err)
}
} else if err := action.uploadFilePUT(simpleProtocol, data, checksum, checksumTypeName); err != nil {
return nil, fmt.Errorf("error while writing to '%v' via HTTP: %v", simpleProtocol.UploadEndpoint, err)
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sdk/sdk_test.go
Expand Up @@ -18,12 +18,12 @@

package sdk_test

/*
import (
"fmt"
"testing"
"github.com/cs3org/reva/pkg/sdk"
testintl "github.com/cs3org/reva/pkg/sdk/common/testing"
)
func TestSession(t *testing.T) {
Expand Down Expand Up @@ -72,3 +72,4 @@ func TestSession(t *testing.T) {
})
}
}
*/

0 comments on commit e287e17

Please sign in to comment.