Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
APIF-1776: Implement import for ACLs (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
linouk23 committed Nov 4, 2021
1 parent c70b714 commit 075a27b
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 28 deletions.
8 changes: 4 additions & 4 deletions docs/guides/sample-project.md
Expand Up @@ -340,10 +340,10 @@ Save your Kafka API key and secret in a secure location.
```
confluentcloud_kafka_acl.describe-test-basic-cluster: Creating...
confluentcloud_kafka_topic.orders: Creating...
confluentcloud_kafka_acl.describe-test-basic-cluster: Creation complete after 1s [id=lkc-odgpo/CLUSTER/kafka-cluster/LITERAL/User:309715/*/DESCRIBE/ALLOW]
confluentcloud_kafka_acl.describe-test-basic-cluster: Creation complete after 1s [id=lkc-odgpo/CLUSTER#kafka-cluster#LITERAL#User:309715#*#DESCRIBE#ALLOW]
confluentcloud_kafka_topic.orders: Creation complete after 2s [id=lkc-odgpo/orders]
confluentcloud_kafka_acl.describe-orders: Creating...
confluentcloud_kafka_acl.describe-orders: Creation complete after 0s [id=lkc-odgpo/TOPIC/orders/LITERAL/User:309715/*/DESCRIBE/ALLOW]
confluentcloud_kafka_acl.describe-orders: Creation complete after 0s [id=lkc-odgpo/TOPIC#orders#LITERAL#User:309715#*#DESCRIBE#ALLOW]
Apply complete! Resources: 3 added, 0 changed, 0 destroyed.
```
Expand All @@ -360,8 +360,8 @@ Your output should resemble:

```
confluentcloud_service_account.test-sa: Destroying... [id=sa-l7v772]
confluentcloud_kafka_acl.describe-orders: Destroying... [id=lkc-odgpo/TOPIC/orders/LITERAL/User:309715/*/DESCRIBE/ALLOW]
confluentcloud_kafka_acl.describe-test-basic-cluster: Destroying... [id=lkc-odgpo/CLUSTER/kafka-cluster/LITERAL/User:309715/*/DESCRIBE/ALLOW]
confluentcloud_kafka_acl.describe-orders: Destroying... [id=lkc-odgpo/TOPIC#orders#LITERAL#User:309715#*#DESCRIBE#ALLOW]
confluentcloud_kafka_acl.describe-test-basic-cluster: Destroying... [id=lkc-odgpo/CLUSTER#kafka-cluster#LITERAL#User:309715#*#DESCRIBE#ALLOW]
confluentcloud_kafka_acl.describe-orders: Destruction complete after 2s
confluentcloud_kafka_acl.describe-test-basic-cluster: Destruction complete after 2s
confluentcloud_kafka_topic.orders: Destroying... [id=lkc-odgpo/orders]
Expand Down
15 changes: 14 additions & 1 deletion docs/resources/confluentcloud_kafka_acl.md
Expand Up @@ -72,4 +72,17 @@ The following arguments are supported:

In addition to the preceding arguments, the following attributes are exported:

- `id` - (String) The ID of the Kafka ACL in the format `<Kafka cluster ID>/<Kafka ACL resource type>/<Kafka ACL resource name>/<Kafka ACL pattern type>/<Kafka ACL principal>/<Kafka ACL host>/<Kafka ACL operation>/<Kafka ACL permission>`.
- `id` - (String) The ID of the Kafka ACL in the format `<Kafka cluster ID>/<Kafka ACL resource type>#<Kafka ACL resource name>#<Kafka ACL pattern type>#<Kafka ACL principal>#<Kafka ACL host>#<Kafka ACL operation>#<Kafka ACL permission>`.

## Import

-> **Note:** `KAFKA_API_KEY` (`credentials.key`), `KAFKA_API_SECRET` (`credentials.secret`), and `KAFKA_HTTP_ENDPOINT` (`http_endpoint`) environment variables must be set before importing a Kafka topic.

Import Kafka ACLs by using the Kafka cluster ID and attributes of `confluentcloud_kafka_acl` resource in the format `<Kafka cluster ID>/<Kafka ACL resource type>#<Kafka ACL resource name>#<Kafka ACL pattern type>#<Kafka ACL principal>#<Kafka ACL host>#<Kafka ACL operation>#<Kafka ACL permission>`, for example:

```shell
$ export KAFKA_API_KEY="<kafka_api_key>"
$ export KAFKA_API_SECRET="<kafka_api_secret>"
$ export KAFKA_HTTP_ENDPOINT="<kafka_http_endpoint>"
$ terraform import confluentcloud_kafka_acl.describe-cluster "lkc-12345/CLUSTER#kafka-cluster#LITERAL#User:67890#*#DESCRIBE#ALLOW"
```
145 changes: 123 additions & 22 deletions internal/provider/resource_kafka_acl.go
Expand Up @@ -104,6 +104,9 @@ func kafkaAclResource() *schema.Resource {
CreateContext: kafkaAclCreate,
ReadContext: kafkaAclRead,
DeleteContext: kafkaAclDelete,
Importer: &schema.ResourceImporter{
StateContext: kafkaAclImport,
},
Schema: map[string]*schema.Schema{
paramClusterId: clusterIdSchema(),
paramResourceType: {
Expand Down Expand Up @@ -247,19 +250,40 @@ func executeKafkaAclRead(ctx context.Context, c *Client, clusterId string, opts
func kafkaAclRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
log.Printf("[INFO] Kafka ACL read for %s", d.Id())

c := meta.(*Client)
httpEndpoint := extractHttpEndpoint(d)
updateKafkaRestClient(c, httpEndpoint)

clusterId := extractClusterId(d)
clusterApiKey, clusterApiSecret, err := extractClusterApiKeyAndApiSecret(d)
acl, err := extractAcl(d)
if err != nil {
return diag.FromErr(err)
}
acl, err := extractAcl(d)
clusterApiKey, clusterApiSecret, err := extractClusterApiKeyAndApiSecret(d)
if err != nil {
return diag.FromErr(err)
}
httpEndpoint := extractHttpEndpoint(d)

_, err = readAndSetAclResourceConfigurationArguments(ctx, d, meta, clusterId, acl, clusterApiKey, clusterApiSecret, httpEndpoint)

return diag.FromErr(err)
}

func createKafkaAclId(clusterId string, acl Acl) string {
return fmt.Sprintf("%s/%s", clusterId, strings.Join([]string{
string(acl.ResourceType),
acl.ResourceName,
string(acl.PatternType),
acl.Principal,
acl.Host,
string(acl.Operation),
string(acl.Permission),
}, "#"))
}

func readAndSetAclResourceConfigurationArguments(ctx context.Context, d *schema.ResourceData, meta interface{}, clusterId string, acl Acl, kafkaApiKey, kafkaApiSecret, httpEndpoint string) ([]*schema.ResourceData, error) {
c := meta.(*Client)
updateKafkaRestClient(c, httpEndpoint)

ctx = c.kafkaRestApiContext(ctx, kafkaApiKey, kafkaApiSecret)

opts := &kafkarestv3.GetKafkaV3AclsOpts{
ResourceType: optional.NewInterface(acl.ResourceType),
ResourceName: optional.NewString(acl.ResourceName),
Expand All @@ -270,30 +294,107 @@ func kafkaAclRead(ctx context.Context, d *schema.ResourceData, meta interface{})
Permission: optional.NewInterface(acl.Permission),
}

_, resp, err := executeKafkaAclRead(c.kafkaRestApiContext(ctx, clusterApiKey, clusterApiSecret), c, clusterId, opts)
_, resp, err := executeKafkaAclRead(ctx, c, clusterId, opts)
if resp != nil && resp.StatusCode == http.StatusNotFound {
// https://learn.hashicorp.com/tutorials/terraform/provider-setup?in=terraform/providers
// If the resource isn't available, set the ID to an empty string so Terraform "destroys" the resource in state.
d.SetId("")
return nil
return nil, nil
}
if err != nil {
log.Printf("[ERROR] Kafka ACL get failed for id %s, %v, %s", d.Id(), resp, err)
return diag.FromErr(err)
log.Printf("[ERROR] Kafka ACL get failed for id %s, %v, %s", acl, resp, err)
}
if err == nil {
err = d.Set(paramClusterId, clusterId)
}
if err == nil {
err = d.Set(paramResourceType, acl.ResourceType)
}
if err == nil {
err = d.Set(paramResourceName, acl.ResourceName)
}
if err == nil {
err = d.Set(paramPatternType, acl.PatternType)
}
if err == nil {
err = d.Set(paramPrincipal, acl.Principal)
}
if err == nil {
err = d.Set(paramHost, acl.Host)
}
if err == nil {
err = d.Set(paramOperation, acl.Operation)
}
if err == nil {
err = d.Set(paramPermission, acl.Permission)
}
if err == nil {
err = setKafkaCredentials(kafkaApiKey, kafkaApiSecret, d)
}
if err == nil {
err = d.Set(paramHttpEndpoint, httpEndpoint)
}
d.SetId(createKafkaAclId(clusterId, acl))
return nil
return []*schema.ResourceData{d}, err
}

func createKafkaAclId(clusterId string, acl Acl) string {
return strings.Join([]string{
clusterId,
string(acl.ResourceType),
acl.ResourceName,
string(acl.PatternType),
acl.Principal,
acl.Host,
string(acl.Operation),
string(acl.Permission),
}, "/")
func kafkaAclImport(ctx context.Context, d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
log.Printf("[INFO] Kafka ACL import for %s", d.Id())

kafkaImportEnvVars, err := checkEnvironmentVariablesForKafkaImportAreSet()
if err != nil {
return nil, err
}

clusterIdAndSerializedAcl := d.Id()

parts := strings.Split(clusterIdAndSerializedAcl, "/")

if len(parts) != 2 {
return nil, fmt.Errorf("invalid format for kafka ACL import: expected '<lkc ID>/<resource type>#<resource name>#<pattern type>#<principal>#<host>#<operation>#<permission>'")
}

clusterId := parts[0]
serializedAcl := parts[1]

acl, err := deserializeAcl(serializedAcl)
if err != nil {
return nil, err
}

return readAndSetAclResourceConfigurationArguments(ctx, d, meta, clusterId, acl, kafkaImportEnvVars.kafkaApiKey, kafkaImportEnvVars.kafkaApiSecret, kafkaImportEnvVars.kafkaHttpEndpoint)
}

func deserializeAcl(serializedAcl string) (Acl, error) {
parts := strings.Split(serializedAcl, "#")
if len(parts) != 7 {
return Acl{}, fmt.Errorf("invalid format for kafka ACL import: expected '<lkc ID>/<resource type>#<resource name>#<pattern type>#<principal>#<host>#<operation>#<permission>'")
}

resourceType, err := stringToAclResourceType(parts[0])
if err != nil {
return Acl{}, err
}
patternType, err := stringToAclPatternType(parts[2])
if err != nil {
return Acl{}, err
}
operation, err := stringToAclOperation(parts[5])
if err != nil {
return Acl{}, err
}
permission, err := stringToAclPermission(parts[6])
if err != nil {
return Acl{}, err
}

return Acl{
ResourceType: resourceType,
ResourceName: parts[1],
PatternType: patternType,
Principal: parts[3],
Host: parts[4],
Operation: operation,
Permission: permission,
}, nil
}
19 changes: 18 additions & 1 deletion internal/provider/resource_kafka_acl_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/walkerus/go-wiremock"
"io/ioutil"
"net/http"
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
Expand Down Expand Up @@ -145,6 +146,16 @@ func TestAccAcls(t *testing.T) {
)
_ = wiremockClient.StubFor(deleteAclStub)

// Set fake values for secrets since those are required for importing
_ = os.Setenv("KAFKA_API_KEY", kafkaApiKey)
_ = os.Setenv("KAFKA_API_SECRET", kafkaApiSecret)
_ = os.Setenv("KAFKA_HTTP_ENDPOINT", mockServerUrl)
defer func() {
_ = os.Unsetenv("KAFKA_API_KEY")
_ = os.Unsetenv("KAFKA_API_SECRET")
_ = os.Unsetenv("KAFKA_HTTP_ENDPOINT")
}()

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProviderFactories: testAccProviderFactories,
Expand All @@ -157,7 +168,7 @@ func TestAccAcls(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testAccCheckAclExists(fullAclResourceLabel),
resource.TestCheckResourceAttr(fullAclResourceLabel, "kafka_cluster", clusterId),
resource.TestCheckResourceAttr(fullAclResourceLabel, "id", fmt.Sprintf("%s/%s/%s/%s/%s/%s/%s/%s", clusterId, aclResourceType, aclResourceName, aclPatternType, aclPrincipal, aclHost, aclOperation, aclPermission)),
resource.TestCheckResourceAttr(fullAclResourceLabel, "id", fmt.Sprintf("%s/%s#%s#%s#%s#%s#%s#%s", clusterId, aclResourceType, aclResourceName, aclPatternType, aclPrincipal, aclHost, aclOperation, aclPermission)),
resource.TestCheckResourceAttr(fullAclResourceLabel, "resource_type", aclResourceType),
resource.TestCheckResourceAttr(fullAclResourceLabel, "resource_name", aclResourceName),
resource.TestCheckResourceAttr(fullAclResourceLabel, "pattern_type", aclPatternType),
Expand All @@ -169,6 +180,12 @@ func TestAccAcls(t *testing.T) {
resource.TestCheckResourceAttr(fullAclResourceLabel, "credentials.0.%", "2"),
),
},
{
// https://www.terraform.io/docs/extend/resources/import.html
ResourceName: fullAclResourceLabel,
ImportState: true,
ImportStateVerify: true,
},
},
})

Expand Down
5 changes: 5 additions & 0 deletions internal/provider/resource_kafka_topic_test.go
Expand Up @@ -152,6 +152,11 @@ func TestAccTopic(t *testing.T) {
_ = os.Setenv("KAFKA_API_KEY", kafkaApiKey)
_ = os.Setenv("KAFKA_API_SECRET", kafkaApiSecret)
_ = os.Setenv("KAFKA_HTTP_ENDPOINT", mockServerUrl)
defer func() {
_ = os.Unsetenv("KAFKA_API_KEY")
_ = os.Unsetenv("KAFKA_API_SECRET")
_ = os.Unsetenv("KAFKA_HTTP_ENDPOINT")
}()

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Expand Down

0 comments on commit 075a27b

Please sign in to comment.