Skip to content

Commit

Permalink
MB-52517: Disallow Alter Index and Create Index With parameters in Se…
Browse files Browse the repository at this point in the history
…rverless.

Implemented CreatePrimaryIndex5 and CreateIndex5 functions as a part of Indexer5{} interface. This interface passes the user crdentials as IndexConnection which can be used to perform a RBAC check.

1. IsOperationAllowed() returns fasle only if the user does not have permission to use certain paarmeters in WITH Clause in serverless mode.
2. IsParameterAllowed() returns true only if the WITH clause has either "defer_build" or "retain_deleted_xattr" or both.

Change-Id: I32d340b5db69c117945c844f7912dba0cbc4d60f
  • Loading branch information
singlaabhinandan committed Nov 10, 2022
1 parent 1b5cb71 commit fad6f20
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 18 deletions.
18 changes: 0 additions & 18 deletions secondary/queryport/client/meta_client.go
Expand Up @@ -325,12 +325,6 @@ func (b *metadataClient) CreateIndex(
}
}

if common.GetDeploymentModel() == common.SERVERLESS_DEPLOYMENT {
if err := IsParameterAllowed(plan); err != nil {
return 0, err
}
}

refreshCnt := 0
RETRY:
defnID, err, needRefresh := b.mdClient.CreateIndexWithPlan(
Expand All @@ -350,18 +344,6 @@ RETRY:
return uint64(defnID), err
}

func IsParameterAllowed(plan map[string]interface{}) error {

allowedParams := map[string]bool{"defer_build": true, "retain_deleted_xattr": true}

for attr, _ := range plan {
if _, ok := allowedParams[attr]; !ok {
return errors.New("Unsupported parameters in the With clause. Create Index With defer_build and retain_deleted_xattr is supported.")
}
}
return nil
}

// BuildIndexes implements BridgeAccessor{} interface.
func (b *metadataClient) BuildIndexes(defnIDs []uint64) error {
currmeta := (*indexTopology)(atomic.LoadPointer(&b.indexers))
Expand Down
79 changes: 79 additions & 0 deletions secondary/queryport/n1ql/secondary_index.go
Expand Up @@ -428,6 +428,20 @@ func (gsi *gsiKeyspace) CreatePrimaryIndex3(
return index.(datastore.PrimaryIndex), nil
}

// CreatePrimaryIndex5 implements datastore.Indexer5{} interface. Create a secondary
// index on this keyspace
func (gsi *gsiKeyspace) CreatePrimaryIndex5(
requestId, name string, indexPartition *datastore.IndexPartition,
with value.Value, conn *datastore.IndexConnection) (pi datastore.PrimaryIndex, retErr errors.Error) {

keyspace := fmt.Sprintf("%s:%s:%s", gsi.bucket, gsi.scope, gsi.keyspace)
if isAllowed, err := IsOperationAllowed(with, conn, keyspace, "CreatePrimaryIndex5"); isAllowed {
return gsi.CreatePrimaryIndex3(requestId, name, indexPartition, with)
} else {
return nil, err
}
}

// CreateIndex implements datastore.Indexer{} interface. Create a secondary
// index on this keyspace
func (gsi *gsiKeyspace) CreateIndex(
Expand Down Expand Up @@ -606,6 +620,71 @@ func (gsi *gsiKeyspace) CreateIndex3(
return gsi.IndexById(defnID2String(defnID))
}

// CreateIndex5 implements datastore.Indexer5{} interface. Create a secondary
// index on this keyspace
func (gsi *gsiKeyspace) CreateIndex5(
requestId, name string, rangeKey datastore.IndexKeys, indexPartition *datastore.IndexPartition,
where expression.Expression, with value.Value, conn *datastore.IndexConnection) (si datastore.Index, retErr errors.Error) {

keyspace := fmt.Sprintf("%s:%s:%s", gsi.bucket, gsi.scope, gsi.keyspace)
if isAllowed, err := IsOperationAllowed(with, conn, keyspace, "CreateIndex5"); isAllowed {
return gsi.CreateIndex3(requestId, name, rangeKey, indexPartition, where, with)
} else {
return nil, err
}
}

func IsOperationAllowed(with value.Value, conn *datastore.IndexConnection, keyspace string, method string) (bool, errors.Error) {

if common.GetDeploymentModel() == common.SERVERLESS_DEPLOYMENT && with != nil {
var withJSON []byte
var err error
if withJSON, err = with.MarshalJSON(); err != nil {
return false, errors.NewError(err, "GSI error marshalling WITH clause.")
}
plan := make(map[string]interface{})
if withJSON != nil && len(withJSON) > 0 {
err := json.Unmarshal(withJSON, &plan)
if err != nil {
return false, errors.NewError(err, "GSI error Unmarshalling WITH clause.")
}
if IsParameterAllowed(plan) {
l.Debugf("%s: The WITH clause parameters are allowed in Serverless mode.", method)
return true, nil
} else {
l.Debugf("%s: The WITH clause parameters are not allowed in Serverless mode.", method)
http := conn.Context().Credentials().HttpRequest
auth, valid, err := c.IsAuthValid(http)
if valid {
permission := fmt.Sprintf("cluster.collection[%s].n1ql.index.parameterized!create", keyspace)
if isPermitted, err := auth.IsAllowed(permission); isPermitted {
return true, nil
} else {
l.Errorf("%s: The user does not have permission to use these parameters in WITH clause of CREATE PRIMARY INDEX/CREATE INDEX. WITH = %v", method, plan)
return false, errors.NewError(err, "User does not permission to perform this operation.")
}
} else {
l.Errorf("%s: The user credential are Invalid.", method)
return false, errors.NewError(err, "Invalid credentials.")
}
}
}
}
return true, nil
}

func IsParameterAllowed(plan map[string]interface{}) bool {

allowedParams := map[string]bool{"defer_build": true, "retain_deleted_xattr": true}

for attr, _ := range plan {
if _, ok := allowedParams[attr]; !ok {
return false
}
}
return true
}

func partitionKey(partitionType datastore.PartitionType) c.PartitionScheme {
if partitionType == datastore.HASH_PARTITION {
return c.PartitionScheme(c.KEY)
Expand Down

0 comments on commit fad6f20

Please sign in to comment.