Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implementing support for randomized start times in Tasks #2465

Merged
merged 3 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion cmd/poller/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"golang.org/x/text/cases"
"golang.org/x/text/language"
"math"
"math/rand"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -52,6 +53,7 @@ type Collector interface {
Start(*sync.WaitGroup)
GetName() string
GetObject() string
GetLogger() *logging.Logger
GetParams() *node.Node
GetOptions() *options.Options
GetCollectCount() uint64
Expand Down Expand Up @@ -142,13 +144,26 @@ func Init(c Collector) error {
opts := c.GetOptions()
name := c.GetName()
object := c.GetObject()
logger := c.GetLogger()
var jitterR time.Duration

// Initialize schedule and tasks (polls)
tasks := params.GetChildS("schedule")
if tasks == nil || len(tasks.GetChildren()) == 0 {
return errs.New(errs.ErrMissingParam, "schedule")
}

jitterS := params.GetChildContentS("jitter")
if jitterS != "" {
jitter, err := time.ParseDuration(jitterS)
if err != nil {
return errs.New(errs.ErrInvalidParam, "jitter ("+jitterS+"): "+err.Error())
}
if jitter > 0 {
jitterR = time.Duration(rand.Int63n(int64(jitter))) //nolint:gosec
}
}

s := schedule.New()

// Each task will be mapped to a collector method
Expand All @@ -160,7 +175,11 @@ func Init(c Collector) error {

if m := reflect.ValueOf(c).MethodByName(methodName); m.IsValid() {
if foo, ok := m.Interface().(func() (map[string]*matrix.Matrix, error)); ok {
if err := s.NewTaskString(task.GetNameS(), task.GetContentS(), foo, true, "Collector_"+c.GetName()+"_"+c.GetObject()); err != nil {
logger.Debug().Str("task", task.GetNameS()).
Str("delay", jitterR.String()).
Str("schedule", task.GetContentS()).
Send()
if err := s.NewTaskString(task.GetNameS(), task.GetContentS(), jitterR, foo, true, "Collector_"+c.GetName()+"_"+c.GetObject()); err != nil {
return errs.New(errs.ErrInvalidParam, "schedule ("+task.GetNameS()+"): "+err.Error())
}
} else {
Expand Down Expand Up @@ -490,6 +509,11 @@ func (c *AbstractCollector) GetName() string {
return c.Name
}

// GetLogger returns logger of the collector
func (c *AbstractCollector) GetLogger() *logging.Logger {
return c.Logger
}

// GetObject returns object of the collector
func (c *AbstractCollector) GetObject() string {
return c.Object
Expand Down
4 changes: 2 additions & 2 deletions cmd/poller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (p *Poller) Init() error {
pollerSchedule = p.params.PollerSchedule
}
p.schedule = schedule.New()
if err = p.schedule.NewTaskString("poller", pollerSchedule, nil, true, "poller_"+p.name); err != nil {
if err = p.schedule.NewTaskString("poller", pollerSchedule, 0, nil, true, "poller_"+p.name); err != nil {
logger.Error().Stack().Err(err).Msg("set schedule:")
return err
}
Expand All @@ -349,7 +349,7 @@ func (p *Poller) Init() error {
p.firstAutoSupport()
})
}
if err = p.schedule.NewTaskString("asup", asupSchedule, p.startAsup, p.options.Asup, "asup_"+p.name); err != nil {
if err = p.schedule.NewTaskString("asup", asupSchedule, 0, p.startAsup, p.options.Asup, "asup_"+p.name); err != nil {
return err
}
logger.Info().
Expand Down
10 changes: 5 additions & 5 deletions cmd/poller/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ func (s *Schedule) Recover() {
// should be positive.
// The order in which tasks are added is maintained: GetTasks() will
// return tasks in FIFO order.
func (s *Schedule) NewTask(n string, i time.Duration, f func() (map[string]*matrix.Matrix, error), runNow bool, identifier string) error {
func (s *Schedule) NewTask(n string, i time.Duration, jitter time.Duration, f func() (map[string]*matrix.Matrix, error), runNow bool, identifier string) error {
if s.GetTask(n) == nil {
if i > 0 {
t := &Task{Name: n, interval: i, foo: f, identifier: identifier}
s.cachedInterval[n] = t.interval // remember normal interval of task
if runNow {
t.timer = time.Now().Add(-i) // set to run immediately
t.timer = time.Now().Add(-i + jitter) // set to run after jitter
} else {
t.timer = time.Now().Add(0) // run after interval has elapsed
t.timer = time.Now().Add(jitter) // run after interval has elapsed
}
s.tasks = append(s.tasks, t)
return nil
Expand All @@ -189,12 +189,12 @@ func (s *Schedule) NewTask(n string, i time.Duration, f func() (map[string]*matr
}

// NewTaskString creates a new task, the interval is parsed from string i
func (s *Schedule) NewTaskString(n, i string, f func() (map[string]*matrix.Matrix, error), runNow bool, identifier string) error {
func (s *Schedule) NewTaskString(n, i string, jitter time.Duration, f func() (map[string]*matrix.Matrix, error), runNow bool, identifier string) error {
d, err := time.ParseDuration(i)
if err != nil {
return err
}
return s.NewTask(n, d, f, runNow, identifier)
return s.NewTask(n, d, jitter, f, runNow, identifier)
}

// GetTasks returns scheduled tasks
Expand Down
54 changes: 51 additions & 3 deletions cmd/poller/schedule/schedule_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package schedule

import (
"github.com/netapp/harvest/v2/pkg/matrix"
"testing"
"time"
)

func setupSchedule() *Schedule {
s := New()
err1 := s.NewTaskString("counter", "1200s", nil, false, "")
err2 := s.NewTaskString("data", "180s", nil, false, "")
err3 := s.NewTaskString("instance", "600s", nil, false, "")
err1 := s.NewTaskString("counter", "1200s", 0, nil, false, "")
err2 := s.NewTaskString("data", "180s", 0, nil, false, "")
err3 := s.NewTaskString("instance", "600s", 0, nil, false, "")
if err1 != nil || err2 != nil || err3 != nil {
panic("error creating tasks")
}
Expand Down Expand Up @@ -43,3 +44,50 @@ func TestSchedule_Recover(t *testing.T) {
}
}
}

func TestNewTaskString(t *testing.T) {
// Define a dummy function for the task
f := func() (map[string]*matrix.Matrix, error) {
return nil, nil
}

testCases := []struct {
name string
interval string
jitter time.Duration
runNow bool
}{
{"test1", "10s", 5 * time.Second, true},
{"test2", "10s", 5 * time.Second, false},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s := &Schedule{
tasks: []*Task{},
cachedInterval: map[string]time.Duration{},
}

// Create a task with runNow set to tc.runNow
err := s.NewTaskString(tc.name+"1", tc.interval, tc.jitter, f, tc.runNow, "testID1")
if err != nil {
t.Errorf("NewTaskString returned an error: %v", err)
}

// create another task with runNow set to tc.runNow and jitter 0
err = s.NewTaskString(tc.name+"2", tc.interval, 0, f, tc.runNow, "testID2")
if err != nil {
t.Errorf("NewTaskString returned an error: %v", err)
}

if len(s.tasks) != 2 {
t.Errorf("Expected 2 tasks, got %d", len(s.tasks))
}

// Check that the task with jitter should run after the task with jitter set to 0
if s.tasks[0].timer.Before(s.tasks[1].timer) {
t.Errorf("Expected first task to run after second task, got first task timer '%s' and second task timer '%s'", s.tasks[0].timer, s.tasks[1].timer)
}
})
}
}
Loading