Skip to content

Commit

Permalink
issue-246: manage streaming pulsar tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
pgier committed Jun 27, 2023
1 parent bd0602d commit b645875
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 15 deletions.
30 changes: 30 additions & 0 deletions docs/resources/streaming_pulsar_token.md
@@ -0,0 +1,30 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "astra_streaming_pulsar_token Resource - terraform-provider-astra"
subcategory: ""
description: |-
A Pulsar Namespace.
---

# astra_streaming_pulsar_token (Resource)

A Pulsar Namespace.



<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `cluster` (String) Cluster where the Pulsar tenant is located.
- `tenant` (String) Name of the tenant.

### Optional

- `time_to_live` (String) The relative time until the token expires. For example 1h, 1d, 1y, etc.

### Read-Only

- `id` (String) Full path to the namespace
- `token` (String, Sensitive) String values of the token
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -3,7 +3,7 @@ module github.com/datastax/terraform-provider-astra/v2
go 1.18

require (
github.com/datastax/astra-client-go/v2 v2.2.49
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8
github.com/google/uuid v1.3.0
github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320
github.com/hashicorp/go-retryablehttp v0.7.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -95,8 +95,8 @@ github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEM
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datastax/astra-client-go/v2 v2.2.49 h1:O0UH3rk3Zcrcs2E31+aj6bu/lZ2XZsEa6YxJS87GGJc=
github.com/datastax/astra-client-go/v2 v2.2.49/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M=
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8 h1:lcjdcx7OmSmsXhuD7400+P8WYXGM4Q5y9gw61POOq58=
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
2 changes: 2 additions & 0 deletions internal/astra/provider.go
Expand Up @@ -173,6 +173,7 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
streamingV3Client, err := astrastreaming.NewClientWithResponses(streamingAPIServerURL, func(c *astrastreaming.Client) error {
c.Client = retryClient.StandardClient()
c.RequestEditors = append(c.RequestEditors, func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", authorization)
req.Header.Set("User-Agent", userAgent)
req.Header.Set("X-Astra-Provider-Version", p.Version)
req.Header.Set("X-Astra-Client-Version", clientVersion)
Expand Down Expand Up @@ -209,6 +210,7 @@ func (p *astraProvider) DataSources(_ context.Context) []func() datasource.DataS
func (p *astraProvider) Resources(_ context.Context) []func() resource.Resource {
return []func() resource.Resource{
NewStreamingNamespaceResource,
NewStreamingPulsarTokenResource,
}
}

Expand Down
261 changes: 261 additions & 0 deletions internal/astra/resource_streaming_pulsar_token.go
@@ -0,0 +1,261 @@
package astra

import (
"context"
"fmt"
"io/ioutil"
"strings"

astrastreaming "github.com/datastax/astra-client-go/v2/astra-streaming"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/types"
)

var (
adminPulsarTokenType = "admin"
)

// Ensure the implementation satisfies the expected interfaces.
var (
_ resource.Resource = &StreamingPulsarTokenResource{}
_ resource.ResourceWithConfigure = &StreamingPulsarTokenResource{}
_ resource.ResourceWithImportState = &StreamingPulsarTokenResource{}
)

// NewStreamingPulsarTokenResource is a helper function to simplify the provider implementation.
func NewStreamingPulsarTokenResource() resource.Resource {
return &StreamingPulsarTokenResource{}
}

// StreamingPulsarTokenResource is the resource implementation.
type StreamingPulsarTokenResource struct {
clients *astraClients
}

// StreamingPulsarTokenResourceModel maps the resource schema data.
type StreamingPulsarTokenResourceModel struct {
ID types.String `tfsdk:"id"`
Cluster types.String `tfsdk:"cluster"`
Tenant types.String `tfsdk:"tenant"`
TimeToLive types.String `tfsdk:"time_to_live"`
Token types.String `tfsdk:"token"`
}

// Metadata returns the data source type name.
func (r *StreamingPulsarTokenResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) {
resp.TypeName = req.ProviderTypeName + "_streaming_pulsar_token"
}

// Schema defines the schema for the data source.
func (r *StreamingPulsarTokenResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) {
resp.Schema = schema.Schema{
Description: "A Pulsar Namespace.",
Attributes: map[string]schema.Attribute{
"id": schema.StringAttribute{
Description: "Full path to the namespace",
Computed: true,
},
"cluster": schema.StringAttribute{
Description: "Cluster where the Pulsar tenant is located.",
Required: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"tenant": schema.StringAttribute{
Description: "Name of the tenant.",
Required: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"time_to_live": schema.StringAttribute{
Description: "The relative time until the token expires. For example 1h, 1d, 1y, etc.",
Optional: true,
// Default: stringdefault.StaticString("1y"),
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"token": schema.StringAttribute{
Description: "String values of the token",
Computed: true,
Sensitive: true,
},
},
}
}

// Configure adds the provider configured client to the data source.
func (r *StreamingPulsarTokenResource) Configure(_ context.Context, req resource.ConfigureRequest, _ *resource.ConfigureResponse) {
if req.ProviderData == nil {
return
}

r.clients = req.ProviderData.(*astraClients)
}

// Create creates the resource and sets the initial Terraform state.
func (r *StreamingPulsarTokenResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) {
var tokenPlan StreamingPulsarTokenResourceModel
diags := req.Plan.Get(ctx, &tokenPlan)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
"Could not get current Astra organization: "+err.Error(),
)
return
}

tokenRequestParams := &astrastreaming.CreateTenantTokenHandlerV3Params{
XDataStaxCurrentOrg: astraOrgID,
XDataStaxPulsarCluster: tokenPlan.Cluster.ValueString(),
}
tokenRequestBody := astrastreaming.CreateTenantTokenV3Request{
Type: &adminPulsarTokenType,
Exp: tokenPlan.TimeToLive.ValueStringPointer(),
}
tokenHTTPResp, err := streamingClient.CreateTenantTokenHandlerV3(ctx,
tokenPlan.Tenant.ValueString(), tokenRequestParams, tokenRequestBody)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
"Could not get Pulsar token: "+err.Error(),
)
return
} else if tokenHTTPResp.StatusCode >= 300 {
errorMsg, err := ioutil.ReadAll(tokenHTTPResp.Body)
if err != nil {
errorMsg = []byte(err.Error())
}
resp.Diagnostics.AddError(
"Error creating Pulsar token",
fmt.Sprintf("Received unexpected status code, status '%s', body: %s", tokenHTTPResp.Status, string(errorMsg)),
)
return
}

pulsarTokenResp, err := astrastreaming.ParseCreateTenantTokenHandlerV3Response(tokenHTTPResp)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
fmt.Sprintf("Failed to parse token response, status '%s', body: %s", tokenHTTPResp.Status, err.Error()),
)
return
}

// Manually set the ID because this is computed
tokenPlan.ID = types.StringValue(*pulsarTokenResp.JSON201.ID)
tokenPlan.Token = types.StringValue(*pulsarTokenResp.JSON201.Token)

resp.Diagnostics.Append(resp.State.Set(ctx, &tokenPlan)...)
}

// Read refreshes the Terraform state with the latest data.
func (r *StreamingPulsarTokenResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) {
var state StreamingPulsarTokenResourceModel
diags := req.State.Get(ctx, &state)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error getting namespace",
"Could not get current organization: "+err.Error(),
)
return
}

