-
Notifications
You must be signed in to change notification settings - Fork 323
/
messageutil.go
339 lines (293 loc) · 13.4 KB
/
messageutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not
// use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing`
// permissions and limitations under the License.
// Package utils provides utility functions to be used by interactors
package utils
import (
"encoding/json"
"errors"
"fmt"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
"github.com/Jeffail/gabs"
"github.com/aws/amazon-ssm-agent/agent/context"
"github.com/aws/amazon-ssm-agent/agent/contracts"
"github.com/aws/amazon-ssm-agent/agent/framework/docparser"
"github.com/aws/amazon-ssm-agent/agent/jsonutil"
"github.com/aws/amazon-ssm-agent/agent/log"
"github.com/aws/amazon-ssm-agent/agent/log/logger"
model "github.com/aws/amazon-ssm-agent/agent/messageservice/contracts"
messageContracts "github.com/aws/amazon-ssm-agent/agent/runcommand/contracts"
"github.com/aws/amazon-ssm-agent/agent/times"
"github.com/aws/amazon-ssm-agent/common/identity/identity"
"github.com/aws/aws-sdk-go/service/ssmmds"
)
// TopicPrefix is the prefix of the Topic field in an MDS message.
type TopicPrefix string
const (
// CloudWatchLogGroupNamePrefix CloudWatch output's log group name prefix
CloudWatchLogGroupNamePrefix = "/aws/ssm/"
documentContent = "DocumentContent"
runtimeConfig = "runtimeConfig"
cloudwatchPlugin = "aws:cloudWatch"
properties = "properties"
parameters = "Parameters"
// MDS service will mark document as timeout if it didn't receive any response from the agent after 2 hours
documentLevelTimeOutDurationHour = 2
// SendCommandTopicPrefix is the topic prefix for a send command MDS message.
SendCommandTopicPrefix TopicPrefix = "aws.ssm.sendCommand"
// CancelCommandTopicPrefix is the topic prefix for a cancel command MDS message.
CancelCommandTopicPrefix TopicPrefix = "aws.ssm.cancelCommand"
// SendFailedReplyFrequencyMinutes is the frequency at which to send failed reply requests back to MDS
SendFailedReplyFrequencyMinutes = 5
)
// empty returns true if string is empty
func empty(s *string) bool {
return s == nil || *s == ""
}
// Validate returns error if the message is invalid
func Validate(msg *ssmmds.Message) error {
if msg == nil {
return errors.New("Message is nil")
}
if empty(msg.Topic) {
return errors.New("Topic is missing")
}
if empty(msg.Destination) {
return errors.New("Destination is missing")
}
if empty(msg.MessageId) {
return errors.New("MessageId is missing")
}
if empty(msg.CreatedDate) {
return errors.New("CreatedDate is missing")
}
return nil
}
// newDocumentInfo initializes new DocumentInfo object
func newDocumentInfo(msg model.InstanceMessage, parsedMsg messageContracts.SendCommandPayload) contracts.DocumentInfo {
documentInfo := new(contracts.DocumentInfo)
documentInfo.CommandID, _ = messageContracts.GetCommandID(msg.MessageId)
documentInfo.DocumentID = documentInfo.CommandID
documentInfo.InstanceID = msg.Destination
documentInfo.MessageID = msg.MessageId
documentInfo.RunID = times.ToIsoDashUTC(times.DefaultClock.Now())
documentInfo.CreatedDate = msg.CreatedDate
documentInfo.DocumentName = parsedMsg.DocumentName
documentInfo.DocumentStatus = contracts.ResultStatusInProgress
return *documentInfo
}
// ParseCancelCommandMessage parses send command message
func ParseCancelCommandMessage(context context.T, msg model.InstanceMessage, upstreamService contracts.UpstreamServiceName) (*contracts.DocumentState, error) {
log := context.Log()
log.Debug("Processing cancel command message - ", msg.MessageId)
var payload messageContracts.CancelPayload
err := json.Unmarshal([]byte(msg.Payload), &payload)
if err != nil {
return nil, err
}
commandID, _ := messageContracts.GetCommandID(msg.MessageId)
var docState contracts.DocumentState
documentInfo := contracts.DocumentInfo{
DocumentID: commandID,
CommandID: commandID,
InstanceID: msg.Destination,
MessageID: msg.MessageId,
RunID: times.ToIsoDashUTC(times.DefaultClock.Now()),
CreatedDate: msg.CreatedDate,
DocumentStatus: contracts.ResultStatusInProgress,
}
cancelCommand := new(contracts.CancelCommandInfo)
cancelCommand.Payload = msg.Payload
cancelCommand.CancelMessageID = payload.CancelMessageID
cancelCommandID, _ := messageContracts.GetCommandID(payload.CancelMessageID)
cancelCommand.CancelCommandID = cancelCommandID
cancelCommand.DebugInfo = fmt.Sprintf("Command %v is yet to be cancelled", commandID)
docState = contracts.DocumentState{
DocumentInformation: documentInfo,
CancelInformation: *cancelCommand,
DocumentType: contracts.CancelCommand,
UpstreamServiceName: upstreamService,
}
return &docState, nil
}
// generateCloudWatchLogStreamPrefix creates the LogStreamPrefix for cloudWatch output. LogStreamPrefix = <CommandID>/<InstanceID>
func generateCloudWatchLogStreamPrefix(context context.T, commandID string) (string, error) {
instanceID, err := context.Identity().ShortInstanceID()
if err != nil {
return "", err
}
return fmt.Sprintf("%s/%s", commandID, instanceID), nil
}
func generateCloudWatchConfigFromPayload(context context.T, parsedMessage messageContracts.SendCommandPayload) (contracts.CloudWatchConfiguration, error) {
cloudWatchOutputEnabled, err := strconv.ParseBool(parsedMessage.CloudWatchOutputEnabled)
cloudWatchConfig := contracts.CloudWatchConfiguration{}
if err != nil || !cloudWatchOutputEnabled {
return cloudWatchConfig, err
}
cloudWatchConfig.LogStreamPrefix, err = generateCloudWatchLogStreamPrefix(context, parsedMessage.CommandID)
if err != nil {
return cloudWatchConfig, err
}
if parsedMessage.CloudWatchLogGroupName != "" {
cloudWatchConfig.LogGroupName = parsedMessage.CloudWatchLogGroupName
} else {
logGroupName := fmt.Sprintf("%s%s", CloudWatchLogGroupNamePrefix, parsedMessage.DocumentName)
cloudWatchConfig.LogGroupName = cleanupLogGroupName(logGroupName)
}
return cloudWatchConfig, nil
}
func cleanupLogGroupName(logGroupName string) string {
// log group pattern referred from below URL
// https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html
if reg, err := regexp.Compile(`[^a-zA-Z0-9_\-/\.#]`); reg != nil && err == nil {
// replace invalid chars with dot(.)
return reg.ReplaceAllString(logGroupName, ".")
}
return logGroupName
}
// ParseSendCommandMessage parses send command message
func ParseSendCommandMessage(context context.T, msg model.InstanceMessage, messagesOrchestrationRootDir string, upstreamService contracts.UpstreamServiceName) (*contracts.DocumentState, error) {
log := context.Log()
commandID, _ := messageContracts.GetCommandID(msg.MessageId)
log.Debug("Processing send command message ", msg.MessageId)
log.Trace("Processing send command payload: ", jsonutil.Indent(msg.Payload))
// parse message to retrieve parameters
var parsedMessage messageContracts.SendCommandPayload
err := json.Unmarshal([]byte(msg.Payload), &parsedMessage)
if err != nil {
errorMsg := fmt.Errorf("encountered error while parsing input - internal error %v", err)
log.Error(errorMsg)
return nil, errorMsg
}
// adapt plugin configuration format from MDS to plugin expected format
s3KeyPrefix := path.Join(parsedMessage.OutputS3KeyPrefix, parsedMessage.CommandID, msg.Destination)
cloudWatchConfig, err := generateCloudWatchConfigFromPayload(context, parsedMessage)
if err != nil {
log.Errorf("encountered error while generating cloudWatch config from send command payload, err: %s", err)
}
messageOrchestrationDirectory := filepath.Join(messagesOrchestrationRootDir, commandID)
documentType := contracts.SendCommand
documentInfo := newDocumentInfo(msg, parsedMessage)
parserInfo := docparser.DocumentParserInfo{
OrchestrationDir: messageOrchestrationDirectory,
S3Bucket: parsedMessage.OutputS3BucketName,
S3Prefix: s3KeyPrefix,
MessageId: documentInfo.MessageID,
DocumentId: documentInfo.DocumentID,
CloudWatchConfig: cloudWatchConfig,
}
docContent := &docparser.DocContent{
SchemaVersion: parsedMessage.DocumentContent.SchemaVersion,
Description: parsedMessage.DocumentContent.Description,
RuntimeConfig: parsedMessage.DocumentContent.RuntimeConfig,
MainSteps: parsedMessage.DocumentContent.MainSteps,
Parameters: parsedMessage.DocumentContent.Parameters}
//Data format persisted in Current Folder is defined by the struct - CommandState
docState, err := docparser.InitializeDocState(context, documentType, docContent, documentInfo, parserInfo, parsedMessage.Parameters)
if err != nil {
return nil, err
}
docState.UpstreamServiceName = upstreamService
parsedMessageContent, _ := jsonutil.Marshal(parsedMessage)
var parsedContentJson *gabs.Container
if parsedContentJson, err = gabs.ParseJSON([]byte(parsedMessageContent)); err != nil {
log.Debugf("Parsed message is in the wrong json format. Error is ", err)
}
// Search for "DocumentContent" > "runtimeConfig" > "aws:cloudWatch" > "properties" which has the cloudwatch
// config file and scrub the credentials, if present
obj := parsedContentJson.Search(documentContent, runtimeConfig, cloudwatchPlugin, properties).String()
if obj != "{}" {
//This will be true only for aws:cloudwatch
stripConfig := strings.Replace(strings.Replace(strings.Replace(obj, "\\t", "", -1), "\\n", "", -1), "\\", "", -1)
stripConfig = strings.TrimSuffix(strings.TrimPrefix(stripConfig, "\""), "\"")
finalLogConfig := logger.PrintCWConfig(stripConfig, log)
// Parameters > properties is another path where the config file is printed
if _, err = parsedContentJson.Set(finalLogConfig, parameters, properties); err != nil {
log.Debug("Error occurred when setting Parameters->properties with scrubbed credentials - ", err)
}
if _, err = parsedContentJson.Set(finalLogConfig, documentContent, runtimeConfig, cloudwatchPlugin, properties); err != nil {
log.Debug("Error occurred when setting aws:cloudWatch->properties with scrubbed credentials - ", err)
}
log.Debug("ParsedMessage is ", parsedContentJson.StringIndent("", " "))
} else {
//For plugins that are not aws:cloudwatch
log.Debug("ParsedMessage is ", jsonutil.Indent(parsedMessageContent))
}
// Check if it is a managed instance and is executing managed instance incompatible AWS SSM public document.
// A few public AWS SSM documents contain code which is not compatible when run on managed instances.
// isManagedInstanceIncompatibleAWSSSMDocument makes sure to find such documents at runtime and replace the incompatible code.
isMI := identity.IsOnPremInstance(context.Identity())
if isMI && contracts.IsManagedInstanceIncompatibleAWSSSMDocument(docState.DocumentInformation.DocumentName) {
log.Debugf("Running incompatible AWS SSM Document %v on managed instance", docState.DocumentInformation.DocumentName)
if err = contracts.RemoveDependencyOnInstanceMetadata(context, &docState); err != nil {
return nil, err
}
}
return &docState, nil
}
// IsValidReplyRequest checks whether the reply is valid and had timed or not
func IsValidReplyRequest(filename string, name contracts.UpstreamServiceName) bool {
splitFileName := strings.Split(filename, "_")
if len(splitFileName) < 2 {
return false
}
timeInFileName := ""
if name == contracts.MessageGatewayService { // MGS uses this format to have proper time based sorting
timeInFileName = splitFileName[0]
} else {
timeInFileName = splitFileName[1]
}
t, _ := time.Parse("2006-01-02T15-04-05", timeInFileName)
curTime := time.Now().UTC()
delta := curTime.Sub(t).Hours()
if delta > documentLevelTimeOutDurationHour {
return false
} else {
return true
}
}
// PrepareReplyPayloadToUpdateDocumentStatus creates the payload object for SendReply based on document status change.
func PrepareReplyPayloadToUpdateDocumentStatus(agentInfo contracts.AgentInfo, documentStatus contracts.ResultStatus, documentTraceOutput string, ableToOpenMGSConnection *bool) (payload messageContracts.SendReplyPayload) {
payload = messageContracts.SendReplyPayload{
AdditionalInfo: contracts.AdditionalInfo{
Agent: agentInfo,
DateTime: times.ToIso8601UTC(times.DefaultClock.Now()),
AbleToOpenMGSConnection: ableToOpenMGSConnection,
},
DocumentStatus: documentStatus,
DocumentTraceOutput: documentTraceOutput,
RuntimeStatus: nil,
}
return
}
// PrepareReplyPayloadFromIntermediatePluginResults parses send reply payload
func PrepareReplyPayloadFromIntermediatePluginResults(log log.T, pluginID string, agentInfo contracts.AgentInfo, outputs map[string]*contracts.PluginResult, ableToOpenMGSConnection *bool) (payload messageContracts.SendReplyPayload) {
status, statusCount, runtimeStatuses, _ := contracts.DocumentResultAggregator(log, pluginID, outputs)
additionalInfo := contracts.AdditionalInfo{
Agent: agentInfo,
DateTime: times.ToIso8601UTC(time.Now()),
RuntimeStatusCounts: statusCount,
AbleToOpenMGSConnection: ableToOpenMGSConnection,
}
payload = messageContracts.SendReplyPayload{
AdditionalInfo: additionalInfo,
DocumentStatus: status,
DocumentTraceOutput: "", // TODO: Fill me appropriately
RuntimeStatus: runtimeStatuses,
}
return
}