Skip to content

Commit

Permalink
Add ingestion settings to google_pubsub_topic (#9985) (#7111)
Browse files Browse the repository at this point in the history
* Add ingestion settings to `google_pubsub_topic`

* Add ingestion settings to `google_pubsub_topic`

* Add create/update tests for `google_pubsub_topic` ingestion settings

* Update topic ingestion settings test

[upstream:046681494d8e99d6517c273e0a806513fc8afbe6]

Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician committed Mar 15, 2024
1 parent d81379e commit bea913c
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/9985.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: added `ingestion_data_source_settings` field to `google_pubsub_topic` resource
```
189 changes: 189 additions & 0 deletions google-beta/services/pubsub/resource_pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,53 @@ func ResourcePubsubTopic() *schema.Resource {
DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName,
Description: `Name of the topic.`,
},
"ingestion_data_source_settings": {
Type: schema.TypeList,
Optional: true,
Description: `Settings for ingestion from a data source into this topic.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"aws_kinesis": {
Type: schema.TypeList,
Optional: true,
Description: `Settings for ingestion from Amazon Kinesis Data Streams.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"aws_role_arn": {
Type: schema.TypeString,
Required: true,
Description: `AWS role ARN to be used for Federated Identity authentication with
Kinesis. Check the Pub/Sub docs for how to set up this role and the
required permissions that need to be attached to it.`,
},
"consumer_arn": {
Type: schema.TypeString,
Required: true,
Description: `The Kinesis consumer ARN to used for ingestion in
Enhanced Fan-Out mode. The consumer must be already
created and ready to be used.`,
},
"gcp_service_account": {
Type: schema.TypeString,
Required: true,
Description: `The GCP service account to be used for Federated Identity authentication
with Kinesis (via a 'AssumeRoleWithWebIdentity' call for the provided
role). The 'awsRoleArn' must be set up with 'accounts.google.com:sub'
equals to this service account number.`,
},
"stream_arn": {
Type: schema.TypeString,
Required: true,
Description: `The Kinesis stream ARN to ingest data from.`,
},
},
},
},
},
},
},
"kms_key_name": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -207,6 +254,12 @@ func resourcePubsubTopicCreate(d *schema.ResourceData, meta interface{}) error {
} else if v, ok := d.GetOkExists("message_retention_duration"); !tpgresource.IsEmptyValue(reflect.ValueOf(messageRetentionDurationProp)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) {
obj["messageRetentionDuration"] = messageRetentionDurationProp
}
ingestionDataSourceSettingsProp, err := expandPubsubTopicIngestionDataSourceSettings(d.Get("ingestion_data_source_settings"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("ingestion_data_source_settings"); !tpgresource.IsEmptyValue(reflect.ValueOf(ingestionDataSourceSettingsProp)) && (ok || !reflect.DeepEqual(v, ingestionDataSourceSettingsProp)) {
obj["ingestionDataSourceSettings"] = ingestionDataSourceSettingsProp
}
labelsProp, err := expandPubsubTopicEffectiveLabels(d.Get("effective_labels"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -371,6 +424,9 @@ func resourcePubsubTopicRead(d *schema.ResourceData, meta interface{}) error {
if err := d.Set("message_retention_duration", flattenPubsubTopicMessageRetentionDuration(res["messageRetentionDuration"], d, config)); err != nil {
return fmt.Errorf("Error reading Topic: %s", err)
}
if err := d.Set("ingestion_data_source_settings", flattenPubsubTopicIngestionDataSourceSettings(res["ingestionDataSourceSettings"], d, config)); err != nil {
return fmt.Errorf("Error reading Topic: %s", err)
}
if err := d.Set("terraform_labels", flattenPubsubTopicTerraformLabels(res["labels"], d, config)); err != nil {
return fmt.Errorf("Error reading Topic: %s", err)
}
Expand Down Expand Up @@ -421,6 +477,12 @@ func resourcePubsubTopicUpdate(d *schema.ResourceData, meta interface{}) error {
} else if v, ok := d.GetOkExists("message_retention_duration"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, messageRetentionDurationProp)) {
obj["messageRetentionDuration"] = messageRetentionDurationProp
}
ingestionDataSourceSettingsProp, err := expandPubsubTopicIngestionDataSourceSettings(d.Get("ingestion_data_source_settings"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("ingestion_data_source_settings"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, ingestionDataSourceSettingsProp)) {
obj["ingestionDataSourceSettings"] = ingestionDataSourceSettingsProp
}
labelsProp, err := expandPubsubTopicEffectiveLabels(d.Get("effective_labels"), d, config)
if err != nil {
return err
Expand Down Expand Up @@ -457,6 +519,10 @@ func resourcePubsubTopicUpdate(d *schema.ResourceData, meta interface{}) error {
updateMask = append(updateMask, "messageRetentionDuration")
}

if d.HasChange("ingestion_data_source_settings") {
updateMask = append(updateMask, "ingestionDataSourceSettings")
}

if d.HasChange("effective_labels") {
updateMask = append(updateMask, "labels")
}
Expand Down Expand Up @@ -632,6 +698,54 @@ func flattenPubsubTopicMessageRetentionDuration(v interface{}, d *schema.Resourc
return v
}

func flattenPubsubTopicIngestionDataSourceSettings(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["aws_kinesis"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesis(original["awsKinesis"], d, config)
return []interface{}{transformed}
}
func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesis(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["stream_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(original["streamArn"], d, config)
transformed["consumer_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(original["consumerArn"], d, config)
transformed["aws_role_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(original["awsRoleArn"], d, config)
transformed["gcp_service_account"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(original["gcpServiceAccount"], d, config)
return []interface{}{transformed}
}
func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicTerraformLabels(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return v
Expand Down Expand Up @@ -720,6 +834,81 @@ func expandPubsubTopicMessageRetentionDuration(v interface{}, d tpgresource.Terr
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettings(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedAwsKinesis, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesis(original["aws_kinesis"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedAwsKinesis); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["awsKinesis"] = transformedAwsKinesis
}

return transformed, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesis(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedStreamArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(original["stream_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedStreamArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["streamArn"] = transformedStreamArn
}

transformedConsumerArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(original["consumer_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedConsumerArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["consumerArn"] = transformedConsumerArn
}

transformedAwsRoleArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(original["aws_role_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedAwsRoleArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["awsRoleArn"] = transformedAwsRoleArn
}

transformedGcpServiceAccount, err := expandPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(original["gcp_service_account"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedGcpServiceAccount); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["gcpServiceAccount"] = transformedGcpServiceAccount
}

return transformed, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisStreamArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisConsumerArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisAwsRoleArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsKinesisGcpServiceAccount(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicEffectiveLabels(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (map[string]string, error) {
if v == nil {
return map[string]string{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,49 @@ resource "google_pubsub_topic" "example" {
`, context)
}

