-
Notifications
You must be signed in to change notification settings - Fork 9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
resource/aws_kinesis_firehose: add splunk configuration #3117
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -301,6 +301,22 @@ func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.De | |
elasticsearchConfList[0] = elasticsearchConfiguration | ||
d.Set("elasticsearch_configuration", elasticsearchConfList) | ||
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.ElasticsearchDestinationDescription.S3DestinationDescription)) | ||
} else if destination.SplunkDestinationDescription != nil { | ||
d.Set("destination", "splunk") | ||
|
||
splunkConfiguration := map[string]interface{}{ | ||
"hec_acknowledgment_timeout": *destination.SplunkDestinationDescription.HECAcknowledgmentTimeoutInSeconds, | ||
"hec_endpoint": *destination.SplunkDestinationDescription.HECEndpoint, | ||
"hec_endpoint_type": *destination.SplunkDestinationDescription.HECEndpointType, | ||
"hec_token": *destination.SplunkDestinationDescription.HECToken, | ||
"s3_backup_mode": *destination.SplunkDestinationDescription.S3BackupMode, | ||
"retry_duration": *destination.SplunkDestinationDescription.RetryOptions.DurationInSeconds, | ||
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(*destination.SplunkDestinationDescription.CloudWatchLoggingOptions), | ||
} | ||
splunkConfList := make([]map[string]interface{}, 1) | ||
splunkConfList[0] = splunkConfiguration | ||
d.Set("splunk_configuration", splunkConfList) | ||
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.SplunkDestinationDescription.S3DestinationDescription)) | ||
} else if d.Get("destination").(string) == "s3" { | ||
d.Set("destination", "s3") | ||
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.S3DestinationDescription)) | ||
|
@@ -404,9 +420,9 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { | |
}, | ||
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { | ||
value := v.(string) | ||
if value != "s3" && value != "extended_s3" && value != "redshift" && value != "elasticsearch" { | ||
if value != "s3" && value != "extended_s3" && value != "redshift" && value != "elasticsearch" && value != "splunk" { | ||
errors = append(errors, fmt.Errorf( | ||
"%q must be one of 's3', 'extended_s3', 'redshift', 'elasticsearch'", k)) | ||
"%q must be one of 's3', 'extended_s3', 'redshift', 'elasticsearch', 'splunk'", k)) | ||
} | ||
return | ||
}, | ||
|
@@ -653,6 +669,85 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { | |
}, | ||
}, | ||
|
||
"splunk_configuration": { | ||
Type: schema.TypeList, | ||
Optional: true, | ||
MaxItems: 1, | ||
Elem: &schema.Resource{ | ||
Schema: map[string]*schema.Schema{ | ||
"hec_acknowledgment_timeout": { | ||
Type: schema.TypeInt, | ||
Optional: true, | ||
Default: 180, | ||
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a function to make this easier. 😄 |
||
value := v.(int) | ||
if value < 180 || value > 600 { | ||
errors = append(errors, fmt.Errorf( | ||
"%q must be in the range from 180 to 600 seconds.", k)) | ||
} | ||
return | ||
}, | ||
}, | ||
|
||
"hec_endpoint": { | ||
Type: schema.TypeString, | ||
Required: true, | ||
}, | ||
|
||
"hec_endpoint_type": { | ||
Type: schema.TypeString, | ||
Optional: true, | ||
Default: "Raw", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: The SDK provides a constant for |
||
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { | ||
value := v.(string) | ||
if value != "Raw" && value != "Event" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same note about SDK constants here. |
||
errors = append(errors, fmt.Errorf( | ||
"%q must be one of 'Raw', 'Event'", k)) | ||
} | ||
return | ||
}, | ||
}, | ||
|
||
"hec_token": { | ||
Type: schema.TypeString, | ||
Required: true, | ||
}, | ||
|
||
"s3_backup_mode": { | ||
Type: schema.TypeString, | ||
Optional: true, | ||
Default: "FailedEventsOnly", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: SDK constants available for here and validation below:
|
||
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { | ||
value := v.(string) | ||
if value != "FailedEventsOnly" && value != "AllEvents" { | ||
errors = append(errors, fmt.Errorf( | ||
"%q must be one of 'FailedEventsOnly', 'AllEvents'", k)) | ||
} | ||
return | ||
}, | ||
}, | ||
|
||
"retry_duration": { | ||
Type: schema.TypeInt, | ||
Optional: true, | ||
Default: 3600, | ||
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar note as above: |
||
value := v.(int) | ||
if value < 0 || value > 7200 { | ||
errors = append(errors, fmt.Errorf( | ||
"%q must be in the range from 0 to 7200 seconds.", k)) | ||
} | ||
return | ||
}, | ||
}, | ||
|
||
"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(), | ||
|
||
"processing_configuration": processingConfigurationSchema(), | ||
}, | ||
}, | ||
}, | ||
|
||
"arn": { | ||
Type: schema.TypeString, | ||
Optional: true, | ||
|
@@ -1052,6 +1147,62 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest | |
return update, nil | ||
} | ||
|
||
func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.SplunkDestinationConfiguration, error) { | ||
splunkRaw, ok := d.GetOk("splunk_configuration") | ||
if !ok { | ||
return nil, fmt.Errorf("[ERR] Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found") | ||
} | ||
sl := splunkRaw.([]interface{}) | ||
|
||
splunk := sl[0].(map[string]interface{}) | ||
|
||
configuration := &firehose.SplunkDestinationConfiguration{ | ||
HECToken: aws.String(splunk["hec_token"].(string)), | ||
HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)), | ||
HECEndpoint: aws.String(splunk["hec_endpoint"].(string)), | ||
HECAcknowledgmentTimeoutInSeconds: aws.Int64(int64(splunk["hec_acknowledgment_timeout"].(int))), | ||
RetryOptions: extractSplunkRetryOptions(splunk), | ||
S3Configuration: s3Config, | ||
} | ||
|
||
if _, ok := splunk["cloudwatch_logging_options"]; ok { | ||
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(splunk) | ||
} | ||
if s3BackupMode, ok := splunk["s3_backup_mode"]; ok { | ||
configuration.S3BackupMode = aws.String(s3BackupMode.(string)) | ||
} | ||
|
||
return configuration, nil | ||
} | ||
|
||
func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.SplunkDestinationUpdate, error) { | ||
splunkRaw, ok := d.GetOk("splunk_configuration") | ||
if !ok { | ||
return nil, fmt.Errorf("[ERR] Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found") | ||
} | ||
sl := splunkRaw.([]interface{}) | ||
|
||
splunk := sl[0].(map[string]interface{}) | ||
|
||
configuration := &firehose.SplunkDestinationUpdate{ | ||
HECToken: aws.String(splunk["hec_token"].(string)), | ||
HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)), | ||
HECEndpoint: aws.String(splunk["hec_endpoint"].(string)), | ||
HECAcknowledgmentTimeoutInSeconds: aws.Int64(int64(splunk["hec_acknowledgment_timeout"].(int))), | ||
RetryOptions: extractSplunkRetryOptions(splunk), | ||
S3Update: s3Update, | ||
} | ||
|
||
if _, ok := splunk["cloudwatch_logging_options"]; ok { | ||
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(splunk) | ||
} | ||
if s3BackupMode, ok := splunk["s3_backup_mode"]; ok { | ||
configuration.S3BackupMode = aws.String(s3BackupMode.(string)) | ||
} | ||
|
||
return configuration, nil | ||
} | ||
|
||
func extractBufferingHints(es map[string]interface{}) *firehose.ElasticsearchBufferingHints { | ||
bufferingHints := &firehose.ElasticsearchBufferingHints{} | ||
|
||
|
@@ -1085,6 +1236,16 @@ func extractRedshiftRetryOptions(redshift map[string]interface{}) *firehose.Reds | |
return retryOptions | ||
} | ||
|
||
func extractSplunkRetryOptions(splunk map[string]interface{}) *firehose.SplunkRetryOptions { | ||
retryOptions := &firehose.SplunkRetryOptions{} | ||
|
||
if retryDuration, ok := splunk["retry_duration"].(int); ok { | ||
retryOptions.DurationInSeconds = aws.Int64(int64(retryDuration)) | ||
} | ||
|
||
return retryOptions | ||
} | ||
|
||
func extractCopyCommandConfiguration(redshift map[string]interface{}) *firehose.CopyCommand { | ||
cmd := &firehose.CopyCommand{ | ||
DataTableName: aws.String(redshift["data_table_name"].(string)), | ||
|
@@ -1136,12 +1297,18 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta | |
return err | ||
} | ||
createInput.ElasticsearchDestinationConfiguration = esConfig | ||
} else { | ||
} else if d.Get("destination").(string) == "redshift" { | ||
rc, err := createRedshiftConfig(d, s3Config) | ||
if err != nil { | ||
return err | ||
} | ||
createInput.RedshiftDestinationConfiguration = rc | ||
} else if d.Get("destination").(string) == "splunk" { | ||
rc, err := createSplunkConfig(d, s3Config) | ||
if err != nil { | ||
return err | ||
} | ||
createInput.SplunkDestinationConfiguration = rc | ||
} | ||
} | ||
|
||
|
@@ -1258,12 +1425,18 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta | |
return err | ||
} | ||
updateInput.ElasticsearchDestinationUpdate = esUpdate | ||
} else { | ||
} else if d.Get("destination").(string) == "redshift" { | ||
rc, err := updateRedshiftConfig(d, s3Config) | ||
if err != nil { | ||
return err | ||
} | ||
updateInput.RedshiftDestinationUpdate = rc | ||
} else if d.Get("destination").(string) == "splunk" { | ||
rc, err := updateSplunkConfig(d, s3Config) | ||
if err != nil { | ||
return err | ||
} | ||
updateInput.SplunkDestinationUpdate = rc | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR, but we currently are getting crashes dereferencing CloudWatch logging options for other destination types as this information is not always returned by the API. We'll probably want to fix all of them at once.