Skip to content

Commit

Permalink
fix(kafka_topic): ignore default values
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Apr 3, 2024
1 parent 4bd589b commit b9a7003
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 143 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ nav_order: 1
## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Fix incorrect timeout values used in certain cases
- Fix `aiven_kafka_topic`: ignore config default values

## [4.15.0] - 2024-03-21

Expand Down
203 changes: 91 additions & 112 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,167 +24,140 @@ var errLocalRetentionBytesDependency = fmt.Errorf("local_retention_bytes can't b

var aivenKafkaTopicConfigSchema = map[string]*schema.Schema{
"cleanup_policy": {
Type: schema.TypeString,
Description: "cleanup.policy value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "cleanup.policy value",
Optional: true,
},
"compression_type": {
Type: schema.TypeString,
Description: "compression.type value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "compression.type value",
Optional: true,
},
"delete_retention_ms": {
Type: schema.TypeString,
Description: "delete.retention.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "delete.retention.ms value",
Optional: true,
},
"file_delete_delay_ms": {
Type: schema.TypeString,
Description: "file.delete.delay.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "file.delete.delay.ms value",
Optional: true,
},
"flush_messages": {
Type: schema.TypeString,
Description: "flush.messages value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "flush.messages value",
Optional: true,
},
"flush_ms": {
Type: schema.TypeString,
Description: "flush.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "flush.ms value",
Optional: true,
},
"index_interval_bytes": {
Type: schema.TypeString,
Description: "index.interval.bytes value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "index.interval.bytes value",
Optional: true,
},
"max_compaction_lag_ms": {
Type: schema.TypeString,
Description: "max.compaction.lag.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "max.compaction.lag.ms value",
Optional: true,
},
"max_message_bytes": {
Type: schema.TypeString,
Description: "max.message.bytes value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "max.message.bytes value",
Optional: true,
},
"message_downconversion_enable": {
Type: schema.TypeBool,
Description: "message.downconversion.enable value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeBool,
Description: "message.downconversion.enable value",
Optional: true,
},
"message_format_version": {
Type: schema.TypeString,
Description: "message.format.version value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "message.format.version value",
Optional: true,
},
"message_timestamp_difference_max_ms": {
Type: schema.TypeString,
Description: "message.timestamp.difference.max.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "message.timestamp.difference.max.ms value",
Optional: true,
},
"message_timestamp_type": {
Type: schema.TypeString,
Description: "message.timestamp.type value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "message.timestamp.type value",
Optional: true,
},
"min_cleanable_dirty_ratio": {
Type: schema.TypeFloat,
Description: "min.cleanable.dirty.ratio value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeFloat,
Description: "min.cleanable.dirty.ratio value",
Optional: true,
},
"min_compaction_lag_ms": {
Type: schema.TypeString,
Description: "min.compaction.lag.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "min.compaction.lag.ms value",
Optional: true,
},
"min_insync_replicas": {
Type: schema.TypeString,
Description: "min.insync.replicas value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "min.insync.replicas value",
Optional: true,
},
"preallocate": {
Type: schema.TypeBool,
Description: "preallocate value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeBool,
Description: "preallocate value",
Optional: true,
},
"retention_bytes": {
Type: schema.TypeString,
Description: "retention.bytes value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "retention.bytes value",
Optional: true,
},
"retention_ms": {
Type: schema.TypeString,
Description: "retention.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "retention.ms value",
Optional: true,
},
"segment_bytes": {
Type: schema.TypeString,
Description: "segment.bytes value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "segment.bytes value",
Optional: true,
},
"segment_index_bytes": {
Type: schema.TypeString,
Description: "segment.index.bytes value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "segment.index.bytes value",
Optional: true,
},
"segment_jitter_ms": {
Type: schema.TypeString,
Description: "segment.jitter.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "segment.jitter.ms value",
Optional: true,
},
"segment_ms": {
Type: schema.TypeString,
Description: "segment.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "segment.ms value",
Optional: true,
},
"unclean_leader_election_enable": {
Type: schema.TypeBool,
Description: "unclean.leader.election.enable value; This field is deprecated and no longer functional.",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Deprecated: "This field is deprecated and no longer functional.",
Type: schema.TypeBool,
Description: "unclean.leader.election.enable value; This field is deprecated and no longer functional.",
Optional: true,
Deprecated: "This field is deprecated and no longer functional.",
},
"remote_storage_enable": {
Type: schema.TypeBool,
Description: "remote.storage.enable value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeBool,
Description: "remote.storage.enable value",
Optional: true,
},
"local_retention_bytes": {
Type: schema.TypeString,
Description: "local.retention.bytes value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "local.retention.bytes value",
Optional: true,
},
"local_retention_ms": {
Type: schema.TypeString,
Description: "local.retention.ms value",
Optional: true,
DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc,
Type: schema.TypeString,
Description: "local.retention.ms value",
Optional: true,
},
}

Expand Down Expand Up @@ -436,7 +409,7 @@ func resourceKafkaTopicRead(ctx context.Context, d *schema.ResourceData, m inter
return diag.FromErr(err)
}

config, err := FlattenKafkaTopicConfig(topic)
config, err := ReadKafkaTopicConfig(topic)
if err != nil {
return diag.FromErr(err)
}
Expand Down Expand Up @@ -520,9 +493,10 @@ func resourceKafkaTopicDelete(ctx context.Context, d *schema.ResourceData, m int
return nil
}

func FlattenKafkaTopicConfig(t *aiven.KafkaTopic) ([]map[string]interface{}, error) {
func ReadKafkaTopicConfig(t *aiven.KafkaTopic) ([]map[string]interface{}, error) {
source := make(map[string]struct {
Value any `json:"value"`
Source string `json:"source"`
Value any `json:"value"`
})

data, err := json.Marshal(t.Config)
Expand All @@ -537,6 +511,11 @@ func FlattenKafkaTopicConfig(t *aiven.KafkaTopic) ([]map[string]interface{}, err

config := make(map[string]any)
for k, v := range source {
// Takes user specified values only, marked as "topic_config"
if v.Source != "topic_config" {
continue
}

if aivenKafkaTopicConfigSchema[k].Type == schema.TypeString {
config[k] = schemautil.ToOptionalString(v.Value)
} else {
Expand Down
70 changes: 39 additions & 31 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ resource "aiven_kafka_topic" "topic" {
})
}

func TestFlattenKafkaTopicConfig(t *testing.T) {
func TestReadKafkaTopicConfig(t *testing.T) {
cases := []struct {
name string
expect map[string]any
Expand Down Expand Up @@ -682,33 +682,33 @@ func TestFlattenKafkaTopicConfig(t *testing.T) {
"unclean_leader_election_enable": true,
},
config: aiven.KafkaTopicConfigResponse{
CleanupPolicy: &aiven.KafkaTopicConfigResponseString{Value: "foo"},
CompressionType: &aiven.KafkaTopicConfigResponseString{Value: "bar"},
DeleteRetentionMs: &aiven.KafkaTopicConfigResponseInt{Value: 0},
FileDeleteDelayMs: &aiven.KafkaTopicConfigResponseInt{Value: 1},
FlushMessages: &aiven.KafkaTopicConfigResponseInt{Value: 2},
FlushMs: &aiven.KafkaTopicConfigResponseInt{Value: 3},
IndexIntervalBytes: &aiven.KafkaTopicConfigResponseInt{Value: 4},
LocalRetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 5},
LocalRetentionMs: &aiven.KafkaTopicConfigResponseInt{Value: 6},
MaxCompactionLagMs: &aiven.KafkaTopicConfigResponseInt{Value: 7},
MaxMessageBytes: &aiven.KafkaTopicConfigResponseInt{Value: 8},
MessageDownconversionEnable: &aiven.KafkaTopicConfigResponseBool{Value: false},
MessageFormatVersion: &aiven.KafkaTopicConfigResponseString{Value: ""},
MessageTimestampDifferenceMaxMs: &aiven.KafkaTopicConfigResponseInt{Value: 0},
MessageTimestampType: &aiven.KafkaTopicConfigResponseString{Value: ""},
MinCleanableDirtyRatio: &aiven.KafkaTopicConfigResponseFloat{Value: 0.2},
MinCompactionLagMs: &aiven.KafkaTopicConfigResponseInt{Value: 0},
MinInsyncReplicas: &aiven.KafkaTopicConfigResponseInt{Value: 0},
Preallocate: &aiven.KafkaTopicConfigResponseBool{Value: true},
RemoteStorageEnable: &aiven.KafkaTopicConfigResponseBool{Value: false},
RetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 0},
RetentionMs: &aiven.KafkaTopicConfigResponseInt{Value: 0},
SegmentBytes: &aiven.KafkaTopicConfigResponseInt{Value: 0},
SegmentIndexBytes: &aiven.KafkaTopicConfigResponseInt{Value: 0},
SegmentJitterMs: &aiven.KafkaTopicConfigResponseInt{Value: 0},
SegmentMs: &aiven.KafkaTopicConfigResponseInt{Value: 0},
UncleanLeaderElectionEnable: &aiven.KafkaTopicConfigResponseBool{Value: true},
CleanupPolicy: &aiven.KafkaTopicConfigResponseString{Source: "topic_config", Value: "foo"},
CompressionType: &aiven.KafkaTopicConfigResponseString{Source: "topic_config", Value: "bar"},
DeleteRetentionMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
FileDeleteDelayMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 1},
FlushMessages: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 2},
FlushMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 3},
IndexIntervalBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 4},
LocalRetentionBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 5},
LocalRetentionMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 6},
MaxCompactionLagMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 7},
MaxMessageBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 8},
MessageDownconversionEnable: &aiven.KafkaTopicConfigResponseBool{Source: "topic_config", Value: false},
MessageFormatVersion: &aiven.KafkaTopicConfigResponseString{Source: "topic_config", Value: ""},
MessageTimestampDifferenceMaxMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
MessageTimestampType: &aiven.KafkaTopicConfigResponseString{Source: "topic_config", Value: ""},
MinCleanableDirtyRatio: &aiven.KafkaTopicConfigResponseFloat{Source: "topic_config", Value: 0.2},
MinCompactionLagMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
MinInsyncReplicas: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
Preallocate: &aiven.KafkaTopicConfigResponseBool{Source: "topic_config", Value: true},
RemoteStorageEnable: &aiven.KafkaTopicConfigResponseBool{Source: "topic_config", Value: false},
RetentionBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
RetentionMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
SegmentBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
SegmentIndexBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
SegmentJitterMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
SegmentMs: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 0},
UncleanLeaderElectionEnable: &aiven.KafkaTopicConfigResponseBool{Source: "topic_config", Value: true},
},
},
{
Expand All @@ -718,15 +718,23 @@ func TestFlattenKafkaTopicConfig(t *testing.T) {
"retention_bytes": "2",
},
config: aiven.KafkaTopicConfigResponse{
LocalRetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 1},
RetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 2},
LocalRetentionBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 1},
RetentionBytes: &aiven.KafkaTopicConfigResponseInt{Source: "topic_config", Value: 2},
},
},
{
name: "ignores default values",
expect: map[string]any{},
config: aiven.KafkaTopicConfigResponse{
LocalRetentionBytes: &aiven.KafkaTopicConfigResponseInt{Source: "default_config", Value: 1},
RetentionBytes: &aiven.KafkaTopicConfigResponseInt{Source: "default_config", Value: 2},
},
},
}

for _, opt := range cases {
t.Run(opt.name, func(t *testing.T) {
result, err := kafkatopic.FlattenKafkaTopicConfig(&aiven.KafkaTopic{Config: opt.config})
result, err := kafkatopic.ReadKafkaTopicConfig(&aiven.KafkaTopic{Config: opt.config})
assert.NoError(t, err)
assert.Empty(t, cmp.Diff([]map[string]any{opt.expect}, result))
})
Expand Down

0 comments on commit b9a7003

Please sign in to comment.