Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
APIF-1775: Add import for kafka topics (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
linouk23 committed Nov 3, 2021
1 parent df50bc3 commit ba366e1
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 64 deletions.
13 changes: 13 additions & 0 deletions docs/resources/confluentcloud_kafka_topic.md
Expand Up @@ -72,3 +72,16 @@ The following arguments are supported:
In addition to the preceding arguments, the following attributes are exported:

- `id` - (String) The ID of the Kafka topic, in the format `<Kafka cluster ID>/<Kafka Topic name>`, for example, `lkc-abc123/orders-1`.

## Import

-> **Note:** `KAFKA_API_KEY` (`credentials.key`), `KAFKA_API_SECRET` (`credentials.secret`), and `KAFKA_HTTP_ENDPOINT` (`http_endpoint`) environment variables must be set before importing a Kafka topic.

Import Kafka topics by using the Kafka cluster ID and Kafka topic name in the format `<Kafka cluster ID>/<Kafka topic name>`, for example:

```shell
$ export KAFKA_API_KEY="<kafka_api_key>"
$ export KAFKA_API_SECRET="<kafka_api_secret>"
$ export KAFKA_HTTP_ENDPOINT="<kafka_http_endpoint>"
$ terraform import confluentcloud_kafka_topic.my_topic lkc-abc123/orders-123
```
8 changes: 0 additions & 8 deletions internal/provider/provider_test.go
Expand Up @@ -59,11 +59,3 @@ func testAccPreCheck(t *testing.T) {
t.Fatal("Both CONFLUENT_CLOUD_API_KEY and CONFLUENT_CLOUD_API_SECRET must be set for acceptance tests (having them set to fake values is fine)")
}
}

func getEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}
9 changes: 6 additions & 3 deletions internal/provider/resource_kafka_acl.go
Expand Up @@ -166,7 +166,8 @@ func kafkaAclResource() *schema.Resource {

func kafkaAclCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
c := meta.(*Client)
updateKafkaRestClient(c, d)
httpEndpoint := extractHttpEndpoint(d)
updateKafkaRestClient(c, httpEndpoint)

clusterId := extractClusterId(d)
clusterApiKey, clusterApiSecret, err := extractClusterApiKeyAndApiSecret(d)
Expand Down Expand Up @@ -208,7 +209,8 @@ func executeKafkaAclCreate(ctx context.Context, c *Client, clusterId string, req

func kafkaAclDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
c := meta.(*Client)
updateKafkaRestClient(c, d)
httpEndpoint := extractHttpEndpoint(d)
updateKafkaRestClient(c, httpEndpoint)

clusterId := extractClusterId(d)
clusterApiKey, clusterApiSecret, err := extractClusterApiKeyAndApiSecret(d)
Expand Down Expand Up @@ -246,7 +248,8 @@ func kafkaAclRead(ctx context.Context, d *schema.ResourceData, meta interface{})
log.Printf("[INFO] Kafka ACL read for %s", d.Id())

c := meta.(*Client)
updateKafkaRestClient(c, d)
httpEndpoint := extractHttpEndpoint(d)
updateKafkaRestClient(c, httpEndpoint)

clusterId := extractClusterId(d)
clusterApiKey, clusterApiSecret, err := extractClusterApiKeyAndApiSecret(d)
Expand Down
2 changes: 1 addition & 1 deletion internal/provider/resource_kafka_acl_test.go
Expand Up @@ -184,7 +184,7 @@ func testAccCheckAclDestroy(s *terraform.State) error {
continue
}
deletedAclId := rs.Primary.ID
aclList, _, err := c.kafkaRestClient.ACLV3Api.GetKafkaV3Acls(c.kafkaRestApiContext(context.Background(), clusterApiKey, clusterApiSecret), clusterId, nil)
aclList, _, err := c.kafkaRestClient.ACLV3Api.GetKafkaV3Acls(c.kafkaRestApiContext(context.Background(), kafkaApiKey, kafkaApiSecret), clusterId, nil)

if len(aclList.Data) == 0 {
return nil
Expand Down
140 changes: 94 additions & 46 deletions internal/provider/resource_kafka_topic.go
Expand Up @@ -25,6 +25,7 @@ import (
"log"
"net/http"
"regexp"
"strings"
)

const (
Expand Down Expand Up @@ -101,6 +102,9 @@ func kafkaTopicResource() *schema.Resource {
CreateContext: kafkaTopicCreate,
ReadContext: kafkaTopicRead,
DeleteContext: kafkaTopicDelete,
Importer: &schema.ResourceImporter{
StateContext: kafkaTopicImport,
},
Schema: map[string]*schema.Schema{
paramClusterId: clusterIdSchema(),
paramTopicName: {
Expand Down Expand Up @@ -139,7 +143,8 @@ func kafkaTopicResource() *schema.Resource {

func kafkaTopicCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
c := meta.(*Client)
updateKafkaRestClient(c, d)
httpEndpoint := extractHttpEndpoint(d)
updateKafkaRestClient(c, httpEndpoint)

clusterId := extractClusterId(d)
clusterApiKey, clusterApiSecret, err := extractClusterApiKeyAndApiSecret(d)
Expand Down Expand Up @@ -182,7 +187,8 @@ func kafkaTopicDelete(ctx context.Context, d *schema.ResourceData, meta interfac
log.Printf("[INFO] Kafka topic delete for %s", d.Id())

c := meta.(*Client)
updateKafkaRestClient(c, d)
httpEndpoint := extractHttpEndpoint(d)
updateKafkaRestClient(c, httpEndpoint)

clusterId := extractClusterId(d)
topicName := extractTopicName(d)
Expand All @@ -203,54 +209,15 @@ func kafkaTopicDelete(ctx context.Context, d *schema.ResourceData, meta interfac
func kafkaTopicRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
log.Printf("[INFO] Kafka topic read for %s", d.Id())

c := meta.(*Client)
updateKafkaRestClient(c, d)

clusterId := extractClusterId(d)
topicName := extractTopicName(d)
clusterApiKey, clusterApiSecret, err := extractClusterApiKeyAndApiSecret(d)
if err != nil {
return diag.FromErr(err)
}
httpEndpoint := extractHttpEndpoint(d)

ctx = c.kafkaRestApiContext(ctx, clusterApiKey, clusterApiSecret)

kafkaTopic, resp, err := c.kafkaRestClient.TopicV3Api.GetKafkaV3Topic(ctx, clusterId, topicName)
if resp != nil && resp.StatusCode == http.StatusNotFound {
// https://learn.hashicorp.com/tutorials/terraform/provider-setup?in=terraform/providers
// If the resource isn't available, set the ID to an empty string so Terraform "destroys" the resource in state.
d.SetId("")
return nil
}
if err != nil {
log.Printf("[ERROR] Kafka topic get failed for id %s, %v, %s", d.Id(), resp, err)
}
if err == nil {
err = d.Set(paramClusterId, kafkaTopic.ClusterId)
}
if err == nil {
err = d.Set(paramTopicName, kafkaTopic.TopicName)
}
if err == nil {
err = d.Set(paramPartitionsCount, kafkaTopic.PartitionsCount)
}
if err == nil {
topicConfigList, resp, err := c.kafkaRestClient.ConfigsV3Api.ListKafkaV3TopicConfigs(ctx, clusterId, topicName)
if err != nil {
log.Printf("[ERROR] Kafka topic config get failed for id %s, %v, %s", d.Id(), resp, err)
return diag.FromErr(err)
}

config := make(map[string]string)
for _, remoteConfig := range topicConfigList.Data {
// Extract configs that were set via terraform vs set by default
if remoteConfig.Source == kafkarestv3.CONFIGSOURCE_DYNAMIC_TOPIC_CONFIG && remoteConfig.Value != nil {
config[remoteConfig.Name] = *remoteConfig.Value
}
}
err = d.Set(paramConfigs, config)
return diag.FromErr(err)
}
_, err = readAndSetTopicResourceConfigurationArguments(ctx, d, meta, clusterId, topicName, clusterApiKey, clusterApiSecret, httpEndpoint)

return diag.FromErr(err)
}
Expand All @@ -259,12 +226,12 @@ func createKafkaTopicId(clusterId, topicName string) string {
return fmt.Sprintf("%s/%s", clusterId, topicName)
}

func updateKafkaRestClient(c *Client, d *schema.ResourceData) {
if c.kafkaRestClient.GetConfig().BasePath == extractHttpEndpoint(d) {
func updateKafkaRestClient(c *Client, httpEndpoint string) {
if c.kafkaRestClient.GetConfig().BasePath == httpEndpoint {
return
}
kafkaRestConfig := kafkarestv3.NewConfiguration()
kafkaRestConfig.BasePath = extractHttpEndpoint(d)
kafkaRestConfig.BasePath = httpEndpoint
c.kafkaRestClient = kafkarestv3.NewAPIClient(kafkaRestConfig)
}

Expand Down Expand Up @@ -306,3 +273,84 @@ func clusterIdSchema() *schema.Schema {
ValidateFunc: validation.StringMatch(regexp.MustCompile("^lkc-"), "the Kafka cluster ID must be of the form 'lkc-'"),
}
}

func kafkaTopicImport(ctx context.Context, d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
log.Printf("[INFO] Kafka topic import for %s", d.Id())

kafkaImportEnvVars, err := checkEnvironmentVariablesForKafkaImportAreSet()
if err != nil {
return nil, err
}

clusterIDAndTopicName := d.Id()
parts := strings.Split(clusterIDAndTopicName, "/")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid format for kafka topic import: expected '<lkc ID>/<topic name>'")
}

clusterId := parts[0]
topicName := parts[1]

return readAndSetTopicResourceConfigurationArguments(ctx, d, meta, clusterId, topicName, kafkaImportEnvVars.kafkaApiKey, kafkaImportEnvVars.kafkaApiSecret, kafkaImportEnvVars.kafkaHttpEndpoint)
}

func readAndSetTopicResourceConfigurationArguments(ctx context.Context, d *schema.ResourceData, meta interface{}, clusterId, topicName, kafkaApiKey, kafkaApiSecret, httpEndpoint string) ([]*schema.ResourceData, error) {
c := meta.(*Client)
updateKafkaRestClient(c, httpEndpoint)

ctx = c.kafkaRestApiContext(ctx, kafkaApiKey, kafkaApiSecret)

kafkaTopic, resp, err := c.kafkaRestClient.TopicV3Api.GetKafkaV3Topic(ctx, clusterId, topicName)
if resp != nil && resp.StatusCode == http.StatusNotFound {
// https://learn.hashicorp.com/tutorials/terraform/provider-setup?in=terraform/providers
// If the resource isn't available, set the ID to an empty string so Terraform "destroys" the resource in state.
d.SetId("")
return nil, nil
}
if err != nil {
log.Printf("[ERROR] Kafka topic get failed for id %s, %v, %s", topicName, resp, err)
}
if err == nil {
err = d.Set(paramClusterId, kafkaTopic.ClusterId)
}
if err == nil {
err = d.Set(paramTopicName, kafkaTopic.TopicName)
}
if err == nil {
err = d.Set(paramPartitionsCount, kafkaTopic.PartitionsCount)
}
if err == nil {
topicConfigList, resp, err := c.kafkaRestClient.ConfigsV3Api.ListKafkaV3TopicConfigs(ctx, clusterId, topicName)
if err != nil {
log.Printf("[ERROR] Kafka topic config get failed for id %s, %v, %s", d.Id(), resp, err)
return nil, err
}

config := make(map[string]string)
for _, remoteConfig := range topicConfigList.Data {
// Extract configs that were set via terraform vs set by default
if remoteConfig.Source == kafkarestv3.CONFIGSOURCE_DYNAMIC_TOPIC_CONFIG && remoteConfig.Value != nil {
config[remoteConfig.Name] = *remoteConfig.Value
}
}
err = d.Set(paramConfigs, config)
if err != nil {
return nil, err
}
}

if err == nil {
err = setKafkaCredentials(kafkaApiKey, kafkaApiSecret, d)
}
if err == nil {
err = d.Set(paramHttpEndpoint, httpEndpoint)
}
return []*schema.ResourceData{d}, err
}

func setKafkaCredentials(kafkaApiKey, kafkaApiSecret string, d *schema.ResourceData) error {
return d.Set(paramCredentials, []interface{}{map[string]interface{}{
paramKey: kafkaApiKey,
paramSecret: kafkaApiSecret,
}})
}
28 changes: 22 additions & 6 deletions internal/provider/resource_kafka_topic_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/walkerus/go-wiremock"
"io/ioutil"
"net/http"
"os"
"strconv"
"testing"

Expand All @@ -43,8 +44,9 @@ const (
secondConfigValue = "6789"
topicName = "test_topic_name"
topicResourceLabel = "test_topic_resource_label"
clusterApiKey = "foo"
clusterApiSecret = "bar"
kafkaApiKey = "test_key"
kafkaApiSecret = "test_secret"
numberOfResourceAttributes = "7"
)

var fullTopicResourceLabel = fmt.Sprintf("confluentcloud_kafka_topic.%s", topicResourceLabel)
Expand Down Expand Up @@ -146,6 +148,11 @@ func TestAccTopic(t *testing.T) {
)
_ = wiremockClient.StubFor(deleteTopicStub)

// Set fake values for secrets since those are required for importing
_ = os.Setenv("KAFKA_API_KEY", kafkaApiKey)
_ = os.Setenv("KAFKA_API_SECRET", kafkaApiSecret)
_ = os.Setenv("KAFKA_HTTP_ENDPOINT", mockServerUrl)

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
Expand All @@ -159,6 +166,7 @@ func TestAccTopic(t *testing.T) {
testAccCheckTopicExists(fullTopicResourceLabel),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "kafka_cluster", clusterId),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "id", fmt.Sprintf("%s/%s", clusterId, topicName)),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "%", numberOfResourceAttributes),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "topic_name", topicName),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "partitions_count", strconv.Itoa(partitionCount)),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "http_endpoint", mockServerUrl),
Expand All @@ -167,8 +175,16 @@ func TestAccTopic(t *testing.T) {
resource.TestCheckResourceAttr(fullTopicResourceLabel, "config.retention.ms", "6789"),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "credentials.#", "1"),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "credentials.0.%", "2"),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "credentials.0.key", kafkaApiKey),
resource.TestCheckResourceAttr(fullTopicResourceLabel, "credentials.0.secret", kafkaApiSecret),
),
},
{
// https://www.terraform.io/docs/extend/resources/import.html
ResourceName: fullTopicResourceLabel,
ImportState: true,
ImportStateVerify: true,
},
},
})