pulsarToken, err := getPulsarTokenByID(ctx, streamingClient, astraOrgID, state.Cluster.ValueString(), state.Tenant.ValueString(), state.ID.ValueString())
if err != nil {
resp.Diagnostics.AddError(
"Error getting pulsar token",
"Could not get pulsar token: "+err.Error(),
)
return
}

state.Token = types.StringValue(pulsarToken)

resp.Diagnostics.Append(resp.State.Set(ctx, &state)...)
}

// Update updates the resource and sets the updated Terraform state on success.
func (r *StreamingPulsarTokenResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
// Not implemented

}

// Delete deletes the resource and removes the Terraform state on success.
func (r *StreamingPulsarTokenResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
// Retrieve values from state
var state StreamingPulsarTokenResourceModel
diags := req.State.Get(ctx, &state)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting pulsar token",
"Could not get current organization: "+err.Error(),
)
return
}

params := astrastreaming.DeletePulsarTokenByIDParams{
XDataStaxCurrentOrg: astraOrgID,
XDataStaxPulsarCluster: state.Cluster.ValueString(),
}
httpResp, err := streamingClient.DeletePulsarTokenByID(ctx, state.Tenant.ValueString(), state.ID.ValueString(), &params)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting token",
"Could not create Pulsar token: "+err.Error(),
)
} else if httpResp.StatusCode > 300 {
resp.Diagnostics.AddError(
"Error deleting token",
fmt.Sprintf("Unexpected status code: %v", httpResp.StatusCode),
)
}
}

func (r *StreamingPulsarTokenResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) {
tokenPath := strings.Split(req.ID, "/")
if len(tokenPath) != 3 {
resp.Diagnostics.AddError(
"Error importing token",
"ID must be in the format <cluster>/<tenant>/<tokenID>",
)
return
}
resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("cluster"), tokenPath[0])...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("tenant"), tokenPath[1])...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("id"), tokenPath[2])...)
}
41 changes: 41 additions & 0 deletions internal/astra/resource_streaming_pulsar_token_test.go
@@ -0,0 +1,41 @@
package astra

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccStreamingPulsarTokenResource(t *testing.T) {
clusterName := getEnvVarOrDefault("ASTRA_TEST_STREAMING_CLUSTER_NAME", testDefaultStreamingClusterName)
tenant := getEnvVarOrDefault("ASTRA_TEST_STREAMING_TENANT_NAME", "terraform-"+randomString(4))

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProtoV5ProviderFactories: testAccProtoV5ProviderFactories,
Steps: []resource.TestStep{
{
Config: testProviderConfig + "\n" + streamingPulsarTokenTestConfig(clusterName, tenant),
},
},
})
}

func streamingPulsarTokenTestConfig(cluster, tenant string) string {
return fmt.Sprintf(`
resource "astra_streaming_tenant" "streaming_tenant_1" {
cluster_name = "%s"
tenant_name = "%s"
user_email = "terraform-test-user@datastax.com"
deletion_protection = false
}
resource "astra_streaming_pulsar_token" "pulsar_token_1" {
depends_on = [
astra_streaming_tenant.streaming_tenant_1
]
cluster = "%s"
tenant = "%s"
}`, cluster, tenant, cluster, tenant)
}

0 comments on commit b645875

Please sign in to comment.