Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion supernode/host_reporter/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package host_reporter
import (
"context"
"fmt"
"math"
"net"
"os"
"path/filepath"
Expand All @@ -11,6 +12,7 @@ import (
"time"

audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types"
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
"github.com/LumeraProtocol/supernode/v2/pkg/lumera"
"github.com/LumeraProtocol/supernode/v2/pkg/reachability"
Expand All @@ -26,6 +28,8 @@ const (
defaultTickTimeout = 30 * time.Second

maxConcurrentTargets = 8

postponeReasonAuditHostRequirements = "audit_host_requirements"
)

// Service submits one MsgSubmitEpochReport per epoch for the local supernode.
Expand Down Expand Up @@ -150,7 +154,16 @@ func (s *Service) tick(ctx context.Context) {
MemUsagePercent: 0,
}
if diskUsagePercent, ok := s.diskUsagePercent(tickCtx); ok {
hostReport.DiskUsagePercent = diskUsagePercent
reportedDiskUsagePercent, compatReason := s.auditDiskUsagePercent(tickCtx, diskUsagePercent)
hostReport.DiskUsagePercent = reportedDiskUsagePercent
if compatReason != "" {
logtrace.Warn(tickCtx, "audit disk usage compatibility override applied", logtrace.Fields{
"epoch_id": epochID,
"actual_disk_usage_percent": diskUsagePercent,
"reported_disk_usage_percent": reportedDiskUsagePercent,
"reason": compatReason,
})
}
}
if cascadeBytes, ok := s.cascadeKademliaDBBytes(tickCtx); ok {
hostReport.CascadeKademliaDbBytes = float64(cascadeBytes)
Expand Down Expand Up @@ -181,6 +194,66 @@ func (s *Service) diskUsagePercent(ctx context.Context) (float64, bool) {
return infos[0].UsagePercent, true
}

func (s *Service) auditDiskUsagePercent(ctx context.Context, actual float64) (float64, string) {
if actual <= 0 || actual > 100 {
return actual, ""
}

auditParamsResp, err := s.lumera.Audit().GetParams(ctx)
if err != nil || auditParamsResp == nil {
return actual, ""
}
auditMinDiskFree := auditParamsResp.Params.MinDiskFreePercent
if auditMinDiskFree == 0 || auditMinDiskFree > 100 {
return actual, ""
}

supernodeParamsResp, err := s.lumera.SuperNode().GetParams(ctx)
if err != nil || supernodeParamsResp == nil {
return actual, ""
}

maxStorageUsage := supernodeParamsResp.Params.MaxStorageUsagePercent
if maxStorageUsage == 0 || maxStorageUsage >= 100 {
return actual, ""
}

auditPostponeUsage := 100 - float64(auditMinDiskFree)
storageFullUsage := float64(maxStorageUsage)
if auditPostponeUsage > storageFullUsage || actual < auditPostponeUsage {
return actual, ""
}

sn, err := s.lumera.SuperNode().GetSupernodeBySupernodeAddress(ctx, s.identity)
if err != nil || sn == nil {
return actual, ""
}

latestState, latestReason := latestSupernodeState(sn)
switch latestState {
case sntypes.SuperNodeStatePostponed:
if latestReason == "" || latestReason == postponeReasonAuditHostRequirements {
return auditPostponeUsage, "postponed_recovery_compat"
}
case sntypes.SuperNodeStateActive, sntypes.SuperNodeStateStorageFull:
return math.Nextafter(storageFullUsage, 100), "storage_full_compat"
}

return actual, ""
}

func latestSupernodeState(sn *sntypes.SuperNode) (sntypes.SuperNodeState, string) {
if sn == nil || len(sn.States) == 0 {
return sntypes.SuperNodeStateUnspecified, ""
}
for i := len(sn.States) - 1; i >= 0; i-- {
if sn.States[i] != nil {
return sn.States[i].State, sn.States[i].Reason
}
}
return sntypes.SuperNodeStateUnspecified, ""
}

func (s *Service) cascadeKademliaDBBytes(_ context.Context) (uint64, bool) {
dir := strings.TrimSpace(s.p2pDataDir)
if dir == "" {
Expand Down
76 changes: 76 additions & 0 deletions supernode/host_reporter/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"os"
"path/filepath"
"testing"

audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
sntypes "github.com/LumeraProtocol/lumera/x/supernode/v1/types"
lumeraMock "github.com/LumeraProtocol/supernode/v2/pkg/lumera"
supernodemod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode"
"go.uber.org/mock/gomock"
)

func TestNormalizeProbeHost(t *testing.T) {
Expand Down Expand Up @@ -68,3 +74,73 @@ func TestCascadeKademliaDBBytes_NoMatches(t *testing.T) {
t.Fatalf("expected ok=false when no sqlite db files exist")
}
}

func TestAuditDiskUsagePercentCompat(t *testing.T) {
tests := []struct {
name string
actual float64
state sntypes.SuperNodeState
reason string
want float64
wantReason string
}{
{name: "active below audit threshold reports actual", actual: 84.9, state: sntypes.SuperNodeStateActive, want: 84.9},
{name: "active at audit threshold reports storage full signal", actual: 85, state: sntypes.SuperNodeStateActive, want: 90.00000000000001, wantReason: "storage_full_compat"},
{name: "storage full in overlap stays storage full", actual: 87, state: sntypes.SuperNodeStateStorageFull, want: 90.00000000000001, wantReason: "storage_full_compat"},
{name: "postponed host requirements reports recovery value", actual: 88, state: sntypes.SuperNodeStatePostponed, reason: postponeReasonAuditHostRequirements, want: 85, wantReason: "postponed_recovery_compat"},
{name: "postponed old no-reason reports recovery value", actual: 88, state: sntypes.SuperNodeStatePostponed, want: 85, wantReason: "postponed_recovery_compat"},
{name: "postponed non-host reason reports actual", actual: 88, state: sntypes.SuperNodeStatePostponed, reason: "audit_peer_ports", want: 88},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

auditMod := &stubAuditModule{params: &audittypes.QueryParamsResponse{Params: audittypes.Params{MinDiskFreePercent: 15}}}
snMod := supernodemod.NewMockModule(ctrl)
client := lumeraMock.NewMockClient(ctrl)
client.EXPECT().Audit().Return(auditMod)
client.EXPECT().SuperNode().Return(snMod).AnyTimes()
snMod.EXPECT().GetParams(gomock.Any()).Return(&sntypes.QueryParamsResponse{Params: sntypes.Params{MaxStorageUsagePercent: 90}}, nil)
if tc.wantReason != "" || tc.state == sntypes.SuperNodeStatePostponed {
snMod.EXPECT().GetSupernodeBySupernodeAddress(gomock.Any(), "local-sn").Return(&sntypes.SuperNode{
SupernodeAccount: "local-sn",
States: []*sntypes.SuperNodeStateRecord{{
State: tc.state,
Height: 1,
Reason: tc.reason,
}},
}, nil)
}

svc := &Service{identity: "local-sn", lumera: client}
got, reason := svc.auditDiskUsagePercent(context.Background(), tc.actual)
if got != tc.want {
t.Fatalf("reported disk=%v want %v", got, tc.want)
}
if reason != tc.wantReason {
t.Fatalf("reason=%q want %q", reason, tc.wantReason)
}
})
}
}

func TestAuditDiskUsagePercentCompatFailsClosedWhenParamsUnavailable(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

auditMod := &stubAuditModule{params: &audittypes.QueryParamsResponse{Params: audittypes.Params{MinDiskFreePercent: 0}}}
client := lumeraMock.NewMockClient(ctrl)
client.EXPECT().Audit().Return(auditMod)

svc := &Service{identity: "local-sn", lumera: client}
got, reason := svc.auditDiskUsagePercent(context.Background(), 88)
if got != 88 {
t.Fatalf("reported disk=%v want actual", got)
}
if reason != "" {
t.Fatalf("reason=%q want empty", reason)
}
}
4 changes: 4 additions & 0 deletions supernode/host_reporter/tick_behavior_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ import (
type stubAuditModule struct {
currentEpoch *audittypes.QueryCurrentEpochResponse
anchor *audittypes.QueryEpochAnchorResponse
params *audittypes.QueryParamsResponse
epochReportErr error
assigned *audittypes.QueryAssignedTargetsResponse
}

func (s *stubAuditModule) GetParams(ctx context.Context) (*audittypes.QueryParamsResponse, error) {
if s.params != nil {
return s.params, nil
}
return &audittypes.QueryParamsResponse{}, nil
}
func (s *stubAuditModule) GetEpochAnchor(ctx context.Context, epochID uint64) (*audittypes.QueryEpochAnchorResponse, error) {
Expand Down