Expand All @@ -184,7 +200,7 @@ func testAccCheckTopicDestroy(s *terraform.State) error {
continue
}
deletedTopicId := rs.Primary.ID
_, response, err := c.kafkaRestClient.TopicV3Api.GetKafkaV3Topic(c.kafkaRestApiContext(context.Background(), clusterApiKey, clusterApiSecret), clusterId, topicName)
_, response, err := c.kafkaRestClient.TopicV3Api.GetKafkaV3Topic(c.kafkaRestApiContext(context.Background(), kafkaApiKey, kafkaApiSecret), clusterId, topicName)
if response != nil && (response.StatusCode == http.StatusForbidden || response.StatusCode == http.StatusNotFound) {
return nil
} else if err == nil && deletedTopicId != "" {
Expand Down Expand Up @@ -216,11 +232,11 @@ func testAccCheckTopicConfig(confluentCloudBaseUrl, mockServerUrl string) string
}
credentials {
key = "test_key"
secret = "test_secret"
key = "%s"
secret = "%s"
}
}
`, confluentCloudBaseUrl, topicResourceLabel, clusterId, topicName, partitionCount, mockServerUrl, firstConfigName, firstConfigValue, secondConfigName, secondConfigValue)
`, confluentCloudBaseUrl, topicResourceLabel, clusterId, topicName, partitionCount, mockServerUrl, firstConfigName, firstConfigValue, secondConfigName, secondConfigValue, kafkaApiKey, kafkaApiSecret)
}

