Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into feature/sort-records-by-date
Browse files Browse the repository at this point in the history
  • Loading branch information
Panziera, Luca committed May 11, 2017
2 parents 467ba4c + 946d554 commit dff97c3
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
8 changes: 8 additions & 0 deletions cycles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ cycles:
collection: methode
throttle: 1m
coolDown: 5m


- name: wordpress-whole-archive
type: ThrottledWholeCollection
origin: wordpress
collection: wordpress
throttle: 1m
coolDown: 5m

# - name: methode-one-hour
# type: ScalingWindow
Expand Down
32 changes: 31 additions & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,46 @@ func (s *defaultScheduler) Start() error {
if s.IsRunning() {
return errors.New("Scheduler is already running")
}

s.setCurrentExecutionState(running)

startInterval := s.archiveCycleStartInterval()

for id, cycle := range s.cycles {
log.WithField("id", id).Info("Starting cycle.")
cycle.Start()
time.Sleep(startInterval)

}
return nil
}

func (s *defaultScheduler) archiveCycleStartInterval() time.Duration {

var temp time.Duration
var archiveCycles []*ThrottledWholeCollectionCycle

for _, cycle := range s.cycles {
if "ThrottledWholeCollection" == cycle.TransformToConfig().Type {
archiveCycles = append(archiveCycles, cycle.(*ThrottledWholeCollectionCycle))
}
}
numArchiveCycles := len(archiveCycles)

if numArchiveCycles > 1 {

minimumStartInterval := archiveCycles[0].throttle.Interval()

for _, cycle := range archiveCycles {
temp = cycle.throttle.Interval()
if temp < minimumStartInterval {
minimumStartInterval = temp
}
}
return minimumStartInterval / time.Duration(numArchiveCycles)
}
return temp
}

func (s *defaultScheduler) Shutdown() error {
s.cycleLock.RLock()
defer s.cycleLock.RUnlock()
Expand Down
39 changes: 39 additions & 0 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func TestSchedulerShouldStartWhenEnabled(t *testing.T) {
c2.On("ID").Return("id2")
c1.On("Start").Return()
c2.On("Start").Return()
c1.On("TransformToConfig").Return(&CycleConfig{Type: "test"})
c2.On("TransformToConfig").Return(&CycleConfig{Type: "test"})

s.AddCycle(c1)
s.AddCycle(c2)
Expand Down Expand Up @@ -85,6 +87,8 @@ func TestSchedulerResumeAfterDisable(t *testing.T) {
c2.On("Start").Return()
c1.On("Stop").Return()
c2.On("Stop").Return()
c1.On("TransformToConfig").Return(&CycleConfig{Type: "test"})
c2.On("TransformToConfig").Return(&CycleConfig{Type: "test"})

s.AddCycle(c1)
s.AddCycle(c2)
Expand Down Expand Up @@ -162,6 +166,7 @@ func TestSaveCycleMetadata(t *testing.T) {
c1 := new(MockCycle)
c1.On("ID").Return(id1)
c1.On("Start").Return()
c1.On("TransformToConfig").Return(&CycleConfig{Type: "test"})

db := new(native.MockDB)
dbCollection := "testCollection"
Expand All @@ -183,3 +188,37 @@ func TestSaveCycleMetadata(t *testing.T) {

rw.AssertExpectations(t)
}

func TestCalculateArchiveCycleStartInterval(t *testing.T) {
assert := assert.New(t)
id1 := "id1"

c1 := new(MockCycle)
c1.On("ID").Return(id1)
c1.On("Start").Return()
c1.On("TransformToConfig").Return(&CycleConfig{Type: "test"})

db := new(native.MockDB)
dbCollection := "testCollection"
origin := "testOrigin"
coolDown := time.Minute
throttle, _ := NewThrottle(time.Second, 1)
throttle2, _ := NewThrottle(time.Second, 2)
c2 := NewThrottledWholeCollectionCycle("test", db, dbCollection, origin, coolDown, throttle, nil)
c3 := NewThrottledWholeCollectionCycle("test2", db, dbCollection, origin, coolDown, throttle2, nil)

rw := MockMetadataRW{}

s := NewScheduler(db, &tasks.MockTask{}, &rw, 1*time.Minute)

s.AddCycle(c1)
s.AddCycle(c2)
s.AddCycle(c3)

testIterval := s.(*defaultScheduler).archiveCycleStartInterval()

//we have 2 archive cycles out of 3, cycles and the shortest throttle is 1 sec
//therefore the startup interval is 500ms
expected, _ := time.ParseDuration("500ms")
assert.Equal(expected, testIterval, "test interval should be 500ms")
}

0 comments on commit dff97c3

Please sign in to comment.