Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AWS MSK serverless cluster type #25684

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6f86efb
Add MSKServerless Configration
moremagic Jun 27, 2022
df05a43
Fixed to use constant value for ClusterType
moremagic Jul 6, 2022
6cbcdef
Updated documentation
moremagic Jul 6, 2022
b657db4
Add changelog
moremagic Jul 6, 2022
a6a9f50
fix timeout value
moremagic Jul 11, 2022
98743f1
Revert "fix timeout value"
ewbankkit Aug 10, 2022
12fa0c9
Revert "Add changelog"
ewbankkit Aug 10, 2022
db77c2e
Revert "Updated documentation"
ewbankkit Aug 10, 2022
72a7b2f
Revert "Fixed to use constant value for ClusterType"
ewbankkit Aug 10, 2022
43b296a
Revert "Add MSKServerless Configration"
ewbankkit Aug 10, 2022
ac5f072
Merge branch 'main' into HEAD
ewbankkit Aug 10, 2022
5dfbc13
Add CHANGELOG entry.
ewbankkit Aug 10, 2022
a53ff99
MSK: No need to generate 'ListTags'.
ewbankkit Aug 11, 2022
463143a
r/aws_msk_cluster: Use 'UpdateTagsWithContext'.
ewbankkit Aug 11, 2022
859784c
Merge branch 'main' into HEAD
ewbankkit Aug 17, 2022
9556447
Add 'FindClusterV2ByARN' and 'FindServerlessClusterByARN'.
ewbankkit Aug 17, 2022
83cea95
'statusClusterState' uses 'findClusterV2ByARN'.
ewbankkit Aug 17, 2022
ffe4afe
r/aws_msk_serverless_cluster: Skeleton.
ewbankkit Aug 17, 2022
0bd38c8
r/aws_msk_serverless_cluster: Add documentation.
ewbankkit Aug 17, 2022
6bae639
r/aws_msk_serverless_cluster: Add flex.
ewbankkit Aug 17, 2022
a6e3923
Add documentation about cluster types (provisioned vs. serverless).
ewbankkit Aug 17, 2022
28d1081
r/aws_msk_serverless_cluster: Add acceptance tests.
ewbankkit Aug 17, 2022
f32eecb
Update CHANGELOG entry.
ewbankkit Aug 17, 2022
3c8762b
Fix typo.
ewbankkit Aug 17, 2022
998a71e
MSK: Sweep both provisioned and serverless clusters.
ewbankkit Aug 17, 2022
610be0c
Fix terrafmt error.
ewbankkit Aug 17, 2022
0cdcb0f
Fix semgrep 'ci.caps2-in-func-name'.
ewbankkit Aug 17, 2022
735f553
Fix golangi-lint 'unparam'.
ewbankkit Aug 17, 2022
a1567f7
Add 'TestAccKafkaCluster_disappears'.
ewbankkit Aug 17, 2022
6691496
Fix semgrep 'ci.caps4-in-func-name'.
ewbankkit Aug 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/25684.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:new-resource
aws_msk_serverless_cluster
```
1 change: 1 addition & 0 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,7 @@ func New(_ context.Context) (*schema.Provider, error) {
"aws_msk_cluster": kafka.ResourceCluster(),
"aws_msk_configuration": kafka.ResourceConfiguration(),
"aws_msk_scram_secret_association": kafka.ResourceScramSecretAssociation(),
"aws_msk_serverless_cluster": kafka.ResourceServerlessCluster(),

"aws_mskconnect_connector": kafkaconnect.ResourceConnector(),
"aws_mskconnect_custom_plugin": kafkaconnect.ResourceCustomPlugin(),
Expand Down
2 changes: 1 addition & 1 deletion internal/service/kafka/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int
if d.HasChange("tags_all") {
o, n := d.GetChange("tags_all")

if err := UpdateTags(conn, d.Id(), o, n); err != nil {
if err := UpdateTagsWithContext(ctx, conn, d.Id(), o, n); err != nil {
return diag.Errorf("updating MSK Cluster (%s) tags: %s", d.Id(), err)
}
}
Expand Down
119 changes: 71 additions & 48 deletions internal/service/kafka/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,77 @@ func TestAccKafkaCluster_basic(t *testing.T) {
})
}

func TestAccKafkaCluster_disappears(t *testing.T) {
var cluster kafka.ClusterInfo
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccClusterConfig_basic(rName),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(resourceName, &cluster),
acctest.CheckResourceDisappears(acctest.Provider, tfkafka.ResourceCluster(), resourceName),
),
ExpectNonEmptyPlan: true,
},
},
})
}

func TestAccKafkaCluster_tags(t *testing.T) {
var cluster kafka.ClusterInfo
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccClusterConfig_tags1(rName, "key1", "value1"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(resourceName, &cluster),
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"current_version",
},
},
{
Config: testAccClusterConfig_tags2(rName, "key1", "value1updated", "key2", "value2"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(resourceName, &cluster),
resource.TestCheckResourceAttr(resourceName, "tags.%", "2"),
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"),
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
),
},
{
Config: testAccClusterConfig_tags1(rName, "key2", "value2"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(resourceName, &cluster),
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
),
},
},
})
}

func TestAccKafkaCluster_BrokerNodeGroupInfo_ebsVolumeSize(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
Expand Down Expand Up @@ -1090,54 +1161,6 @@ func TestAccKafkaCluster_kafkaVersionUpgradeWithInfo(t *testing.T) {
})
}

func TestAccKafkaCluster_tags(t *testing.T) {
var cluster kafka.ClusterInfo
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t); testAccPreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccClusterConfig_tags1(rName, "key1", "value1"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(resourceName, &cluster),
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"current_version",
},
},
{
Config: testAccClusterConfig_tags2(rName, "key1", "value1updated", "key2", "value2"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(resourceName, &cluster),
resource.TestCheckResourceAttr(resourceName, "tags.%", "2"),
resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"),
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
),
},
{
Config: testAccClusterConfig_tags1(rName, "key2", "value2"),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterExists(resourceName, &cluster),
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"),
),
},
},
})
}

func testAccCheckResourceAttrIsSortedCSV(resourceName, attributeName string) resource.TestCheckFunc {
return func(s *terraform.State) error {
is, err := acctest.PrimaryInstanceState(s, resourceName)
Expand Down
39 changes: 39 additions & 0 deletions internal/service/kafka/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@ func FindClusterByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafk
return output.ClusterInfo, nil
}

func findClusterV2ByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.Cluster, error) {
input := &kafka.DescribeClusterV2Input{
ClusterArn: aws.String(arn),
}

output, err := conn.DescribeClusterV2WithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, kafka.ErrCodeNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.ClusterInfo == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output.ClusterInfo, nil
}

func FindClusterOperationByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.ClusterOperationInfo, error) {
input := &kafka.DescribeClusterOperationInput{
ClusterOperationArn: aws.String(arn),
Expand Down Expand Up @@ -102,3 +127,17 @@ func FindScramSecrets(conn *kafka.Kafka, clusterArn string) ([]*string, error) {

return scramSecrets, err
}

func FindServerlessClusterByARN(ctx context.Context, conn *kafka.Kafka, arn string) (*kafka.Cluster, error) {
output, err := findClusterV2ByARN(ctx, conn, arn)

if err != nil {
return nil, err
}

if output.Serverless == nil {
return nil, tfresource.NewEmptyResultError(arn)
}

return output, nil
}
2 changes: 1 addition & 1 deletion internal/service/kafka/generate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:generate go run ../../generate/tags/main.go -ListTags -ServiceTagsMap -UpdateTags
//go:generate go run ../../generate/tags/main.go -ServiceTagsMap -UpdateTags
// ONLY generate directives and package declaration! Do not add anything else to this file.

package kafka