Skip to content

Commit

Permalink
MB-45446 : Add RBAC for indexer level stats in /api/v1/stats endpoint
Browse files Browse the repository at this point in the history
Change-Id: Ie1c3dd0f9ac0c8b0005b00f09740efd563232698
  • Loading branch information
ksaikrishnateja committed Apr 21, 2021
1 parent 15a6a47 commit 0ada856
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 75 deletions.
88 changes: 88 additions & 0 deletions secondary/common/permission_cache.go
@@ -0,0 +1,88 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.

package common

import (
"fmt"

"github.com/couchbase/cbauth"
)

type sessionPermissionsCache struct {
permissions map[string]bool
creds cbauth.Creds
}

func NewSessionPermissionsCache(cr cbauth.Creds) *sessionPermissionsCache {
p := &sessionPermissionsCache{}
p.permissions = make(map[string]bool)
p.creds = cr
return p
}

func (p *sessionPermissionsCache) checkPermissions(permissions []string) (bool, error) {

allow := false
err := error(nil)

for _, permission := range permissions {
allow, err = p.creds.IsAllowed(permission)
if !allow || err != nil {
break
}
}

return allow, err
}

func (p *sessionPermissionsCache) CheckAndAddBucketLevelPermission(bucket, op string) bool {
if bucketLevelPermission, ok := p.permissions[bucket]; ok {
return bucketLevelPermission
} else {
permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!%s", bucket, op)
p.permissions[bucket], _ = p.checkPermissions([]string{permission})
return p.permissions[bucket]
}
}

func (p *sessionPermissionsCache) CheckAndAddScopeLevelPermission(bucket, scope, op string) bool {
scopeLevel := fmt.Sprintf("%s:%s", bucket, scope)
if scopeLevelPermission, ok := p.permissions[scopeLevel]; ok {
return scopeLevelPermission
} else {
permission := fmt.Sprintf("cluster.scope[%s].n1ql.index!%s", scopeLevel, op)
p.permissions[scopeLevel], _ = p.checkPermissions([]string{permission})
return p.permissions[scopeLevel]
}
}

func (p *sessionPermissionsCache) CheckAndAddCollectionLevelPermission(bucket, scope, collection, op string) bool {
collectionLevel := fmt.Sprintf("%s:%s:%s", bucket, scope, collection)
if collectionLevelPermission, ok := p.permissions[collectionLevel]; ok {
return collectionLevelPermission
} else {
permission := fmt.Sprintf("cluster.collection[%s].n1ql.index!%s", collectionLevel, op)
p.permissions[collectionLevel], _ = p.checkPermissions([]string{permission})
return p.permissions[collectionLevel]
}
}

// isAllowed returns true iff a caller's (creds, op) allow access to IndexDefn (bucket, scope, collection).
func (p *sessionPermissionsCache) IsAllowed(bucket, scope, collection, op string) bool {

if p.CheckAndAddBucketLevelPermission(bucket, op) {
return true
} else if p.CheckAndAddScopeLevelPermission(bucket, scope, op) {
return true
} else if p.CheckAndAddCollectionLevelPermission(bucket, scope, collection, op) {
return true
}
return false
}
7 changes: 5 additions & 2 deletions secondary/indexer/api.go
Expand Up @@ -24,6 +24,7 @@ type target struct {
skipEmpty bool
partition bool
pretty bool
creds cbauth.Creds
}

