Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the uploading files workflow from various clients #1285

Merged
merged 16 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog/unreleased/uploads-refactor.md
@@ -0,0 +1,8 @@
Enhancement: Refactor the uploading files workflow from various clients

Previously, we were implementing the tus client logic in the ocdav service,
leading to restricting the whole of tus logic to the internal services. This PR
refactors that workflow to accept incoming requests following the tus protocol
while using simpler transmission internally.

https://github.com/cs3org/reva/pull/1285
82 changes: 42 additions & 40 deletions internal/grpc/services/storageprovider/storageprovider.go
Expand Up @@ -57,7 +57,6 @@ type config struct {
TmpFolder string `mapstructure:"tmp_folder" docs:"/var/tmp;Path to temporary folder."`
DataServerURL string `mapstructure:"data_server_url" docs:"http://localhost/data;The URL for the data server."`
ExposeDataServer bool `mapstructure:"expose_data_server" docs:"false;Whether to expose data server."` // if true the client will be able to upload/download directly to it
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
AvailableXS map[string]uint32 `mapstructure:"available_checksums" docs:"nil;List of available checksums."`
MimeTypes map[string]string `mapstructure:"mimetypes" docs:"nil;List of supported mime types and corresponding file extensions."`
}
Expand Down Expand Up @@ -267,17 +266,17 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia
// For example, https://data-server.example.org/home/docs/myfile.txt
// or ownclouds://data-server.example.org/home/docs/myfile.txt
log := appctx.GetLogger(ctx)
url := *s.dataServerURL
u := *s.dataServerURL
newRef, err := s.unwrap(ctx, req.Ref)
if err != nil {
return &provider.InitiateFileDownloadResponse{
Status: status.NewInternal(ctx, err, "error unwrapping path"),
}, nil
}
url.Path = path.Join("/", url.Path, newRef.GetPath())
log.Info().Str("data-server", url.String()).Str("fn", req.Ref.GetPath()).Msg("file download")
u.Path = path.Join(u.Path, newRef.GetPath())
log.Info().Str("data-server", u.String()).Str("fn", req.Ref.GetPath()).Msg("file download")
res := &provider.InitiateFileDownloadResponse{
DownloadEndpoint: url.String(),
DownloadEndpoint: u.String(),
Status: status.NewOK(ctx),
Expose: s.conf.ExposeDataServer,
}
Expand All @@ -298,50 +297,53 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
Status: status.NewInternal(ctx, errors.New("can't upload to mount path"), "can't upload to mount path"),
}, nil
}
url := *s.dataServerURL
if s.conf.DisableTus {
url.Path = path.Join("/", url.Path, newRef.GetPath())
} else {
metadata := map[string]string{}
var uploadLength int64
if req.Opaque != nil && req.Opaque.Map != nil {
if req.Opaque.Map["Upload-Length"] != nil {
var err error
uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error parsing upload length"),
}, nil
}
}
if req.Opaque.Map["X-OC-Mtime"] != nil {
metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value)

metadata := map[string]string{}
var uploadLength int64
if req.Opaque != nil && req.Opaque.Map != nil {
if req.Opaque.Map["Upload-Length"] != nil {
var err error
uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error parsing upload length"),
}, nil
}
}
uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata)
if err != nil {
var st *rpc.Status
switch err.(type) {
case errtypes.IsNotFound:
st = status.NewNotFound(ctx, "path not found when initiating upload")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
default:
st = status.NewInternal(ctx, err, "error getting upload id: "+req.Ref.String())
}
return &provider.InitiateFileUploadResponse{
Status: st,
}, nil
if req.Opaque.Map["X-OC-Mtime"] != nil {
metadata["mtime"] = string(req.Opaque.Map["X-OC-Mtime"].Value)
}
}
uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength, metadata)
if err != nil {
var st *rpc.Status
switch err.(type) {
case errtypes.IsNotFound:
st = status.NewNotFound(ctx, "path not found when initiating upload")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
default:
st = status.NewInternal(ctx, err, "error getting upload id: "+req.Ref.String())
}
url.Path = path.Join("/", url.Path, uploadID)
return &provider.InitiateFileUploadResponse{
Status: st,
}, nil
}

u := *s.dataServerURL
u.Path = path.Join(u.Path, uploadID)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error parsing data server URL"),
}, nil
}

