Skip to content

Commit

Permalink
Allow retries for creating the log group; fixes aws#18
Browse files Browse the repository at this point in the history
  • Loading branch information
PettitWesley committed Oct 20, 2019
1 parent b5dc2e6 commit 855ac9f
Showing 1 changed file with 42 additions and 26 deletions.
68 changes: 42 additions & 26 deletions cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type OutputPlugin struct {
timer *plugins.Timeout
nextLogStreamCleanUpCheckTime time.Time
PluginInstanceID int
logGroupCreated bool
}

// OutputPluginConfig is the input information used by NewOutputPlugin to create a new OutputPlugin
Expand Down Expand Up @@ -136,31 +137,17 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) {
return nil, err
}

if config.AutoCreateGroup {
err = createLogGroup(config.LogGroupName, client)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() != cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
return nil, err
}
logrus.Infof("[cloudwatch %d] Log group %s already exists\n", config.PluginInstanceID, config.LogGroupName)
} else {
return nil, err
}
}
logrus.Infof("[cloudwatch %d] Created log group %s\n", config.PluginInstanceID, config.LogGroupName)
}

return &OutputPlugin{
logGroupName: config.LogGroupName,
logStreamPrefix: config.LogStreamPrefix,
logStreamName: config.LogStreamName,
logKey: config.LogKey,
client: client,
timer: timer,
streams: make(map[string]*logStream),
logGroupName: config.LogGroupName,
logStreamPrefix: config.LogStreamPrefix,
logStreamName: config.LogStreamName,
logKey: config.LogKey,
client: client,
timer: timer,
streams: make(map[string]*logStream),
nextLogStreamCleanUpCheckTime: time.Now().Add(logStreamInactivityCheckInterval),
PluginInstanceID: config.PluginInstanceID,
logGroupCreated: !config.AutoCreateGroup,
}, nil
}

Expand Down Expand Up @@ -192,6 +179,15 @@ func newCloudWatchLogsClient(roleARN string, sess *session.Session, endpoint str
// the return value is one of: FLB_OK, FLB_RETRY
// API Errors lead to an FLB_RETRY, and all other errors are logged, the record is discarded and FLB_OK is returned
func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interface{}, timestamp time.Time) int {
if !output.logGroupCreated {
err := output.createLogGroup()
if err != nil {
logrus.Error(err)
return fluentbit.FLB_ERROR
}
output.logGroupCreated = true
}

data, err := output.processRecord(record)
if err != nil {
logrus.Errorf("[cloudwatch %d] %v\n", output.PluginInstanceID, err)
Expand Down Expand Up @@ -350,12 +346,24 @@ func (output *OutputPlugin) createStream(tag string) (*logStream, error) {
return stream, nil
}

func createLogGroup(name string, client LogsClient) error {
_, err := client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(name),
func (output *OutputPlugin) createLogGroup() error {
_, err := output.client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(output.logGroupName),
})

return err
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() != cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
return err
}
logrus.Infof("[cloudwatch %d] Log group %s already exists\n", output.PluginInstanceID, output.logGroupName)
} else {
return err
}
}
logrus.Infof("[cloudwatch %d] Created log group %s\n", output.PluginInstanceID, output.logGroupName)

return nil
}

// Takes the byte slice and returns a string
Expand Down Expand Up @@ -419,6 +427,14 @@ func logKey(record map[interface{}]interface{}, logKey string) (*interface{}, er

// Flush sends the current buffer of records (for the stream that corresponds with the given tag)
func (output *OutputPlugin) Flush(tag string) error {
if !output.logGroupCreated {
err := output.createLogGroup()
if err != nil {
return err
}
output.logGroupCreated = true
}

output.cleanUpExpiredLogStreams() // will periodically clean up, otherwise is no-op

stream, err := output.getLogStream(tag)
Expand Down

0 comments on commit 855ac9f

Please sign in to comment.