Skip to content

Commit

Permalink
introduce spaces filed mask
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed May 23, 2022
1 parent 80f88df commit dcb251f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 16 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/spaces-field-mask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: introduce spaces field mask

We now use a field mask to select which properties to retrieve when looking up storage spaces. This allows the gateway to only ask for `root` when trying to forward id or path based requests.

https://github.com/cs3org/reva/pull/2888
6 changes: 5 additions & 1 deletion internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag

func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
// TODO update CS3 api to forward the filters to the registry so it can filter the number of providers the gateway needs to query
filters := map[string]string{}
filters := map[string]string{
// TODO add opaque / CS3 api to expand 'path,root' properties / field mask
"mask": "*", // fetch all properties when listing storage spaces
}

for _, f := range req.Filters {
switch f.Type {
Expand Down Expand Up @@ -1093,6 +1096,7 @@ func (s *svc) findSpaces(ctx context.Context, ref *provider.Reference) ([]*regis
}

filters := map[string]string{
"mask": "root", // we only need the root for routing
"path": ref.Path,
}
if ref.ResourceId != nil {
Expand Down
56 changes: 41 additions & 15 deletions pkg/storage/registry/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,18 +270,19 @@ func (r *registry) GetProvider(ctx context.Context, space *providerpb.StorageSpa
func (r *registry) ListProviders(ctx context.Context, filters map[string]string) ([]*registrypb.ProviderInfo, error) {
b, _ := strconv.ParseBool(filters["unique"])
unrestricted, _ := strconv.ParseBool(filters["unrestricted"])
mask := filters["mask"]
switch {
case filters["storage_id"] != "" && filters["opaque_id"] != "":
findMountpoint := filters["type"] == "mountpoint"
findGrant := !findMountpoint && filters["path"] == "" // relvative references, by definition, occur in the correct storage, so do not look for grants
return r.findProvidersForResource(ctx, filters["storage_id"]+"!"+filters["opaque_id"], findMountpoint, findGrant, unrestricted), nil
return r.findProvidersForResource(ctx, filters["storage_id"]+"!"+filters["opaque_id"], findMountpoint, findGrant, unrestricted, mask), nil
case filters["path"] != "":
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], b, unrestricted), nil
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], b, unrestricted, mask), nil
case len(filters) == 0:
// return all providers
return r.findAllProviders(ctx), nil
return r.findAllProviders(ctx, mask), nil
default:
return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted), nil
return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted, mask), nil
}
}

Expand Down Expand Up @@ -321,7 +322,7 @@ func (r *registry) buildFilters(filterMap map[string]string) []*providerpb.ListS
return filters
}

func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool, _ string) []*registrypb.ProviderInfo {

currentUser := ctxpkg.ContextMustGetUser(ctx)
providerInfos := []*registrypb.ProviderInfo{}
Expand Down Expand Up @@ -367,20 +368,45 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid
// findProvidersForResource looks up storage providers based on a resource id
// for the root of a space the res.StorageId is the same as the res.OpaqueId
// for share spaces the res.StorageId tells the registry the spaceid and res.OpaqueId is a node in that space
func (r *registry) findProvidersForResource(ctx context.Context, id string, findMoundpoint, findGrant, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForResource(ctx context.Context, id string, findMoundpoint, findGrant, unrestricted bool, mask string) []*registrypb.ProviderInfo {
currentUser := ctxpkg.ContextMustGetUser(ctx)
providerInfos := []*registrypb.ProviderInfo{}
sdid, _ := storagespace.SplitStorageID(id)
providerID, spaceID := storagespace.SplitStorageID(id)
spaceID, nodeID, err := storagespace.SplitID(spaceID)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("splitting spaceid failed")
return nil
}

for address, provider := range r.c.Providers {
// try to find provider based on storageproviderid prefix
if provider.ProviderID != "" && sdid != "" && provider.ProviderID != sdid {
// skip mismatching storageproviders
continue
}
p := &registrypb.ProviderInfo{
Address: address,
ProviderId: id,
ProviderId: providerID,
}
// try to find provider based on storageproviderid prefix if only root is requested
if provider.ProviderID != "" && providerID != "" && provider.ProviderID == providerID && mask == "root" {
// construct space based on configured properties without actually making a ListStorageSpaces call
space := &providerpb.StorageSpace{
Id: &providerpb.StorageSpaceId{OpaqueId: id},
Root: &providerpb.ResourceId{
StorageId: spaceID,
OpaqueId: nodeID,
},
}
// this is a request for requests by id
// setPath(space, provider.Path) // hmm not enough info to build a path.... the space alias is no longer known here we would need to query the provider

validSpaces := []*providerpb.StorageSpace{space}
if err := setSpaces(p, validSpaces); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("provider", provider).Interface("spaces", validSpaces).Msg("marshaling spaces failed, continuing")
return nil
}
providerInfos = append(providerInfos, p)
return providerInfos
}
if provider.ProviderID != "" && providerID != "" && provider.ProviderID != providerID {
// skip mismatching storageproviders
continue
}
filters := []*providerpb.ListStorageSpacesRequest_Filter{{
Type: providerpb.ListStorageSpacesRequest_Filter_TYPE_ID,
Expand Down Expand Up @@ -461,7 +487,7 @@ func (r *registry) findProvidersForResource(ctx context.Context, id string, find

// findProvidersForAbsolutePathReference takes a path and returns the storage provider with the longest matching path prefix
// FIXME use regex to return the correct provider when multiple are configured
func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, path string, unique, unrestricted bool) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, path string, unique, unrestricted bool, _ string) []*registrypb.ProviderInfo {
currentUser := ctxpkg.ContextMustGetUser(ctx)

deepestMountPath := ""
Expand Down Expand Up @@ -568,7 +594,7 @@ func (r *registry) findProvidersForAbsolutePathReference(ctx context.Context, pa

// findAllProviders returns a list of all storage providers
// This is a dumb call that does not call ListStorageSpaces() on the providers: ListStorageSpaces() in the gateway can cache that better.
func (r *registry) findAllProviders(ctx context.Context) []*registrypb.ProviderInfo {
func (r *registry) findAllProviders(ctx context.Context, _ string) []*registrypb.ProviderInfo {
pis := make([]*registrypb.ProviderInfo, 0, len(r.c.Providers))
for address := range r.c.Providers {
pis = append(pis, &registrypb.ProviderInfo{
Expand Down

0 comments on commit dcb251f

Please sign in to comment.