Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Commit

Permalink
refactor: update for review comments / improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiondic committed Feb 13, 2023
1 parent a6f98d4 commit f5c834f
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 118 deletions.
16 changes: 0 additions & 16 deletions internal/vigilante/database/database.go

This file was deleted.

13 changes: 0 additions & 13 deletions internal/vigilante/database/database_suite_test.go

This file was deleted.

25 changes: 0 additions & 25 deletions internal/vigilante/database/in_memory.go

This file was deleted.

39 changes: 0 additions & 39 deletions internal/vigilante/database/in_memory_test.go

This file was deleted.

16 changes: 9 additions & 7 deletions internal/vigilante/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"fmt"
"github.com/conplementag/cops-vigilante/internal/vigilante/database"
"github.com/conplementag/cops-vigilante/internal/vigilante/errors"
"github.com/conplementag/cops-vigilante/internal/vigilante/services"
"github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat"
Expand Down Expand Up @@ -39,13 +38,16 @@ func InitializeAndStart(scheduleIntervalInSeconds int) *Scheduler {
func (s *Scheduler) start(scheduleIntervalInSeconds int) {
duration, _ := time.ParseDuration(fmt.Sprintf("%ds", scheduleIntervalInSeconds))

_, err := s.taskScheduler.ScheduleAtFixedRate(func(ctx context.Context) {
k8sClient, err := services.NewKubernetesService()
if err != nil {
errors.PanicOnError(err)
}
k8sClient, err := services.NewKubernetesService()
if err != nil {
errors.PanicOnError(err)
}

// task instances need to be kept "outside" the scheduled loop to preserve state (if any)
snatTask := snat.NewSnatTask(k8sClient, &snatmetrics.SnatMetricsRecorder{}, &clock.RealClock{})

snat.NewSnatTask(k8sClient, database.NewInMemoryDatabase(), &snatmetrics.SnatMetricsRecorder{}, &clock.RealClock{}).Run()
_, err = s.taskScheduler.ScheduleAtFixedRate(func(ctx context.Context) {
snatTask.Run()
}, duration)

if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions internal/vigilante/tasks/snat/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ package snat

import (
"github.com/conplementag/cops-vigilante/internal/vigilante/clock"
"github.com/conplementag/cops-vigilante/internal/vigilante/database"
"github.com/conplementag/cops-vigilante/internal/vigilante/services"
"github.com/conplementag/cops-vigilante/internal/vigilante/tasks"
"github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat/metrics"
)

func NewSnatTask(
kubernetesService services.KubernetesService,
stateDatabase database.Database,
metrics metrics.SnatMetrics,
clock clock.Clock,
) tasks.Task {
return &snatTask{
kubernetesService: kubernetesService,
stateDatabase: stateDatabase,
metrics: metrics,
clock: clock,
state: map[string]interface{}{},
}
}
26 changes: 13 additions & 13 deletions internal/vigilante/tasks/snat/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import (
"fmt"
"github.com/ahmetb/go-linq/v3"
"github.com/conplementag/cops-vigilante/internal/vigilante/clock"
"github.com/conplementag/cops-vigilante/internal/vigilante/database"
"github.com/conplementag/cops-vigilante/internal/vigilante/services"
"github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat/consts"
"github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat/metrics"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimachinerymetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
"time"
)

type snatTask struct {
kubernetesService services.KubernetesService
stateDatabase database.Database
state map[string]interface{}
metrics metrics.SnatMetrics
clock clock.Clock
}
Expand Down Expand Up @@ -53,11 +53,11 @@ func (s *snatTask) heal() {
readyNonHealedWindowsNodes := s.filterForReadyNonHealedWindowsNodes(allNodes)

// Some cases, like nodes becoming unready, require us to restart the healing process.
s.updateStateDatabase(readyNonHealedWindowsNodes)
s.updateState(readyNonHealedWindowsNodes)

for _, node := range readyNonHealedWindowsNodes {
s.initializeHealingStateIfRequired(node.Name)
healingState := s.stateDatabase.Get(node.Name).(*NodeHealingState)
healingState := s.state[node.Name].(*NodeHealingState)

if healingState.NumberOfErrorRuns >= 10 {
logrus.Debugf("Skipping the healing for node %s to to number of errors reached: %d", node.Name, healingState.NumberOfErrorRuns)
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *snatTask) filterForReadyNonHealedWindowsNodes(nodes []*corev1.Node) []*

for _, node := range nodes {
if val, ok := node.Labels["kubernetes.io/os"]; ok {
if val != "windows" && val != "Windows" {
if !strings.EqualFold(val, "windows") {
continue // other node types are of no interest
}
}
Expand Down Expand Up @@ -137,14 +137,14 @@ func (s *snatTask) filterForReadyNonHealedWindowsNodes(nodes []*corev1.Node) []*
return results
}

func (s *snatTask) updateStateDatabase(readyWindowsNodes []*corev1.Node) {
func (s *snatTask) updateState(readyWindowsNodes []*corev1.Node) {
// If the healing is recorded in our state, and the node becomes un-ready or with unknown state, then we should remove
// the healing record and forget about the node. Once it becomes ready again, the healing process will
// effectively restart because the new state will be written for that node. In this case, we should also
// not attempt any.
var itemKeysToRemoveFromState []string

for key, _ := range s.stateDatabase.GetAll() {
for key, _ := range s.state {
readyNode := linq.From(readyWindowsNodes).WhereT(func(node *corev1.Node) bool {
return node.Name == key
}).Single()
Expand All @@ -154,18 +154,18 @@ func (s *snatTask) updateStateDatabase(readyWindowsNodes []*corev1.Node) {
}
}

// We delete outside the loop above to prevent modifying the state database in the same loop (removing items from
// We delete outside the loop above to prevent modifying the state in the same loop (removing items from
// a "collection" while iterating the same collection is never a good idea).
for _, key := range itemKeysToRemoveFromState {
s.stateDatabase.Delete(key)
delete(s.state, key)
}
}

func (s *snatTask) initializeHealingStateIfRequired(nodeName string) {
if s.stateDatabase.Get(nodeName) == nil {
s.stateDatabase.Set(nodeName, &NodeHealingState{
if s.state[nodeName] == nil {
s.state[nodeName] = &NodeHealingState{
HealingStartedAt: s.clock.Now(),
})
}
}
}

Expand Down Expand Up @@ -207,7 +207,7 @@ func (s *snatTask) createHealingPod(nodeName string) error {
// handleError handles errors that occur during a node healing step
func (s *snatTask) handleError(nodeName string, err error) {
logrus.Warnf("[SNAT Task] error occured, will be recorded. Error: %v", err)
healingState := s.stateDatabase.Get(nodeName).(*NodeHealingState)
healingState := s.state[nodeName].(*NodeHealingState)

if healingState == nil {
panic(fmt.Sprintf("Got an error during healing of a node %s, but the state for that node was not found. "+
Expand Down
3 changes: 1 addition & 2 deletions internal/vigilante/tasks/snat/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package snat_test
import (
"errors"
"github.com/conplementag/cops-vigilante/internal/vigilante/clock/testing"
"github.com/conplementag/cops-vigilante/internal/vigilante/database"
"github.com/conplementag/cops-vigilante/internal/vigilante/services"
"github.com/conplementag/cops-vigilante/internal/vigilante/tasks"
"github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat"
Expand All @@ -30,7 +29,7 @@ var _ = Describe("SNAT Worker", func() {
metricsRecorderMock.On("IncHealAttemptsCounter", mock.Anything)
metricsRecorderMock.On("IncNumberOfNotReadyNodesSeen", mock.Anything)

task = snat.NewSnatTask(kubernetesServiceMock, database.NewInMemoryDatabase(), metricsRecorderMock, fakeClock)
task = snat.NewSnatTask(kubernetesServiceMock, metricsRecorderMock, fakeClock)
})

When("running on a fresh cluster", func() {
Expand Down

0 comments on commit f5c834f

Please sign in to comment.