Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
(7.0) Refactor etcd disk check to be more tolerant (#1847)
Browse files Browse the repository at this point in the history
  • Loading branch information
r0mant committed Jul 15, 2020
1 parent c2bff24 commit 5703fa3
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 116 deletions.
80 changes: 33 additions & 47 deletions lib/checks/checks.go
Expand Up @@ -262,6 +262,8 @@ type Checker interface {
// CheckNodes executes multi-node checks (such as network reachability,
// bandwidth, etc) on the provided set of servers.
CheckNodes(ctx context.Context, servers []Server) []*agentpb.Probe
// Check executes all checks on configured servers and returns failed probes.
Check(ctx context.Context) []*agentpb.Probe
}

type checker struct {
Expand Down Expand Up @@ -322,14 +324,22 @@ type Server struct {

// Run runs a full set of checks on the servers specified in r.servers
func (r *checker) Run(ctx context.Context) error {
failed := r.Check(ctx)
if len(failed) != 0 {
return trace.BadParameter("The following checks failed:\n%v",
FormatFailedChecks(failed))
}
return nil
}

// Check executes checks on r.servers and returns a list of failed probes.
func (r *checker) Check(ctx context.Context) (failed []*agentpb.Probe) {
if ifTestsDisabled() {
log.Infof("Skipping checks due to %q set.",
constants.PreflightChecksOffEnvVar)
return nil
}

var failed []*agentpb.Probe

// check each server against its profile
for _, server := range r.Servers {
failed = append(failed, r.CheckNode(ctx, server)...)
Expand All @@ -338,12 +348,7 @@ func (r *checker) Run(ctx context.Context) error {
// run checks that take all servers into account
failed = append(failed, r.CheckNodes(ctx, r.Servers)...)

if len(failed) != 0 {
return trace.BadParameter("The following checks failed:\n%v",
FormatFailedChecks(failed))
}

return nil
return failed
}

// CheckNode executes checks for the provided individual server.
Expand All @@ -365,50 +370,39 @@ func (r *checker) CheckNode(ctx context.Context, server Server) (failed []*agent
})
if err != nil {
log.WithError(err).Warn("Failed to validate remote node.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: fmt.Sprintf("failed to validate node %v", server),
})
failed = append(failed, newFailedProbe(
fmt.Sprintf("Failed to validate node %v", server), err.Error()))
}

err = checkServerProfile(server, requirements)
if err != nil {
log.WithError(err).Warn("Failed to validate profile requirements.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate profile requirements",
})
failed = append(failed, newFailedProbe(
"Failed to validate profile requirements", err.Error()))
}

err = r.checkTempDir(ctx, server)
if err != nil {
log.WithError(err).Warn("Failed to validate temporary directory.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate temporary directory",
})
failed = append(failed, newFailedProbe(
"Failed to validate temporary directory", err.Error()))
}

if server.IsMaster() && r.TestEtcdDisk {
err = r.checkEtcdDisk(ctx, server)
probes, err := r.checkEtcdDisk(ctx, server)
if err != nil {
log.WithError(err).Warn("Failed to validate etcd disk requirements.")
if isFioTestError(err) {
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate etcd disk requirements",
})
}
}
// The checker will only return probes if etcd disk test succeeded and
// some iops/latency requirements are not met.
failed = append(failed, probes...)
}

err = r.checkDisks(ctx, server)
if err != nil {
log.WithError(err).Warn("Failed to validate disk requirements.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate disk requirements",
})
failed = append(failed, newFailedProbe(
"Failed to validate disk requirements", err.Error()))
}

return failed
Expand All @@ -425,40 +419,32 @@ func (r *checker) CheckNodes(ctx context.Context, servers []Server) (failed []*a
err := checkSameOS(servers)
if err != nil {
log.WithError(err).Warn("Failed to validate same OS requirements.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate same OS requirement",
})
failed = append(failed, newFailedProbe(
"Failed to validate same OS requirement", err.Error()))
}

err = checkTime(time.Now().UTC(), servers)
if err != nil {
log.WithError(err).Warn("Failed to validate time drift requirements.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate time drift requirement",
})
failed = append(failed, newFailedProbe(
"Failed to validate time drift requirement", err.Error()))
}

if r.TestPorts {
err = r.checkPorts(ctx, servers)
if err != nil {
log.WithError(err).Warn("Failed to validate port requirements.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate port requirements",
})
failed = append(failed, newFailedProbe(
"Failed to validate port requirements", err.Error()))
}
}

if r.TestBandwidth {
err = r.checkBandwidth(ctx, servers)
if err != nil {
log.WithError(err).Warn("Failed to validate bandwidth requirements.")
failed = append(failed, &agentpb.Probe{
Detail: err.Error(),
Error: "failed to validate network bandwidth requirements",
})
failed = append(failed, newFailedProbe(
"Failed to validate network bandwidth requirements", err.Error()))
}
}

