Skip to content
This repository has been archived by the owner on Jul 6, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into jjm-no-doc-topology-expand
Browse files Browse the repository at this point in the history
  • Loading branch information
raghavendra-talur committed Apr 4, 2019
2 parents 1b975b4 + 68b9e09 commit 10246f1
Show file tree
Hide file tree
Showing 13 changed files with 566 additions and 30 deletions.
29 changes: 27 additions & 2 deletions apps/glusterfs/app.go
Expand Up @@ -11,6 +11,7 @@ package glusterfs

import (
"fmt"
"math"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -219,6 +220,14 @@ func (app *App) initDB() error {
return logger.LogError("Unable to initialize buckets: %v", err)
}

// Check that this is db we can safely use
validAttributes := validDbAttributeKeys(tx, mapDbAtrributeKeys())
if !validAttributes {
return logger.LogError(
"Unable to initialize db, unknown attributes are present" +
" (db from a newer version of heketi?)")
}

// Handle Upgrade Changes
err = UpgradeDB(tx)
if err != nil {
Expand All @@ -228,7 +237,7 @@ func (app *App) initDB() error {
return nil
})
}
return nil
return err
}

func (app *App) initNodeMonitor() {
Expand Down Expand Up @@ -368,6 +377,14 @@ func (a *App) setFromEnvironmentalVariable() {
if "" != env {
a.conf.ZoneChecking = env
}

env = os.Getenv("HEKETI_GLUSTER_MAX_VOLUMES_PER_CLUSTER")
if env != "" {
a.conf.MaxVolumesPerCluster, err = strconv.Atoi(env)
if err != nil {
logger.LogError("Error: While parsing HEKETI_GLUSTER_MAX_VOLUMES_PER_CLUSTER: %v", err)
}
}
}

func (a *App) setAdvSettings() {
Expand Down Expand Up @@ -403,11 +420,19 @@ func (a *App) setAdvSettings() {
logger.Info("Post Request Volume Options: %v", a.conf.PostReqVolumeOptions)
PostReqVolumeOptions = a.conf.PostReqVolumeOptions
}

if a.conf.ZoneChecking != "" {
logger.Info("Zone checking: '%v'", a.conf.ZoneChecking)
ZoneChecking = ZoneCheckingStrategy(a.conf.ZoneChecking)
}
if a.conf.MaxVolumesPerCluster < 0 {
logger.Info("Volumes per cluster limit is removed as it is set to %v", a.conf.MaxVolumesPerCluster)
maxVolumesPerCluster = math.MaxInt32
} else if a.conf.MaxVolumesPerCluster == 0 {
logger.Info("Volumes per cluster limit is set to default value of %v", maxVolumesPerCluster)
} else {
logger.Info("Volumes per cluster limit is set to %v", a.conf.MaxVolumesPerCluster)
maxVolumesPerCluster = a.conf.MaxVolumesPerCluster
}
}

func (a *App) setBlockSettings() {
Expand Down
1 change: 1 addition & 0 deletions apps/glusterfs/app_config.go
Expand Up @@ -36,6 +36,7 @@ type GlusterFSConfig struct {
PreReqVolumeOptions string `json:"pre_request_volume_options"`
PostReqVolumeOptions string `json:"post_request_volume_options"`
ZoneChecking string `json:"zone_checking"`
MaxVolumesPerCluster int `json:"max_volumes_per_cluster"`

//block settings
CreateBlockHostingVolumes bool `json:"auto_create_block_hosting_volume"`
Expand Down
5 changes: 5 additions & 0 deletions apps/glusterfs/cluster_entry.go
Expand Up @@ -156,6 +156,11 @@ func (c *ClusterEntry) VolumeDelete(id string) {
c.Info.Volumes = sortedstrings.Delete(c.Info.Volumes, id)
}

// VolumeCount returns number of volumes in cluster *including* the pending ones
func (c *ClusterEntry) volumeCount() int {
return len(c.Info.Volumes)
}

func (c *ClusterEntry) BlockVolumeAdd(id string) {
c.Info.BlockVolumes = append(c.Info.BlockVolumes, id)
c.Info.BlockVolumes.Sort()
Expand Down
36 changes: 36 additions & 0 deletions apps/glusterfs/dbattribute_entry.go
Expand Up @@ -17,6 +17,15 @@ import (
"github.com/lpabon/godbc"
)

var (
dbAttributeKeys = []string{
DB_BRICK_HAS_SUBTYPE_FIELD,
DB_CLUSTER_HAS_FILE_BLOCK_FLAG,
DB_GENERATION_ID,
DB_HAS_PENDING_OPS_BUCKET,
}
)

type DbAttributeEntry struct {
Key string
Value string
Expand Down Expand Up @@ -80,3 +89,30 @@ func DbAttributeList(tx *bolt.Tx) ([]string, error) {
}
return list, nil
}

// validDbAttributeKeys returns true if all dbattribute keys in the
// database match keys in knownKeys map.
func validDbAttributeKeys(tx *bolt.Tx, knownKeys map[string]bool) bool {
list := EntryKeys(tx, BOLTDB_BUCKET_DBATTRIBUTE)
if list == nil {
logger.LogError("unable to list keys in dbattribute bucket")
return false
}
for _, key := range list {
if !knownKeys[key] {
logger.LogError("unknown dbattribute key: %+v", key)
return false
}
}
return true
}

// mapDbAtrributeKeys returns a map equivalent of dbAttributeKeys
// for fast lookup.
func mapDbAtrributeKeys() map[string]bool {
m := map[string]bool{}
for _, k := range dbAttributeKeys {
m[k] = true
}
return m
}
63 changes: 63 additions & 0 deletions apps/glusterfs/dbattribute_entry_test.go
@@ -0,0 +1,63 @@
//
// Copyright (c) 2019 The heketi Authors
//
// This file is licensed to you under your choice of the GNU Lesser
// General Public License, version 3 or any later version (LGPLv3 or
// later), or the GNU General Public License, version 2 (GPLv2), in all
// cases as published by the Free Software Foundation.
//

package glusterfs

import (
"os"
"testing"

"github.com/boltdb/bolt"
"github.com/heketi/tests"
)

func TestMapDbAtrributeKeys(t *testing.T) {
x := mapDbAtrributeKeys()
tests.Assert(t, len(x) == len(dbAttributeKeys), "different lengths")
for _, k := range dbAttributeKeys {
tests.Assert(t, x[k], k, "missing from", x)
}
}

func TestValidDbAttributeKeys(t *testing.T) {
tmpfile := tests.Tempfile()
defer os.Remove(tmpfile)

app := NewTestApp(tmpfile)

// test normal db attributes (for this version)
app.db.View(func(tx *bolt.Tx) error {
v := validDbAttributeKeys(tx, mapDbAtrributeKeys())
tests.Assert(t, v, "expected db attributes valid")
return nil
})

// add some fake known attributes. This is still valid
// if the db lacks keys we know about. DB is probably just old.
app.db.View(func(tx *bolt.Tx) error {
m := mapDbAtrributeKeys()
m["LOVELY_WATER"] = true
m["THAT_SINKING_FEELING"] = true
v := validDbAttributeKeys(tx, m)
tests.Assert(t, v, "expected db attributes valid")
return nil
})

app.db.Update(func(tx *bolt.Tx) error {
entry := NewDbAttributeEntry()
entry.Key = "LOVELY_FISH"
entry.Value = "no"
err := entry.Save(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)

v := validDbAttributeKeys(tx, mapDbAtrributeKeys())
tests.Assert(t, !v, "expected db attributes not valid")
return nil
})
}
7 changes: 4 additions & 3 deletions apps/glusterfs/limits.go
Expand Up @@ -11,7 +11,8 @@ package glusterfs

var (
// Default limits
BrickMinSize = uint64(1 * GB)
BrickMaxSize = uint64(4 * TB)
BrickMaxNum = 32
BrickMinSize = uint64(1 * GB)
BrickMaxSize = uint64(4 * TB)
BrickMaxNum = 32
maxVolumesPerCluster = 1000
)
159 changes: 159 additions & 0 deletions apps/glusterfs/operations_volume_test.go
Expand Up @@ -1001,3 +1001,162 @@ func TestListCompleteVolumesDuringOperation(t *testing.T) {
})
})
}

