Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce spaces field mask #2888

Merged
merged 4 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/spaces-field-mask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: introduce spaces field mask

We now use a field mask to select which properties to retrieve when looking up storage spaces. This allows the gateway to only ask for `root` when trying to forward id or path based requests.

https://github.com/cs3org/reva/pull/2888
12 changes: 11 additions & 1 deletion internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,16 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag

func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
// TODO update CS3 api to forward the filters to the registry so it can filter the number of providers the gateway needs to query
filters := map[string]string{}
filters := map[string]string{
// TODO add opaque / CS3 api to expand 'path,root,stat?' properties / field mask
"mask": "*", // fetch all properties when listing storage spaces
}

mask := utils.ReadPlainFromOpaque(req.Opaque, "mask")
if mask != "" {
// TODO check for allowed filters
filters["mask"] = mask
}

for _, f := range req.Filters {
switch f.Type {
Expand Down Expand Up @@ -1093,6 +1102,7 @@ func (s *svc) findSpaces(ctx context.Context, ref *provider.Reference) ([]*regis
}

filters := map[string]string{
"mask": "root", // we only need the root for routing
"path": ref.Path,
}
if ref.ResourceId != nil {
Expand Down
12 changes: 8 additions & 4 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,10 @@ func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateSt
}, nil
}

resp.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Id.GetOpaqueId())
resp.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Root.GetStorageId())
if resp.StorageSpace != nil {
resp.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Id.GetOpaqueId())
resp.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, resp.StorageSpace.Root.GetStorageId())
}
return resp, nil
}

Expand Down Expand Up @@ -578,8 +580,10 @@ func (s *service) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt
Msg("failed to update storage space")
return nil, err
}
res.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Id.GetOpaqueId())
res.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Root.GetStorageId())
if res.StorageSpace != nil {
res.StorageSpace.Id.OpaqueId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Id.GetOpaqueId())
res.StorageSpace.Root.StorageId = storagespace.FormatStorageID(s.conf.MountID, res.StorageSpace.Root.GetStorageId())
}
return res, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ func (p *Handler) getResourceInfos(ctx context.Context, w http.ResponseWriter, r
}
spacePath := string(space.Opaque.Map["path"].Value)
// TODO separate stats to the path or to the children, after statting all children update the mtime/etag
// TODO get mtime, and size from space as well, so we no longer have to stat here?
// TODO get mtime, and size from space as well, so we no longer have to stat here? would require sending the requested metadata keys as well
// root should be a ResourceInfo so it can contain the full stat, not only the id ... do we even need spaces then?
// metadata keys could all be prefixed with "root." to indicate we want more than the root id ...
spaceRef := spacelookup.MakeRelativeReference(space, requestPath, spacesPropfind)
info, status, err := p.statSpace(ctx, client, space, spaceRef, metadataKeys)
if err != nil || status.GetCode() != rpc.Code_CODE_OK {
Expand Down
56 changes: 41 additions & 15 deletions pkg/storage/registry/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,18 +270,19 @@ func (r *registry) GetProvider(ctx context.Context, space *providerpb.StorageSpa
func (r *registry) ListProviders(ctx context.Context, filters map[string]string) ([]*registrypb.ProviderInfo, error) {
b, _ := strconv.ParseBool(filters["unique"])
unrestricted, _ := strconv.ParseBool(filters["unrestricted"])
mask := filters["mask"]
switch {
case filters["storage_id"] != "" && filters["opaque_id"] != "":
findMountpoint := filters["type"] == "mountpoint"
findGrant := !findMountpoint && filters["path"] == "" // relvative references, by definition, occur in the correct storage, so do not look for grants
return r.findProvidersForResource(ctx, filters["storage_id"]+"!"+filters["opaque_id"], findMountpoint, findGrant, unrestricted), nil
return r.findProvidersForResource(ctx, filters["storage_id"]+"!"+filters["opaque_id"], findMountpoint, findGrant, unrestricted, mask), nil
case filters["path"] != "":
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], b, unrestricted), nil
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], b, unrestricted, mask), nil
case len(filters) == 0:
// return all providers
return r.findAllProviders(ctx), nil
return r.findAllProviders(ctx, mask), nil
default:
return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted), nil
return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted, mask), nil
}
}

Expand Down Expand Up @@ -321,7 +322,7 @@ func (r *registry) buildFilters(filterMap map[string]string) []*providerpb.ListS
return filters
}