log.Info().Str("data-server", url.String()).
log.Info().Str("data-server", u.String()).
Str("fn", req.Ref.GetPath()).
Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)).
Msg("file upload")
res := &provider.InitiateFileUploadResponse{
UploadEndpoint: url.String(),
UploadEndpoint: u.String(),
Status: status.NewOK(ctx),
AvailableChecksums: s.availableXS,
Expose: s.conf.ExposeDataServer,
Expand Down
157 changes: 70 additions & 87 deletions internal/http/services/dataprovider/dataprovider.go
Expand Up @@ -36,28 +36,20 @@ func init() {
}

type config struct {
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
TempDirectory string `mapstructure:"temp_directory"`
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
}

func (c *config) init() {
if c.Prefix == "" {
c.Prefix = "data"
}

if c.Driver == "" {
c.Driver = "localhome"
}

if c.TempDirectory == "" {
c.TempDirectory = "/var/tmp/reva/tmp"
}

}

type svc struct {
Expand Down Expand Up @@ -100,10 +92,7 @@ func (s *svc) Unprotected() []string {

// Create a new DataStore instance which is responsible for
// storing the uploaded file on disk in the specified directory.
// This path _must_ exist before tusd will store uploads in it.
// If you want to save them on a different medium, for example
// a remote FTP server, you can implement your own storage backend
// by implementing the tusd.DataStore interface.
// This path _must_ exist before we store uploads in it.
func getFS(c *config) (storage.FS, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
Expand All @@ -119,14 +108,70 @@ func (s *svc) Handler() http.Handler {
return s.handler
}

// Composable is the interface that a struct needs to implement to be composable by this composer
type Composable interface {
func (s *svc) setHandler() error {

tusHandler := s.getTusHandler()

s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log := appctx.GetLogger(r.Context())
log.Info().Msgf("dataprovider routing: path=%s", r.URL.Path)

method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
if r.Header.Get("X-HTTP-Method-Override") != "" {
method = r.Header.Get("X-HTTP-Method-Override")
}

switch method {
// old fashioned download.
// GET is not part of the tus.io protocol
// TODO allow range based get requests? that end before the current offset
case "GET":
s.doGet(w, r)
case "PUT":
s.doPut(w, r)
case "HEAD":
w.WriteHeader(http.StatusOK)

// tus.io based uploads
// uploads are initiated using the CS3 APIs Initiate Upload call
case "POST":
if tusHandler != nil {
tusHandler.PostFile(w, r)
} else {
w.WriteHeader(http.StatusNotImplemented)
}
case "PATCH":
if tusHandler != nil {
tusHandler.PatchFile(w, r)
} else {
w.WriteHeader(http.StatusNotImplemented)
}
// TODO Only attach the DELETE handler if the Terminate() method is provided
case "DELETE":
if tusHandler != nil {
tusHandler.DelFile(w, r)
} else {
w.WriteHeader(http.StatusNotImplemented)
}
default:
w.WriteHeader(http.StatusNotImplemented)
return
}
})

return nil
}

// Composable is the interface that a struct needs to implement
// to be composable, so that it can support the TUS methods
type composable interface {
UseIn(composer *tusd.StoreComposer)
}

func (s *svc) setHandler() (err error) {
composable, ok := s.storage.(Composable)
if ok && !s.conf.DisableTus {
func (s *svc) getTusHandler() *tusd.UnroutedHandler {
composable, ok := s.storage.(composable)
if ok {
// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
Expand All @@ -144,71 +189,9 @@ func (s *svc) setHandler() (err error) {

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return err
return nil
}

s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
if r.Header.Get("X-HTTP-Method-Override") != "" {
method = r.Header.Get("X-HTTP-Method-Override")
}

switch method {
// old fashioned download.

// GET is not part of the tus.io protocol
// currently there is no way to GET an upload that is in progress
// TODO allow range based get requests? that end before the current offset
case "GET":
s.doGet(w, r)

// tus.io based upload

// uploads are initiated using the CS3 APIs Initiate Download call
case "POST":
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
handler.PatchFile(w, r)
// PUT provides a wrapper around the POST call, to save the caller from
// the trouble of configuring the tus client.
case "PUT":
s.doTusPut(w, r)
// TODO Only attach the DELETE handler if the Terminate() method is provided
case "DELETE":
handler.DelFile(w, r)
}
}))
} else {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
method := r.Method
// https://github.com/tus/tus-resumable-upload-protocol/blob/master/protocol.md#x-http-method-override
if r.Header.Get("X-HTTP-Method-Override") != "" {
method = r.Header.Get("X-HTTP-Method-Override")
}

switch method {
case "HEAD":
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
}
})
return handler
}

return err
return nil
}