Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement OCM TUS uploads #4410

Merged
merged 5 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/unreleased/ocm.md
Expand Up @@ -2,4 +2,5 @@ Bugfix: Improve OCM support

We fixed several bugs with OCM support.

https://github.com/cs3org/reva/pull/4410
https://github.com/cs3org/reva/pull/4333
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -231,4 +231,4 @@ require (

replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20231207143248-4d424e3ae348

replace github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231123085457-ff658b6ea159
replace github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -425,8 +425,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/acomagu/bufpipe v1.0.3 h1:fxAGrHZTgQ9w5QqVItgzwj235/uYZYgbXitB+dLupOk=
github.com/acomagu/bufpipe v1.0.3/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
github.com/aduffeck/gowebdav v0.0.0-20231123085457-ff658b6ea159 h1:m63hhLqbqmLGGPtyTtjTdxae61d9tMbRdKvMaDHWcDs=
github.com/aduffeck/gowebdav v0.0.0-20231123085457-ff658b6ea159/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE=
github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6 h1:ws0yvsikTQdmheKINP16tBzAHdttrHwbz/q3Fgl9X1Y=
github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down
10 changes: 9 additions & 1 deletion internal/grpc/services/ocmshareprovider/ocmshareprovider.go
Expand Up @@ -515,6 +515,14 @@ func (s *service) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateRec

func (s *service) GetReceivedOCMShare(ctx context.Context, req *ocm.GetReceivedOCMShareRequest) (*ocm.GetReceivedOCMShareResponse, error) {
user := ctxpkg.ContextMustGetUser(ctx)
if user.Id.GetType() == userpb.UserType_USER_TYPE_SERVICE {
var uid userpb.UserId
_ = utils.ReadJSONFromOpaque(req.Opaque, "userid", &uid)
user = &userpb.User{
Id: &uid,
}
}

ocmshare, err := s.repo.GetReceivedShare(ctx, user, req.Ref)
if err != nil {
if errors.Is(err, share.ErrShareNotFound) {
Expand All @@ -523,7 +531,7 @@ func (s *service) GetReceivedOCMShare(ctx context.Context, req *ocm.GetReceivedO
}, nil
}
return &ocm.GetReceivedOCMShareResponse{
Status: status.NewInternal(ctx, "error getting received share"),
Status: status.NewInternal(ctx, "error getting received share: "+err.Error()),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions internal/http/services/owncloud/ocdav/put.go
Expand Up @@ -137,6 +137,7 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ

length, err := getContentLength(w, r)
if err != nil {
log.Error().Err(err).Msg("error getting the content length")
w.WriteHeader(http.StatusBadRequest)
return
}
Expand Down
73 changes: 30 additions & 43 deletions pkg/ocm/storage/received/ocm.go
Expand Up @@ -30,6 +30,7 @@ import (
"strings"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
ocmpb "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
Expand Down Expand Up @@ -59,8 +60,11 @@ type driver struct {
}

type config struct {
GatewaySVC string `mapstructure:"gatewaysvc"`
Insecure bool `mapstructure:"insecure"`
GatewaySVC string `mapstructure:"gatewaysvc"`
Insecure bool `mapstructure:"insecure"`
StorageRoot string `mapstructure:"storage_root"`
ServiceAccountID string `mapstructure:"service_account_id"`
ServiceAccountSecret string `mapstructure:"service_account_secret"`
}

func (c *config) ApplyDefaults() {
Expand Down Expand Up @@ -136,15 +140,19 @@ func shareInfoFromReference(ref *provider.Reference) (*ocmpb.ShareId, string) {

}

func (d *driver) getWebDAVFromShare(ctx context.Context, shareID *ocmpb.ShareId) (*ocmpb.ReceivedShare, string, string, error) {
func (d *driver) getWebDAVFromShare(ctx context.Context, forUser *userpb.UserId, shareID *ocmpb.ShareId) (*ocmpb.ReceivedShare, string, string, error) {
// TODO: we may want to cache the share
res, err := d.gateway.GetReceivedOCMShare(ctx, &ocmpb.GetReceivedOCMShareRequest{
req := &ocmpb.GetReceivedOCMShareRequest{
Ref: &ocmpb.ShareReference{
Spec: &ocmpb.ShareReference_Id{
Id: shareID,
},
},
})
}
if forUser != nil {
req.Opaque = utils.AppendJSONToOpaque(nil, "userid", forUser)
}
res, err := d.gateway.GetReceivedOCMShare(ctx, req)
if err != nil {
return nil, "", "", err
}
Expand Down Expand Up @@ -173,10 +181,10 @@ func getWebDAVProtocol(protocols []*ocmpb.Protocol) (*ocmpb.WebDAVProtocol, bool
return nil, false
}

func (d *driver) webdavClient(ctx context.Context, ref *provider.Reference) (*gowebdav.Client, *ocmpb.ReceivedShare, string, error) {
func (d *driver) webdavClient(ctx context.Context, forUser *userpb.UserId, ref *provider.Reference) (*gowebdav.Client, *ocmpb.ReceivedShare, string, error) {
id, rel := shareInfoFromReference(ref)

share, endpoint, secret, err := d.getWebDAVFromShare(ctx, id)
share, endpoint, secret, err := d.getWebDAVFromShare(ctx, forUser, id)
if err != nil {
return nil, nil, "", err
}
Expand All @@ -199,31 +207,31 @@ func (d *driver) webdavClient(ctx context.Context, ref *provider.Reference) (*go
}

func (d *driver) CreateDir(ctx context.Context, ref *provider.Reference) error {
client, _, rel, err := d.webdavClient(ctx, ref)
client, _, rel, err := d.webdavClient(ctx, nil, ref)
if err != nil {
return err
}
return client.MkdirAll(rel, 0)
}

func (d *driver) Delete(ctx context.Context, ref *provider.Reference) error {
client, _, rel, err := d.webdavClient(ctx, ref)
client, _, rel, err := d.webdavClient(ctx, nil, ref)
if err != nil {
return err
}
return client.RemoveAll(rel)
}

func (d *driver) TouchFile(ctx context.Context, ref *provider.Reference, markprocessing bool, mtime string) error {
client, _, rel, err := d.webdavClient(ctx, ref)
client, _, rel, err := d.webdavClient(ctx, nil, ref)
if err != nil {
return err
}
return client.Write(rel, []byte{}, 0)
}

func (d *driver) Move(ctx context.Context, oldRef, newRef *provider.Reference) error {
client, _, relOld, err := d.webdavClient(ctx, oldRef)
client, _, relOld, err := d.webdavClient(ctx, nil, oldRef)
if err != nil {
return err
}
Expand Down Expand Up @@ -263,7 +271,7 @@ func convertStatToResourceInfo(ref *provider.Reference, f fs.FileInfo, share *oc
}
webdavProtocol, _ := getWebDAVProtocol(share.Protocols)

return &provider.ResourceInfo{
ri := provider.ResourceInfo{
Type: t,
Id: id,
MimeType: mime.Detect(f.IsDir(), f.Name()),
Expand All @@ -278,11 +286,17 @@ func convertStatToResourceInfo(ref *provider.Reference, f fs.FileInfo, share *oc
Checksum: &provider.ResourceChecksum{
Type: provider.ResourceChecksumType_RESOURCE_CHECKSUM_TYPE_INVALID,
},
}, nil
}

if f.(gowebdav.File).StatusCode() == 425 {
ri.Opaque = utils.AppendPlainToOpaque(ri.Opaque, "status", "processing")
}

return &ri, nil
}

func (d *driver) GetMD(ctx context.Context, ref *provider.Reference, _ []string, _ []string) (*provider.ResourceInfo, error) {
client, share, rel, err := d.webdavClient(ctx, ref)
client, share, rel, err := d.webdavClient(ctx, nil, ref)
if err != nil {
return nil, err
}
Expand All @@ -299,7 +313,7 @@ func (d *driver) GetMD(ctx context.Context, ref *provider.Reference, _ []string,
}

func (d *driver) ListFolder(ctx context.Context, ref *provider.Reference, _ []string, _ []string) ([]*provider.ResourceInfo, error) {
client, share, rel, err := d.webdavClient(ctx, ref)
client, share, rel, err := d.webdavClient(ctx, nil, ref)
if err != nil {
return nil, err
}
Expand All @@ -320,35 +334,8 @@ func (d *driver) ListFolder(ctx context.Context, ref *provider.Reference, _ []st
return res, nil
}

func (d *driver) InitiateUpload(ctx context.Context, ref *provider.Reference, _ int64, _ map[string]string) (map[string]string, error) {
shareID, rel := shareInfoFromReference(ref)
p := getPathFromShareIDAndRelPath(shareID, rel)

return map[string]string{
"simple": p,
}, nil
}

func (d *driver) Upload(ctx context.Context, req storage.UploadRequest, _ storage.UploadFinishedFunc) (provider.ResourceInfo, error) {
client, _, rel, err := d.webdavClient(ctx, req.Ref)
if err != nil {
return provider.ResourceInfo{}, err
}
client.SetInterceptor(func(method string, rq *http.Request) {
// Set the content length on the request struct directly instead of the header.
// The content-length header gets reset by the golang http library before
// sendind out the request, resulting in chunked encoding to be used which
// breaks the quota checks in ocdav.
if method == "PUT" {
rq.ContentLength = req.Length
}
})

return provider.ResourceInfo{}, client.WriteStream(rel, req.Body, 0)
}

func (d *driver) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
client, _, rel, err := d.webdavClient(ctx, ref)
client, _, rel, err := d.webdavClient(ctx, nil, ref)
if err != nil {
return nil, err
}
Expand Down