From f5c834f996934ea8c26de38cee15616aa7694554 Mon Sep 17 00:00:00 2001 From: Denis Biondic Date: Mon, 13 Feb 2023 17:55:40 +0100 Subject: [PATCH] refactor: update for review comments / improvements --- internal/vigilante/database/database.go | 16 -------- .../vigilante/database/database_suite_test.go | 13 ------- internal/vigilante/database/in_memory.go | 25 ------------ internal/vigilante/database/in_memory_test.go | 39 ------------------- internal/vigilante/scheduler/scheduler.go | 16 ++++---- internal/vigilante/tasks/snat/factory.go | 4 +- internal/vigilante/tasks/snat/worker.go | 26 ++++++------- internal/vigilante/tasks/snat/worker_test.go | 3 +- 8 files changed, 24 insertions(+), 118 deletions(-) delete mode 100644 internal/vigilante/database/database.go delete mode 100644 internal/vigilante/database/database_suite_test.go delete mode 100644 internal/vigilante/database/in_memory.go delete mode 100644 internal/vigilante/database/in_memory_test.go diff --git a/internal/vigilante/database/database.go b/internal/vigilante/database/database.go deleted file mode 100644 index c89c884..0000000 --- a/internal/vigilante/database/database.go +++ /dev/null @@ -1,16 +0,0 @@ -package database - -// Database is a simple interface for any key-value store -type Database interface { - // Set sets the key with value, creating a new key or overwriting an existing one - Set(key string, value interface{}) - - // Get returns the value found by key. Returns nil if nothing found - Get(key string) interface{} - - // GetAll returns all elements in a normal map representation - GetAll() map[string]interface{} - - // Delete removes the key from the database - Delete(key string) -} diff --git a/internal/vigilante/database/database_suite_test.go b/internal/vigilante/database/database_suite_test.go deleted file mode 100644 index 1684cda..0000000 --- a/internal/vigilante/database/database_suite_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package database_test - -import ( - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -func TestDatabase(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Database Suite") -} diff --git a/internal/vigilante/database/in_memory.go b/internal/vigilante/database/in_memory.go deleted file mode 100644 index 18d1138..0000000 --- a/internal/vigilante/database/in_memory.go +++ /dev/null @@ -1,25 +0,0 @@ -package database - -type InMemoryDatabase struct { - store map[string]interface{} -} - -func NewInMemoryDatabase() Database { - return &InMemoryDatabase{store: map[string]interface{}{}} -} - -func (db *InMemoryDatabase) Get(key string) interface{} { - return db.store[key] -} - -func (db *InMemoryDatabase) GetAll() map[string]interface{} { - return db.store -} - -func (db *InMemoryDatabase) Set(key string, value interface{}) { - db.store[key] = value -} - -func (db *InMemoryDatabase) Delete(key string) { - delete(db.store, key) -} diff --git a/internal/vigilante/database/in_memory_test.go b/internal/vigilante/database/in_memory_test.go deleted file mode 100644 index 0a71d91..0000000 --- a/internal/vigilante/database/in_memory_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package database_test - -import ( - . "github.com/conplementag/cops-vigilante/internal/vigilante/database" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -var _ = Describe("InMemory", func() { - var db Database - - BeforeEach(func() { - db = NewInMemoryDatabase() - }) - - Describe("With some values set", func() { - BeforeEach(func() { - db.Set("a", 123) - db.Set("b", "any value here") - db.Set("a", 345) - db.Set("b", "any value here 2") - }) - - It("should get the values without any problems", func() { - Expect(db.Get("a")).To(Equal(345)) - Expect(db.Get("b")).To(Equal("any value here 2")) - Expect(db.Get("c")).To(BeNil()) - }) - - It("should return all when requested", func() { - Expect(len(db.GetAll())).To(Equal(2)) - }) - - It("should be able to delete", func() { - db.Delete("a") - Expect(db.Get("a")).To(BeNil()) - }) - }) -}) diff --git a/internal/vigilante/scheduler/scheduler.go b/internal/vigilante/scheduler/scheduler.go index 47e3387..a59d1ec 100644 --- a/internal/vigilante/scheduler/scheduler.go +++ b/internal/vigilante/scheduler/scheduler.go @@ -3,7 +3,6 @@ package scheduler import ( "context" "fmt" - "github.com/conplementag/cops-vigilante/internal/vigilante/database" "github.com/conplementag/cops-vigilante/internal/vigilante/errors" "github.com/conplementag/cops-vigilante/internal/vigilante/services" "github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat" @@ -39,13 +38,16 @@ func InitializeAndStart(scheduleIntervalInSeconds int) *Scheduler { func (s *Scheduler) start(scheduleIntervalInSeconds int) { duration, _ := time.ParseDuration(fmt.Sprintf("%ds", scheduleIntervalInSeconds)) - _, err := s.taskScheduler.ScheduleAtFixedRate(func(ctx context.Context) { - k8sClient, err := services.NewKubernetesService() - if err != nil { - errors.PanicOnError(err) - } + k8sClient, err := services.NewKubernetesService() + if err != nil { + errors.PanicOnError(err) + } + + // task instances need to be kept "outside" the scheduled loop to preserve state (if any) + snatTask := snat.NewSnatTask(k8sClient, &snatmetrics.SnatMetricsRecorder{}, &clock.RealClock{}) - snat.NewSnatTask(k8sClient, database.NewInMemoryDatabase(), &snatmetrics.SnatMetricsRecorder{}, &clock.RealClock{}).Run() + _, err = s.taskScheduler.ScheduleAtFixedRate(func(ctx context.Context) { + snatTask.Run() }, duration) if err != nil { diff --git a/internal/vigilante/tasks/snat/factory.go b/internal/vigilante/tasks/snat/factory.go index ea28d76..3e0beb1 100644 --- a/internal/vigilante/tasks/snat/factory.go +++ b/internal/vigilante/tasks/snat/factory.go @@ -2,7 +2,6 @@ package snat import ( "github.com/conplementag/cops-vigilante/internal/vigilante/clock" - "github.com/conplementag/cops-vigilante/internal/vigilante/database" "github.com/conplementag/cops-vigilante/internal/vigilante/services" "github.com/conplementag/cops-vigilante/internal/vigilante/tasks" "github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat/metrics" @@ -10,14 +9,13 @@ import ( func NewSnatTask( kubernetesService services.KubernetesService, - stateDatabase database.Database, metrics metrics.SnatMetrics, clock clock.Clock, ) tasks.Task { return &snatTask{ kubernetesService: kubernetesService, - stateDatabase: stateDatabase, metrics: metrics, clock: clock, + state: map[string]interface{}{}, } } diff --git a/internal/vigilante/tasks/snat/worker.go b/internal/vigilante/tasks/snat/worker.go index 943b257..15d3044 100644 --- a/internal/vigilante/tasks/snat/worker.go +++ b/internal/vigilante/tasks/snat/worker.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/ahmetb/go-linq/v3" "github.com/conplementag/cops-vigilante/internal/vigilante/clock" - "github.com/conplementag/cops-vigilante/internal/vigilante/database" "github.com/conplementag/cops-vigilante/internal/vigilante/services" "github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat/consts" "github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat/metrics" @@ -12,12 +11,13 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" apimachinerymetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "strings" "time" ) type snatTask struct { kubernetesService services.KubernetesService - stateDatabase database.Database + state map[string]interface{} metrics metrics.SnatMetrics clock clock.Clock } @@ -53,11 +53,11 @@ func (s *snatTask) heal() { readyNonHealedWindowsNodes := s.filterForReadyNonHealedWindowsNodes(allNodes) // Some cases, like nodes becoming unready, require us to restart the healing process. - s.updateStateDatabase(readyNonHealedWindowsNodes) + s.updateState(readyNonHealedWindowsNodes) for _, node := range readyNonHealedWindowsNodes { s.initializeHealingStateIfRequired(node.Name) - healingState := s.stateDatabase.Get(node.Name).(*NodeHealingState) + healingState := s.state[node.Name].(*NodeHealingState) if healingState.NumberOfErrorRuns >= 10 { logrus.Debugf("Skipping the healing for node %s to to number of errors reached: %d", node.Name, healingState.NumberOfErrorRuns) @@ -100,7 +100,7 @@ func (s *snatTask) filterForReadyNonHealedWindowsNodes(nodes []*corev1.Node) []* for _, node := range nodes { if val, ok := node.Labels["kubernetes.io/os"]; ok { - if val != "windows" && val != "Windows" { + if !strings.EqualFold(val, "windows") { continue // other node types are of no interest } } @@ -137,14 +137,14 @@ func (s *snatTask) filterForReadyNonHealedWindowsNodes(nodes []*corev1.Node) []* return results } -func (s *snatTask) updateStateDatabase(readyWindowsNodes []*corev1.Node) { +func (s *snatTask) updateState(readyWindowsNodes []*corev1.Node) { // If the healing is recorded in our state, and the node becomes un-ready or with unknown state, then we should remove // the healing record and forget about the node. Once it becomes ready again, the healing process will // effectively restart because the new state will be written for that node. In this case, we should also // not attempt any. var itemKeysToRemoveFromState []string - for key, _ := range s.stateDatabase.GetAll() { + for key, _ := range s.state { readyNode := linq.From(readyWindowsNodes).WhereT(func(node *corev1.Node) bool { return node.Name == key }).Single() @@ -154,18 +154,18 @@ func (s *snatTask) updateStateDatabase(readyWindowsNodes []*corev1.Node) { } } - // We delete outside the loop above to prevent modifying the state database in the same loop (removing items from + // We delete outside the loop above to prevent modifying the state in the same loop (removing items from // a "collection" while iterating the same collection is never a good idea). for _, key := range itemKeysToRemoveFromState { - s.stateDatabase.Delete(key) + delete(s.state, key) } } func (s *snatTask) initializeHealingStateIfRequired(nodeName string) { - if s.stateDatabase.Get(nodeName) == nil { - s.stateDatabase.Set(nodeName, &NodeHealingState{ + if s.state[nodeName] == nil { + s.state[nodeName] = &NodeHealingState{ HealingStartedAt: s.clock.Now(), - }) + } } } @@ -207,7 +207,7 @@ func (s *snatTask) createHealingPod(nodeName string) error { // handleError handles errors that occur during a node healing step func (s *snatTask) handleError(nodeName string, err error) { logrus.Warnf("[SNAT Task] error occured, will be recorded. Error: %v", err) - healingState := s.stateDatabase.Get(nodeName).(*NodeHealingState) + healingState := s.state[nodeName].(*NodeHealingState) if healingState == nil { panic(fmt.Sprintf("Got an error during healing of a node %s, but the state for that node was not found. "+ diff --git a/internal/vigilante/tasks/snat/worker_test.go b/internal/vigilante/tasks/snat/worker_test.go index a373a32..2776e14 100644 --- a/internal/vigilante/tasks/snat/worker_test.go +++ b/internal/vigilante/tasks/snat/worker_test.go @@ -3,7 +3,6 @@ package snat_test import ( "errors" "github.com/conplementag/cops-vigilante/internal/vigilante/clock/testing" - "github.com/conplementag/cops-vigilante/internal/vigilante/database" "github.com/conplementag/cops-vigilante/internal/vigilante/services" "github.com/conplementag/cops-vigilante/internal/vigilante/tasks" "github.com/conplementag/cops-vigilante/internal/vigilante/tasks/snat" @@ -30,7 +29,7 @@ var _ = Describe("SNAT Worker", func() { metricsRecorderMock.On("IncHealAttemptsCounter", mock.Anything) metricsRecorderMock.On("IncNumberOfNotReadyNodesSeen", mock.Anything) - task = snat.NewSnatTask(kubernetesServiceMock, database.NewInMemoryDatabase(), metricsRecorderMock, fakeClock) + task = snat.NewSnatTask(kubernetesServiceMock, metricsRecorderMock, fakeClock) }) When("running on a fresh cluster", func() {