func TestAccPubsubTopic_pubsubTopicIngestionKinesisExample(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": acctest.RandString(t, 10),
}

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubTopicDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubTopic_pubsubTopicIngestionKinesisExample(context),
},
{
ResourceName: "google_pubsub_topic.example",
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"labels", "terraform_labels"},
},
},
})
}

func testAccPubsubTopic_pubsubTopicIngestionKinesisExample(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_pubsub_topic" "example" {
name = "tf-test-example-topic%{random_suffix}"
# Outside of automated terraform-provider-google CI tests, these values must be of actual AWS resources for the test to pass.
ingestion_data_source_settings {
aws_kinesis {
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111"
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
gcp_service_account = "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
}
}
}
`, context)
}

func testAccCheckPubsubTopicDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
for name, rs := range s.RootModule().Resources {
Expand Down
68 changes: 68 additions & 0 deletions google-beta/services/pubsub/resource_pubsub_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,38 @@ func TestAccPubsubTopic_migration(t *testing.T) {
})
}

func TestAccPubsubTopic_kinesisIngestionUpdate(t *testing.T) {
t.Parallel()

topic := fmt.Sprintf("tf-test-topic-%s", acctest.RandString(t, 10))

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckPubsubTopicDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccPubsubTopic_updateWithKinesisIngestionSettings(topic),
},
{
ResourceName: "google_pubsub_topic.foo",
ImportStateId: topic,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccPubsubTopic_updateWithUpdatedKinesisIngestionSettings(topic),
},
{
ResourceName: "google_pubsub_topic.foo",
ImportStateId: topic,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccPubsubTopic_update(topic, key, value string) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {
Expand Down Expand Up @@ -214,3 +246,39 @@ resource "google_pubsub_topic" "bar" {
}
`, schema, topic)
}

func testAccPubsubTopic_updateWithKinesisIngestionSettings(topic string) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {
name = "%s"
# Outside of automated terraform-provider-google CI tests, these values must be of actual AWS resources for the test to pass.
ingestion_data_source_settings {
aws_kinesis {
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name"
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111"
aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name"
gcp_service_account = "fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
}
}
}
`, topic)
}

func testAccPubsubTopic_updateWithUpdatedKinesisIngestionSettings(topic string) string {
return fmt.Sprintf(`
resource "google_pubsub_topic" "foo" {
name = "%s"
# Outside of automated terraform-provider-google CI tests, these values must be of actual AWS resources for the test to pass.
ingestion_data_source_settings {
aws_kinesis {
stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/updated-fake-stream-name"
consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/updated-fake-stream-name/consumer/consumer-1:1111111111"
aws_role_arn = "arn:aws:iam::111111111111:role/updated-fake-role-name"
gcp_service_account = "updated-fake-service-account@fake-gcp-project.iam.gserviceaccount.com"
}
}
}
`, topic)
}

0 comments on commit bea913c

Please sign in to comment.