func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool, _ string) []*registrypb.ProviderInfo {

currentUser := ctxpkg.ContextMustGetUser(ctx)
providerInfos := []*registrypb.ProviderInfo{}
Expand Down Expand Up @@ -367,20 +368,45 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid
// findProvidersForResource looks up storage providers based on a resource id
// for the root of a space the res.StorageId is the same as the res.OpaqueId
// for share spaces the res.StorageId tells the registry the spaceid and res.OpaqueId is a node in that space
func (r *registry) findProvidersForResource(ctx context.Context, id string, findMoundpoint, findGrant, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForResource(ctx context.Context, id string, findMoundpoint, findGrant, unrestricted bool, mask string) []*registrypb.ProviderInfo {
currentUser := ctxpkg.ContextMustGetUser(ctx)
providerInfos := []*registrypb.ProviderInfo{}
sdid, _ := storagespace.SplitStorageID(id)
providerID, spaceID := storagespace.SplitStorageID(id)
spaceID, nodeID, err := storagespace.SplitID(spaceID)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("splitting spaceid failed")
return nil
}

for address, provider := range r.c.Providers {
// try to find provider based on storageproviderid prefix
if provider.ProviderID != "" && sdid != "" && provider.ProviderID != sdid {
// skip mismatching storageproviders
continue
}
p := &registrypb.ProviderInfo{
Address: address,
ProviderId: id,
ProviderId: providerID,
}
// try to find provider based on storageproviderid prefix if only root is requested
if provider.ProviderID != "" && providerID != "" && provider.ProviderID == providerID && mask == "root" {
// construct space based on configured properties without actually making a ListStorageSpaces call
space := &providerpb.StorageSpace{
Id: &providerpb.StorageSpaceId{OpaqueId: id},
Root: &providerpb.ResourceId{
StorageId: spaceID,
OpaqueId: nodeID,
},
}
// this is a request for requests by id
// setPath(space, provider.Path) // hmm not enough info to build a path.... the space alias is no longer known here we would need to query the provider

validSpaces := []*providerpb.StorageSpace{space}
if err := setSpaces(p, validSpaces); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("provider", provider).Interface("spaces", validSpaces).Msg("marshaling spaces failed, continuing")
return nil
}
providerInfos = append(providerInfos, p)
return providerInfos
}
if provider.ProviderID != "" && providerID != "" && provider.ProviderID != providerID {
// skip mismatching storageproviders
continue
}
filters := []*providerpb.ListStorageSpacesRequest_Filter{{
Type: providerpb.ListStorageSpacesRequest_Filter_TYPE_ID,
Expand Down Expand Up @@ -461,7 +487,7 @@ func (r *registry) findProvidersForResource(ctx context.Context, id string, find

// findProvidersForAbsolutePathReference takes a path and returns the storage provider with the longest matching path prefix
// FIXME use regex to return the correct provider when multiple are configured
func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, path string, unique, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, path string, unique, unrestricted bool, _ string) []*registrypb.ProviderInfo {
currentUser := ctxpkg.ContextMustGetUser(ctx)

deepestMountPath := ""
Expand Down Expand Up @@ -568,7 +594,7 @@ func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, pa

// findAllProviders returns a list of all storage providers
// This is a dumb call that does not call ListStorageSpaces() on the providers: ListStorageSpaces() in the gateway can cache that better.
func (r *registry) findAllProviders(ctx context.Context) []*registrypb.ProviderInfo {
func (r *registry) findAllProviders(ctx context.Context, _ string) []*registrypb.ProviderInfo {
pis := make([]*registrypb.ProviderInfo, 0, len(r.c.Providers))
for address := range r.c.Providers {
pis = append(pis, &registrypb.ProviderInfo{
Expand Down
22 changes: 14 additions & 8 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,6 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
return nil, err
}

if restore {
if err := node.SetDTime(nil); err != nil {
return nil, err
}
}

u, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, fmt.Errorf("decomposedfs: spaces: contextual user not found")
Expand Down Expand Up @@ -462,11 +456,17 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
}

// TODO change the permission handling
// these two attributes need manager permissions
if space.Name != "" || hasDescription {
// these three attributes need manager permissions
if space.Name != "" || hasDescription || restore {
err = fs.checkManagerPermission(ctx, node)
}
if err != nil {
if restore {
// a disabled space is invisible to non admins
return &provider.UpdateStorageSpaceResponse{
Status: &v1beta11.Status{Code: v1beta11.Code_CODE_NOT_FOUND, Message: err.Error()},
}, nil
}
return &provider.UpdateStorageSpaceResponse{
Status: &v1beta11.Status{Code: v1beta11.Code_CODE_PERMISSION_DENIED, Message: err.Error()},
}, nil
Expand All @@ -484,6 +484,12 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
return nil, err
}

if restore {
if err := node.SetDTime(nil); err != nil {
return nil, err
}
}

// send back the updated data from the storage
updatedSpace, err := fs.storageSpaceFromNode(ctx, node, node.InternalPath(), false, false)
if err != nil {
Expand Down