func TestVolumeCreateLimits(t *testing.T) {
tmpfile := tests.Tempfile()
defer os.Remove(tmpfile)

// Create the app
app := NewTestApp(tmpfile)
defer app.Close()

err := setupSampleDbWithTopology(app,
1, // clusters
3, // nodes_per_cluster
1, // devices_per_node,
2*TB, // disksize)
)
tests.Assert(t, err == nil, "expected err == nil, got:", err)

// We will backup the global value and set limit to 5
oldMaxVolumesPerCluster := maxVolumesPerCluster
maxVolumesPerCluster = 5
defer func() { maxVolumesPerCluster = oldMaxVolumesPerCluster }()

var cleanupVolume = func(vol *VolumeEntry) {
vdo := NewVolumeDeleteOperation(vol, app.db)
e := RunOperation(vdo, app.executor)
tests.Assert(t, e == nil, "expected e == nil, got:", e)

app.db.View(func(tx *bolt.Tx) error {
vols, err := VolumeList(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 4,
"expected len(vols) == 4, got:", len(vols))
return nil
})
}

// Create 4 volumes
for i := 0; i < 4; i++ {
req := &api.VolumeCreateRequest{}
req.Size = 1
req.Durability.Type = api.DurabilityReplicate
req.Durability.Replicate.Replica = 3
vol := NewVolumeEntryFromRequest(req)
vco := NewVolumeCreateOperation(vol, app.db)
e := RunOperation(vco, app.executor)
tests.Assert(t, e == nil, "expected e == nil, got:", e)
}

t.Run("InLimit", func(t *testing.T) {
req := &api.VolumeCreateRequest{}
req.Size = 1
req.Durability.Type = api.DurabilityReplicate
req.Durability.Replicate.Replica = 3
vol := NewVolumeEntryFromRequest(req)
vco := NewVolumeCreateOperation(vol, app.db)
e := RunOperation(vco, app.executor)
tests.Assert(t, e == nil, "expected e == nil, got:", e)
defer cleanupVolume(vol)

app.db.View(func(tx *bolt.Tx) error {
vols, err := VolumeList(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 5,
"expected len(vols) == 5, got:", len(vols))
return nil
})

})

t.Run("BeyondLimit", func(t *testing.T) {
// Hit the limit
req := &api.VolumeCreateRequest{}
req.Size = 1
req.Durability.Type = api.DurabilityReplicate
req.Durability.Replicate.Replica = 3
vol := NewVolumeEntryFromRequest(req)
vco := NewVolumeCreateOperation(vol, app.db)
e := RunOperation(vco, app.executor)
tests.Assert(t, e == nil, "expected e == nil, got:", e)
defer cleanupVolume(vol)

// Next volume create should fail
errstring := "has 5 volumes and limit is 5"
newvol := NewVolumeEntryFromRequest(req)
vco = NewVolumeCreateOperation(newvol, app.db)
e = RunOperation(vco, app.executor)
tests.Assert(t, strings.Contains(e.Error(), errstring),
"expected strings.Contains(e.Error(),", errstring, " got:", e)

// Check that we don't leave any pending volume
// and the volume count is still the same as before
app.db.View(func(tx *bolt.Tx) error {
vols, err := VolumeList(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 5,
"expected len(vols) == 5, got:", len(vols))
vols, err = ListCompleteVolumes(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 5,
"expected len(vols) == 5, got:", len(vols))
return nil
})

})

t.Run("BeyondLimitWhenPendingVolsExist", func(t *testing.T) {
// Hit the limit but only as Pending
req := &api.VolumeCreateRequest{}
req.Size = 1
req.Durability.Type = api.DurabilityReplicate
req.Durability.Replicate.Replica = 3
vol := NewVolumeEntryFromRequest(req)
vco := NewVolumeCreateOperation(vol, app.db)
e := vco.Build()
tests.Assert(t, e == nil, "expected e == nil, got", e)

// Next volume create should fail
newvol := NewVolumeEntryFromRequest(req)
newvco := NewVolumeCreateOperation(newvol, app.db)
e = RunOperation(newvco, app.executor)
errstring := "has 5 volumes and limit is 5"
tests.Assert(t, strings.Contains(e.Error(), errstring),
"expected strings.Contains(e.Error(),", errstring, " got:", e)

// Check that we don't leave any pending volume
// and the volume count is still the same as before
app.db.View(func(tx *bolt.Tx) error {
vols, err := VolumeList(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 5,
"expected len(vols) == 5, got:", len(vols))
vols, err = ListCompleteVolumes(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 4,
"expected len(vols) == 4, got:", len(vols))
return nil
})

// Check the volume in pending can still proceed
e = vco.Exec(app.executor)
tests.Assert(t, e == nil, "expected e == nil, got", e)
e = vco.Finalize()
tests.Assert(t, e == nil, "expected e == nil, got", e)
app.db.View(func(tx *bolt.Tx) error {
vols, err := VolumeList(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 5,
"expected len(vols) == 5, got:", len(vols))
vols, err = ListCompleteVolumes(tx)
tests.Assert(t, err == nil, "expected err == nil, got:", err)
tests.Assert(t, len(vols) == 5,
"expected len(vols) == 5, got:", len(vols))
return nil
})
cleanupVolume(vol)

})

}

0 comments on commit 10246f1

Please sign in to comment.