Expand Down
142 changes: 105 additions & 37 deletions lib/checks/disks.go
Expand Up @@ -20,36 +20,38 @@ import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/gravitational/gravity/lib/defaults"
"github.com/gravitational/gravity/lib/network/validation/proto"
"github.com/gravitational/gravity/lib/state"
"github.com/gravitational/gravity/lib/utils"

"github.com/gravitational/satellite/agent/proto/agentpb"
"github.com/gravitational/trace"
)

// checkEtcdDisk makes sure that the disk used for etcd wal satisfies
// performance requirements.
func (r *checker) checkEtcdDisk(ctx context.Context, server Server) error {
func (r *checker) checkEtcdDisk(ctx context.Context, server Server) ([]*agentpb.Probe, error) {
// Test file should reside where etcd data will be.
testPath := state.InEtcdDir(server.ServerInfo.StateDir, testFile)
res, err := r.Remote.CheckDisks(ctx, server.AdvertiseIP, fioEtcdJob(testPath))
if err != nil {
return trace.Wrap(err)
return nil, trace.Wrap(err)
}
log.Debugf("Server %v disk test results: %s.", server.Hostname, res.String())
if len(res.Jobs) != 1 {
return trace.BadParameter("expected 1 job result: %v", res)
return nil, trace.BadParameter("expected 1 job result: %v", res)
}
iops := res.Jobs[0].GetWriteIOPS()
latency := res.Jobs[0].GetFsyncLatency()
if iops < EtcdMinWriteIOPS || latency > EtcdMaxFsyncLatencyMs {
return formatEtcdErrors(server, testPath, iops, latency)
probes := formatEtcdProbes(server, testPath, iops, latency)
if len(probes) > 0 {
return probes, nil
}
log.Infof("Server %v passed etcd disk check, has %v sequential write iops and %vms fsync latency.",
server.Hostname, iops, latency)
return nil
return nil, nil
}

// fioEtcdJob constructs a request to check etcd disk performance.
Expand Down Expand Up @@ -80,55 +82,121 @@ func fioEtcdJob(filename string) *proto.CheckDisksRequest {
}
}

// formatEtcdErrors returns appropritate formatted error messages based
// on the etcd disk performance test results.
func formatEtcdErrors(server Server, testPath string, iops float64, latency int64) error {
err := &fioTestError{}
if iops < EtcdMinWriteIOPS {
err.messages = append(err.messages, fmt.Sprintf("server %v has low sequential write IOPS of %v on %v (required minimum is %v)",
server.Hostname, iops, filepath.Dir(testPath), EtcdMinWriteIOPS))
// formatEtcdProbes returns appropritate probes based on the etcd disk
// performance test results.
func formatEtcdProbes(server Server, testPath string, iops float64, latency int64) (probes []*agentpb.Probe) {
if iops < getEtcdMinIOPSHard() {
probes = append(probes, newFailedProbe("",
fmt.Sprintf("Node %v sequential write IOPS on %v is lower than %v (%v)",
server.Hostname, filepath.Dir(testPath), getEtcdMinIOPSHard(), int(iops))))
} else if iops < getEtcdMinIOPSSoft() {
probes = append(probes, newWarningProbe("",
fmt.Sprintf("Node %v sequential write IOPS on %v is lower than %v (%v) which may result in poor etcd performance",
server.Hostname, filepath.Dir(testPath), getEtcdMinIOPSSoft(), int(iops))))
}
if latency > EtcdMaxFsyncLatencyMs {
err.messages = append(err.messages, fmt.Sprintf("server %v has high fsync latency of %vms on %v (required maximum is %vms)",
server.Hostname, latency, filepath.Dir(testPath), EtcdMaxFsyncLatencyMs))
if latency > getEtcdMaxLatencyHard() {
probes = append(probes, newFailedProbe("",
fmt.Sprintf("Node %v fsync latency on %v is higher than %vms (%vms)",
server.Hostname, filepath.Dir(testPath), getEtcdMaxLatencyHard(), latency)))
} else if latency > getEtcdMaxLatencySoft() {
probes = append(probes, newWarningProbe("",
fmt.Sprintf("Node %v fsync latency on %v is higher than %vms (%vms) which may result in poor etcd performance",
server.Hostname, filepath.Dir(testPath), getEtcdMaxLatencySoft(), latency)))
}
return err
return probes
}

// fioTestError is returned when fio disk test fails to validate requirements.
type fioTestError struct {
messages []string
}

// Error returns all errors encountered during fio disk test.
func (e *fioTestError) Error() string {
return strings.Join(e.messages, ", ")
func newFailedProbe(message, detail string) *agentpb.Probe {
return &agentpb.Probe{
Status: agentpb.Probe_Failed,
Severity: agentpb.Probe_Critical,
Error: message,
Detail: detail,
}
}

// isFioTestError returns true if the provided error is the fio disk test error.
func isFioTestError(err error) bool {
_, ok := trace.Unwrap(err).(*fioTestError)
return ok
func newWarningProbe(message, detail string) *agentpb.Probe {
return &agentpb.Probe{
Status: agentpb.Probe_Failed,
Severity: agentpb.Probe_Warning,
Error: message,
Detail: detail,
}
}

const (
// EtcdMinWriteIOPS defines the minimum number of sequential write iops
// required for etcd to perform effectively.
// EtcdMinWriteIOPSSoft defines the soft threshold for a minimum number of
// sequential write iops required for etcd to perform effectively.
//
// The number is recommended by etcd documentation:
// https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/hardware.md#disks
//
EtcdMinWriteIOPS = 50

// EtcdMaxFsyncLatencyMs defines the maximum fsync latency required for
// etcd to perform effectively, in milliseconds.
// The soft threshold will generate a warning.
EtcdMinWriteIOPSSoft = 50
// EtcdMinWriteIOPSHard is the lowest number of IOPS Gravity will tolerate
// before generating a critical probe failure.
EtcdMinWriteIOPSHard = 10

// EtcdMaxFsyncLatencyMsSoft defines the soft threshold for a maximum fsync
// latency required for etcd to perform effectively, in milliseconds.
//
// Etcd documentation recommends 10ms for optimal performance but we're
// being conservative here to ensure better dev/test experience:
// https://github.com/etcd-io/etcd/blob/master/Documentation/faq.md#what-does-the-etcd-warning-failed-to-send-out-heartbeat-on-time-mean
//
EtcdMaxFsyncLatencyMs = 50
// The soft threshold will generate a warning.
EtcdMaxFsyncLatencyMsSoft = 50
// EtcdMaxFsyncLatencyMsHard is the highest fsync latency Gravity prechecks
// will tolerate before generating a critical probe failure.
EtcdMaxFsyncLatencyMsHard = 150

// testFile is the name of the disk performance test file.
testFile = "fio.test"
)

// getEtcdMinIOPSSoft returns the soft limit for minimum number of IOPS.
func getEtcdMinIOPSSoft() float64 {
value, err := utils.GetenvInt(EtcdMinIOPSSoftEnvVar)
if err == nil {
return float64(value)
}
return EtcdMinWriteIOPSSoft
}

// getEtcdMinIOPSHard returns the hard limit for minimum number of IOPS.
func getEtcdMinIOPSHard() float64 {
value, err := utils.GetenvInt(EtcdMinIOPSHardEnvVar)
if err == nil {
return float64(value)
}
return EtcdMinWriteIOPSHard
}

// getEtcdMaxLatencySoft returns the soft limit for maximum fsync latency.
func getEtcdMaxLatencySoft() int64 {
value, err := utils.GetenvInt(EtcdMaxLatencySoftEnvVar)
if err == nil {
return int64(value)
}
return EtcdMaxFsyncLatencyMsSoft
}

// getEtcdMaxLatencyHard returns the hard limit for maximum fsync latency.
func getEtcdMaxLatencyHard() int64 {
value, err := utils.GetenvInt(EtcdMaxLatencyHardEnvVar)
if err == nil {
return int64(value)
}
return EtcdMaxFsyncLatencyMsHard
}

const (
// EtcdMinIOPSSoftEnvVar is the environment variable with soft IOPS limit.
EtcdMinIOPSSoftEnvVar = "GRAVITY_ETCD_MIN_IOPS_SOFT"
// EtcdMinIOPSHardEnvVar is the environment variable with hard IOPS limit.
EtcdMinIOPSHardEnvVar = "GRAVITY_ETCD_MIN_IOPS_HARD"
// EtcdMaxLatencySoftEnvVar is the environment variable with soft fsync limit.
EtcdMaxLatencySoftEnvVar = "GRAVITY_ETCD_MAX_LATENCY_SOFT"
// EtcdMaxLatencyHardEnvVar is the environment variable with hard fsync limit.
EtcdMaxLatencyHardEnvVar = "GRAVITY_ETCD_MAX_LATENCY_HARD"
)
3 changes: 3 additions & 0 deletions lib/constants/constants.go
Expand Up @@ -208,6 +208,9 @@ const (
// If not empty, turns the preflight checks off
PreflightChecksOffEnvVar = "GRAVITY_CHECKS_OFF"

// GravityEnvVarPrefix is the prefix for gravity-specific environment variables.
GravityEnvVarPrefix = "GRAVITY_"

// Localhost is local host
Localhost = "127.0.0.1"

Expand Down
3 changes: 2 additions & 1 deletion lib/install/client/install.go
Expand Up @@ -79,11 +79,12 @@ func (r *InstallerStrategy) installSelfAsService() error {
Timeout: int(time.Duration(defaults.ServiceConnectTimeout).Seconds()),
WantedBy: "multi-user.target",
WorkingDirectory: r.ApplicationDir,
// Propagate all gravity-related environment variables to the service.
Environment: utils.GetenvsByPrefix(constants.GravityEnvVarPrefix),
},
NoBlock: true,
Name: r.ServicePath,
}
req.ServiceSpec.Environment = utils.Getenv(constants.PreflightChecksOffEnvVar)
r.WithField("req", fmt.Sprintf("%+v", req)).Info("Install service.")
return trace.Wrap(service.Reinstall(req))
}
Expand Down

0 comments on commit 5703fa3

Please sign in to comment.