diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3f27ecf86..d74b7bb69 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@
- Migrate `elasticstack_kibana_action_connector` to the Terraform plugin framework ([#1269](https://github.com/elastic/terraform-provider-elasticstack/pull/1269))
- Migrate `elasticstack_elasticsearch_security_role_mapping` resource and data source to Terraform Plugin Framework ([#1279](https://github.com/elastic/terraform-provider-elasticstack/pull/1279))
- Add support for `inactivity_timeout` in `elasticstack_fleet_agent_policy` ([#641](https://github.com/elastic/terraform-provider-elasticstack/issues/641))
+- Add support for `kafka` output types in `elasticstack_fleet_output` ([#1302](https://github.com/elastic/terraform-provider-elasticstack/pull/1302))
- Add support for `prevent_initial_backfill` to `elasticstack_kibana_slo` ([#1071](https://github.com/elastic/terraform-provider-elasticstack/pull/1071))
- [Refactor] Regenerate the SLO client using the current OpenAPI spec ([#1303](https://github.com/elastic/terraform-provider-elasticstack/pull/1303))
- Add support for `data_view_id` in the `elasticstack_kibana_slo` resource ([#1305](https://github.com/elastic/terraform-provider-elasticstack/pull/1305))
diff --git a/docs/resources/fleet_output.md b/docs/resources/fleet_output.md
index 0af674f1c..23b909903 100644
--- a/docs/resources/fleet_output.md
+++ b/docs/resources/fleet_output.md
@@ -1,4 +1,3 @@
-
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "elasticstack_fleet_output Resource - terraform-provider-elasticstack"
@@ -13,6 +12,8 @@ Creates a new Fleet Output.
## Example Usage
+### Basic output
+
```terraform
provider "elasticstack" {
kibana {}
@@ -32,6 +33,168 @@ resource "elasticstack_fleet_output" "test_output" {
}
```
+### Basic Kafka output
+
+```terraform
+terraform {
+ required_providers {
+ elasticstack = {
+ source = "elastic/elasticstack"
+ version = "~> 0.11"
+ }
+ }
+}
+
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+# Basic Kafka Fleet Output
+resource "elasticstack_fleet_output" "kafka_basic" {
+ name = "Basic Kafka Output"
+ output_id = "kafka-basic-output"
+ type = "kafka"
+ default_integrations = false
+ default_monitoring = false
+
+ hosts = [
+ "kafka:9092"
+ ]
+
+ # Basic Kafka configuration
+ kafka = {
+ auth_type = "user_pass"
+ username = "kafka_user"
+ password = "kafka_password"
+ topic = "elastic-beats"
+ partition = "hash"
+ compression = "gzip"
+ required_acks = 1
+
+ headers = [
+ {
+ key = "environment"
+ value = "production"
+ }
+ ]
+ }
+}
+```
+
+### Advanced Kafka output
+
+```terraform
+terraform {
+ required_providers {
+ elasticstack = {
+ source = "elastic/elasticstack"
+ version = "~> 0.11"
+ }
+ }
+}
+
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+# Advanced Kafka Fleet Output with SSL authentication
+resource "elasticstack_fleet_output" "kafka_advanced" {
+ name = "Advanced Kafka Output"
+ output_id = "kafka-advanced-output"
+ type = "kafka"
+ default_integrations = false
+ default_monitoring = false
+
+ hosts = [
+ "kafka1:9092",
+ "kafka2:9092",
+ "kafka3:9092"
+ ]
+
+ # Advanced Kafka configuration
+ kafka = {
+ auth_type = "ssl"
+ topic = "elastic-logs"
+ partition = "round_robin"
+ compression = "snappy"
+ required_acks = -1
+ broker_timeout = 10
+ timeout = 30
+ version = "2.6.0"
+ client_id = "elastic-beats-client"
+
+ # Custom headers for message metadata
+ headers = [
+ {
+ key = "datacenter"
+ value = "us-west-1"
+ },
+ {
+ key = "service"
+ value = "beats"
+ },
+ {
+ key = "environment"
+ value = "production"
+ }
+ ]
+
+ # Hash-based partitioning
+ hash = {
+ hash = "host.name"
+ random = false
+ }
+
+ # SASL configuration
+ sasl = {
+ mechanism = "SCRAM-SHA-256"
+ }
+ }
+
+ # SSL configuration (reusing common SSL block)
+ ssl = {
+ certificate_authorities = [
+ file("${path.module}/ca.crt")
+ ]
+ certificate = file("${path.module}/client.crt")
+ key = file("${path.module}/client.key")
+ }
+
+ # Additional YAML configuration for advanced settings
+ config_yaml = yamlencode({
+ "ssl.verification_mode" = "full"
+ "ssl.supported_protocols" = ["TLSv1.2", "TLSv1.3"]
+ "max.message.bytes" = 1000000
+ })
+}
+
+# Example showing round-robin partitioning with event grouping
+resource "elasticstack_fleet_output" "kafka_round_robin" {
+ name = "Kafka Round Robin Output"
+ output_id = "kafka-round-robin-output"
+ type = "kafka"
+ default_integrations = false
+ default_monitoring = false
+
+ hosts = ["kafka:9092"]
+
+ kafka = {
+ auth_type = "none"
+ topic = "elastic-metrics"
+ partition = "round_robin"
+ compression = "lz4"
+
+ round_robin = [
+ {
+ group_events = 100
+ }
+ ]
+ }
+}
+```
+
## Schema
@@ -48,14 +211,83 @@ resource "elasticstack_fleet_output" "test_output" {
- `default_integrations` (Boolean) Make this output the default for agent integrations.
- `default_monitoring` (Boolean) Make this output the default for agent monitoring.
- `hosts` (List of String) A list of hosts.
+- `kafka` (Attributes) Kafka-specific configuration. (see [below for nested schema](#nestedatt--kafka))
- `output_id` (String) Unique identifier of the output.
-- `ssl` (Block List) SSL configuration. (see [below for nested schema](#nestedblock--ssl))
+- `ssl` (Attributes) SSL configuration. (see [below for nested schema](#nestedatt--ssl))
### Read-Only
- `id` (String) The ID of this resource.
-
+
+### Nested Schema for `kafka`
+
+Optional:
+
+- `auth_type` (String) Authentication type for Kafka output.
+- `broker_timeout` (Number) Kafka broker timeout.
+- `client_id` (String) Kafka client ID.
+- `compression` (String) Compression type for Kafka output.
+- `compression_level` (Number) Compression level for Kafka output.
+- `connection_type` (String) Connection type for Kafka output.
+- `hash` (Attributes) Hash configuration for Kafka partition. (see [below for nested schema](#nestedatt--kafka--hash))
+- `headers` (Attributes List) Headers for Kafka messages. (see [below for nested schema](#nestedatt--kafka--headers))
+- `key` (String) Key field for Kafka messages.
+- `partition` (String) Partition strategy for Kafka output.
+- `password` (String, Sensitive) Password for Kafka authentication.
+- `random` (Attributes) Random configuration for Kafka partition. (see [below for nested schema](#nestedatt--kafka--random))
+- `required_acks` (Number) Number of acknowledgments required for Kafka output.
+- `round_robin` (Attributes) Round robin configuration for Kafka partition. (see [below for nested schema](#nestedatt--kafka--round_robin))
+- `sasl` (Attributes) SASL configuration for Kafka authentication. (see [below for nested schema](#nestedatt--kafka--sasl))
+- `timeout` (Number) Timeout for Kafka output.
+- `topic` (String) Kafka topic.
+- `username` (String) Username for Kafka authentication.
+- `version` (String) Kafka version.
+
+
+### Nested Schema for `kafka.hash`
+
+Optional:
+
+- `hash` (String) Hash field.
+- `random` (Boolean) Use random hash.
+
+
+
+### Nested Schema for `kafka.headers`
+
+Required:
+
+- `key` (String) Header key.
+- `value` (String) Header value.
+
+
+
+### Nested Schema for `kafka.random`
+
+Optional:
+
+- `group_events` (Number) Number of events to group.
+
+
+
+### Nested Schema for `kafka.round_robin`
+
+Optional:
+
+- `group_events` (Number) Number of events to group.
+
+
+
+### Nested Schema for `kafka.sasl`
+
+Optional:
+
+- `mechanism` (String) SASL mechanism.
+
+
+
+
### Nested Schema for `ssl`
Required:
diff --git a/examples/resources/elasticstack_fleet_output/kafka_advanced.tf b/examples/resources/elasticstack_fleet_output/kafka_advanced.tf
new file mode 100644
index 000000000..3b093a8dd
--- /dev/null
+++ b/examples/resources/elasticstack_fleet_output/kafka_advanced.tf
@@ -0,0 +1,108 @@
+terraform {
+ required_providers {
+ elasticstack = {
+ source = "elastic/elasticstack"
+ version = "~> 0.11"
+ }
+ }
+}
+
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+# Advanced Kafka Fleet Output with SSL authentication
+resource "elasticstack_fleet_output" "kafka_advanced" {
+ name = "Advanced Kafka Output"
+ output_id = "kafka-advanced-output"
+ type = "kafka"
+ default_integrations = false
+ default_monitoring = false
+
+ hosts = [
+ "kafka1:9092",
+ "kafka2:9092",
+ "kafka3:9092"
+ ]
+
+ # Advanced Kafka configuration
+ kafka = {
+ auth_type = "ssl"
+ topic = "elastic-logs"
+ partition = "round_robin"
+ compression = "snappy"
+ required_acks = -1
+ broker_timeout = 10
+ timeout = 30
+ version = "2.6.0"
+ client_id = "elastic-beats-client"
+
+ # Custom headers for message metadata
+ headers = [
+ {
+ key = "datacenter"
+ value = "us-west-1"
+ },
+ {
+ key = "service"
+ value = "beats"
+ },
+ {
+ key = "environment"
+ value = "production"
+ }
+ ]
+
+ # Hash-based partitioning
+ hash = {
+ hash = "host.name"
+ random = false
+ }
+
+ # SASL configuration
+ sasl = {
+ mechanism = "SCRAM-SHA-256"
+ }
+ }
+
+ # SSL configuration (reusing common SSL block)
+ ssl = {
+ certificate_authorities = [
+ file("${path.module}/ca.crt")
+ ]
+ certificate = file("${path.module}/client.crt")
+ key = file("${path.module}/client.key")
+ }
+
+ # Additional YAML configuration for advanced settings
+ config_yaml = yamlencode({
+ "ssl.verification_mode" = "full"
+ "ssl.supported_protocols" = ["TLSv1.2", "TLSv1.3"]
+ "max.message.bytes" = 1000000
+ })
+}
+
+# Example showing round-robin partitioning with event grouping
+resource "elasticstack_fleet_output" "kafka_round_robin" {
+ name = "Kafka Round Robin Output"
+ output_id = "kafka-round-robin-output"
+ type = "kafka"
+ default_integrations = false
+ default_monitoring = false
+
+ hosts = ["kafka:9092"]
+
+ kafka = {
+ auth_type = "none"
+ topic = "elastic-metrics"
+ partition = "round_robin"
+ compression = "lz4"
+
+ round_robin = [
+ {
+ group_events = 100
+ }
+ ]
+ }
+}
diff --git a/examples/resources/elasticstack_fleet_output/kafka_basic.tf b/examples/resources/elasticstack_fleet_output/kafka_basic.tf
new file mode 100644
index 000000000..b1efba34e
--- /dev/null
+++ b/examples/resources/elasticstack_fleet_output/kafka_basic.tf
@@ -0,0 +1,44 @@
+terraform {
+ required_providers {
+ elasticstack = {
+ source = "elastic/elasticstack"
+ version = "~> 0.11"
+ }
+ }
+}
+
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+# Basic Kafka Fleet Output
+resource "elasticstack_fleet_output" "kafka_basic" {
+ name = "Basic Kafka Output"
+ output_id = "kafka-basic-output"
+ type = "kafka"
+ default_integrations = false
+ default_monitoring = false
+
+ hosts = [
+ "kafka:9092"
+ ]
+
+ # Basic Kafka configuration
+ kafka = {
+ auth_type = "user_pass"
+ username = "kafka_user"
+ password = "kafka_password"
+ topic = "elastic-beats"
+ partition = "hash"
+ compression = "gzip"
+ required_acks = 1
+
+ headers = [
+ {
+ key = "environment"
+ value = "production"
+ }
+ ]
+ }
+}
diff --git a/generated/kbapi/kibana.gen.go b/generated/kbapi/kibana.gen.go
index 97e146c28..3c947f747 100644
--- a/generated/kbapi/kibana.gen.go
+++ b/generated/kbapi/kibana.gen.go
@@ -20779,9 +20779,9 @@ type NewOutputKafka struct {
CaTrustedFingerprint *string `json:"ca_trusted_fingerprint,omitempty"`
ClientId *string `json:"client_id,omitempty"`
Compression *NewOutputKafkaCompression `json:"compression,omitempty"`
- CompressionLevel interface{} `json:"compression_level"`
+ CompressionLevel *int `json:"compression_level,omitempty"`
ConfigYaml *string `json:"config_yaml,omitempty"`
- ConnectionType interface{} `json:"connection_type"`
+ ConnectionType *string `json:"connection_type,omitempty"`
Hash *struct {
Hash *string `json:"hash,omitempty"`
Random *bool `json:"random,omitempty"`
@@ -20799,7 +20799,7 @@ type NewOutputKafka struct {
Key *string `json:"key,omitempty"`
Name string `json:"name"`
Partition *NewOutputKafkaPartition `json:"partition,omitempty"`
- Password interface{} `json:"password"`
+ Password *string `json:"password,omitempty"`
ProxyId *string `json:"proxy_id,omitempty"`
Random *struct {
GroupEvents *float32 `json:"group_events,omitempty"`
@@ -20822,7 +20822,7 @@ type NewOutputKafka struct {
Timeout *float32 `json:"timeout,omitempty"`
Topic *string `json:"topic,omitempty"`
Type NewOutputKafkaType `json:"type"`
- Username interface{} `json:"username"`
+ Username *string `json:"username,omitempty"`
Version *string `json:"version,omitempty"`
WriteToLogsStreams *bool `json:"write_to_logs_streams,omitempty"`
}
@@ -21082,9 +21082,9 @@ type OutputKafka struct {
CaTrustedFingerprint *string `json:"ca_trusted_fingerprint,omitempty"`
ClientId *string `json:"client_id,omitempty"`
Compression *OutputKafkaCompression `json:"compression,omitempty"`
- CompressionLevel interface{} `json:"compression_level"`
+ CompressionLevel *int `json:"compression_level,omitempty"`
ConfigYaml *string `json:"config_yaml,omitempty"`
- ConnectionType interface{} `json:"connection_type"`
+ ConnectionType *string `json:"connection_type,omitempty"`
Hash *OutputKafka_Hash `json:"hash,omitempty"`
Headers *[]OutputKafka_Headers_Item `json:"headers,omitempty"`
Hosts []string `json:"hosts"`
@@ -21096,7 +21096,7 @@ type OutputKafka struct {
Key *string `json:"key,omitempty"`
Name string `json:"name"`
Partition *OutputKafkaPartition `json:"partition,omitempty"`
- Password interface{} `json:"password"`
+ Password *string `json:"password,omitempty"`
ProxyId *string `json:"proxy_id,omitempty"`
Random *OutputKafka_Random `json:"random,omitempty"`
RequiredAcks *OutputKafkaRequiredAcks `json:"required_acks,omitempty"`
@@ -21108,7 +21108,7 @@ type OutputKafka struct {
Timeout *float32 `json:"timeout,omitempty"`
Topic *string `json:"topic,omitempty"`
Type OutputKafkaType `json:"type"`
- Username interface{} `json:"username"`
+ Username *string `json:"username,omitempty"`
Version *string `json:"version,omitempty"`
WriteToLogsStreams *bool `json:"write_to_logs_streams,omitempty"`
AdditionalProperties map[string]interface{} `json:"-"`
@@ -23771,9 +23771,9 @@ type UpdateOutputKafka struct {
CaTrustedFingerprint *string `json:"ca_trusted_fingerprint,omitempty"`
ClientId *string `json:"client_id,omitempty"`
Compression *UpdateOutputKafkaCompression `json:"compression,omitempty"`
- CompressionLevel interface{} `json:"compression_level"`
+ CompressionLevel *int `json:"compression_level,omitempty"`
ConfigYaml *string `json:"config_yaml,omitempty"`
- ConnectionType interface{} `json:"connection_type"`
+ ConnectionType *string `json:"connection_type,omitempty"`
Hash *struct {
Hash *string `json:"hash,omitempty"`
Random *bool `json:"random,omitempty"`
@@ -23790,7 +23790,7 @@ type UpdateOutputKafka struct {
Key *string `json:"key,omitempty"`
Name string `json:"name"`
Partition *UpdateOutputKafkaPartition `json:"partition,omitempty"`
- Password interface{} `json:"password"`
+ Password *string `json:"password,omitempty"`
ProxyId *string `json:"proxy_id,omitempty"`
Random *struct {
GroupEvents *float32 `json:"group_events,omitempty"`
@@ -23813,7 +23813,7 @@ type UpdateOutputKafka struct {
Timeout *float32 `json:"timeout,omitempty"`
Topic *string `json:"topic,omitempty"`
Type *UpdateOutputKafkaType `json:"type,omitempty"`
- Username interface{} `json:"username"`
+ Username *string `json:"username,omitempty"`
Version *string `json:"version,omitempty"`
WriteToLogsStreams *bool `json:"write_to_logs_streams,omitempty"`
}
@@ -41967,9 +41967,11 @@ func (a OutputKafka) MarshalJSON() ([]byte, error) {
}
}
- object["compression_level"], err = json.Marshal(a.CompressionLevel)
- if err != nil {
- return nil, fmt.Errorf("error marshaling 'compression_level': %w", err)
+ if a.CompressionLevel != nil {
+ object["compression_level"], err = json.Marshal(a.CompressionLevel)
+ if err != nil {
+ return nil, fmt.Errorf("error marshaling 'compression_level': %w", err)
+ }
}
if a.ConfigYaml != nil {
@@ -41979,9 +41981,11 @@ func (a OutputKafka) MarshalJSON() ([]byte, error) {
}
}
- object["connection_type"], err = json.Marshal(a.ConnectionType)
- if err != nil {
- return nil, fmt.Errorf("error marshaling 'connection_type': %w", err)
+ if a.ConnectionType != nil {
+ object["connection_type"], err = json.Marshal(a.ConnectionType)
+ if err != nil {
+ return nil, fmt.Errorf("error marshaling 'connection_type': %w", err)
+ }
}
if a.Hash != nil {
@@ -42057,9 +42061,11 @@ func (a OutputKafka) MarshalJSON() ([]byte, error) {
}
}
- object["password"], err = json.Marshal(a.Password)
- if err != nil {
- return nil, fmt.Errorf("error marshaling 'password': %w", err)
+ if a.Password != nil {
+ object["password"], err = json.Marshal(a.Password)
+ if err != nil {
+ return nil, fmt.Errorf("error marshaling 'password': %w", err)
+ }
}
if a.ProxyId != nil {
@@ -42137,9 +42143,11 @@ func (a OutputKafka) MarshalJSON() ([]byte, error) {
return nil, fmt.Errorf("error marshaling 'type': %w", err)
}
- object["username"], err = json.Marshal(a.Username)
- if err != nil {
- return nil, fmt.Errorf("error marshaling 'username': %w", err)
+ if a.Username != nil {
+ object["username"], err = json.Marshal(a.Username)
+ if err != nil {
+ return nil, fmt.Errorf("error marshaling 'username': %w", err)
+ }
}
if a.Version != nil {
@@ -51569,7 +51577,7 @@ func (t SLOsIndicatorPropertiesTimesliceMetric_Params_Metric_Metrics_Item) AsSLO
// FromSLOsTimesliceMetricBasicMetricWithField overwrites any union data inside the SLOsIndicatorPropertiesTimesliceMetric_Params_Metric_Metrics_Item as the provided SLOsTimesliceMetricBasicMetricWithField
func (t *SLOsIndicatorPropertiesTimesliceMetric_Params_Metric_Metrics_Item) FromSLOsTimesliceMetricBasicMetricWithField(v SLOsTimesliceMetricBasicMetricWithField) error {
- v.Aggregation = "max"
+ v.Aggregation = "avg"
b, err := json.Marshal(v)
t.union = b
return err
@@ -51577,7 +51585,7 @@ func (t *SLOsIndicatorPropertiesTimesliceMetric_Params_Metric_Metrics_Item) From
// MergeSLOsTimesliceMetricBasicMetricWithField performs a merge with any union data inside the SLOsIndicatorPropertiesTimesliceMetric_Params_Metric_Metrics_Item, using the provided SLOsTimesliceMetricBasicMetricWithField
func (t *SLOsIndicatorPropertiesTimesliceMetric_Params_Metric_Metrics_Item) MergeSLOsTimesliceMetricBasicMetricWithField(v SLOsTimesliceMetricBasicMetricWithField) error {
- v.Aggregation = "max"
+ v.Aggregation = "avg"
b, err := json.Marshal(v)
if err != nil {
return err
@@ -51658,10 +51666,10 @@ func (t SLOsIndicatorPropertiesTimesliceMetric_Params_Metric_Metrics_Item) Value
return nil, err
}
switch discriminator {
+ case "avg":
+ return t.AsSLOsTimesliceMetricBasicMetricWithField()
case "doc_count":
return t.AsSLOsTimesliceMetricDocCountMetric()
- case "max":
- return t.AsSLOsTimesliceMetricBasicMetricWithField()
case "percentile":
return t.AsSLOsTimesliceMetricPercentileMetric()
default:
diff --git a/generated/kbapi/transform_schema.go b/generated/kbapi/transform_schema.go
index 20e5ea8e2..009fcb3e8 100644
--- a/generated/kbapi/transform_schema.go
+++ b/generated/kbapi/transform_schema.go
@@ -926,10 +926,11 @@ func transformFleetPaths(schema *Schema) {
for _, name := range []string{"output", "new_output", "update_output"} {
// Ref each index in the anyOf union
+ kafkaComponent := fmt.Sprintf("%s_kafka", name)
schema.Components.CreateRef(schema, fmt.Sprintf("%s_elasticsearch", name), fmt.Sprintf("schemas.%s_union.anyOf.0", name))
schema.Components.CreateRef(schema, fmt.Sprintf("%s_remote_elasticsearch", name), fmt.Sprintf("schemas.%s_union.anyOf.1", name))
schema.Components.CreateRef(schema, fmt.Sprintf("%s_logstash", name), fmt.Sprintf("schemas.%s_union.anyOf.2", name))
- schema.Components.CreateRef(schema, fmt.Sprintf("%s_kafka", name), fmt.Sprintf("schemas.%s_union.anyOf.3", name))
+ schema.Components.CreateRef(schema, kafkaComponent, fmt.Sprintf("schemas.%s_union.anyOf.3", name))
// Extract child structs
for _, typ := range []string{"elasticsearch", "remote_elasticsearch", "logstash", "kafka"} {
@@ -954,10 +955,24 @@ func transformFleetPaths(schema *Schema) {
- not: {}
*/
- props := schema.Components.MustGetMap(fmt.Sprintf("schemas.%s_kafka.properties", name))
- for _, key := range []string{"compression_level", "connection_type", "password", "username"} {
- props.Set(key, Map{})
+ // https://github.com/elastic/kibana/issues/197153
+ kafkaRequiredName := fmt.Sprintf("schemas.%s.required", kafkaComponent)
+ props := schema.Components.MustGetMap(fmt.Sprintf("schemas.%s.properties", kafkaComponent))
+ required := schema.Components.MustGetSlice(kafkaRequiredName)
+ for key, apiType := range map[string]string{"compression_level": "integer", "connection_type": "string", "password": "string", "username": "string"} {
+ props.Set(key, Map{
+ "type": apiType,
+ })
+ required = slices.DeleteFunc(required, func(item any) bool {
+ itemStr, ok := item.(string)
+ if !ok {
+ return false
+ }
+
+ return itemStr == key
+ })
}
+ schema.Components.Set(kafkaRequiredName, required)
}
// Add the missing discriminator to the response union
diff --git a/internal/fleet/output/acc_test.go b/internal/fleet/output/acc_test.go
new file mode 100644
index 000000000..cd87c6b6d
--- /dev/null
+++ b/internal/fleet/output/acc_test.go
@@ -0,0 +1,553 @@
+package output_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/elastic/terraform-provider-elasticstack/internal/acctest"
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients"
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients/fleet"
+ "github.com/elastic/terraform-provider-elasticstack/internal/fleet/output"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/elastic/terraform-provider-elasticstack/internal/versionutils"
+ "github.com/hashicorp/go-version"
+ sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
+ "github.com/hashicorp/terraform-plugin-testing/helper/resource"
+ "github.com/hashicorp/terraform-plugin-testing/terraform"
+)
+
+var minVersionOutput = version.Must(version.NewVersion("8.6.0"))
+
+func TestAccResourceOutputElasticsearchFromSDK(t *testing.T) {
+ policyName := sdkacctest.RandString(22)
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceOutputDestroy,
+ Steps: []resource.TestStep{
+ {
+ ExternalProviders: map[string]resource.ExternalProvider{
+ "elasticstack": {
+ Source: "elastic/elasticstack",
+ VersionConstraint: "0.11.7",
+ },
+ },
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputCreateElasticsearch(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Elasticsearch Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
+ ),
+ },
+ {
+ ProtoV6ProviderFactories: acctest.Providers,
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputCreateElasticsearch(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Elasticsearch Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
+ ),
+ },
+ },
+ })
+}
+
+func TestAccResourceOutputElasticsearch(t *testing.T) {
+ policyName := sdkacctest.RandString(22)
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceOutputDestroy,
+ ProtoV6ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputCreateElasticsearch(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Elasticsearch Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
+ ),
+ },
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputUpdateElasticsearch(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Updated Elasticsearch Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
+ ),
+ },
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputUpdateElasticsearch(policyName),
+ ResourceName: "elasticstack_fleet_output.test_output",
+ ImportState: true,
+ ImportStateVerify: true,
+ },
+ },
+ })
+}
+
+func TestAccResourceOutputLogstashFromSDK(t *testing.T) {
+ policyName := sdkacctest.RandString(22)
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceOutputDestroy,
+ Steps: []resource.TestStep{
+ {
+ ExternalProviders: map[string]resource.ExternalProvider{
+ "elasticstack": {
+ Source: "elastic/elasticstack",
+ VersionConstraint: "0.11.7",
+ },
+ },
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputCreateLogstash(policyName, true),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Logstash Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate_authorities.0", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.key", "placeholder"),
+ ),
+ },
+ {
+ ProtoV6ProviderFactories: acctest.Providers,
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputCreateLogstash(policyName, false),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Logstash Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.certificate_authorities.0", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.certificate", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.key", "placeholder"),
+ ),
+ },
+ },
+ })
+}
+
+func TestAccResourceOutputLogstash(t *testing.T) {
+ policyName := sdkacctest.RandString(22)
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceOutputDestroy,
+ ProtoV6ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputCreateLogstash(policyName, false),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Logstash Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.certificate_authorities.0", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.certificate", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.key", "placeholder"),
+ ),
+ },
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
+ Config: testAccResourceOutputUpdateLogstash(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Updated Logstash Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.certificate_authorities.0", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.certificate", "placeholder"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.key", "placeholder"),
+ ),
+ },
+ },
+ })
+}
+
+func TestAccResourceOutputKafka(t *testing.T) {
+ policyName := sdkacctest.RandString(22)
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceOutputDestroy,
+ ProtoV6ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(output.MinVersionOutputKafka),
+ Config: testAccResourceOutputCreateKafka(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Kafka Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-kafka-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "kafka"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "kafka:9092"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.auth_type", "none"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.topic", "beats"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.partition", "hash"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.compression", "gzip"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.compression_level", "6"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.connection_type", "plaintext"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.required_acks", "1"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.headers.0.key", "environment"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.headers.0.value", "test"),
+ ),
+ },
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(output.MinVersionOutputKafka),
+ Config: testAccResourceOutputUpdateKafka(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Updated Kafka Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-kafka-output", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "kafka"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "kafka:9092"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.auth_type", "none"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.topic", "logs"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.partition", "round_robin"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.compression", "snappy"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.connection_type", "encryption"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.required_acks", "-1"),
+ ),
+ },
+ },
+ })
+}
+
+func TestAccResourceOutputKafkaComplex(t *testing.T) {
+ policyName := sdkacctest.RandString(22)
+
+ resource.Test(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceOutputDestroy,
+ ProtoV6ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ SkipFunc: versionutils.CheckIfVersionIsUnsupported(output.MinVersionOutputKafka),
+ Config: testAccResourceOutputCreateKafkaComplex(policyName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Complex Kafka Output %s", policyName)),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "kafka"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.auth_type", "none"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.topic", "complex-topic"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.partition", "hash"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.compression", "lz4"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.required_acks", "0"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.broker_timeout", "10"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.timeout", "30"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.version", "2.6.0"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.headers.0.key", "datacenter"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.headers.0.value", "us-west-1"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.headers.1.key", "service"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.headers.1.value", "beats"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.hash.hash", "event.hash"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.hash.random", "false"),
+ resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "kafka.sasl.mechanism", "SCRAM-SHA-256"),
+ ),
+ },
+ },
+ })
+}
+
+func testAccResourceOutputCreateElasticsearch(id string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+resource "elasticstack_fleet_output" "test_output" {
+ name = "Elasticsearch Output %s"
+ output_id = "%s-elasticsearch-output"
+ type = "elasticsearch"
+ config_yaml = yamlencode({
+ "ssl.verification_mode" : "none"
+ })
+ default_integrations = false
+ default_monitoring = false
+ hosts = [
+ "https://elasticsearch:9200"
+ ]
+}
+`, id, id)
+}
+
+func testAccResourceOutputUpdateElasticsearch(id string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+resource "elasticstack_fleet_output" "test_output" {
+ name = "Updated Elasticsearch Output %s"
+ output_id = "%s-elasticsearch-output"
+ type = "elasticsearch"
+ config_yaml = yamlencode({
+ "ssl.verification_mode" : "none"
+ })
+ default_integrations = false
+ default_monitoring = false
+ hosts = [
+ "https://elasticsearch:9200"
+ ]
+}
+`, id, id)
+}
+
+func testAccResourceOutputCreateLogstash(id string, forSDK bool) string {
+ sslInfix := ""
+ if !forSDK {
+ sslInfix = "="
+ }
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+resource "elasticstack_fleet_output" "test_output" {
+ name = "Logstash Output %s"
+ type = "logstash"
+ output_id = "%s-logstash-output"
+ config_yaml = yamlencode({
+ "ssl.verification_mode" : "none"
+ })
+ default_integrations = false
+ default_monitoring = false
+ hosts = [
+ "logstash:5044"
+ ]
+ ssl %s {
+ certificate_authorities = ["placeholder"]
+ certificate = "placeholder"
+ key = "placeholder"
+ }
+}
+`, id, id, sslInfix)
+}
+
+func testAccResourceOutputUpdateLogstash(id string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+resource "elasticstack_fleet_output" "test_output" {
+ name = "Updated Logstash Output %s"
+ output_id = "%s-logstash-output"
+ type = "logstash"
+ config_yaml = yamlencode({
+ "ssl.verification_mode" : "none"
+ })
+ default_integrations = false
+ default_monitoring = false
+ hosts = [
+ "logstash:5044"
+ ]
+ ssl = {
+ certificate_authorities = ["placeholder"]
+ certificate = "placeholder"
+ key = "placeholder"
+ }
+}
+`, id, id)
+}
+
+func testAccResourceOutputCreateKafka(id string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+resource "elasticstack_fleet_output" "test_output" {
+ name = "Kafka Output %s"
+ output_id = "%s-kafka-output"
+ type = "kafka"
+ config_yaml = yamlencode({
+ "ssl.verification_mode" : "none"
+ })
+ default_integrations = false
+ default_monitoring = false
+ hosts = [
+ "kafka:9092"
+ ]
+
+ # Kafka-specific configuration
+ kafka = {
+ auth_type = "none"
+ topic = "beats"
+ partition = "hash"
+ compression = "gzip"
+ compression_level = 6
+ connection_type = "plaintext"
+ required_acks = 1
+
+ headers = [{
+ key = "environment"
+ value = "test"
+ }]
+ }
+}
+`, id, id)
+}
+
+func testAccResourceOutputUpdateKafka(id string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+resource "elasticstack_fleet_output" "test_output" {
+ name = "Updated Kafka Output %s"
+ output_id = "%s-kafka-output"
+ type = "kafka"
+ config_yaml = yamlencode({
+ "ssl.verification_mode" : "none"
+ })
+ default_integrations = false
+ default_monitoring = false
+ hosts = [
+ "kafka:9092"
+ ]
+
+ # Updated Kafka-specific configuration
+ kafka = {
+ auth_type = "none"
+ topic = "logs"
+ partition = "round_robin"
+ compression = "snappy"
+ connection_type = "encryption"
+ required_acks = -1
+ }
+}
+`, id, id)
+}
+
+func testAccResourceOutputCreateKafkaComplex(id string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+ kibana {}
+}
+
+resource "elasticstack_fleet_output" "test_output" {
+ name = "Complex Kafka Output %s"
+ output_id = "%s-kafka-complex-output"
+ type = "kafka"
+ config_yaml = yamlencode({
+ "ssl.verification_mode" : "none"
+ })
+ default_integrations = false
+ default_monitoring = false
+ hosts = [
+ "kafka1:9092",
+ "kafka2:9092",
+ "kafka3:9092"
+ ]
+
+ # Complex Kafka configuration showcasing all options
+ kafka = {
+ auth_type = "none"
+ topic = "complex-topic"
+ partition = "hash"
+ compression = "lz4"
+ connection_type = "encryption"
+ required_acks = 0
+ broker_timeout = 10
+ timeout = 30
+ version = "2.6.0"
+
+ headers = [
+ {
+ key = "datacenter"
+ value = "us-west-1"
+ },
+ {
+ key = "service"
+ value = "beats"
+ }
+ ]
+
+ hash = {
+ hash = "event.hash"
+ random = false
+ }
+
+ sasl = {
+ mechanism = "SCRAM-SHA-256"
+ }
+ }
+}
+`, id, id)
+}
+
+func checkResourceOutputDestroy(s *terraform.State) error {
+ client, err := clients.NewAcceptanceTestingClient()
+ if err != nil {
+ return err
+ }
+
+ for _, rs := range s.RootModule().Resources {
+ if rs.Type != "elasticstack_fleet_output" {
+ continue
+ }
+
+ fleetClient, err := client.GetFleetClient()
+ if err != nil {
+ return err
+ }
+ output, diags := fleet.GetOutput(context.Background(), fleetClient, rs.Primary.ID)
+ if diags.HasError() {
+ return utils.FwDiagsAsError(diags)
+ }
+ if output != nil {
+ return fmt.Errorf("output id=%v still exists, but it should have been removed", rs.Primary.ID)
+ }
+ }
+ return nil
+}
diff --git a/internal/fleet/output/create.go b/internal/fleet/output/create.go
index b4f0c7858..19e59b145 100644
--- a/internal/fleet/output/create.go
+++ b/internal/fleet/output/create.go
@@ -22,7 +22,7 @@ func (r *outputResource) Create(ctx context.Context, req resource.CreateRequest,
return
}
- body, diags := planModel.toAPICreateModel(ctx)
+ body, diags := planModel.toAPICreateModel(ctx, r.client)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
diff --git a/internal/fleet/output/models.go b/internal/fleet/output/models.go
index 2bece8c9f..3c1847368 100644
--- a/internal/fleet/output/models.go
+++ b/internal/fleet/output/models.go
@@ -5,9 +5,9 @@ import (
"fmt"
"github.com/elastic/terraform-provider-elasticstack/generated/kbapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/hashicorp/terraform-plugin-framework/diag"
- "github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/types"
)
@@ -21,14 +21,9 @@ type outputModel struct {
CaTrustedFingerprint types.String `tfsdk:"ca_trusted_fingerprint"`
DefaultIntegrations types.Bool `tfsdk:"default_integrations"`
DefaultMonitoring types.Bool `tfsdk:"default_monitoring"`
- Ssl types.List `tfsdk:"ssl"` //> outputSslModel
ConfigYaml types.String `tfsdk:"config_yaml"`
-}
-
-type outputSslModel struct {
- CertificateAuthorities types.List `tfsdk:"certificate_authorities"` //> string
- Certificate types.String `tfsdk:"certificate"`
- Key types.String `tfsdk:"key"`
+ Ssl types.Object `tfsdk:"ssl"` //> outputSslModel
+ Kafka types.Object `tfsdk:"kafka"` //> outputKafkaModel
}
func (model *outputModel) populateFromAPI(ctx context.Context, union *kbapi.OutputUnion) (diags diag.Diagnostics) {
@@ -36,196 +31,82 @@ func (model *outputModel) populateFromAPI(ctx context.Context, union *kbapi.Outp
return
}
- doSsl := func(ssl *kbapi.OutputSsl) types.List {
- if ssl != nil {
- p := path.Root("ssl")
- sslModels := []outputSslModel{{
- CertificateAuthorities: utils.SliceToListType_String(ctx, utils.Deref(ssl.CertificateAuthorities), p.AtName("certificate_authorities"), &diags),
- Certificate: types.StringPointerValue(ssl.Certificate),
- Key: types.StringPointerValue(ssl.Key),
- }}
- list, nd := types.ListValueFrom(ctx, getSslAttrTypes(), sslModels)
- diags.Append(nd...)
- return list
- } else {
- return types.ListNull(getSslAttrTypes())
- }
- }
-
- discriminator, err := union.Discriminator()
+ output, err := union.ValueByDiscriminator()
if err != nil {
diags.AddError(err.Error(), "")
return
}
- switch discriminator {
- case "elasticsearch":
- data, err := union.AsOutputElasticsearch()
- if err != nil {
- diags.AddError(err.Error(), "")
- return
- }
-
- model.ID = types.StringPointerValue(data.Id)
- model.OutputID = types.StringPointerValue(data.Id)
- model.Name = types.StringValue(data.Name)
- model.Type = types.StringValue(string(data.Type))
- model.Hosts = utils.SliceToListType_String(ctx, data.Hosts, path.Root("hosts"), &diags)
- model.CaSha256 = types.StringPointerValue(data.CaSha256)
- model.CaTrustedFingerprint = types.StringPointerValue(data.CaTrustedFingerprint)
- model.DefaultIntegrations = types.BoolPointerValue(data.IsDefault)
- model.DefaultMonitoring = types.BoolPointerValue(data.IsDefaultMonitoring)
- model.ConfigYaml = types.StringPointerValue(data.ConfigYaml)
- model.Ssl = doSsl(data.Ssl)
-
- case "logstash":
- data, err := union.AsOutputLogstash()
- if err != nil {
- diags.AddError(err.Error(), "")
- return
- }
+ switch output := output.(type) {
+ case kbapi.OutputElasticsearch:
+ diags.Append(model.fromAPIElasticsearchModel(ctx, &output)...)
- model.ID = types.StringPointerValue(data.Id)
- model.OutputID = types.StringPointerValue(data.Id)
- model.Name = types.StringValue(data.Name)
- model.Type = types.StringValue(string(data.Type))
- model.Hosts = utils.SliceToListType_String(ctx, data.Hosts, path.Root("hosts"), &diags)
- model.CaSha256 = types.StringPointerValue(data.CaSha256)
- model.CaTrustedFingerprint = types.StringPointerValue(data.CaTrustedFingerprint)
- model.DefaultIntegrations = types.BoolPointerValue(data.IsDefault)
- model.DefaultMonitoring = types.BoolPointerValue(data.IsDefaultMonitoring)
- model.ConfigYaml = types.StringPointerValue(data.ConfigYaml)
- model.Ssl = doSsl(data.Ssl)
+ case kbapi.OutputLogstash:
+ diags.Append(model.fromAPILogstashModel(ctx, &output)...)
+ case kbapi.OutputKafka:
+ diags.Append(model.fromAPIKafkaModel(ctx, &output)...)
default:
- diags.AddError(fmt.Sprintf("unhandled output type: %s", discriminator), "")
+ diags.AddError(fmt.Sprintf("unhandled output type: %T", output), "")
}
return
}
-func (model outputModel) toAPICreateModel(ctx context.Context) (union kbapi.NewOutputUnion, diags diag.Diagnostics) {
- doSsl := func() *kbapi.NewOutputSsl {
- if utils.IsKnown(model.Ssl) {
- sslModels := utils.ListTypeAs[outputSslModel](ctx, model.Ssl, path.Root("ssl"), &diags)
- if len(sslModels) > 0 {
- return &kbapi.NewOutputSsl{
- Certificate: sslModels[0].Certificate.ValueStringPointer(),
- CertificateAuthorities: utils.SliceRef(utils.ListTypeToSlice_String(ctx, sslModels[0].CertificateAuthorities, path.Root("certificate_authorities"), &diags)),
- Key: sslModels[0].Key.ValueStringPointer(),
- }
- }
- }
- return nil
- }
-
+func (model outputModel) toAPICreateModel(ctx context.Context, client *clients.ApiClient) (kbapi.NewOutputUnion, diag.Diagnostics) {
outputType := model.Type.ValueString()
+
switch outputType {
case "elasticsearch":
- body := kbapi.NewOutputElasticsearch{
- Type: kbapi.NewOutputElasticsearchTypeElasticsearch,
- CaSha256: model.CaSha256.ValueStringPointer(),
- CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
- ConfigYaml: model.ConfigYaml.ValueStringPointer(),
- Hosts: utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags),
- Id: model.OutputID.ValueStringPointer(),
- IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
- IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
- Name: model.Name.ValueString(),
- Ssl: doSsl(),
- }
-
- err := union.FromNewOutputElasticsearch(body)
- if err != nil {
- diags.AddError(err.Error(), "")
- return
- }
-
+ return model.toAPICreateElasticsearchModel(ctx)
case "logstash":
- body := kbapi.NewOutputLogstash{
- Type: kbapi.NewOutputLogstashTypeLogstash,
- CaSha256: model.CaSha256.ValueStringPointer(),
- CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
- ConfigYaml: model.ConfigYaml.ValueStringPointer(),
- Hosts: utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags),
- Id: model.OutputID.ValueStringPointer(),
- IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
- IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
- Name: model.Name.ValueString(),
- Ssl: doSsl(),
- }
-
- err := union.FromNewOutputLogstash(body)
- if err != nil {
- diags.AddError(err.Error(), "")
- return
+ return model.toAPICreateLogstashModel(ctx)
+ case "kafka":
+ if diags := assertKafkaSupport(ctx, client); diags.HasError() {
+ return kbapi.NewOutputUnion{}, diags
}
+ return model.toAPICreateKafkaModel(ctx)
default:
- diags.AddError(fmt.Sprintf("unhandled output type: %s", outputType), "")
- }
-
- return
-}
-
-func (model outputModel) toAPIUpdateModel(ctx context.Context) (union kbapi.UpdateOutputUnion, diags diag.Diagnostics) {
- doSsl := func() *kbapi.UpdateOutputSsl {
- if utils.IsKnown(model.Ssl) {
- sslModels := utils.ListTypeAs[outputSslModel](ctx, model.Ssl, path.Root("ssl"), &diags)
- if len(sslModels) > 0 {
- return &kbapi.UpdateOutputSsl{
- Certificate: sslModels[0].Certificate.ValueStringPointer(),
- CertificateAuthorities: utils.SliceRef(utils.ListTypeToSlice_String(ctx, sslModels[0].CertificateAuthorities, path.Root("certificate_authorities"), &diags)),
- Key: sslModels[0].Key.ValueStringPointer(),
- }
- }
+ return kbapi.NewOutputUnion{}, diag.Diagnostics{
+ diag.NewErrorDiagnostic(fmt.Sprintf("unhandled output type: %s", outputType), ""),
}
- return nil
}
+}
+func (model outputModel) toAPIUpdateModel(ctx context.Context, client *clients.ApiClient) (union kbapi.UpdateOutputUnion, diags diag.Diagnostics) {
outputType := model.Type.ValueString()
+
switch outputType {
case "elasticsearch":
- body := kbapi.UpdateOutputElasticsearch{
- Type: utils.Pointer(kbapi.Elasticsearch),
- CaSha256: model.CaSha256.ValueStringPointer(),
- CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
- ConfigYaml: model.ConfigYaml.ValueStringPointer(),
- Hosts: utils.SliceRef(utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags)),
- IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
- IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
- Name: model.Name.ValueStringPointer(),
- Ssl: doSsl(),
- }
-
- err := union.FromUpdateOutputElasticsearch(body)
- if err != nil {
- diags.AddError(err.Error(), "")
- return
- }
-
+ return model.toAPIUpdateElasticsearchModel(ctx)
case "logstash":
- body := kbapi.UpdateOutputLogstash{
- Type: utils.Pointer(kbapi.Logstash),
- CaSha256: model.CaSha256.ValueStringPointer(),
- CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
- ConfigYaml: model.ConfigYaml.ValueStringPointer(),
- Hosts: utils.SliceRef(utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags)),
- IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
- IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
- Name: model.Name.ValueStringPointer(),
- Ssl: doSsl(),
- }
-
- err := union.FromUpdateOutputLogstash(body)
- if err != nil {
- diags.AddError(err.Error(), "")
- return
+ return model.toAPIUpdateLogstashModel(ctx)
+ case "kafka":
+ if diags := assertKafkaSupport(ctx, client); diags.HasError() {
+ return kbapi.UpdateOutputUnion{}, diags
}
+ return model.toAPIUpdateKafkaModel(ctx)
default:
diags.AddError(fmt.Sprintf("unhandled output type: %s", outputType), "")
}
return
}
+
+func assertKafkaSupport(ctx context.Context, client *clients.ApiClient) diag.Diagnostics {
+ var diags diag.Diagnostics
+
+ // Check minimum version requirement for Kafka output type
+ if supported, versionDiags := client.EnforceMinVersion(ctx, MinVersionOutputKafka); versionDiags.HasError() {
+ diags.Append(utils.FrameworkDiagsFromSDK(versionDiags)...)
+ return diags
+ } else if !supported {
+ diags.AddError("Unsupported version for Kafka output",
+ fmt.Sprintf("Kafka output type requires server version %s or higher", MinVersionOutputKafka.String()))
+ return diags
+ }
+
+ return nil
+}
diff --git a/internal/fleet/output/models_elasticsearch.go b/internal/fleet/output/models_elasticsearch.go
new file mode 100644
index 000000000..d9fba285a
--- /dev/null
+++ b/internal/fleet/output/models_elasticsearch.go
@@ -0,0 +1,82 @@
+package output
+
+import (
+ "context"
+
+ "github.com/elastic/terraform-provider-elasticstack/generated/kbapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/terraform-plugin-framework/diag"
+ "github.com/hashicorp/terraform-plugin-framework/path"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+)
+
+func (model *outputModel) fromAPIElasticsearchModel(ctx context.Context, data *kbapi.OutputElasticsearch) (diags diag.Diagnostics) {
+ model.ID = types.StringPointerValue(data.Id)
+ model.OutputID = types.StringPointerValue(data.Id)
+ model.Name = types.StringValue(data.Name)
+ model.Type = types.StringValue(string(data.Type))
+ model.Hosts = utils.SliceToListType_String(ctx, data.Hosts, path.Root("hosts"), &diags)
+ model.CaSha256 = types.StringPointerValue(data.CaSha256)
+ model.CaTrustedFingerprint = types.StringPointerValue(data.CaTrustedFingerprint)
+ model.DefaultIntegrations = types.BoolPointerValue(data.IsDefault)
+ model.DefaultMonitoring = types.BoolPointerValue(data.IsDefaultMonitoring)
+ model.ConfigYaml = types.StringPointerValue(data.ConfigYaml)
+ model.Ssl, diags = sslToObjectValue(ctx, data.Ssl)
+ return
+}
+
+func (model outputModel) toAPICreateElasticsearchModel(ctx context.Context) (kbapi.NewOutputUnion, diag.Diagnostics) {
+ ssl, diags := objectValueToSSL(ctx, model.Ssl)
+ if diags.HasError() {
+ return kbapi.NewOutputUnion{}, diags
+ }
+
+ body := kbapi.NewOutputElasticsearch{
+ Type: kbapi.NewOutputElasticsearchTypeElasticsearch,
+ CaSha256: model.CaSha256.ValueStringPointer(),
+ CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
+ ConfigYaml: model.ConfigYaml.ValueStringPointer(),
+ Hosts: utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags),
+ Id: model.OutputID.ValueStringPointer(),
+ IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
+ IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
+ Name: model.Name.ValueString(),
+ Ssl: ssl,
+ }
+
+ var union kbapi.NewOutputUnion
+ err := union.FromNewOutputElasticsearch(body)
+ if err != nil {
+ diags.AddError(err.Error(), "")
+ return kbapi.NewOutputUnion{}, diags
+ }
+
+ return union, diags
+}
+
+func (model outputModel) toAPIUpdateElasticsearchModel(ctx context.Context) (kbapi.UpdateOutputUnion, diag.Diagnostics) {
+ ssl, diags := objectValueToSSLUpdate(ctx, model.Ssl)
+ if diags.HasError() {
+ return kbapi.UpdateOutputUnion{}, diags
+ }
+ body := kbapi.UpdateOutputElasticsearch{
+ Type: utils.Pointer(kbapi.Elasticsearch),
+ CaSha256: model.CaSha256.ValueStringPointer(),
+ CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
+ ConfigYaml: model.ConfigYaml.ValueStringPointer(),
+ Hosts: utils.SliceRef(utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags)),
+ IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
+ IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
+ Name: model.Name.ValueStringPointer(),
+ Ssl: ssl,
+ }
+
+ var union kbapi.UpdateOutputUnion
+ err := union.FromUpdateOutputElasticsearch(body)
+ if err != nil {
+ diags.AddError(err.Error(), "")
+ return kbapi.UpdateOutputUnion{}, diags
+ }
+
+ return union, diags
+}
diff --git a/internal/fleet/output/models_kafka.go b/internal/fleet/output/models_kafka.go
new file mode 100644
index 000000000..a5b831624
--- /dev/null
+++ b/internal/fleet/output/models_kafka.go
@@ -0,0 +1,559 @@
+package output
+
+import (
+ "context"
+
+ "github.com/elastic/terraform-provider-elasticstack/generated/kbapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/terraform-plugin-framework/diag"
+ "github.com/hashicorp/terraform-plugin-framework/path"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+ "github.com/hashicorp/terraform-plugin-framework/types/basetypes"
+)
+
+type outputKafkaModel struct {
+ AuthType types.String `tfsdk:"auth_type"`
+ BrokerTimeout types.Float32 `tfsdk:"broker_timeout"`
+ ClientId types.String `tfsdk:"client_id"`
+ Compression types.String `tfsdk:"compression"`
+ CompressionLevel types.Int64 `tfsdk:"compression_level"`
+ ConnectionType types.String `tfsdk:"connection_type"`
+ Topic types.String `tfsdk:"topic"`
+ Partition types.String `tfsdk:"partition"`
+ RequiredAcks types.Int64 `tfsdk:"required_acks"`
+ Timeout types.Float32 `tfsdk:"timeout"`
+ Version types.String `tfsdk:"version"`
+ Username types.String `tfsdk:"username"`
+ Password types.String `tfsdk:"password"`
+ Key types.String `tfsdk:"key"`
+ Headers types.List `tfsdk:"headers"` //> outputHeadersModel
+ Hash types.Object `tfsdk:"hash"` //> outputHashModel
+ Random types.Object `tfsdk:"random"` //> outputRandomModel
+ RoundRobin types.Object `tfsdk:"round_robin"` //> outputRoundRobinModel
+ Sasl types.Object `tfsdk:"sasl"` //> outputSaslModel
+}
+
+type outputHeadersModel struct {
+ Key types.String `tfsdk:"key"`
+ Value types.String `tfsdk:"value"`
+}
+
+type outputHashModel struct {
+ Hash types.String `tfsdk:"hash"`
+ Random types.Bool `tfsdk:"random"`
+}
+
+type outputRandomModel struct {
+ GroupEvents types.Float64 `tfsdk:"group_events"`
+}
+
+type outputRoundRobinModel struct {
+ GroupEvents types.Float64 `tfsdk:"group_events"`
+}
+
+type outputSaslModel struct {
+ Mechanism types.String `tfsdk:"mechanism"`
+}
+
+func (m outputKafkaModel) toAPIHash(ctx context.Context) (*struct {
+ Hash *string `json:"hash,omitempty"`
+ Random *bool `json:"random,omitempty"`
+}, diag.Diagnostics) {
+ if !utils.IsKnown(m.Hash) {
+ return nil, nil
+ }
+
+ var hashModel outputHashModel
+ diags := m.Hash.As(ctx, &hashModel, basetypes.ObjectAsOptions{})
+ if diags.HasError() {
+ return nil, diags
+ }
+
+ return &struct {
+ Hash *string `json:"hash,omitempty"`
+ Random *bool `json:"random,omitempty"`
+ }{
+ Hash: hashModel.Hash.ValueStringPointer(),
+ Random: hashModel.Random.ValueBoolPointer(),
+ }, diags
+}
+
+func (m outputKafkaModel) toAPIHeaders(ctx context.Context) (*[]struct {
+ Key string `json:"key"`
+ Value string `json:"value"`
+}, diag.Diagnostics) {
+ if !utils.IsKnown(m.Headers) {
+ return nil, nil
+ }
+
+ var diags diag.Diagnostics
+ headerModels := utils.ListTypeAs[outputHeadersModel](ctx, m.Headers, path.Root("kafka").AtName("headers"), &diags)
+ if len(headerModels) == 0 {
+ return nil, diags
+ }
+
+ headers := make([]struct {
+ Key string `json:"key"`
+ Value string `json:"value"`
+ }, len(headerModels))
+ for i, h := range headerModels {
+ headers[i] = struct {
+ Key string `json:"key"`
+ Value string `json:"value"`
+ }{
+ Key: h.Key.ValueString(),
+ Value: h.Value.ValueString(),
+ }
+ }
+ return &headers, diags
+}
+
+func (m outputKafkaModel) toAPIRandom(ctx context.Context) (*struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+}, diag.Diagnostics) {
+ if !utils.IsKnown(m.Random) {
+ return nil, nil
+ }
+
+ var randomModel outputRandomModel
+ diags := m.Random.As(ctx, &randomModel, basetypes.ObjectAsOptions{})
+ if diags.HasError() {
+ return nil, diags
+ }
+
+ return &struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+ }{
+ GroupEvents: func() *float32 {
+ if !randomModel.GroupEvents.IsNull() {
+ val := float32(randomModel.GroupEvents.ValueFloat64())
+ return &val
+ }
+ return nil
+ }(),
+ }, diags
+}
+
+func (m outputKafkaModel) toAPIRoundRobin(ctx context.Context) (*struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+}, diag.Diagnostics) {
+ if !utils.IsKnown(m.RoundRobin) {
+ return nil, nil
+ }
+
+ var roundRobinModel outputRoundRobinModel
+ diags := m.RoundRobin.As(ctx, &roundRobinModel, basetypes.ObjectAsOptions{})
+ if diags.HasError() {
+ return nil, diags
+ }
+ return &struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+ }{
+ GroupEvents: func() *float32 {
+ if !roundRobinModel.GroupEvents.IsNull() {
+ val := float32(roundRobinModel.GroupEvents.ValueFloat64())
+ return &val
+ }
+ return nil
+ }(),
+ }, nil
+}
+
+func (m outputKafkaModel) toAPISasl(ctx context.Context) (*struct {
+ Mechanism *kbapi.NewOutputKafkaSaslMechanism `json:"mechanism,omitempty"`
+}, diag.Diagnostics) {
+ if !utils.IsKnown(m.Sasl) {
+ return nil, nil
+ }
+ var saslModel outputSaslModel
+ diags := m.Sasl.As(ctx, &saslModel, basetypes.ObjectAsOptions{})
+ if diags.HasError() {
+ return nil, diags
+ }
+
+ if saslModel.Mechanism.IsNull() {
+ return nil, diags
+ }
+
+ mechanism := kbapi.NewOutputKafkaSaslMechanism(saslModel.Mechanism.ValueString())
+ return &struct {
+ Mechanism *kbapi.NewOutputKafkaSaslMechanism `json:"mechanism,omitempty"`
+ }{
+ Mechanism: &mechanism,
+ }, diags
+}
+
+func (m outputKafkaModel) toUpdateAPISasl(ctx context.Context) (*struct {
+ Mechanism *kbapi.UpdateOutputKafkaSaslMechanism `json:"mechanism,omitempty"`
+}, diag.Diagnostics) {
+ sasl, diags := m.toAPISasl(ctx)
+ if diags.HasError() || sasl == nil {
+ return nil, diags
+ }
+
+ mechanism := kbapi.UpdateOutputKafkaSaslMechanism(*sasl.Mechanism)
+ return &struct {
+ Mechanism *kbapi.UpdateOutputKafkaSaslMechanism "json:\"mechanism,omitempty\""
+ }{
+ Mechanism: &mechanism,
+ }, diags
+}
+
+func (m outputKafkaModel) toAuthType() kbapi.NewOutputKafkaAuthType {
+ if !utils.IsKnown(m.AuthType) {
+ return kbapi.NewOutputKafkaAuthTypeNone
+ }
+
+ return kbapi.NewOutputKafkaAuthType(m.AuthType.ValueString())
+}
+
+func (m outputKafkaModel) toUpdateAuthType() *kbapi.UpdateOutputKafkaAuthType {
+ if !utils.IsKnown(m.AuthType) {
+ return nil
+ }
+
+ return utils.Pointer(kbapi.UpdateOutputKafkaAuthType(m.AuthType.ValueString()))
+}
+
+func (model outputModel) toAPICreateKafkaModel(ctx context.Context) (kbapi.NewOutputUnion, diag.Diagnostics) {
+ ssl, diags := objectValueToSSL(ctx, model.Ssl)
+ if diags.HasError() {
+ return kbapi.NewOutputUnion{}, diags
+ }
+
+ // Extract kafka model from nested structure
+ var kafkaModel outputKafkaModel
+ if !model.Kafka.IsNull() {
+ kafkaObj := utils.ObjectTypeAs[outputKafkaModel](ctx, model.Kafka, path.Root("kafka"), &diags)
+ kafkaModel = *kafkaObj
+ }
+
+ hash, hashDiags := kafkaModel.toAPIHash(ctx)
+ diags.Append(hashDiags...)
+
+ headers, headersDiags := kafkaModel.toAPIHeaders(ctx)
+ diags.Append(headersDiags...)
+
+ random, randomDiags := kafkaModel.toAPIRandom(ctx)
+ diags.Append(randomDiags...)
+
+ roundRobin, rrDiags := kafkaModel.toAPIRoundRobin(ctx)
+ diags.Append(rrDiags...)
+
+ sasl, saslDiags := kafkaModel.toAPISasl(ctx)
+ diags.Append(saslDiags...)
+
+ body := kbapi.NewOutputKafka{
+ Type: kbapi.NewOutputKafkaTypeKafka,
+ CaSha256: model.CaSha256.ValueStringPointer(),
+ CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
+ ConfigYaml: model.ConfigYaml.ValueStringPointer(),
+ Hosts: utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags),
+ Id: model.OutputID.ValueStringPointer(),
+ IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
+ IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
+ Name: model.Name.ValueString(),
+ Ssl: ssl,
+ // Kafka-specific fields
+ AuthType: kafkaModel.toAuthType(),
+ BrokerTimeout: func() *float32 {
+ if !utils.IsKnown(kafkaModel.BrokerTimeout) {
+ return nil
+ }
+ val := kafkaModel.BrokerTimeout.ValueFloat32()
+ return &val
+ }(),
+ ClientId: kafkaModel.ClientId.ValueStringPointer(),
+ Compression: func() *kbapi.NewOutputKafkaCompression {
+ if !utils.IsKnown(kafkaModel.Compression) {
+ return nil
+ }
+ comp := kbapi.NewOutputKafkaCompression(kafkaModel.Compression.ValueString())
+ return &comp
+ }(),
+ CompressionLevel: func() *int {
+ if !utils.IsKnown(kafkaModel.CompressionLevel) || kafkaModel.Compression.ValueString() != "gzip" {
+ return nil
+ }
+
+ val := int(kafkaModel.CompressionLevel.ValueInt64())
+ return &val
+ }(),
+ ConnectionType: kafkaModel.ConnectionType.ValueStringPointer(),
+ Topic: kafkaModel.Topic.ValueStringPointer(),
+ Partition: func() *kbapi.NewOutputKafkaPartition {
+ if !utils.IsKnown(kafkaModel.Partition) {
+ return nil
+ }
+ part := kbapi.NewOutputKafkaPartition(kafkaModel.Partition.ValueString())
+ return &part
+ }(),
+ RequiredAcks: func() *kbapi.NewOutputKafkaRequiredAcks {
+ if !utils.IsKnown(kafkaModel.RequiredAcks) {
+ return nil
+ }
+ val := kbapi.NewOutputKafkaRequiredAcks(kafkaModel.RequiredAcks.ValueInt64())
+ return &val
+ }(),
+ Timeout: func() *float32 {
+ if !utils.IsKnown(kafkaModel.Timeout) {
+ return nil
+ }
+
+ val := kafkaModel.Timeout.ValueFloat32()
+ return &val
+ }(),
+ Version: kafkaModel.Version.ValueStringPointer(),
+ Username: kafkaModel.Username.ValueStringPointer(),
+ Password: kafkaModel.Password.ValueStringPointer(),
+ Key: kafkaModel.Key.ValueStringPointer(),
+ Headers: headers,
+ Hash: hash,
+ Random: random,
+ RoundRobin: roundRobin,
+ Sasl: sasl,
+ }
+
+ var union kbapi.NewOutputUnion
+ err := union.FromNewOutputKafka(body)
+ if err != nil {
+ diags.AddError(err.Error(), "")
+ return kbapi.NewOutputUnion{}, diags
+ }
+
+ return union, diags
+}
+
+func (model outputModel) toAPIUpdateKafkaModel(ctx context.Context) (kbapi.UpdateOutputUnion, diag.Diagnostics) {
+ ssl, diags := objectValueToSSLUpdate(ctx, model.Ssl)
+ if diags.HasError() {
+ return kbapi.UpdateOutputUnion{}, diags
+ }
+
+ // Extract kafka model from nested structure
+ var kafkaModel outputKafkaModel
+ if !model.Kafka.IsNull() {
+ kafkaObj := utils.ObjectTypeAs[outputKafkaModel](ctx, model.Kafka, path.Root("kafka"), &diags)
+ kafkaModel = *kafkaObj
+ }
+
+ hash, hashDiags := kafkaModel.toAPIHash(ctx)
+ diags.Append(hashDiags...)
+
+ headers, headersDiags := kafkaModel.toAPIHeaders(ctx)
+ diags.Append(headersDiags...)
+
+ random, randomDiags := kafkaModel.toAPIRandom(ctx)
+ diags.Append(randomDiags...)
+
+ roundRobin, rrDiags := kafkaModel.toAPIRoundRobin(ctx)
+ diags.Append(rrDiags...)
+
+ sasl, saslDiags := kafkaModel.toUpdateAPISasl(ctx)
+ diags.Append(saslDiags...)
+
+ body := kbapi.UpdateOutputKafka{
+ Type: utils.Pointer(kbapi.Kafka),
+ CaSha256: model.CaSha256.ValueStringPointer(),
+ CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
+ ConfigYaml: model.ConfigYaml.ValueStringPointer(),
+ Hosts: utils.SliceRef(utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags)),
+ IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
+ IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
+ Name: model.Name.ValueString(),
+ Ssl: ssl,
+ // Kafka-specific fields
+ AuthType: kafkaModel.toUpdateAuthType(),
+ BrokerTimeout: func() *float32 {
+ if !utils.IsKnown(kafkaModel.BrokerTimeout) {
+ return nil
+ }
+ val := kafkaModel.BrokerTimeout.ValueFloat32()
+ return &val
+ }(),
+ ClientId: kafkaModel.ClientId.ValueStringPointer(),
+ Compression: func() *kbapi.UpdateOutputKafkaCompression {
+ if !utils.IsKnown(kafkaModel.Compression) {
+ return nil
+ }
+ comp := kbapi.UpdateOutputKafkaCompression(kafkaModel.Compression.ValueString())
+ return &comp
+ }(),
+ CompressionLevel: func() *int {
+ if !utils.IsKnown(kafkaModel.CompressionLevel) || kafkaModel.Compression.ValueString() != "gzip" {
+ return nil
+ }
+ val := int(kafkaModel.CompressionLevel.ValueInt64())
+ return &val
+ }(),
+ ConnectionType: kafkaModel.ConnectionType.ValueStringPointer(),
+ Topic: kafkaModel.Topic.ValueStringPointer(),
+ Partition: func() *kbapi.UpdateOutputKafkaPartition {
+ if !utils.IsKnown(kafkaModel.Partition) {
+ return nil
+ }
+ part := kbapi.UpdateOutputKafkaPartition(kafkaModel.Partition.ValueString())
+ return &part
+ }(),
+ RequiredAcks: func() *kbapi.UpdateOutputKafkaRequiredAcks {
+ if !utils.IsKnown(kafkaModel.RequiredAcks) {
+ return nil
+ }
+ val := kbapi.UpdateOutputKafkaRequiredAcks(kafkaModel.RequiredAcks.ValueInt64())
+ return &val
+ }(),
+ Timeout: func() *float32 {
+ if !utils.IsKnown(kafkaModel.Timeout) {
+ return nil
+ }
+ val := kafkaModel.Timeout.ValueFloat32()
+ return &val
+ }(),
+ Version: kafkaModel.Version.ValueStringPointer(),
+ Username: kafkaModel.Username.ValueStringPointer(),
+ Password: kafkaModel.Password.ValueStringPointer(),
+ Key: kafkaModel.Key.ValueStringPointer(),
+ Headers: headers,
+ Hash: hash,
+ Random: random,
+ RoundRobin: roundRobin,
+ Sasl: sasl,
+ }
+
+ var union kbapi.UpdateOutputUnion
+ err := union.FromUpdateOutputKafka(body)
+ if err != nil {
+ diags.AddError(err.Error(), "")
+ return kbapi.UpdateOutputUnion{}, diags
+ }
+
+ return union, diags
+}
+
+func (model *outputModel) fromAPIKafkaModel(ctx context.Context, data *kbapi.OutputKafka) (diags diag.Diagnostics) {
+ model.ID = types.StringPointerValue(data.Id)
+ model.OutputID = types.StringPointerValue(data.Id)
+ model.Name = types.StringValue(data.Name)
+ model.Type = types.StringValue(string(data.Type))
+ model.Hosts = utils.SliceToListType_String(ctx, data.Hosts, path.Root("hosts"), &diags)
+ model.CaSha256 = types.StringPointerValue(data.CaSha256)
+ model.CaTrustedFingerprint = types.StringPointerValue(data.CaTrustedFingerprint)
+ model.DefaultIntegrations = types.BoolPointerValue(data.IsDefault)
+ model.DefaultMonitoring = types.BoolPointerValue(data.IsDefaultMonitoring)
+ model.ConfigYaml = types.StringPointerValue(data.ConfigYaml)
+ model.Ssl, diags = sslToObjectValue(ctx, data.Ssl)
+
+ // Kafka-specific fields - initialize kafka nested object
+ kafkaModel := outputKafkaModel{}
+ kafkaModel.AuthType = types.StringValue(string(data.AuthType))
+ kafkaModel.BrokerTimeout = types.Float32PointerValue(data.BrokerTimeout)
+ kafkaModel.ClientId = types.StringPointerValue(data.ClientId)
+ kafkaModel.Compression = types.StringPointerValue((*string)(data.Compression))
+ // Handle CompressionLevel
+ if data.CompressionLevel != nil {
+ kafkaModel.CompressionLevel = types.Int64Value(int64(*data.CompressionLevel))
+ } else {
+ kafkaModel.CompressionLevel = types.Int64Null()
+ }
+ // Handle ConnectionType
+ kafkaModel.ConnectionType = types.StringPointerValue(data.ConnectionType)
+ kafkaModel.Topic = types.StringPointerValue(data.Topic)
+ kafkaModel.Partition = types.StringPointerValue((*string)(data.Partition))
+ if data.RequiredAcks != nil {
+ kafkaModel.RequiredAcks = types.Int64Value(int64(*data.RequiredAcks))
+ } else {
+ kafkaModel.RequiredAcks = types.Int64Null()
+ }
+
+ kafkaModel.Timeout = types.Float32PointerValue(data.Timeout)
+ kafkaModel.Version = types.StringPointerValue(data.Version)
+ kafkaModel.Username = types.StringPointerValue(data.Username)
+ kafkaModel.Password = types.StringPointerValue(data.Password)
+ kafkaModel.Key = types.StringPointerValue(data.Key)
+
+ // Handle headers
+ if data.Headers != nil {
+ headerModels := make([]outputHeadersModel, len(*data.Headers))
+ for i, header := range *data.Headers {
+ headerModels[i] = outputHeadersModel{
+ Key: types.StringValue(header.Key),
+ Value: types.StringValue(header.Value),
+ }
+ }
+ list, nd := types.ListValueFrom(ctx, getHeadersAttrTypes(), headerModels)
+ diags.Append(nd...)
+ kafkaModel.Headers = list
+ } else {
+ kafkaModel.Headers = types.ListNull(getHeadersAttrTypes())
+ }
+
+ // Handle hash
+ if data.Hash != nil {
+ hashModel := outputHashModel{
+ Hash: types.StringPointerValue(data.Hash.Hash),
+ Random: types.BoolPointerValue(data.Hash.Random),
+ }
+ obj, nd := types.ObjectValueFrom(ctx, getHashAttrTypes(), hashModel)
+ diags.Append(nd...)
+ kafkaModel.Hash = obj
+ } else {
+ kafkaModel.Hash = types.ObjectNull(getHashAttrTypes())
+ }
+
+ // Handle random
+ if data.Random != nil {
+ randomModel := outputRandomModel{
+ GroupEvents: func() types.Float64 {
+ if data.Random.GroupEvents != nil {
+ return types.Float64Value(float64(*data.Random.GroupEvents))
+ }
+ return types.Float64Null()
+ }(),
+ }
+ obj, nd := types.ObjectValueFrom(ctx, getRandomAttrTypes(), randomModel)
+ diags.Append(nd...)
+ kafkaModel.Random = obj
+ } else {
+ kafkaModel.Random = types.ObjectNull(getRandomAttrTypes())
+ }
+
+ // Handle round_robin
+ if data.RoundRobin != nil {
+ roundRobinModel := outputRoundRobinModel{
+ GroupEvents: func() types.Float64 {
+ if data.RoundRobin.GroupEvents != nil {
+ return types.Float64Value(float64(*data.RoundRobin.GroupEvents))
+ }
+ return types.Float64Null()
+ }(),
+ }
+ obj, nd := types.ObjectValueFrom(ctx, getRoundRobinAttrTypes(), roundRobinModel)
+ diags.Append(nd...)
+ kafkaModel.RoundRobin = obj
+ } else {
+ kafkaModel.RoundRobin = types.ObjectNull(getRoundRobinAttrTypes())
+ }
+
+ // Handle sasl
+ if data.Sasl != nil {
+ saslModel := outputSaslModel{
+ Mechanism: func() types.String {
+ if data.Sasl.Mechanism != nil {
+ return types.StringValue(string(*data.Sasl.Mechanism))
+ }
+ return types.StringNull()
+ }(),
+ }
+ obj, nd := types.ObjectValueFrom(ctx, getSaslAttrTypes(), saslModel)
+ diags.Append(nd...)
+ kafkaModel.Sasl = obj
+ } else {
+ kafkaModel.Sasl = types.ObjectNull(getSaslAttrTypes())
+ }
+
+ // Set the kafka nested object on the main model
+ kafkaObj, nd := types.ObjectValueFrom(ctx, getKafkaAttrTypes(), kafkaModel)
+ diags.Append(nd...)
+ model.Kafka = kafkaObj
+ return
+}
diff --git a/internal/fleet/output/models_kafka_test.go b/internal/fleet/output/models_kafka_test.go
new file mode 100644
index 000000000..fc76760dc
--- /dev/null
+++ b/internal/fleet/output/models_kafka_test.go
@@ -0,0 +1,393 @@
+package output
+
+import (
+ "context"
+ "testing"
+
+ "github.com/elastic/terraform-provider-elasticstack/generated/kbapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/terraform-plugin-framework/attr"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_outputKafkaModel_toAPIHash(t *testing.T) {
+ type fields struct {
+ Hash types.Object
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *struct {
+ Hash *string `json:"hash,omitempty"`
+ Random *bool `json:"random,omitempty"`
+ }
+ wantErr bool
+ }{
+ {
+ name: "returns nil when hash is unknown",
+ fields: fields{
+ Hash: types.ObjectUnknown(getHashAttrTypes()),
+ },
+ },
+ {
+ name: "returns a hash object when all fields are set",
+ fields: fields{
+ Hash: types.ObjectValueMust(
+ getHashAttrTypes(),
+ map[string]attr.Value{
+ "hash": types.StringValue("field"),
+ "random": types.BoolValue(true),
+ },
+ ),
+ },
+ want: &struct {
+ Hash *string `json:"hash,omitempty"`
+ Random *bool `json:"random,omitempty"`
+ }{
+ Hash: utils.Pointer("field"),
+ Random: utils.Pointer(true),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ Hash: tt.fields.Hash,
+ }
+ got, diags := m.toAPIHash(context.Background())
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("outputKafkaModel.toAPIHash() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_outputKafkaModel_toAPIHeaders(t *testing.T) {
+ type fields struct {
+ Headers types.List
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *[]struct {
+ Key string `json:"key"`
+ Value string `json:"value"`
+ }
+ wantErr bool
+ }{
+ {
+ name: "returns nil when headers are unknown",
+ fields: fields{
+ Headers: types.ListUnknown(getHeadersAttrTypes()),
+ },
+ },
+ {
+ name: "returns headers when populated",
+ fields: fields{
+ Headers: types.ListValueMust(
+ getHeadersAttrTypes(),
+ []attr.Value{
+ types.ObjectValueMust(getHeadersAttrTypes().(types.ObjectType).AttrTypes, map[string]attr.Value{
+ "key": types.StringValue("key-1"),
+ "value": types.StringValue("value-1"),
+ }),
+ types.ObjectValueMust(getHeadersAttrTypes().(types.ObjectType).AttrTypes, map[string]attr.Value{
+ "key": types.StringValue("key-2"),
+ "value": types.StringValue("value-2"),
+ }),
+ },
+ ),
+ },
+ want: &[]struct {
+ Key string `json:"key"`
+ Value string `json:"value"`
+ }{
+ {Key: "key-1", Value: "value-1"},
+ {Key: "key-2", Value: "value-2"},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ Headers: tt.fields.Headers,
+ }
+ got, diags := m.toAPIHeaders(context.Background())
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("outputKafkaModel.toAPIHeaders() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_outputKafkaModel_toAPIRandom(t *testing.T) {
+ type fields struct {
+ Random types.Object
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+ }
+ wantErr bool
+ }{
+ {
+ name: "returns nil when random is unknown",
+ fields: fields{
+ Random: types.ObjectUnknown(getRandomAttrTypes()),
+ },
+ },
+ {
+ name: "returns a random object when populated",
+ fields: fields{
+ Random: types.ObjectValueMust(
+ getRandomAttrTypes(),
+ map[string]attr.Value{
+ "group_events": types.Float64Value(1),
+ },
+ ),
+ },
+ want: &struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+ }{
+ GroupEvents: utils.Pointer(float32(1)),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ Random: tt.fields.Random,
+ }
+ got, diags := m.toAPIRandom(context.Background())
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("outputKafkaModel.toAPIRandom() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_outputKafkaModel_toAPIRoundRobin(t *testing.T) {
+ type fields struct {
+ RoundRobin types.Object
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+ }
+ wantErr bool
+ }{
+ {
+ name: "returns nil when round_robin is unknown",
+ fields: fields{
+ RoundRobin: types.ObjectUnknown(getRoundRobinAttrTypes()),
+ },
+ },
+ {
+ name: "returns a round_robin object when populated",
+ fields: fields{
+ RoundRobin: types.ObjectValueMust(
+ getRoundRobinAttrTypes(),
+ map[string]attr.Value{
+ "group_events": types.Float64Value(1),
+ },
+ ),
+ },
+ want: &struct {
+ GroupEvents *float32 `json:"group_events,omitempty"`
+ }{
+ GroupEvents: utils.Pointer(float32(1)),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ RoundRobin: tt.fields.RoundRobin,
+ }
+ got, diags := m.toAPIRoundRobin(context.Background())
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("outputKafkaModel.toAPIRoundRobin() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_outputKafkaModel_toAPISasl(t *testing.T) {
+ type fields struct {
+ Sasl types.Object
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *struct {
+ Mechanism *kbapi.NewOutputKafkaSaslMechanism `json:"mechanism,omitempty"`
+ }
+ wantErr bool
+ }{
+ {
+ name: "returns nil when sasl is unknown",
+ fields: fields{
+ Sasl: types.ObjectUnknown(getSaslAttrTypes()),
+ },
+ },
+ {
+ name: "returns a sasl object when populated",
+ fields: fields{
+ Sasl: types.ObjectValueMust(
+ getSaslAttrTypes(),
+ map[string]attr.Value{
+ "mechanism": types.StringValue("plain"),
+ },
+ ),
+ },
+ want: &struct {
+ Mechanism *kbapi.NewOutputKafkaSaslMechanism `json:"mechanism,omitempty"`
+ }{
+ Mechanism: utils.Pointer(kbapi.NewOutputKafkaSaslMechanism("plain")),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ Sasl: tt.fields.Sasl,
+ }
+ got, diags := m.toAPISasl(context.Background())
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("outputKafkaModel.toAPISasl() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_outputKafkaModel_toUpdateAPISasl(t *testing.T) {
+ type fields struct {
+ Sasl types.Object
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *struct {
+ Mechanism *kbapi.UpdateOutputKafkaSaslMechanism `json:"mechanism,omitempty"`
+ }
+ wantErr bool
+ }{
+ {
+ name: "returns nil when sasl is unknown",
+ fields: fields{
+ Sasl: types.ObjectUnknown(getSaslAttrTypes()),
+ },
+ },
+ {
+ name: "returns a sasl object when populated",
+ fields: fields{
+ Sasl: types.ObjectValueMust(
+ getSaslAttrTypes(),
+ map[string]attr.Value{
+ "mechanism": types.StringValue("plain"),
+ },
+ ),
+ },
+ want: &struct {
+ Mechanism *kbapi.UpdateOutputKafkaSaslMechanism `json:"mechanism,omitempty"`
+ }{
+ Mechanism: utils.Pointer(kbapi.UpdateOutputKafkaSaslMechanism("plain")),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ Sasl: tt.fields.Sasl,
+ }
+ got, diags := m.toUpdateAPISasl(context.Background())
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("outputKafkaModel.toUpdateAPISasl() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_outputKafkaModel_toAuthType(t *testing.T) {
+ type fields struct {
+ AuthType types.String
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want kbapi.NewOutputKafkaAuthType
+ }{
+ {
+ name: "returns none when auth_type is unknown",
+ fields: fields{
+ AuthType: types.StringUnknown(),
+ },
+ want: kbapi.NewOutputKafkaAuthTypeNone,
+ },
+ {
+ name: "returns an auth_type object when populated",
+ fields: fields{
+ AuthType: types.StringValue("user"),
+ },
+ want: kbapi.NewOutputKafkaAuthType("user"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ AuthType: tt.fields.AuthType,
+ }
+ assert.Equal(t, tt.want, m.toAuthType())
+ })
+ }
+}
+
+func Test_outputKafkaModel_toUpdateAuthType(t *testing.T) {
+ type fields struct {
+ AuthType types.String
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *kbapi.UpdateOutputKafkaAuthType
+ }{
+ {
+ name: "returns nil when auth_type is unknown",
+ fields: fields{
+ AuthType: types.StringUnknown(),
+ },
+ },
+ {
+ name: "returns an auth_type object when populated",
+ fields: fields{
+ AuthType: types.StringValue("user"),
+ },
+ want: utils.Pointer(kbapi.UpdateOutputKafkaAuthType("user")),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := outputKafkaModel{
+ AuthType: tt.fields.AuthType,
+ }
+ assert.Equal(t, tt.want, m.toUpdateAuthType())
+ })
+ }
+}
diff --git a/internal/fleet/output/models_logstash.go b/internal/fleet/output/models_logstash.go
new file mode 100644
index 000000000..680d5809b
--- /dev/null
+++ b/internal/fleet/output/models_logstash.go
@@ -0,0 +1,81 @@
+package output
+
+import (
+ "context"
+
+ "github.com/elastic/terraform-provider-elasticstack/generated/kbapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/terraform-plugin-framework/diag"
+ "github.com/hashicorp/terraform-plugin-framework/path"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+)
+
+func (model *outputModel) fromAPILogstashModel(ctx context.Context, data *kbapi.OutputLogstash) (diags diag.Diagnostics) {
+ model.ID = types.StringPointerValue(data.Id)
+ model.OutputID = types.StringPointerValue(data.Id)
+ model.Name = types.StringValue(data.Name)
+ model.Type = types.StringValue(string(data.Type))
+ model.Hosts = utils.SliceToListType_String(ctx, data.Hosts, path.Root("hosts"), &diags)
+ model.CaSha256 = types.StringPointerValue(data.CaSha256)
+ model.CaTrustedFingerprint = types.StringPointerValue(data.CaTrustedFingerprint)
+ model.DefaultIntegrations = types.BoolPointerValue(data.IsDefault)
+ model.DefaultMonitoring = types.BoolPointerValue(data.IsDefaultMonitoring)
+ model.ConfigYaml = types.StringPointerValue(data.ConfigYaml)
+ model.Ssl, diags = sslToObjectValue(ctx, data.Ssl)
+ return
+}
+
+func (model outputModel) toAPICreateLogstashModel(ctx context.Context) (kbapi.NewOutputUnion, diag.Diagnostics) {
+ ssl, diags := objectValueToSSL(ctx, model.Ssl)
+ if diags.HasError() {
+ return kbapi.NewOutputUnion{}, diags
+ }
+ body := kbapi.NewOutputLogstash{
+ Type: kbapi.NewOutputLogstashTypeLogstash,
+ CaSha256: model.CaSha256.ValueStringPointer(),
+ CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
+ ConfigYaml: model.ConfigYaml.ValueStringPointer(),
+ Hosts: utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags),
+ Id: model.OutputID.ValueStringPointer(),
+ IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
+ IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
+ Name: model.Name.ValueString(),
+ Ssl: ssl,
+ }
+
+ var union kbapi.NewOutputUnion
+ err := union.FromNewOutputLogstash(body)
+ if err != nil {
+ diags.AddError(err.Error(), "")
+ return kbapi.NewOutputUnion{}, diags
+ }
+
+ return union, diags
+}
+
+func (model outputModel) toAPIUpdateLogstashModel(ctx context.Context) (kbapi.UpdateOutputUnion, diag.Diagnostics) {
+ ssl, diags := objectValueToSSLUpdate(ctx, model.Ssl)
+ if diags.HasError() {
+ return kbapi.UpdateOutputUnion{}, diags
+ }
+ body := kbapi.UpdateOutputLogstash{
+ Type: utils.Pointer(kbapi.Logstash),
+ CaSha256: model.CaSha256.ValueStringPointer(),
+ CaTrustedFingerprint: model.CaTrustedFingerprint.ValueStringPointer(),
+ ConfigYaml: model.ConfigYaml.ValueStringPointer(),
+ Hosts: utils.SliceRef(utils.ListTypeToSlice_String(ctx, model.Hosts, path.Root("hosts"), &diags)),
+ IsDefault: model.DefaultIntegrations.ValueBoolPointer(),
+ IsDefaultMonitoring: model.DefaultMonitoring.ValueBoolPointer(),
+ Name: model.Name.ValueStringPointer(),
+ Ssl: ssl,
+ }
+
+ var union kbapi.UpdateOutputUnion
+ err := union.FromUpdateOutputLogstash(body)
+ if err != nil {
+ diags.AddError(err.Error(), "")
+ return kbapi.UpdateOutputUnion{}, diags
+ }
+
+ return union, diags
+}
diff --git a/internal/fleet/output/models_ssl.go b/internal/fleet/output/models_ssl.go
new file mode 100644
index 000000000..e1f05d09a
--- /dev/null
+++ b/internal/fleet/output/models_ssl.go
@@ -0,0 +1,69 @@
+package output
+
+import (
+ "context"
+
+ "github.com/elastic/terraform-provider-elasticstack/generated/kbapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/terraform-plugin-framework/diag"
+ "github.com/hashicorp/terraform-plugin-framework/path"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+)
+
+type outputSslModel struct {
+ CertificateAuthorities types.List `tfsdk:"certificate_authorities"` //> string
+ Certificate types.String `tfsdk:"certificate"`
+ Key types.String `tfsdk:"key"`
+}
+
+func objectValueToSSL(ctx context.Context, obj types.Object) (*kbapi.NewOutputSsl, diag.Diagnostics) {
+ if !utils.IsKnown(obj) {
+ return nil, nil
+ }
+
+ var diags diag.Diagnostics
+ sslModel := utils.ObjectTypeAs[outputSslModel](ctx, obj, path.Root("ssl"), &diags)
+ if diags.HasError() {
+ return nil, diags
+ }
+
+ if sslModel == nil {
+ return nil, diags
+ }
+
+ return &kbapi.NewOutputSsl{
+ Certificate: sslModel.Certificate.ValueStringPointer(),
+ CertificateAuthorities: utils.SliceRef(utils.ListTypeToSlice_String(ctx, sslModel.CertificateAuthorities, path.Root("certificate_authorities"), &diags)),
+ Key: sslModel.Key.ValueStringPointer(),
+ }, diags
+}
+
+func objectValueToSSLUpdate(ctx context.Context, obj types.Object) (*kbapi.UpdateOutputSsl, diag.Diagnostics) {
+ ssl, diags := objectValueToSSL(ctx, obj)
+ if diags.HasError() || ssl == nil {
+ return nil, diags
+ }
+
+ return &kbapi.UpdateOutputSsl{
+ Certificate: ssl.Certificate,
+ CertificateAuthorities: ssl.CertificateAuthorities,
+ Key: ssl.Key,
+ }, diags
+}
+
+func sslToObjectValue(ctx context.Context, ssl *kbapi.OutputSsl) (types.Object, diag.Diagnostics) {
+ if ssl == nil {
+ return types.ObjectNull(getSslAttrTypes()), nil
+ }
+
+ var diags diag.Diagnostics
+ p := path.Root("ssl")
+ sslModel := outputSslModel{
+ CertificateAuthorities: utils.SliceToListType_String(ctx, utils.Deref(ssl.CertificateAuthorities), p.AtName("certificate_authorities"), &diags),
+ Certificate: types.StringPointerValue(ssl.Certificate),
+ Key: types.StringPointerValue(ssl.Key),
+ }
+ obj, diagTemp := types.ObjectValueFrom(ctx, getSslAttrTypes(), sslModel)
+ diags.Append(diagTemp...)
+ return obj, diags
+}
diff --git a/internal/fleet/output/models_ssl_test.go b/internal/fleet/output/models_ssl_test.go
new file mode 100644
index 000000000..19f9b79eb
--- /dev/null
+++ b/internal/fleet/output/models_ssl_test.go
@@ -0,0 +1,154 @@
+package output
+
+import (
+ "context"
+ "testing"
+
+ "github.com/elastic/terraform-provider-elasticstack/generated/kbapi"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/terraform-plugin-framework/attr"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_objectValueToSSL(t *testing.T) {
+ type args struct {
+ obj types.Object
+ }
+ tests := []struct {
+ name string
+ args args
+ want *kbapi.NewOutputSsl
+ wantErr bool
+ }{
+ {
+ name: "returns nil when object is unknown",
+ args: args{
+ obj: types.ObjectUnknown(getSslAttrTypes()),
+ },
+ },
+ {
+ name: "returns an ssl object when populated",
+ args: args{
+ obj: types.ObjectValueMust(
+ getSslAttrTypes(),
+ map[string]attr.Value{
+ "certificate_authorities": types.ListValueMust(types.StringType, []attr.Value{types.StringValue("ca")}),
+ "certificate": types.StringValue("cert"),
+ "key": types.StringValue("key"),
+ },
+ ),
+ },
+ want: &kbapi.NewOutputSsl{
+ Certificate: utils.Pointer("cert"),
+ CertificateAuthorities: &[]string{"ca"},
+ Key: utils.Pointer("key"),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, diags := objectValueToSSL(context.Background(), tt.args.obj)
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("objectValueToSSL() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_objectValueToSSLUpdate(t *testing.T) {
+ type args struct {
+ obj types.Object
+ }
+ tests := []struct {
+ name string
+ args args
+ want *kbapi.UpdateOutputSsl
+ wantErr bool
+ }{
+ {
+ name: "returns nil when object is unknown",
+ args: args{
+ obj: types.ObjectUnknown(getSslAttrTypes()),
+ },
+ },
+ {
+ name: "returns an ssl object when populated",
+ args: args{
+ obj: types.ObjectValueMust(
+ getSslAttrTypes(),
+ map[string]attr.Value{
+ "certificate_authorities": types.ListValueMust(types.StringType, []attr.Value{types.StringValue("ca")}),
+ "certificate": types.StringValue("cert"),
+ "key": types.StringValue("key"),
+ },
+ ),
+ },
+ want: &kbapi.UpdateOutputSsl{
+ Certificate: utils.Pointer("cert"),
+ CertificateAuthorities: &[]string{"ca"},
+ Key: utils.Pointer("key"),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, diags := objectValueToSSLUpdate(context.Background(), tt.args.obj)
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("objectValueToSSLUpdate() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
+
+func Test_sslToObjectValue(t *testing.T) {
+ type args struct {
+ ssl *kbapi.OutputSsl
+ }
+ tests := []struct {
+ name string
+ args args
+ want types.Object
+ wantErr bool
+ }{
+ {
+ name: "returns nil when ssl is nil",
+ args: args{
+ ssl: nil,
+ },
+ want: types.ObjectNull(getSslAttrTypes()),
+ },
+ {
+ name: "returns an object when populated",
+ args: args{
+ ssl: &kbapi.OutputSsl{
+ Certificate: utils.Pointer("cert"),
+ CertificateAuthorities: &[]string{"ca"},
+ Key: utils.Pointer("key"),
+ },
+ },
+ want: types.ObjectValueMust(
+ getSslAttrTypes(),
+ map[string]attr.Value{
+ "certificate_authorities": types.ListValueMust(types.StringType, []attr.Value{types.StringValue("ca")}),
+ "certificate": types.StringValue("cert"),
+ "key": types.StringValue("key"),
+ },
+ ),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, diags := sslToObjectValue(context.Background(), tt.args.ssl)
+ if (diags.HasError()) != tt.wantErr {
+ t.Errorf("sslToObjectValue() error = %v, wantErr %v", diags.HasError(), tt.wantErr)
+ return
+ }
+ assert.Equal(t, tt.want, got)
+ })
+ }
+}
diff --git a/internal/fleet/output/resource.go b/internal/fleet/output/resource.go
index 79918917e..5f6a90ea7 100644
--- a/internal/fleet/output/resource.go
+++ b/internal/fleet/output/resource.go
@@ -2,19 +2,25 @@ package output
import (
"context"
+ "encoding/json"
"fmt"
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
+ "github.com/hashicorp/go-version"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
+ "github.com/hashicorp/terraform-plugin-go/tfprotov6"
)
var (
- _ resource.Resource = &outputResource{}
- _ resource.ResourceWithConfigure = &outputResource{}
- _ resource.ResourceWithImportState = &outputResource{}
+ _ resource.Resource = &outputResource{}
+ _ resource.ResourceWithConfigure = &outputResource{}
+ _ resource.ResourceWithImportState = &outputResource{}
+ _ resource.ResourceWithUpgradeState = &outputResource{}
)
+var MinVersionOutputKafka = version.Must(version.NewVersion("8.13.0"))
+
// NewResource is a helper function to simplify the provider implementation.
func NewResource() resource.Resource {
return &outputResource{}
@@ -37,3 +43,58 @@ func (r *outputResource) Metadata(ctx context.Context, req resource.MetadataRequ
func (r *outputResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) {
resource.ImportStatePassthroughID(ctx, path.Root("output_id"), req, resp)
}
+
+func (r *outputResource) UpgradeState(context.Context) map[int64]resource.StateUpgrader {
+ return map[int64]resource.StateUpgrader{
+ 0: {
+ // Legacy provider versions used a block for the `ssl` attribute which means it was stored as a list.
+ // This upgrader migrates the list into a single object if available within the raw state
+ StateUpgrader: func(ctx context.Context, req resource.UpgradeStateRequest, resp *resource.UpgradeStateResponse) {
+ if req.RawState == nil || req.RawState.JSON == nil {
+ resp.Diagnostics.AddError("Invalid raw state", "Raw state or JSON is nil")
+ return
+ }
+
+ // Default to returning the original state if no changes are needed
+ resp.DynamicValue = &tfprotov6.DynamicValue{
+ JSON: req.RawState.JSON,
+ }
+
+ var stateMap map[string]interface{}
+ err := json.Unmarshal(req.RawState.JSON, &stateMap)
+ if err != nil {
+ resp.Diagnostics.AddError("Failed to unmarshal raw state", err.Error())
+ return
+ }
+
+ sslInterface, ok := stateMap["ssl"]
+ if !ok {
+ return
+ }
+
+ sslList, ok := sslInterface.([]any)
+ if !ok {
+ resp.Diagnostics.AddAttributeError(path.Root("ssl"),
+ "Unexpected type for legacy ssl attribute",
+ fmt.Sprintf("Expected []any, got %T", sslInterface),
+ )
+ return
+ }
+
+ if len(sslList) > 0 {
+ stateMap["ssl"] = sslList[0]
+ } else {
+ delete(stateMap, "ssl")
+ }
+
+ stateJSON, err := json.Marshal(stateMap)
+ if err != nil {
+ resp.Diagnostics.AddError("Failed to marshal raw state", err.Error())
+ return
+ }
+
+ resp.DynamicValue.JSON = stateJSON
+ },
+ },
+ }
+}
diff --git a/internal/fleet/output/resource_test.go b/internal/fleet/output/resource_test.go
index 47d324567..aa525f0c2 100644
--- a/internal/fleet/output/resource_test.go
+++ b/internal/fleet/output/resource_test.go
@@ -1,330 +1,171 @@
-package output_test
+package output
import (
"context"
- "fmt"
+ "encoding/json"
"testing"
- "github.com/elastic/terraform-provider-elasticstack/internal/acctest"
- "github.com/elastic/terraform-provider-elasticstack/internal/clients"
- "github.com/elastic/terraform-provider-elasticstack/internal/clients/fleet"
- "github.com/elastic/terraform-provider-elasticstack/internal/utils"
- "github.com/elastic/terraform-provider-elasticstack/internal/versionutils"
- "github.com/hashicorp/go-version"
- sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
- "github.com/hashicorp/terraform-plugin-testing/helper/resource"
- "github.com/hashicorp/terraform-plugin-testing/terraform"
+ "github.com/hashicorp/terraform-plugin-framework/resource"
+ "github.com/hashicorp/terraform-plugin-go/tfprotov6"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
-var minVersionOutput = version.Must(version.NewVersion("8.6.0"))
-
-func TestAccResourceOutputElasticsearchFromSDK(t *testing.T) {
- policyName := sdkacctest.RandString(22)
-
- resource.Test(t, resource.TestCase{
- PreCheck: func() { acctest.PreCheck(t) },
- CheckDestroy: checkResourceOutputDestroy,
- Steps: []resource.TestStep{
- {
- ExternalProviders: map[string]resource.ExternalProvider{
- "elasticstack": {
- Source: "elastic/elasticstack",
- VersionConstraint: "0.11.7",
+func TestOutputResourceUpgradeState(t *testing.T) {
+ t.Parallel()
+
+ tests := []struct {
+ name string
+ rawState map[string]interface{}
+ expectedState map[string]interface{}
+ expectError bool
+ errorContains string
+ }{
+ {
+ name: "successful upgrade - ssl list to object",
+ rawState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "ssl": []interface{}{
+ map[string]interface{}{
+ "certificate": "cert-content",
+ "key": "key-content",
+ "certificate_authorities": []interface{}{"ca1", "ca2"},
},
},
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputCreateElasticsearch(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Elasticsearch Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
- ),
+ "hosts": []interface{}{"https://localhost:9200"},
},
- {
- ProtoV6ProviderFactories: acctest.Providers,
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputCreateElasticsearch(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Elasticsearch Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
- ),
+ expectedState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "ssl": map[string]interface{}{
+ "certificate": "cert-content",
+ "key": "key-content",
+ "certificate_authorities": []interface{}{"ca1", "ca2"},
+ },
+ "hosts": []interface{}{"https://localhost:9200"},
},
+ expectError: false,
},
- })
-}
-
-func TestAccResourceOutputElasticsearch(t *testing.T) {
- policyName := sdkacctest.RandString(22)
-
- resource.Test(t, resource.TestCase{
- PreCheck: func() { acctest.PreCheck(t) },
- CheckDestroy: checkResourceOutputDestroy,
- ProtoV6ProviderFactories: acctest.Providers,
- Steps: []resource.TestStep{
- {
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputCreateElasticsearch(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Elasticsearch Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
- ),
+ {
+ name: "no ssl field - no changes",
+ rawState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "hosts": []interface{}{"https://localhost:9200"},
},
- {
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputUpdateElasticsearch(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Updated Elasticsearch Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-elasticsearch-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "elasticsearch"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "https://elasticsearch:9200"),
- ),
- },
- {
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputUpdateElasticsearch(policyName),
- ResourceName: "elasticstack_fleet_output.test_output",
- ImportState: true,
- ImportStateVerify: true,
+ expectedState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "hosts": []interface{}{"https://localhost:9200"},
},
+ expectError: false,
},
- })
-}
-
-func TestAccResourceOutputLogstashFromSDK(t *testing.T) {
- policyName := sdkacctest.RandString(22)
-
- resource.Test(t, resource.TestCase{
- PreCheck: func() { acctest.PreCheck(t) },
- CheckDestroy: checkResourceOutputDestroy,
- Steps: []resource.TestStep{
- {
- ExternalProviders: map[string]resource.ExternalProvider{
- "elasticstack": {
- Source: "elastic/elasticstack",
- VersionConstraint: "0.11.7",
- },
- },
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputCreateLogstash(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Logstash Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate_authorities.0", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.key", "placeholder"),
- ),
+ {
+ name: "empty ssl list - removes ssl field",
+ rawState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "ssl": []interface{}{},
+ "hosts": []interface{}{"https://localhost:9200"},
},
- {
- ProtoV6ProviderFactories: acctest.Providers,
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputCreateLogstash(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Logstash Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate_authorities.0", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.key", "placeholder"),
- ),
+ expectedState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "hosts": []interface{}{"https://localhost:9200"},
},
+ expectError: false,
},
- })
-}
-
-func TestAccResourceOutputLogstash(t *testing.T) {
- policyName := sdkacctest.RandString(22)
-
- resource.Test(t, resource.TestCase{
- PreCheck: func() { acctest.PreCheck(t) },
- CheckDestroy: checkResourceOutputDestroy,
- ProtoV6ProviderFactories: acctest.Providers,
- Steps: []resource.TestStep{
- {
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputCreateLogstash(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Logstash Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate_authorities.0", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.key", "placeholder"),
- ),
+ {
+ name: "ssl not an array - returns error",
+ rawState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "ssl": "invalid-type",
+ "hosts": []interface{}{"https://localhost:9200"},
+ },
+ expectedState: nil,
+ expectError: true,
+ errorContains: "Unexpected type for legacy ssl attribute",
+ },
+ {
+ name: "multiple ssl items - takes first item",
+ rawState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "ssl": []interface{}{
+ map[string]interface{}{"certificate": "cert1"},
+ map[string]interface{}{"certificate": "cert2"},
+ },
+ "hosts": []interface{}{"https://localhost:9200"},
},
- {
- SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionOutput),
- Config: testAccResourceOutputUpdateLogstash(policyName),
- Check: resource.ComposeTestCheckFunc(
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "name", fmt.Sprintf("Updated Logstash Output %s", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "id", fmt.Sprintf("%s-logstash-output", policyName)),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "type", "logstash"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "config_yaml", "\"ssl.verification_mode\": \"none\"\n"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_integrations", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "default_monitoring", "false"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "hosts.0", "logstash:5044"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate_authorities.0", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.certificate", "placeholder"),
- resource.TestCheckResourceAttr("elasticstack_fleet_output.test_output", "ssl.0.key", "placeholder"),
- ),
+ expectedState: map[string]interface{}{
+ "id": "test-output",
+ "name": "Test Output",
+ "type": "elasticsearch",
+ "ssl": map[string]interface{}{"certificate": "cert1"},
+ "hosts": []interface{}{"https://localhost:9200"},
},
+ expectError: false,
},
- })
-}
-
-func testAccResourceOutputCreateElasticsearch(id string) string {
- return fmt.Sprintf(`
-provider "elasticstack" {
- elasticsearch {}
- kibana {}
-}
-
-resource "elasticstack_fleet_output" "test_output" {
- name = "Elasticsearch Output %s"
- output_id = "%s-elasticsearch-output"
- type = "elasticsearch"
- config_yaml = yamlencode({
- "ssl.verification_mode" : "none"
- })
- default_integrations = false
- default_monitoring = false
- hosts = [
- "https://elasticsearch:9200"
- ]
-}
-`, id, id)
-}
-
-func testAccResourceOutputUpdateElasticsearch(id string) string {
- return fmt.Sprintf(`
-provider "elasticstack" {
- elasticsearch {}
- kibana {}
-}
-
-resource "elasticstack_fleet_output" "test_output" {
- name = "Updated Elasticsearch Output %s"
- output_id = "%s-elasticsearch-output"
- type = "elasticsearch"
- config_yaml = yamlencode({
- "ssl.verification_mode" : "none"
- })
- default_integrations = false
- default_monitoring = false
- hosts = [
- "https://elasticsearch:9200"
- ]
-}
-`, id, id)
-}
-
-func testAccResourceOutputCreateLogstash(id string) string {
- return fmt.Sprintf(`
-provider "elasticstack" {
- elasticsearch {}
- kibana {}
-}
-
-resource "elasticstack_fleet_output" "test_output" {
- name = "Logstash Output %s"
- type = "logstash"
- output_id = "%s-logstash-output"
- config_yaml = yamlencode({
- "ssl.verification_mode" : "none"
- })
- default_integrations = false
- default_monitoring = false
- hosts = [
- "logstash:5044"
- ]
- ssl {
- certificate_authorities = ["placeholder"]
- certificate = "placeholder"
- key = "placeholder"
- }
-}
-`, id, id)
-}
-
-func testAccResourceOutputUpdateLogstash(id string) string {
- return fmt.Sprintf(`
-provider "elasticstack" {
- elasticsearch {}
- kibana {}
-}
-
-resource "elasticstack_fleet_output" "test_output" {
- name = "Updated Logstash Output %s"
- output_id = "%s-logstash-output"
- type = "logstash"
- config_yaml = yamlencode({
- "ssl.verification_mode" : "none"
- })
- default_integrations = false
- default_monitoring = false
- hosts = [
- "logstash:5044"
- ]
- ssl {
- certificate_authorities = ["placeholder"]
- certificate = "placeholder"
- key = "placeholder"
- }
-}
-`, id, id)
-}
-
-func checkResourceOutputDestroy(s *terraform.State) error {
- client, err := clients.NewAcceptanceTestingClient()
- if err != nil {
- return err
}
- for _, rs := range s.RootModule().Resources {
- if rs.Type != "elasticstack_fleet_output" {
- continue
- }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
- fleetClient, err := client.GetFleetClient()
- if err != nil {
- return err
- }
- output, diags := fleet.GetOutput(context.Background(), fleetClient, rs.Primary.ID)
- if diags.HasError() {
- return utils.FwDiagsAsError(diags)
- }
- if output != nil {
- return fmt.Errorf("output id=%v still exists, but it should have been removed", rs.Primary.ID)
- }
+ // Marshal the raw state to JSON
+ rawStateJSON, err := json.Marshal(tt.rawState)
+ require.NoError(t, err)
+
+ // Create the upgrade request
+ req := resource.UpgradeStateRequest{
+ RawState: &tfprotov6.RawState{
+ JSON: rawStateJSON,
+ },
+ }
+
+ // Create a response
+ resp := &resource.UpgradeStateResponse{}
+
+ // Create the resource and call UpgradeState
+ r := &outputResource{}
+ upgraders := r.UpgradeState(context.Background())
+ upgrader := upgraders[0]
+ upgrader.StateUpgrader(context.Background(), req, resp)
+
+ if tt.expectError {
+ require.True(t, resp.Diagnostics.HasError(), "Expected error but got none")
+ if tt.errorContains != "" {
+ errorSummary := ""
+ for _, diag := range resp.Diagnostics.Errors() {
+ errorSummary += diag.Summary() + " " + diag.Detail()
+ }
+ assert.Contains(t, errorSummary, tt.errorContains)
+ }
+ return
+ }
+
+ // Check no errors occurred
+ require.False(t, resp.Diagnostics.HasError(), "Unexpected error: %v", resp.Diagnostics.Errors())
+
+ // Check that a DynamicValue is always returned
+ require.NotNil(t, resp.DynamicValue, "DynamicValue should always be returned")
+
+ // Unmarshal the upgraded state to compare
+ var actualState map[string]interface{}
+ err = json.Unmarshal(resp.DynamicValue.JSON, &actualState)
+ require.NoError(t, err)
+
+ assert.Equal(t, tt.expectedState, actualState)
+ })
}
- return nil
}
diff --git a/internal/fleet/output/schema.go b/internal/fleet/output/schema.go
index 74919729c..590374c21 100644
--- a/internal/fleet/output/schema.go
+++ b/internal/fleet/output/schema.go
@@ -3,12 +3,18 @@ package output
import (
"context"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils/validators"
+ "github.com/hashicorp/terraform-plugin-framework-validators/int64validator"
"github.com/hashicorp/terraform-plugin-framework-validators/listvalidator"
"github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator"
"github.com/hashicorp/terraform-plugin-framework/attr"
+ "github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/booldefault"
+ "github.com/hashicorp/terraform-plugin-framework/resource/schema/float32planmodifier"
+ "github.com/hashicorp/terraform-plugin-framework/resource/schema/int64planmodifier"
+ "github.com/hashicorp/terraform-plugin-framework/resource/schema/listplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/schema/validator"
@@ -21,6 +27,7 @@ func (r *outputResource) Schema(ctx context.Context, req resource.SchemaRequest,
func getSchema() schema.Schema {
return schema.Schema{
+ Version: 1,
Description: "Creates a new Fleet Output.",
Attributes: map[string]schema.Attribute{
"id": schema.StringAttribute{
@@ -47,7 +54,7 @@ func getSchema() schema.Schema {
Description: "The output type.",
Required: true,
Validators: []validator.String{
- stringvalidator.OneOf("elasticsearch", "logstash"),
+ stringvalidator.OneOf("elasticsearch", "logstash", "kafka"),
},
},
"hosts": schema.ListAttribute{
@@ -83,28 +90,197 @@ func getSchema() schema.Schema {
Optional: true,
Sensitive: true,
},
- },
- Blocks: map[string]schema.Block{
- "ssl": schema.ListNestedBlock{
+ "ssl": schema.SingleNestedAttribute{
Description: "SSL configuration.",
- Validators: []validator.List{
- listvalidator.SizeAtMost(1),
+ Optional: true,
+ Attributes: map[string]schema.Attribute{
+ "certificate_authorities": schema.ListAttribute{
+ Description: "Server SSL certificate authorities.",
+ Optional: true,
+ ElementType: types.StringType,
+ },
+ "certificate": schema.StringAttribute{
+ Description: "Client SSL certificate.",
+ Required: true,
+ },
+ "key": schema.StringAttribute{
+ Description: "Client SSL certificate key.",
+ Required: true,
+ Sensitive: true,
+ },
},
- NestedObject: schema.NestedBlockObject{
- Attributes: map[string]schema.Attribute{
- "certificate_authorities": schema.ListAttribute{
- Description: "Server SSL certificate authorities.",
- Optional: true,
- ElementType: types.StringType,
+ },
+ "kafka": schema.SingleNestedAttribute{
+ Description: "Kafka-specific configuration.",
+ Optional: true,
+ Attributes: map[string]schema.Attribute{
+ "auth_type": schema.StringAttribute{
+ Description: "Authentication type for Kafka output.",
+ Optional: true,
+ Validators: []validator.String{
+ stringvalidator.OneOf("none", "user_pass", "ssl", "kerberos"),
+ },
+ },
+ "broker_timeout": schema.Float32Attribute{
+ Description: "Kafka broker timeout.",
+ Optional: true,
+ Computed: true,
+ PlanModifiers: []planmodifier.Float32{
+ float32planmodifier.UseStateForUnknown(),
+ },
+ },
+ "client_id": schema.StringAttribute{
+ Description: "Kafka client ID.",
+ Optional: true,
+ Computed: true,
+ PlanModifiers: []planmodifier.String{
+ stringplanmodifier.UseStateForUnknown(),
+ },
+ },
+ "compression": schema.StringAttribute{
+ Description: "Compression type for Kafka output.",
+ Optional: true,
+ Validators: []validator.String{
+ stringvalidator.OneOf("gzip", "snappy", "lz4", "none"),
+ },
+ },
+ "compression_level": schema.Int64Attribute{
+ Description: "Compression level for Kafka output.",
+ Optional: true,
+ Computed: true,
+ PlanModifiers: []planmodifier.Int64{
+ int64planmodifier.UseStateForUnknown(),
+ },
+ Validators: []validator.Int64{
+ validators.Int64ConditionalRequirement(
+ path.Root("kafka").AtName("compression"),
+ []string{"gzip"},
+ ),
+ },
+ },
+ "connection_type": schema.StringAttribute{
+ Description: "Connection type for Kafka output.",
+ Optional: true,
+ Validators: []validator.String{
+ stringvalidator.OneOf("plaintext", "encryption"),
+ validators.StringConditionalRequirementSingle(
+ path.Root("kafka").AtName("auth_type"),
+ "none",
+ ),
+ },
+ },
+ "topic": schema.StringAttribute{
+ Description: "Kafka topic.",
+ Optional: true,
+ },
+ "partition": schema.StringAttribute{
+ Description: "Partition strategy for Kafka output.",
+ Optional: true,
+ Validators: []validator.String{
+ stringvalidator.OneOf("random", "round_robin", "hash"),
+ },
+ },
+ "required_acks": schema.Int64Attribute{
+ Description: "Number of acknowledgments required for Kafka output.",
+ Optional: true,
+ Validators: []validator.Int64{
+ int64validator.OneOf(-1, 0, 1),
+ },
+ },
+ "timeout": schema.Float32Attribute{
+ Description: "Timeout for Kafka output.",
+ Optional: true,
+ Computed: true,
+ PlanModifiers: []planmodifier.Float32{
+ float32planmodifier.UseStateForUnknown(),
},
- "certificate": schema.StringAttribute{
- Description: "Client SSL certificate.",
- Required: true,
+ },
+ "version": schema.StringAttribute{
+ Description: "Kafka version.",
+ Optional: true,
+ Computed: true,
+ PlanModifiers: []planmodifier.String{
+ stringplanmodifier.UseStateForUnknown(),
+ },
+ },
+ "username": schema.StringAttribute{
+ Description: "Username for Kafka authentication.",
+ Optional: true,
+ },
+ "password": schema.StringAttribute{
+ Description: "Password for Kafka authentication.",
+ Optional: true,
+ Sensitive: true,
+ },
+ "key": schema.StringAttribute{
+ Description: "Key field for Kafka messages.",
+ Optional: true,
+ },
+ "headers": schema.ListNestedAttribute{
+ Description: "Headers for Kafka messages.",
+ Optional: true,
+ Computed: true,
+ PlanModifiers: []planmodifier.List{
+ listplanmodifier.UseStateForUnknown(),
+ },
+ NestedObject: schema.NestedAttributeObject{
+ Attributes: map[string]schema.Attribute{
+ "key": schema.StringAttribute{
+ Description: "Header key.",
+ Required: true,
+ },
+ "value": schema.StringAttribute{
+ Description: "Header value.",
+ Required: true,
+ },
+ },
+ },
+ },
+ "hash": schema.SingleNestedAttribute{
+ Description: "Hash configuration for Kafka partition.",
+ Optional: true,
+ Attributes: map[string]schema.Attribute{
+ "hash": schema.StringAttribute{
+ Description: "Hash field.",
+ Optional: true,
+ },
+ "random": schema.BoolAttribute{
+ Description: "Use random hash.",
+ Optional: true,
+ },
+ },
+ },
+ "random": schema.SingleNestedAttribute{
+ Description: "Random configuration for Kafka partition.",
+ Optional: true,
+ Attributes: map[string]schema.Attribute{
+ "group_events": schema.Float64Attribute{
+ Description: "Number of events to group.",
+ Optional: true,
+ },
+ },
+ },
+ "round_robin": schema.SingleNestedAttribute{
+ Description: "Round robin configuration for Kafka partition.",
+ Optional: true,
+ Attributes: map[string]schema.Attribute{
+ "group_events": schema.Float64Attribute{
+ Description: "Number of events to group.",
+ Optional: true,
+ },
},
- "key": schema.StringAttribute{
- Description: "Client SSL certificate key.",
- Required: true,
- Sensitive: true,
+ },
+ "sasl": schema.SingleNestedAttribute{
+ Description: "SASL configuration for Kafka authentication.",
+ Optional: true,
+ Attributes: map[string]schema.Attribute{
+ "mechanism": schema.StringAttribute{
+ Description: "SASL mechanism.",
+ Optional: true,
+ Validators: []validator.String{
+ stringvalidator.OneOf("PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"),
+ },
+ },
},
},
},
@@ -113,6 +289,30 @@ func getSchema() schema.Schema {
}
}
-func getSslAttrTypes() attr.Type {
- return getSchema().Blocks["ssl"].Type().(attr.TypeWithElementType).ElementType()
+func getSslAttrTypes() map[string]attr.Type {
+ return getSchema().Attributes["ssl"].GetType().(attr.TypeWithAttributeTypes).AttributeTypes()
+}
+
+func getHeadersAttrTypes() attr.Type {
+ return getSchema().Attributes["kafka"].(schema.SingleNestedAttribute).Attributes["headers"].GetType().(attr.TypeWithElementType).ElementType()
+}
+
+func getHashAttrTypes() map[string]attr.Type {
+ return getSchema().Attributes["kafka"].(schema.SingleNestedAttribute).Attributes["hash"].GetType().(attr.TypeWithAttributeTypes).AttributeTypes()
+}
+
+func getRandomAttrTypes() map[string]attr.Type {
+ return getSchema().Attributes["kafka"].(schema.SingleNestedAttribute).Attributes["random"].GetType().(attr.TypeWithAttributeTypes).AttributeTypes()
+}
+
+func getRoundRobinAttrTypes() map[string]attr.Type {
+ return getSchema().Attributes["kafka"].(schema.SingleNestedAttribute).Attributes["round_robin"].GetType().(attr.TypeWithAttributeTypes).AttributeTypes()
+}
+
+func getSaslAttrTypes() map[string]attr.Type {
+ return getSchema().Attributes["kafka"].(schema.SingleNestedAttribute).Attributes["sasl"].GetType().(attr.TypeWithAttributeTypes).AttributeTypes()
+}
+
+func getKafkaAttrTypes() map[string]attr.Type {
+ return getSchema().Attributes["kafka"].(schema.SingleNestedAttribute).GetType().(attr.TypeWithAttributeTypes).AttributeTypes()
}
diff --git a/internal/fleet/output/update.go b/internal/fleet/output/update.go
index ca95177a9..46688d5b9 100644
--- a/internal/fleet/output/update.go
+++ b/internal/fleet/output/update.go
@@ -22,7 +22,7 @@ func (r *outputResource) Update(ctx context.Context, req resource.UpdateRequest,
return
}
- body, diags := planModel.toAPIUpdateModel(ctx)
+ body, diags := planModel.toAPIUpdateModel(ctx, r.client)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
diff --git a/internal/utils/validators/conditional.go b/internal/utils/validators/conditional.go
new file mode 100644
index 000000000..a13d400a5
--- /dev/null
+++ b/internal/utils/validators/conditional.go
@@ -0,0 +1,144 @@
+package validators
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/hashicorp/terraform-plugin-framework/attr"
+ "github.com/hashicorp/terraform-plugin-framework/diag"
+ "github.com/hashicorp/terraform-plugin-framework/path"
+ "github.com/hashicorp/terraform-plugin-framework/schema/validator"
+ "github.com/hashicorp/terraform-plugin-framework/tfsdk"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+)
+
+// conditionalRequirement represents a validator which ensures that an attribute
+// can only be set if another attribute at a specified path equals one of the specified values.
+// This is a shared implementation that can be used for both string and float64 validators.
+type conditionalRequirement struct {
+ dependentPath path.Path
+ allowedValues []string
+}
+
+// Description describes the validation in plain text formatting.
+func (v conditionalRequirement) Description(_ context.Context) string {
+ if len(v.allowedValues) == 1 {
+ return fmt.Sprintf("value can only be set when %s equals %q", v.dependentPath, v.allowedValues[0])
+ }
+ return fmt.Sprintf("value can only be set when %s is one of %v", v.dependentPath, v.allowedValues)
+}
+
+// MarkdownDescription describes the validation in Markdown formatting.
+func (v conditionalRequirement) MarkdownDescription(ctx context.Context) string {
+ return v.Description(ctx)
+}
+
+func (v conditionalRequirement) validate(ctx context.Context, config tfsdk.Config, val attr.Value, p path.Path) diag.Diagnostics {
+ if val.IsNull() || val.IsUnknown() {
+ return nil
+ }
+
+ // Get the value at the dependent path
+ var dependentValue types.String
+ diags := config.GetAttribute(ctx, v.dependentPath, &dependentValue)
+ if diags.HasError() {
+ return diags
+ }
+
+ // If dependent value is null, unknown, or doesn't match any allowed values,
+ // then the current attribute should not be set
+ dependentValueStr := dependentValue.ValueString()
+ isAllowed := false
+
+ if !dependentValue.IsNull() && !dependentValue.IsUnknown() {
+ for _, allowedValue := range v.allowedValues {
+ if dependentValueStr == allowedValue {
+ isAllowed = true
+ break
+ }
+ }
+ }
+
+ if !isAllowed {
+ if len(v.allowedValues) == 1 {
+ diags.AddAttributeError(p, "Invalid Configuration",
+ fmt.Sprintf("Attribute %s can only be set when %s equals %q, but %s is %q",
+ p,
+ v.dependentPath,
+ v.allowedValues[0],
+ v.dependentPath,
+ dependentValueStr,
+ ),
+ )
+ return diags
+ } else {
+ diags.AddAttributeError(p, "Invalid Configuration",
+ fmt.Sprintf("Attribute %s can only be set when %s is one of %v, but %s is %q",
+ p,
+ v.dependentPath,
+ v.allowedValues,
+ v.dependentPath,
+ dependentValueStr,
+ ),
+ )
+ return diags
+ }
+ }
+
+ return nil
+}
+
+// validateConditionalRequirement was an attempt at shared logic but is not used
+// The validation logic is implemented directly in ValidateString and ValidateFloat64 methods
+
+// ValidateString performs the validation for string attributes.
+func (v conditionalRequirement) ValidateString(ctx context.Context, request validator.StringRequest, response *validator.StringResponse) {
+ response.Diagnostics.Append(v.validate(ctx, request.Config, request.ConfigValue, request.Path)...)
+}
+
+// ValidateInt64 performs the validation for int64 attributes.
+func (v conditionalRequirement) ValidateInt64(ctx context.Context, request validator.Int64Request, response *validator.Int64Response) {
+ response.Diagnostics.Append(v.validate(ctx, request.Config, request.ConfigValue, request.Path)...)
+}
+
+// StringConditionalRequirement returns a validator which ensures that a string attribute
+// can only be set if another attribute at the specified path equals one of the specified values.
+//
+// The dependentPath parameter should use path.Root() to specify the attribute path.
+// For example: path.Root("auth_type")
+//
+// Example usage:
+//
+// "connection_type": schema.StringAttribute{
+// Optional: true,
+// Validators: []validator.String{
+// validators.StringConditionalRequirement(
+// path.Root("auth_type"),
+// []string{"none"},
+// "connection_type can only be set when auth_type is 'none'",
+// ),
+// },
+// },
+func StringConditionalRequirement(dependentPath path.Path, allowedValues []string) validator.String {
+ return conditionalRequirement{
+ dependentPath: dependentPath,
+ allowedValues: allowedValues,
+ }
+}
+
+// StringConditionalRequirementSingle is a convenience function for when there's only one allowed value.
+func StringConditionalRequirementSingle(dependentPath path.Path, requiredValue string) validator.String {
+ return StringConditionalRequirement(dependentPath, []string{requiredValue})
+}
+
+func Int64ConditionalRequirement(dependentPath path.Path, allowedValues []string) validator.Int64 {
+ return conditionalRequirement{
+ dependentPath: dependentPath,
+ allowedValues: allowedValues,
+ }
+}
+
+// Int64ConditionalRequirementSingle is a convenience function for when there's only one allowed value.
+func Int64ConditionalRequirementSingle(dependentPath path.Path, requiredValue string) validator.Int64 {
+ return Int64ConditionalRequirement(dependentPath, []string{requiredValue})
+}
diff --git a/internal/utils/validators/conditional_test.go b/internal/utils/validators/conditional_test.go
new file mode 100644
index 000000000..faca2f8e5
--- /dev/null
+++ b/internal/utils/validators/conditional_test.go
@@ -0,0 +1,281 @@
+package validators
+
+import (
+ "context"
+ "testing"
+
+ "github.com/hashicorp/terraform-plugin-framework/path"
+ "github.com/hashicorp/terraform-plugin-framework/resource/schema"
+ "github.com/hashicorp/terraform-plugin-framework/schema/validator"
+ "github.com/hashicorp/terraform-plugin-framework/tfsdk"
+ "github.com/hashicorp/terraform-plugin-framework/types"
+ "github.com/hashicorp/terraform-plugin-go/tftypes"
+)
+
+func TestStringConditionalRequirement(t *testing.T) {
+ t.Parallel()
+
+ type testCase struct {
+ name string
+ currentValue types.String
+ dependentValue types.String
+ expectedError bool
+ }
+
+ testCases := []testCase{
+ {
+ name: "valid - current null, dependent any value",
+ currentValue: types.StringNull(),
+ dependentValue: types.StringValue("user_pass"),
+ expectedError: false,
+ },
+ {
+ name: "valid - current unknown, dependent any value",
+ currentValue: types.StringUnknown(),
+ dependentValue: types.StringValue("user_pass"),
+ expectedError: false,
+ },
+ {
+ name: "valid - current set, dependent matches required value",
+ currentValue: types.StringValue("plaintext"),
+ dependentValue: types.StringValue("none"),
+ expectedError: false,
+ },
+ {
+ name: "invalid - current set, dependent doesn't match required value",
+ currentValue: types.StringValue("plaintext"),
+ dependentValue: types.StringValue("user_pass"),
+ expectedError: true,
+ },
+ {
+ name: "invalid - current set, dependent is null",
+ currentValue: types.StringValue("plaintext"),
+ dependentValue: types.StringNull(),
+ expectedError: true,
+ },
+ {
+ name: "invalid - current set, dependent is unknown",
+ currentValue: types.StringValue("plaintext"),
+ dependentValue: types.StringUnknown(),
+ expectedError: true,
+ },
+ }
+
+ for _, testCase := range testCases {
+ testCase := testCase
+ t.Run(testCase.name, func(t *testing.T) {
+ t.Parallel()
+
+ // Create a simple schema for testing
+ testSchema := schema.Schema{
+ Attributes: map[string]schema.Attribute{
+ "connection_type": schema.StringAttribute{
+ Optional: true,
+ },
+ "auth_type": schema.StringAttribute{
+ Optional: true,
+ },
+ },
+ }
+
+ // Create raw config values
+ currentTfValue, err := testCase.currentValue.ToTerraformValue(context.Background())
+ if err != nil {
+ t.Fatalf("Error converting current value: %v", err)
+ }
+ dependentTfValue, err := testCase.dependentValue.ToTerraformValue(context.Background())
+ if err != nil {
+ t.Fatalf("Error converting dependent value: %v", err)
+ }
+
+ rawConfigValues := map[string]tftypes.Value{
+ "connection_type": currentTfValue,
+ "auth_type": dependentTfValue,
+ }
+
+ rawConfig := tftypes.NewValue(
+ tftypes.Object{
+ AttributeTypes: map[string]tftypes.Type{
+ "connection_type": tftypes.String,
+ "auth_type": tftypes.String,
+ },
+ },
+ rawConfigValues,
+ )
+
+ config := tfsdk.Config{
+ Raw: rawConfig,
+ Schema: testSchema,
+ }
+
+ // Create validator
+ v := StringConditionalRequirement(
+ path.Root("auth_type"),
+ []string{"none"},
+ )
+
+ // Create validation request
+ request := validator.StringRequest{
+ Path: path.Root("connection_type"),
+ ConfigValue: testCase.currentValue,
+ Config: config,
+ }
+
+ // Run validation
+ response := &validator.StringResponse{}
+ v.ValidateString(context.Background(), request, response)
+
+ // Check result
+ if testCase.expectedError {
+ if !response.Diagnostics.HasError() {
+ t.Errorf("Expected validation error but got none")
+ }
+ } else {
+ if response.Diagnostics.HasError() {
+ t.Errorf("Expected no validation error but got: %v", response.Diagnostics.Errors())
+ }
+ }
+ })
+ }
+}
+
+func TestStringConditionalRequirement_Description(t *testing.T) {
+ v := StringConditionalRequirement(
+ path.Root("auth_type"),
+ []string{"none"},
+ )
+
+ description := v.Description(context.Background())
+ expected := "value can only be set when auth_type equals \"none\""
+
+ if description != expected {
+ t.Errorf("Expected description %q, got %q", expected, description)
+ }
+}
+
+func TestInt64ConditionalRequirement(t *testing.T) {
+ t.Parallel()
+
+ type testCase struct {
+ name string
+ currentValue types.Int64
+ dependentValue types.String
+ expectedError bool
+ }
+
+ testCases := []testCase{
+ {
+ name: "valid - current null, dependent any value",
+ currentValue: types.Int64Null(),
+ dependentValue: types.StringValue("none"),
+ expectedError: false,
+ },
+ {
+ name: "valid - current unknown, dependent any value",
+ currentValue: types.Int64Unknown(),
+ dependentValue: types.StringValue("none"),
+ expectedError: false,
+ },
+ {
+ name: "valid - current set, dependent matches required value",
+ currentValue: types.Int64Value(6),
+ dependentValue: types.StringValue("gzip"),
+ expectedError: false,
+ },
+ {
+ name: "invalid - current set, dependent doesn't match required value",
+ currentValue: types.Int64Value(6),
+ dependentValue: types.StringValue("none"),
+ expectedError: true,
+ },
+ {
+ name: "invalid - current set, dependent is null",
+ currentValue: types.Int64Value(6),
+ dependentValue: types.StringNull(),
+ expectedError: true,
+ },
+ {
+ name: "invalid - current set, dependent is unknown",
+ currentValue: types.Int64Value(6),
+ dependentValue: types.StringUnknown(),
+ expectedError: true,
+ },
+ }
+
+ for _, testCase := range testCases {
+ testCase := testCase
+ t.Run(testCase.name, func(t *testing.T) {
+ t.Parallel()
+
+ // Create a simple schema for testing
+ testSchema := schema.Schema{
+ Attributes: map[string]schema.Attribute{
+ "compression_level": schema.Float64Attribute{
+ Optional: true,
+ },
+ "compression": schema.StringAttribute{
+ Optional: true,
+ },
+ },
+ }
+
+ // Create raw config values
+ currentTfValue, err := testCase.currentValue.ToTerraformValue(context.Background())
+ if err != nil {
+ t.Fatalf("Error converting current value: %v", err)
+ }
+ dependentTfValue, err := testCase.dependentValue.ToTerraformValue(context.Background())
+ if err != nil {
+ t.Fatalf("Error converting dependent value: %v", err)
+ }
+
+ rawConfigValues := map[string]tftypes.Value{
+ "compression_level": currentTfValue,
+ "compression": dependentTfValue,
+ }
+
+ rawConfig := tftypes.NewValue(
+ tftypes.Object{
+ AttributeTypes: map[string]tftypes.Type{
+ "compression_level": tftypes.Number,
+ "compression": tftypes.String,
+ },
+ },
+ rawConfigValues,
+ )
+
+ config := tfsdk.Config{
+ Raw: rawConfig,
+ Schema: testSchema,
+ }
+
+ // Create validator
+ v := Int64ConditionalRequirement(
+ path.Root("compression"),
+ []string{"gzip"},
+ )
+
+ // Create validation request
+ request := validator.Int64Request{
+ Path: path.Root("compression_level"),
+ ConfigValue: testCase.currentValue,
+ Config: config,
+ }
+
+ // Run validation
+ response := &validator.Int64Response{}
+ v.ValidateInt64(context.Background(), request, response)
+
+ // Check result
+ if testCase.expectedError {
+ if !response.Diagnostics.HasError() {
+ t.Errorf("Expected validation error but got none")
+ }
+ } else {
+ if response.Diagnostics.HasError() {
+ t.Errorf("Expected no validation error but got: %v", response.Diagnostics.Errors())
+ }
+ }
+ })
+ }
+}
diff --git a/templates/resources/fleet_output.md.tmpl b/templates/resources/fleet_output.md.tmpl
new file mode 100644
index 000000000..7abe5e204
--- /dev/null
+++ b/templates/resources/fleet_output.md.tmpl
@@ -0,0 +1,53 @@
+---
+# generated by https://github.com/hashicorp/terraform-plugin-docs
+page_title: "{{.Name}} {{.Type}} - {{.RenderedProviderName}}"
+subcategory: "Fleet"
+description: |-
+{{ .Description | plainmarkdown | trimspace | prefixlines " " }}
+---
+
+# {{.Name}} ({{.Type}})
+
+{{ .Description | trimspace }}
+
+## Example Usage
+
+### Basic output
+
+{{ tffile "examples/resources/elasticstack_fleet_output/resource.tf" }}
+
+### Basic Kafka output
+
+{{ tffile "examples/resources/elasticstack_fleet_output/kafka_basic.tf" }}
+
+### Advanced Kafka output
+
+{{ tffile "examples/resources/elasticstack_fleet_output/kafka_advanced.tf" }}
+
+{{ .SchemaMarkdown | trimspace }}
+{{- if or .HasImport .HasImportIDConfig .HasImportIdentityConfig }}
+
+## Import
+
+Import is supported using the following syntax:
+{{- end }}
+{{- if .HasImportIdentityConfig }}
+
+In Terraform v1.12.0 and later, the [`import` block](https://developer.hashicorp.com/terraform/language/import) can be used with the `identity` attribute, for example:
+
+{{tffile .ImportIdentityConfigFile }}
+
+{{ .IdentitySchemaMarkdown | trimspace }}
+{{- end }}
+{{- if .HasImportIDConfig }}
+
+In Terraform v1.5.0 and later, the [`import` block](https://developer.hashicorp.com/terraform/language/import) can be used with the ` + "`" + `id` + "`" + ` attribute, for example:
+
+{{tffile .ImportIDConfigFile }}
+{{- end }}
+{{- if .HasImport }}
+
+The [`terraform import` command](https://developer.hashicorp.com/terraform/cli/commands/import) can be used, for example:
+
+{{codefile "shell" .ImportFile }}
+{{- end }}