diff --git a/config/config.go b/config/config.go index 5fc82c3e3456..6fb9b2e5127c 100644 --- a/config/config.go +++ b/config/config.go @@ -1207,15 +1207,26 @@ func getCPUTargeterConfig(v *viper.Viper) (tracker.TargeterConfig, error) { } } -func getDiskSpaceConfig(v *viper.Viper) (requiredAvailableDiskSpace uint64, warningThresholdAvailableDiskSpace uint64, err error) { - requiredAvailableDiskSpace = v.GetUint64(SystemTrackerRequiredAvailableDiskSpaceKey) - warningThresholdAvailableDiskSpace = v.GetUint64(SystemTrackerWarningThresholdAvailableDiskSpaceKey) - switch { - case warningThresholdAvailableDiskSpace < requiredAvailableDiskSpace: - return 0, 0, fmt.Errorf("%q (%d) < %q (%d)", SystemTrackerWarningThresholdAvailableDiskSpaceKey, warningThresholdAvailableDiskSpace, SystemTrackerRequiredAvailableDiskSpaceKey, requiredAvailableDiskSpace) - default: - return requiredAvailableDiskSpace, warningThresholdAvailableDiskSpace, nil - } +// getResourceAvailableConfig returns: +// - requiredThreshold under which the node may shutdown +// - warningThreshold under which the node may report unhealthy +func getResourceAvailableConfig( + v *viper.Viper, + requiredKey string, + warningKey string, +) (uint64, uint64, error) { + requiredThreshold := v.GetUint64(requiredKey) + warningThreshold := v.GetUint64(warningKey) + if warningThreshold < requiredThreshold { + return 0, 0, fmt.Errorf( + "%q (%d) < %q (%d)", + warningKey, + warningThreshold, + requiredKey, + requiredThreshold, + ) + } + return requiredThreshold, warningThreshold, nil } func getDiskTargeterConfig(v *viper.Viper) (tracker.TargeterConfig, error) { @@ -1479,7 +1490,20 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) { nodeConfig.SystemTrackerCPUHalflife = v.GetDuration(SystemTrackerCPUHalflifeKey) nodeConfig.SystemTrackerDiskHalflife = v.GetDuration(SystemTrackerDiskHalflifeKey) - nodeConfig.RequiredAvailableDiskSpace, nodeConfig.WarningThresholdAvailableDiskSpace, err = getDiskSpaceConfig(v) + nodeConfig.RequiredAvailableMemory, nodeConfig.WarningThresholdAvailableMemory, err = getResourceAvailableConfig( + v, + SystemTrackerRequiredAvailableMemoryKey, + SystemTrackerWarningThresholdAvailableMemoryKey, + ) + if err != nil { + return node.Config{}, err + } + + nodeConfig.RequiredAvailableDiskSpace, nodeConfig.WarningThresholdAvailableDiskSpace, err = getResourceAvailableConfig( + v, + SystemTrackerRequiredAvailableDiskSpaceKey, + SystemTrackerWarningThresholdAvailableDiskSpaceKey, + ) if err != nil { return node.Config{}, err } diff --git a/config/flags.go b/config/flags.go index 1d257eb7f2ae..674f1d0a6566 100644 --- a/config/flags.go +++ b/config/flags.go @@ -365,7 +365,9 @@ func addNodeFlags(fs *pflag.FlagSet) { fs.Duration(SystemTrackerProcessingHalflifeKey, 15*time.Second, "Halflife to use for the processing requests tracker. Larger halflife --> usage metrics change more slowly") fs.Duration(SystemTrackerCPUHalflifeKey, 15*time.Second, "Halflife to use for the cpu tracker. Larger halflife --> cpu usage metrics change more slowly") fs.Duration(SystemTrackerDiskHalflifeKey, time.Minute, "Halflife to use for the disk tracker. Larger halflife --> disk usage metrics change more slowly") - fs.Uint64(SystemTrackerRequiredAvailableDiskSpaceKey, units.GiB/2, "Minimum number of available bytes on disk, under which the node will shutdown.") + fs.Uint64(SystemTrackerRequiredAvailableMemoryKey, units.GiB/2, "Minimum amount of available memory, under which the node may shutdown.") + fs.Uint64(SystemTrackerWarningThresholdAvailableMemoryKey, units.GiB, fmt.Sprintf("Warning threshold for the amount of available memory, under which the node will be considered unhealthy. Must be >= [%s]", SystemTrackerRequiredAvailableMemoryKey)) + fs.Uint64(SystemTrackerRequiredAvailableDiskSpaceKey, units.GiB/2, "Minimum number of available bytes on disk, under which the node may shutdown.") fs.Uint64(SystemTrackerWarningThresholdAvailableDiskSpaceKey, units.GiB, fmt.Sprintf("Warning threshold for the number of available bytes on disk, under which the node will be considered unhealthy. Must be >= [%s]", SystemTrackerRequiredAvailableDiskSpaceKey)) // CPU management diff --git a/config/keys.go b/config/keys.go index 788702245675..3aa1199c7215 100644 --- a/config/keys.go +++ b/config/keys.go @@ -200,6 +200,8 @@ const ( SystemTrackerProcessingHalflifeKey = "system-tracker-processing-halflife" SystemTrackerCPUHalflifeKey = "system-tracker-cpu-halflife" SystemTrackerDiskHalflifeKey = "system-tracker-disk-halflife" + SystemTrackerRequiredAvailableMemoryKey = "system-tracker-memory-required-available" + SystemTrackerWarningThresholdAvailableMemoryKey = "system-tracker-memory-warning-threshold-available" SystemTrackerRequiredAvailableDiskSpaceKey = "system-tracker-disk-required-available-space" SystemTrackerWarningThresholdAvailableDiskSpaceKey = "system-tracker-disk-warning-threshold-available-space" DiskVdrAllocKey = "throttler-inbound-disk-validator-alloc" diff --git a/node/config.go b/node/config.go index 5ff04a76a4d8..a031891d7a3d 100644 --- a/node/config.go +++ b/node/config.go @@ -213,6 +213,9 @@ type Config struct { DiskTargeterConfig tracker.TargeterConfig `json:"diskTargeterConfig"` + RequiredAvailableMemory uint64 `json:"requiredAvailableMemory"` + WarningThresholdAvailableMemory uint64 `json:"warningThresholdAvailableMemory"` + RequiredAvailableDiskSpace uint64 `json:"requiredAvailableDiskSpace"` WarningThresholdAvailableDiskSpace uint64 `json:"warningThresholdAvailableDiskSpace"` diff --git a/node/node.go b/node/node.go index 948821ce4278..a910d55e1c07 100644 --- a/node/node.go +++ b/node/node.go @@ -1046,9 +1046,51 @@ func (n *Node) initHealthAPI() error { return fmt.Errorf("couldn't register database health check: %w", err) } + memorySpaceCheck := health.CheckerFunc(func(context.Context) (interface{}, error) { + // Confirm that the node has enough memory to continue operating. If + // there is too little memory remaining, first report unhealthy and then + // shutdown the node. + + usedMemoryBytes := n.resourceManager.MemoryUsage() + availableMemoryBytes := n.resourceManager.AvailableMemoryBytes() + + var err error + if availableMemoryBytes < n.Config.RequiredAvailableMemory { + // TODO: log a FATAL and shutdown the node + n.Log.Error("critically low on memory", + zap.Uint64("usedMemory", usedMemoryBytes), + zap.Uint64("availableMemory", availableMemoryBytes), + ) + err = fmt.Errorf( + "remaining available memory (%d) is below minimum required available memory (%d) when using (%d)", + availableMemoryBytes, + n.Config.RequiredAvailableMemory, + usedMemoryBytes, + ) + } else if availableMemoryBytes < n.Config.WarningThresholdAvailableMemory { + err = fmt.Errorf( + "remaining available memory (%d) is below the warning threshold of available memory (%d) when using (%d)", + availableMemoryBytes, + n.Config.WarningThresholdAvailableDiskSpace, + usedMemoryBytes, + ) + } + + return map[string]interface{}{ + "usedMemoryBytes": usedMemoryBytes, + "availableMemoryBytes": availableMemoryBytes, + }, err + }) + + err = n.health.RegisterHealthCheck("memory", memorySpaceCheck, health.GlobalTag) + if err != nil { + return fmt.Errorf("couldn't register memory resource health check: %w", err) + } + diskSpaceCheck := health.CheckerFunc(func(context.Context) (interface{}, error) { - // confirm that the node has enough disk space to continue operating - // if there is too little disk space remaining, first report unhealthy and then shutdown the node + // Confirm that the node has enough disk space to continue operating. + // If there is too little disk space remaining, first report unhealthy + // and then shutdown the node. availableDiskBytes := n.resourceTracker.DiskTracker().AvailableDiskBytes() @@ -1058,9 +1100,17 @@ func (n *Node) initHealthAPI() error { zap.Uint64("remainingDiskBytes", availableDiskBytes), ) go n.Shutdown(1) - err = fmt.Errorf("remaining available disk space (%d) is below minimum required available space (%d)", availableDiskBytes, n.Config.RequiredAvailableDiskSpace) + err = fmt.Errorf( + "remaining available disk space (%d) is below minimum required available space (%d)", + availableDiskBytes, + n.Config.RequiredAvailableDiskSpace, + ) } else if availableDiskBytes < n.Config.WarningThresholdAvailableDiskSpace { - err = fmt.Errorf("remaining available disk space (%d) is below the warning threshold of disk space (%d)", availableDiskBytes, n.Config.WarningThresholdAvailableDiskSpace) + err = fmt.Errorf( + "remaining available disk space (%d) is below the warning threshold of disk space (%d)", + availableDiskBytes, + n.Config.WarningThresholdAvailableDiskSpace, + ) } return map[string]interface{}{ @@ -1070,7 +1120,7 @@ func (n *Node) initHealthAPI() error { err = n.health.RegisterHealthCheck("diskspace", diskSpaceCheck, health.GlobalTag) if err != nil { - return fmt.Errorf("couldn't register resource health check: %w", err) + return fmt.Errorf("couldn't register disk resource health check: %w", err) } handler, err := health.NewGetAndPostHandler(n.Log, healthChecker) @@ -1197,6 +1247,7 @@ func (n *Node) initVdrs() validators.Set { // Initialize [n.resourceManager]. func (n *Node) initResourceManager(reg prometheus.Registerer) error { n.resourceManager = resource.NewManager( + n.Log, n.Config.DatabaseConfig.Path, n.Config.SystemTrackerFrequency, n.Config.SystemTrackerCPUHalflife, diff --git a/utils/resource/mock_user.go b/utils/resource/mock_user.go index 9b344ca1c830..8d4ac6241a39 100644 --- a/utils/resource/mock_user.go +++ b/utils/resource/mock_user.go @@ -50,6 +50,20 @@ func (mr *MockUserMockRecorder) AvailableDiskBytes() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AvailableDiskBytes", reflect.TypeOf((*MockUser)(nil).AvailableDiskBytes)) } +// AvailableMemoryBytes mocks base method. +func (m *MockUser) AvailableMemoryBytes() uint64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AvailableMemoryBytes") + ret0, _ := ret[0].(uint64) + return ret0 +} + +// AvailableMemoryBytes indicates an expected call of AvailableMemoryBytes. +func (mr *MockUserMockRecorder) AvailableMemoryBytes() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AvailableMemoryBytes", reflect.TypeOf((*MockUser)(nil).AvailableMemoryBytes)) +} + // CPUUsage mocks base method. func (m *MockUser) CPUUsage() float64 { m.ctrl.T.Helper() @@ -78,3 +92,17 @@ func (mr *MockUserMockRecorder) DiskUsage() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiskUsage", reflect.TypeOf((*MockUser)(nil).DiskUsage)) } + +// MemoryUsage mocks base method. +func (m *MockUser) MemoryUsage() uint64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MemoryUsage") + ret0, _ := ret[0].(uint64) + return ret0 +} + +// MemoryUsage indicates an expected call of MemoryUsage. +func (mr *MockUserMockRecorder) MemoryUsage() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MemoryUsage", reflect.TypeOf((*MockUser)(nil).MemoryUsage)) +} diff --git a/utils/resource/no_usage.go b/utils/resource/no_usage.go index 8a10d11ced2a..a88b1b6f357e 100644 --- a/utils/resource/no_usage.go +++ b/utils/resource/no_usage.go @@ -14,6 +14,14 @@ func (noUsage) CPUUsage() float64 { return 0 } +func (noUsage) MemoryUsage() uint64 { + return 0 +} + +func (noUsage) AvailableMemoryBytes() uint64 { + return math.MaxUint64 +} + func (noUsage) DiskUsage() (float64, float64) { return 0, 0 } diff --git a/utils/resource/usage.go b/utils/resource/usage.go index 2c83aa0db034..5c4fc8ca9260 100644 --- a/utils/resource/usage.go +++ b/utils/resource/usage.go @@ -8,8 +8,13 @@ import ( "sync" "time" + "go.uber.org/zap" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/process" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/storage" ) @@ -29,17 +34,27 @@ type CPUUser interface { CPUUsage() float64 } +type MemoryUser interface { + // MemoryUsage returns the amount of bytes this user has allocated. + MemoryUsage() uint64 + + // AvailableMemoryBytes returns number of bytes available for the OS to + // allocate. + AvailableMemoryBytes() uint64 +} + type DiskUser interface { // DiskUsage returns the number of bytes per second read from/written to // disk recently. DiskUsage() (read float64, write float64) - // returns number of bytes available in the db volume + // AvailableDiskBytes returns number of bytes available in the db volume. AvailableDiskBytes() uint64 } type User interface { CPUUser + MemoryUser DiskUser } @@ -62,24 +77,34 @@ type Manager interface { } type manager struct { + log logging.Logger processesLock sync.Mutex processes map[int]*proc - usageLock sync.RWMutex - cpuUsage float64 + usageLock sync.RWMutex + cpuUsage float64 + memoryUsage uint64 // [readUsage] is the number of bytes/second read from disk recently. readUsage float64 // [writeUsage] is the number of bytes/second written to disk recently. writeUsage float64 - availableDiskBytes uint64 + availableMemoryBytes uint64 + availableDiskBytes uint64 closeOnce sync.Once onClose chan struct{} } -func NewManager(diskPath string, frequency, cpuHalflife, diskHalflife time.Duration) Manager { +func NewManager( + log logging.Logger, + diskPath string, + frequency time.Duration, + cpuHalflife time.Duration, + diskHalflife time.Duration, +) Manager { m := &manager{ + log: log, processes: make(map[int]*proc), onClose: make(chan struct{}), availableDiskBytes: math.MaxUint64, @@ -95,6 +120,20 @@ func (m *manager) CPUUsage() float64 { return m.cpuUsage } +func (m *manager) MemoryUsage() uint64 { + m.usageLock.RLock() + defer m.usageLock.RUnlock() + + return m.memoryUsage +} + +func (m *manager) AvailableMemoryBytes() uint64 { + m.usageLock.RLock() + defer m.usageLock.RUnlock() + + return m.availableMemoryBytes +} + func (m *manager) DiskUsage() (float64, float64) { m.usageLock.RLock() defer m.usageLock.RUnlock() @@ -115,7 +154,10 @@ func (m *manager) TrackProcess(pid int) { return } - process := &proc{p: p} + process := &proc{ + log: m.log, + p: p, + } m.processesLock.Lock() m.processes[pid] = process @@ -143,22 +185,48 @@ func (m *manager) update(diskPath string, frequency, cpuHalflife, diskHalflife t frequencyInSeconds := frequency.Seconds() for { - currentCPUUsage, currentReadUsage, currentWriteUsage := m.getActiveUsage(frequencyInSeconds) + currentCPUUsage, currentMemoryUsage, currentReadUsage, currentWriteUsage := m.getActiveUsage(frequencyInSeconds) currentScaledCPUUsage := newCPUWeight * currentCPUUsage currentScaledReadUsage := newDiskWeight * currentReadUsage currentScaledWriteUsage := newDiskWeight * currentWriteUsage + machineMemory, getMemoryErr := mem.VirtualMemory() + if getMemoryErr != nil { + m.log.Debug("failed to lookup resource", + zap.String("resource", "system memory"), + zap.Error(getMemoryErr), + ) + } + machineSwap, getSwapErr := mem.SwapMemory() + if getSwapErr != nil { + m.log.Debug("failed to lookup resource", + zap.String("resource", "system swap"), + zap.Error(getSwapErr), + ) + machineSwap = &mem.SwapMemoryStat{} + } availableBytes, getBytesErr := storage.AvailableBytes(diskPath) + if getBytesErr != nil { + m.log.Debug("failed to lookup resource", + zap.String("resource", "system disk"), + zap.Error(getBytesErr), + ) + } m.usageLock.Lock() m.cpuUsage = oldCPUWeight*m.cpuUsage + currentScaledCPUUsage + m.memoryUsage = currentMemoryUsage m.readUsage = oldDiskWeight*m.readUsage + currentScaledReadUsage m.writeUsage = oldDiskWeight*m.writeUsage + currentScaledWriteUsage + if getMemoryErr == nil { + // Note: if [getSwapErr] is non-nil, we report the available swap as + // 0. + m.availableMemoryBytes = machineMemory.Available + machineSwap.Free + } if getBytesErr == nil { m.availableDiskBytes = availableBytes } - m.usageLock.Unlock() select { @@ -171,29 +239,33 @@ func (m *manager) update(diskPath string, frequency, cpuHalflife, diskHalflife t // Returns: // 1. Current CPU usage by all processes. -// 2. Current bytes/sec read from disk by all processes. -// 3. Current bytes/sec written to disk by all processes. -func (m *manager) getActiveUsage(secondsSinceLastUpdate float64) (float64, float64, float64) { +// 2. Current Memory usage by all processes. +// 3. Current bytes/sec read from disk by all processes. +// 4. Current bytes/sec written to disk by all processes. +func (m *manager) getActiveUsage(secondsSinceLastUpdate float64) (float64, uint64, float64, float64) { m.processesLock.Lock() defer m.processesLock.Unlock() var ( - totalCPU float64 - totalRead float64 - totalWrite float64 + totalCPU float64 + totalMemory uint64 + totalRead float64 + totalWrite float64 ) for _, p := range m.processes { - cpu, read, write := p.getActiveUsage(secondsSinceLastUpdate) + cpu, memory, read, write := p.getActiveUsage(secondsSinceLastUpdate) totalCPU += cpu + totalMemory += memory totalRead += read totalWrite += write } - return totalCPU, totalRead, totalWrite + return totalCPU, totalMemory, totalRead, totalWrite } type proc struct { - p *process.Process + log logging.Logger + p *process.Process initialized bool @@ -207,17 +279,37 @@ type proc struct { lastWriteBytes uint64 } -func (p *proc) getActiveUsage(secondsSinceLastUpdate float64) (float64, float64, float64) { +func (p *proc) getActiveUsage(secondsSinceLastUpdate float64) (float64, uint64, float64, float64) { // If there is an error tracking the CPU/disk utilization of a process, // assume that the utilization is 0. times, err := p.p.Times() if err != nil { - return 0, 0, 0 + p.log.Debug("failed to lookup resource", + zap.String("resource", "process CPU"), + zap.Int32("pid", p.p.Pid), + zap.Error(err), + ) + times = &cpu.TimesStat{} } io, err := p.p.IOCounters() if err != nil { - return 0, 0, 0 + p.log.Debug("failed to lookup resource", + zap.String("resource", "process IO"), + zap.Int32("pid", p.p.Pid), + zap.Error(err), + ) + io = &process.IOCountersStat{} + } + + mem, err := p.p.MemoryInfo() + if err != nil { + p.log.Debug("failed to lookup resource", + zap.String("resource", "process memory"), + zap.Int32("pid", p.p.Pid), + zap.Error(err), + ) + mem = &process.MemoryInfoStat{} } var ( @@ -246,7 +338,7 @@ func (p *proc) getActiveUsage(secondsSinceLastUpdate float64) (float64, float64, p.lastReadBytes = io.ReadBytes p.lastWriteBytes = io.WriteBytes - return cpu, read, write + return cpu, mem.RSS, read, write } // getSampleWeights converts the frequency of CPU sampling and the halflife of diff --git a/vms/registry/vm_getter_test.go b/vms/registry/vm_getter_test.go index ce659cf4716b..8dfb2b16e535 100644 --- a/vms/registry/vm_getter_test.go +++ b/vms/registry/vm_getter_test.go @@ -15,6 +15,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/filesystem" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/resource" "github.com/ava-labs/avalanchego/vms" ) @@ -149,7 +150,13 @@ func initVMGetterTest(t *testing.T) *vmGetterTestResources { FileReader: mockReader, Manager: mockManager, PluginDirectory: pluginDir, - CPUTracker: resource.NewManager("", time.Hour, time.Hour, time.Hour), + CPUTracker: resource.NewManager( + logging.NoLog{}, + "", + time.Hour, + time.Hour, + time.Hour, + ), }, )