type restServer struct {
Expand Down Expand Up @@ -139,7 +140,7 @@ func (api *restServer) statsHandler(req request) {
stats := api.statsMgr.stats.Get()
segs := strings.Split(req.url, "/")
t := &target{version: req.version, skipEmpty: skipEmpty,
partition: partition, pretty: pretty}
partition: partition, pretty: pretty, creds: req.creds}
switch req.version {
case "v1":
if len(segs) == 3 { // Indexer node level stats
Expand Down Expand Up @@ -196,7 +197,7 @@ func (api *restServer) statsHandler(req request) {
return
}
}
if !api.authorizeStats(req, t) {
if t.level != "indexer" && !api.authorizeStats(req, t) {
return
}
if bytes, err := stats.VersionedJSON(t); err != nil {
Expand All @@ -218,6 +219,8 @@ func (api *restServer) statsHandler(req request) {
}
}

// Dont use this function for indexer level stats. For indexer level stats
// we must check permissions for every index.
func (api *restServer) authorizeStats(req request, t *target) bool {

permissions := ([]string)(nil)
Expand Down
11 changes: 9 additions & 2 deletions secondary/indexer/stats_manager.go
Expand Up @@ -1127,6 +1127,7 @@ func (is *IndexerStats) GetStats(spec *statsSpec) interface{} {

func (is *IndexerStats) GetVersionedStats(t *target) (common.Statistics, bool) {
statsMap := make(map[string]interface{})

var found bool

addToStatsMap := func(s *IndexStats) {
Expand All @@ -1145,9 +1146,15 @@ func (is *IndexerStats) GetVersionedStats(t *target) (common.Statistics, bool) {
}

if t.level == "indexer" {
statsMap["indexer"] = is.constructIndexerStats(t.skipEmpty, t.version)
querySystemCatalog, _ := t.creds.IsAllowed("cluster.n1ql.meta!read")
if querySystemCatalog {
statsMap["indexer"] = is.constructIndexerStats(t.skipEmpty, t.version)
}
permissionCache := common.NewSessionPermissionsCache(t.creds)
for _, s := range is.indexes {
addToStatsMap(s)
if querySystemCatalog || permissionCache.IsAllowed(s.bucket, s.scope, s.collection, "list") {
addToStatsMap(s)
}
}
found = true
} else if t.level == "bucket" {
Expand Down
87 changes: 16 additions & 71 deletions secondary/manager/request_handler.go
Expand Up @@ -157,10 +157,6 @@ type IndexStatus struct {

type indexStatusSorter []IndexStatus

type permissionsCache struct {
permissions map[string]bool
}

//
// Response
//
Expand Down Expand Up @@ -703,7 +699,7 @@ func (m *requestHandlerContext) getIndexStatus(creds cbauth.Creds, t *target, ge

defnToHostMap := make(map[common.IndexDefnId][]string)
isInstanceDeferred := make(map[common.IndexInstId]bool)
permissionCache := initPermissionsCache()
permissionCache := common.NewSessionPermissionsCache(creds)

keepKeys := make([]string, 0, len(nids)) // memory cache keys of current indexer nodes
for _, nid := range nids {
Expand Down Expand Up @@ -792,7 +788,7 @@ func (m *requestHandlerContext) getIndexStatus(creds cbauth.Creds, t *target, ge
fullSet = false
continue
}
accessAllowed := permissionCache.isAllowed(creds, defn.Bucket, defn.Scope, defn.Collection, "list")
accessAllowed := permissionCache.IsAllowed(defn.Bucket, defn.Scope, defn.Collection, "list")
if !accessAllowed {
// Do not cache partial results
metaToCache[hostKey] = nil
Expand Down Expand Up @@ -1279,7 +1275,7 @@ func (m *requestHandlerContext) getIndexMetadata(creds cbauth.Creds, t *target)
return nil, err
}

permissionsCache := initPermissionsCache()
permissionsCache := common.NewSessionPermissionsCache(creds)

// find all nodes that has a index http service
nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
Expand Down Expand Up @@ -1325,13 +1321,13 @@ func (m *requestHandlerContext) getIndexMetadata(creds cbauth.Creds, t *target)
}

for _, topology := range localMeta.IndexTopologies {
if permissionsCache.isAllowed(creds, topology.Bucket, topology.Scope, topology.Collection, "list") {
if permissionsCache.IsAllowed(topology.Bucket, topology.Scope, topology.Collection, "list") {
newLocalMeta.IndexTopologies = append(newLocalMeta.IndexTopologies, topology)
}
}

for _, defn := range localMeta.IndexDefinitions {
if permissionsCache.isAllowed(creds, defn.Bucket, defn.Scope, defn.Collection, "list") {
if permissionsCache.IsAllowed(defn.Bucket, defn.Scope, defn.Collection, "list") {
newLocalMeta.IndexDefinitions = append(newLocalMeta.IndexDefinitions, defn)
}
}
Expand Down Expand Up @@ -1708,7 +1704,7 @@ func (m *requestHandlerContext) getLocalIndexMetadata(creds cbauth.Creds,
t0 := time.Now()

repo := m.mgr.getMetadataRepo()
permissionsCache := initPermissionsCache()
permissionsCache := common.NewSessionPermissionsCache(creds)

meta = &LocalIndexMetadata{IndexTopologies: nil, IndexDefinitions: nil}
indexerId, err := repo.GetLocalIndexerId()
Expand Down Expand Up @@ -1745,7 +1741,7 @@ func (m *requestHandlerContext) getLocalIndexMetadata(creds cbauth.Creds,
_, defn, err = iter.Next()
for err == nil {
if applyFilters(bucket, defn.Bucket, defn.Scope, defn.Collection, defn.Name, filters, filterType) &&
permissionsCache.isAllowed(creds, defn.Bucket, defn.Scope, defn.Collection, "list") {
permissionsCache.IsAllowed(defn.Bucket, defn.Scope, defn.Collection, "list") {
meta.IndexDefinitions = append(meta.IndexDefinitions, *defn)
} else {
fullSet = false
Expand All @@ -1765,7 +1761,7 @@ func (m *requestHandlerContext) getLocalIndexMetadata(creds cbauth.Creds,
topology, err = iter1.Next()
for err == nil {
if applyFilters(bucket, topology.Bucket, topology.Scope, topology.Collection, "", filters, filterType) &&
permissionsCache.isAllowed(creds, topology.Bucket, topology.Scope, topology.Collection, "list") {
permissionsCache.IsAllowed(topology.Bucket, topology.Scope, topology.Collection, "list") {
meta.IndexTopologies = append(meta.IndexTopologies, *topology)
} else {
fullSet = false
Expand Down Expand Up @@ -1843,57 +1839,6 @@ func shouldProcess(t *target, defnBucket, defnScope, defnColl, defnName string)
return false
}

func initPermissionsCache() *permissionsCache {
p := &permissionsCache{}
p.permissions = make(map[string]bool)
return p
}

// isAllowed returns true iff a caller's (creds, op) allow access to IndexDefn (bucket, scope, collection).
func (p *permissionsCache) isAllowed(creds cbauth.Creds, bucket, scope, collection, op string) bool {

checkAndAddBucketLevelPermission := func(bucket string) bool {
if bucketLevelPermission, ok := p.permissions[bucket]; ok {
return bucketLevelPermission
} else {
permission := fmt.Sprintf("cluster.bucket[%s].n1ql.index!%s", bucket, op)
p.permissions[bucket] = isAllowed(creds, []string{permission}, nil)
return p.permissions[bucket]
}
}

checkAndAddScopeLevelPermission := func(bucket, scope string) bool {
scopeLevel := fmt.Sprintf("%s:%s", bucket, scope)
if scopeLevelPermission, ok := p.permissions[scopeLevel]; ok {
return scopeLevelPermission
} else {
permission := fmt.Sprintf("cluster.scope[%s].n1ql.index!%s", scopeLevel, op)
p.permissions[scopeLevel] = isAllowed(creds, []string{permission}, nil)
return p.permissions[scopeLevel]
}
}

checkAndAddCollectionLevelPermission := func(bucket, scope, collection string) bool {
collectionLevel := fmt.Sprintf("%s:%s:%s", bucket, scope, collection)
if collectionLevelPermission, ok := p.permissions[collectionLevel]; ok {
return collectionLevelPermission
} else {
permission := fmt.Sprintf("cluster.collection[%s].n1ql.index!%s", collectionLevel, op)
p.permissions[collectionLevel] = isAllowed(creds, []string{permission}, nil)
return p.permissions[collectionLevel]
}
}

if checkAndAddBucketLevelPermission(bucket) {
return true
} else if checkAndAddScopeLevelPermission(bucket, scope) {
return true
} else if checkAndAddCollectionLevelPermission(bucket, scope, collection) {
return true
}
return false
}

///////////////////////////////////////////////////////
// Cached LocalIndexMetadata and Stats
///////////////////////////////////////////////////////
Expand All @@ -1905,7 +1850,7 @@ func (m *requestHandlerContext) handleCachedLocalIndexMetadataRequest(w http.Res
return
}

permissionsCache := initPermissionsCache()
permissionsCache := common.NewSessionPermissionsCache(creds)
host := r.FormValue("host")
host = strings.Trim(host, "\"")

Expand All @@ -1916,13 +1861,13 @@ func (m *requestHandlerContext) handleCachedLocalIndexMetadataRequest(w http.Res
newMeta.IndexTopologies = make([]IndexTopology, 0, len(meta.IndexTopologies))

for _, defn := range meta.IndexDefinitions {
if permissionsCache.isAllowed(creds, defn.Bucket, defn.Scope, defn.Collection, "list") {
if permissionsCache.IsAllowed(defn.Bucket, defn.Scope, defn.Collection, "list") {
newMeta.IndexDefinitions = append(newMeta.IndexDefinitions, defn)
}
}

for _, topology := range meta.IndexTopologies {
if permissionsCache.isAllowed(creds, topology.Bucket, topology.Scope, topology.Collection, "list") {
if permissionsCache.IsAllowed(topology.Bucket, topology.Scope, topology.Collection, "list") {
newMeta.IndexTopologies = append(newMeta.IndexTopologies, topology)
}
}
Expand Down Expand Up @@ -1977,7 +1922,7 @@ func (m *requestHandlerContext) handleRestoreIndexMetadataRequest(w http.Respons
return
}

permissionsCache := initPermissionsCache()
permissionsCache := common.NewSessionPermissionsCache(creds)
// convert backup image into runtime data structure
image := m.convertIndexMetadataRequest(r)
if image == nil {
Expand All @@ -1987,13 +1932,13 @@ func (m *requestHandlerContext) handleRestoreIndexMetadataRequest(w http.Respons

for _, localMeta := range image.Metadata {
for _, topology := range localMeta.IndexTopologies {
if !permissionsCache.isAllowed(creds, topology.Bucket, topology.Scope, topology.Collection, "write") {
if !permissionsCache.IsAllowed(topology.Bucket, topology.Scope, topology.Collection, "write") {
return
}
}

for _, defn := range localMeta.IndexDefinitions {
if !permissionsCache.isAllowed(creds, defn.Bucket, defn.Scope, defn.Collection, "write") {
if !permissionsCache.IsAllowed(defn.Bucket, defn.Scope, defn.Collection, "write") {
return
}
}
Expand Down Expand Up @@ -2268,11 +2213,11 @@ func (m *requestHandlerContext) getLocalReplicaCount(creds cbauth.Creds) (map[co
defer iter.Close()

var defn *common.IndexDefn
permissionsCache := initPermissionsCache()
permissionsCache := common.NewSessionPermissionsCache(creds)

_, defn, err = iter.Next()
for err == nil {
if !permissionsCache.isAllowed(creds, defn.Bucket, defn.Scope, defn.Collection, "list") {
if !permissionsCache.IsAllowed(defn.Bucket, defn.Scope, defn.Collection, "list") {
return nil, fmt.Errorf("Permission denied on reading metadata for keyspace %v:%v:%v", defn.Bucket, defn.Scope, defn.Collection)
}

Expand Down

0 comments on commit 0ada856

Please sign in to comment.