func testAccCheckTopicExists(n string) resource.TestCheckFunc {
Expand Down
30 changes: 30 additions & 0 deletions internal/provider/utils.go
Expand Up @@ -22,6 +22,7 @@ import (
kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"
org "github.com/confluentinc/ccloud-sdk-go-v2/org/v2"
"log"
"os"
"time"
)

Expand Down Expand Up @@ -168,3 +169,32 @@ type Acl struct {
Operation kafkarestv3.AclOperation
Permission kafkarestv3.AclPermission
}

type KafkaImportEnvVars struct {
kafkaApiKey string
kafkaApiSecret string
kafkaHttpEndpoint string
}

func getEnv(key, defaultValue string) string {
value := os.Getenv(key)
if value == "" {
return defaultValue
}
return value
}

func checkEnvironmentVariablesForKafkaImportAreSet() (KafkaImportEnvVars, error) {
kafkaApiKey := getEnv("KAFKA_API_KEY", "")
kafkaApiSecret := getEnv("KAFKA_API_SECRET", "")
kafkaHttpEndpoint := getEnv("KAFKA_HTTP_ENDPOINT", "")
canImport := kafkaApiKey != "" && kafkaApiSecret != "" && kafkaHttpEndpoint != ""
if !canImport {
return KafkaImportEnvVars{}, fmt.Errorf("KAFKA_API_KEY, KAFKA_API_SECRET, and KAFKA_HTTP_ENDPOINT must be set for kafka topic / ACL import")
}
return KafkaImportEnvVars{
kafkaApiKey: kafkaApiKey,
kafkaApiSecret: kafkaApiSecret,
kafkaHttpEndpoint: kafkaHttpEndpoint,
}, nil
}

0 comments on commit ba366e1

Please sign in to comment.