forked from aws/amazon-ssm-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor_scheduler.go
158 lines (136 loc) · 4.39 KB
/
processor_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package processor implements MDS plugin processor
// processor_scheduler contains the GetMessages Scheduler implementation
package processor
import (
"math/rand"
"sync"
"time"
"github.com/aws/amazon-ssm-agent/agent/sdkutil"
"github.com/carlescere/scheduler"
)
var lastPollTimeMap map[string]time.Time = make(map[string]time.Time)
var lock sync.RWMutex
var processMessage = (*Processor).processMessage
func updateLastPollTime(processorType string, currentTime time.Time) {
lock.Lock()
defer lock.Unlock()
lastPollTimeMap[processorType] = currentTime
}
func getLastPollTime(processorType string) time.Time {
lock.RLock()
defer lock.RUnlock()
return lastPollTimeMap[processorType]
}
// loop reads messages from MDS then processes them.
func (p *Processor) loop() {
// time lock to only have one loop active anytime.
// this is extra insurance to prevent any race condition
pollStartTime := time.Now()
updateLastPollTime(p.name, pollStartTime)
log := p.context.Log()
if !p.isDone() {
if p.processorStopPolicy != nil {
if p.name == mdsName {
log.Debugf("%v's stoppolicy before polling is %v", p.name, p.processorStopPolicy)
}
if p.processorStopPolicy.IsHealthy() == false {
log.Errorf("%v stopped temporarily due to internal failure. We will retry automatically after %v minutes", p.name, pollMessageFrequencyMinutes)
p.reset()
return
}
} else {
log.Debugf("creating new stop-policy.")
p.processorStopPolicy = newStopPolicy(p.name)
}
p.pollOnce()
if p.name == mdsName {
log.Debugf("%v's stoppolicy after polling is %v", p.name, p.processorStopPolicy)
}
// Slow down a bit in case GetMessages returns
// without blocking, which may cause us to
// flood the service with requests.
if time.Since(pollStartTime) < 1*time.Second {
time.Sleep(time.Duration(2000+rand.Intn(500)) * time.Millisecond)
}
// check if any other poll loop has started in the meantime
// to prevent any possible race condition due to the scheduler
if getLastPollTime(p.name) == pollStartTime {
// skip waiting for the next scheduler polling event and start polling immediately
scheduleNextRun(p.messagePollJob)
}
}
}
var scheduleNextRun = func(j *scheduler.Job) {
j.SkipWait <- true
}
func (p *Processor) reset() {
log := p.context.Log()
log.Debugf("Resetting processor:%v", p.name)
// reset stop policy and let the scheduler start the polling after pollMessageFrequencyMinutes timeout
p.processorStopPolicy.ResetErrorCount()
// creating a new mds service object for the retry
// this is extra insurance to avoid service object getting corrupted - adding resiliency
config := p.context.AppConfig()
if p.name == mdsName {
p.service = newMdsService(config)
}
}
// Stop stops the MDSProcessor.
func (p *Processor) stop() {
log := p.context.Log()
log.Debugf("Stopping processor:%v", p.name)
p.service.Stop()
// close channel; subsequent calls to isDone will return true
if !p.isDone() {
close(p.stopSignal)
}
if p.messagePollJob != nil {
p.messagePollJob.Quit <- true
}
if p.assocProcessor != nil {
p.assocProcessor.Stop()
}
}
// isDone returns true if a stop has been requested, false otherwise.
func (p *Processor) isDone() bool {
select {
case <-p.stopSignal:
// received signal or channel already closed
return true
default:
return false
}
}
// pollOnce calls GetMessages once and processes the result.
func (p *Processor) pollOnce() {
log := p.context.Log()
if p.name == mdsName {
log.Debugf("Polling for messages")
}
messages, err := p.service.GetMessages(log, p.config.InstanceID)
if err != nil {
sdkutil.HandleAwsError(log, err, p.processorStopPolicy)
return
}
if len(messages.Messages) > 0 {
log.Debugf("Got %v messages", len(messages.Messages))
}
for _, msg := range messages.Messages {
processMessage(p, msg)
}
if p.name == mdsName {
log.Debugf("Done poll once")
}
}