Skip to content

Commit

Permalink
Auto failure domain
Browse files Browse the repository at this point in the history
Implement automatic failure domain management on scale-up. Start out
with failure domain OSD, upgrade to Host failure domain once we reach
3 nodes with OSDs.

Signed-off-by: Peter Sabaini <peter.sabaini@canonical.com>
  • Loading branch information
sabaini committed Jun 27, 2023
1 parent de9cf23 commit 14dfe45
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 0 deletions.
26 changes: 26 additions & 0 deletions microceph/ceph/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func Bootstrap(s common.StateInterface) error {
return err
}

// ensure crush rules
err = ensureCrushRules()
if err != nil {
return err
}

// Re-generate the configuration from the database.
err = updateConfig(s)
if err != nil {
Expand Down Expand Up @@ -269,3 +275,23 @@ func initMds(s common.StateInterface, dataPath string) error {
return nil

}

// ensureCrushRules removes the default replicated rule and adds a microceph default rule with failure domain OSD
func ensureCrushRules() error {
// Remove the default replicated rule it it exists.
if haveCrushRule("replicated_rule") {
err := removeCrushRule("replicated_rule")
if err != nil {
return fmt.Errorf("Failed to remove default replicated rule: %w", err)
}
}
// Add a microceph default rule with failure domain OSD if it does not exist.
if haveCrushRule("microceph_auto_rule") {
return nil
}
err := addCrushRule("microceph_auto_osd", "osd")
if err != nil {
return fmt.Errorf("Failed to add microceph default rule: %w", err)
}
return nil
}
96 changes: 96 additions & 0 deletions microceph/ceph/crush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package ceph

import (
"encoding/json"
"fmt"
"github.com/tidwall/gjson"
"strings"
)

// removeCrushRule removes a named crush rule
func removeCrushRule(name string) error {
_, err := processExec.RunCommand("ceph", "osd", "crush", "rule", "rm", name)
if err != nil {
return err
}

return nil
}

// addCrushRule creates a new default crush rule with a given name and failure domain
func addCrushRule(name string, failureDomain string) error {
_, err := processExec.RunCommand("ceph", "osd", "crush", "rule", "create-replicated", name, "default", failureDomain)
if err != nil {
return err
}

return nil
}

// listCrushRules returns a list of crush rule names
func listCrushRules() ([]string, error) {
output, err := processExec.RunCommand("ceph", "osd", "crush", "rule", "ls")
if err != nil {
return nil, err
}
rules := strings.Split(strings.TrimSpace(output), "\n")
return rules, nil
}

// haveCrushRule returns true if a crush rule with the given name exists
func haveCrushRule(name string) bool {
rules, err := listCrushRules()
if err != nil {
return false
}
for _, rule := range rules {
if rule == name {
return true
}
}
return false
}

// getCrushRuleID returns the id of a crush rule with the given name
func getCrushRuleID(name string) (string, error) {
output, err := processExec.RunCommand("ceph", "osd", "crush", "rule", "dump", name)
if err != nil {
return "", err
}
var jsond map[string]any
err = json.Unmarshal([]byte(output), &jsond)
val, ok := jsond["rule_id"]
if !ok {
return "", fmt.Errorf("rule_id not found in crush rule dump")
}
return fmt.Sprintf("%v", val), nil // convert to string
}

// getPoolsForDomain returns a list of pools that use a given crush failure domain
func getPoolsForDomain(domain string) ([]string, error) {
var pools []string

ruleID, err := getCrushRuleID(fmt.Sprintf("microceph_auto_%s", domain))
if err != nil {
return nil, err
}

output, err := processExec.RunCommand("ceph", "osd", "pool", "ls", "detail", "--format=json")
if err != nil {
return nil, err
}
poolobjs := gjson.Get(output, fmt.Sprintf("#(crush_rule==%s)#.pool_name", ruleID))
for _, poolobj := range poolobjs.Array() {
pools = append(pools, poolobj.String())
}
return pools, nil
}

// setPoolCrushRule sets the crush rule for a given pool
func setPoolCrushRule(pool string, rule string) error {
_, err := processExec.RunCommand("ceph", "osd", "pool", "set", pool, "crush_rule", rule)
if err != nil {
return err
}
return nil
}
54 changes: 54 additions & 0 deletions microceph/ceph/osd.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,54 @@ func checkEncryptSupport() error {
return nil
}

// setHostFailureDomain sets the host failure domain for the given host.
func setHostFailureDomain() error {
var err error

if !haveCrushRule("microceph_auto_host") {
err = addCrushRule("microceph_auto_host", "host")
if err != nil {
return err
}
}
osdPools, err := getPoolsForDomain("osd")
if err != nil {
return err
}
for _, pool := range osdPools {
err = setPoolCrushRule(pool, "microceph_auto_host")
if err != nil {
return err
}
}
return nil
}

// updateFailureDomain checks if we need to update the crush rules failure domain.
// Once we have at least 3 nodes with at least 1 OSD each, we set the failure domain to host.
func updateFailureDomain(s *state.State) error {
var numNodes int

err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error {
records, err := database.MembersDiskCnt(ctx, tx)
if err != nil {
return fmt.Errorf("Failed to fetch disks: %w", err)
}
numNodes = len(records)
return nil
})
if err != nil {
return err
}
if numNodes >= 3 {
err = setHostFailureDomain()
if err != nil {
return fmt.Errorf("Failed to set host failure domain: %w", err)
}
}
return nil
}

// AddOSD adds an OSD to the cluster, given a device path and a flag for wiping
func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
revert := revert.New()
Expand Down Expand Up @@ -377,6 +425,12 @@ func AddOSD(s *state.State, path string, wipe bool, encrypt bool) error {
return err
}

// Maybe update the failure domain
err = updateFailureDomain(s)
if err != nil {
return err
}

revert.Success() // Revert functions added are not run on return.
return nil
}
Expand Down
54 changes: 54 additions & 0 deletions microceph/database/disk_extras.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package database

import (
"context"
"database/sql"
"fmt"
"github.com/canonical/microcluster/cluster"
"github.com/lxc/lxd/lxd/db/query"
"github.com/lxc/lxd/shared/api"
)

type MemberDisk struct {
Member string `db:"member"`
NumDisks int `db:"num_disks"`
}

var _ = api.ServerEnvironment{}

var membersDiskCnt = cluster.RegisterStmt(`
SELECT internal_cluster_members.name AS member, count(disks.id) AS num_disks
FROM disks
JOIN internal_cluster_members ON disks.member_id = internal_cluster_members.id
GROUP BY internal_cluster_members.id
`)

// MembersDiskCnt returns the number of disks per member for all members that have at least one disk
func MembersDiskCnt(ctx context.Context, tx *sql.Tx) ([]MemberDisk, error) {
var err error
var sqlStmt *sql.Stmt

objects := make([]MemberDisk, 0)

sqlStmt, err = cluster.Stmt(tx, membersDiskCnt)
if err != nil {
return nil, fmt.Errorf("Failed to get \"membersDiskCnt\" prepared statement: %w", err)
}

dest := func(scan func(dest ...any) error) error {
m := MemberDisk{}
err := scan(&m.Member, &m.NumDisks)
if err != nil {
return err
}
objects = append(objects, m)
return nil
}

err = query.SelectObjects(ctx, sqlStmt, dest)
if err != nil {
return nil, fmt.Errorf("Failed to get \"membersDiskCnt\" objects: %w", err)
}

return objects, err
}

0 comments on commit 14dfe45

Please sign in to comment.