Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-246: manage streaming pulsar tokens #257

Merged
merged 1 commit into from Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/resources/streaming_pulsar_token.md
@@ -0,0 +1,30 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "astra_streaming_pulsar_token Resource - terraform-provider-astra"
subcategory: ""
description: |-
A Pulsar Namespace.
---

# astra_streaming_pulsar_token (Resource)

A Pulsar Namespace.



<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `cluster` (String) Cluster where the Pulsar tenant is located.
- `tenant` (String) Name of the tenant.

### Optional

- `time_to_live` (String) The relative time until the token expires. For example 1h, 1d, 1y, etc.

### Read-Only

- `id` (String) Full path to the namespace
- `token` (String, Sensitive) String values of the token
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -3,7 +3,7 @@ module github.com/datastax/terraform-provider-astra/v2
go 1.18

require (
github.com/datastax/astra-client-go/v2 v2.2.49
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8
github.com/google/uuid v1.3.0
github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320
github.com/hashicorp/go-retryablehttp v0.7.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -95,8 +95,8 @@ github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEM
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datastax/astra-client-go/v2 v2.2.49 h1:O0UH3rk3Zcrcs2E31+aj6bu/lZ2XZsEa6YxJS87GGJc=
github.com/datastax/astra-client-go/v2 v2.2.49/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M=
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8 h1:lcjdcx7OmSmsXhuD7400+P8WYXGM4Q5y9gw61POOq58=
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
2 changes: 2 additions & 0 deletions internal/astra/provider.go
Expand Up @@ -173,6 +173,7 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
streamingV3Client, err := astrastreaming.NewClientWithResponses(streamingAPIServerURL, func(c *astrastreaming.Client) error {
c.Client = retryClient.StandardClient()
c.RequestEditors = append(c.RequestEditors, func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", authorization)
req.Header.Set("User-Agent", userAgent)
req.Header.Set("X-Astra-Provider-Version", p.Version)
req.Header.Set("X-Astra-Client-Version", clientVersion)
Expand Down Expand Up @@ -209,6 +210,7 @@ func (p *astraProvider) DataSources(_ context.Context) []func() datasource.DataS
func (p *astraProvider) Resources(_ context.Context) []func() resource.Resource {
return []func() resource.Resource{
NewStreamingNamespaceResource,
NewStreamingPulsarTokenResource,
}
}

Expand Down
261 changes: 261 additions & 0 deletions internal/astra/resource_streaming_pulsar_token.go
@@ -0,0 +1,261 @@
package astra

import (
"context"
"fmt"
"io/ioutil"
"strings"

astrastreaming "github.com/datastax/astra-client-go/v2/astra-streaming"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/types"
)

var (
adminPulsarTokenType = "admin"
)

// Ensure the implementation satisfies the expected interfaces.
var (
_ resource.Resource = &StreamingPulsarTokenResource{}
_ resource.ResourceWithConfigure = &StreamingPulsarTokenResource{}
_ resource.ResourceWithImportState = &StreamingPulsarTokenResource{}
)

// NewStreamingPulsarTokenResource is a helper function to simplify the provider implementation.
func NewStreamingPulsarTokenResource() resource.Resource {
return &StreamingPulsarTokenResource{}
}

// StreamingPulsarTokenResource is the resource implementation.
type StreamingPulsarTokenResource struct {
clients *astraClients
}

// StreamingPulsarTokenResourceModel maps the resource schema data.
type StreamingPulsarTokenResourceModel struct {
ID types.String `tfsdk:"id"`
Cluster types.String `tfsdk:"cluster"`
Tenant types.String `tfsdk:"tenant"`
TimeToLive types.String `tfsdk:"time_to_live"`
Token types.String `tfsdk:"token"`
}

// Metadata returns the data source type name.
func (r *StreamingPulsarTokenResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) {
resp.TypeName = req.ProviderTypeName + "_streaming_pulsar_token"
}

// Schema defines the schema for the data source.
func (r *StreamingPulsarTokenResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) {
resp.Schema = schema.Schema{
Description: "A Pulsar Namespace.",
Attributes: map[string]schema.Attribute{
"id": schema.StringAttribute{
Description: "Full path to the namespace",
Computed: true,
},
"cluster": schema.StringAttribute{
Description: "Cluster where the Pulsar tenant is located.",
Required: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"tenant": schema.StringAttribute{
Description: "Name of the tenant.",
Required: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"time_to_live": schema.StringAttribute{
Description: "The relative time until the token expires. For example 1h, 1d, 1y, etc.",
Optional: true,
// Default: stringdefault.StaticString("1y"),
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"token": schema.StringAttribute{
Description: "String values of the token",
Computed: true,
Sensitive: true,
},
},
}
}

// Configure adds the provider configured client to the data source.
func (r *StreamingPulsarTokenResource) Configure(_ context.Context, req resource.ConfigureRequest, _ *resource.ConfigureResponse) {
if req.ProviderData == nil {
return
}

r.clients = req.ProviderData.(*astraClients)
}

// Create creates the resource and sets the initial Terraform state.
func (r *StreamingPulsarTokenResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) {
var tokenPlan StreamingPulsarTokenResourceModel
diags := req.Plan.Get(ctx, &tokenPlan)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
"Could not get current Astra organization: "+err.Error(),
)
return
}

tokenRequestParams := &astrastreaming.CreateTenantTokenHandlerV3Params{
XDataStaxCurrentOrg: astraOrgID,
XDataStaxPulsarCluster: tokenPlan.Cluster.ValueString(),
}
tokenRequestBody := astrastreaming.CreateTenantTokenV3Request{
Type: &adminPulsarTokenType,
Exp: tokenPlan.TimeToLive.ValueStringPointer(),
}
tokenHTTPResp, err := streamingClient.CreateTenantTokenHandlerV3(ctx,
tokenPlan.Tenant.ValueString(), tokenRequestParams, tokenRequestBody)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
"Could not get Pulsar token: "+err.Error(),
)
return
} else if tokenHTTPResp.StatusCode >= 300 {
errorMsg, err := ioutil.ReadAll(tokenHTTPResp.Body)
if err != nil {
errorMsg = []byte(err.Error())
}
resp.Diagnostics.AddError(
"Error creating Pulsar token",
fmt.Sprintf("Received unexpected status code, status '%s', body: %s", tokenHTTPResp.Status, string(errorMsg)),
)
return
}

pulsarTokenResp, err := astrastreaming.ParseCreateTenantTokenHandlerV3Response(tokenHTTPResp)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
fmt.Sprintf("Failed to parse token response, status '%s', body: %s", tokenHTTPResp.Status, err.Error()),
)
return
}

// Manually set the ID because this is computed
tokenPlan.ID = types.StringValue(*pulsarTokenResp.JSON201.ID)
tokenPlan.Token = types.StringValue(*pulsarTokenResp.JSON201.Token)

resp.Diagnostics.Append(resp.State.Set(ctx, &tokenPlan)...)
}

// Read refreshes the Terraform state with the latest data.
func (r *StreamingPulsarTokenResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) {
var state StreamingPulsarTokenResourceModel
diags := req.State.Get(ctx, &state)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error getting namespace",
"Could not get current organization: "+err.Error(),
)
return
}

pulsarToken, err := getPulsarTokenByID(ctx, streamingClient, astraOrgID, state.Cluster.ValueString(), state.Tenant.ValueString(), state.ID.ValueString())
if err != nil {
resp.Diagnostics.AddError(
"Error getting pulsar token",
"Could not get pulsar token: "+err.Error(),
)
return
}

state.Token = types.StringValue(pulsarToken)

resp.Diagnostics.Append(resp.State.Set(ctx, &state)...)
}

// Update updates the resource and sets the updated Terraform state on success.
func (r *StreamingPulsarTokenResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
// Not implemented

}

// Delete deletes the resource and removes the Terraform state on success.
func (r *StreamingPulsarTokenResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
// Retrieve values from state
var state StreamingPulsarTokenResourceModel
diags := req.State.Get(ctx, &state)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting pulsar token",
"Could not get current organization: "+err.Error(),
)
return
}

params := astrastreaming.DeletePulsarTokenByIDParams{
XDataStaxCurrentOrg: astraOrgID,
XDataStaxPulsarCluster: state.Cluster.ValueString(),
}
httpResp, err := streamingClient.DeletePulsarTokenByID(ctx, state.Tenant.ValueString(), state.ID.ValueString(), &params)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting token",
"Could not create Pulsar token: "+err.Error(),
)
} else if httpResp.StatusCode > 300 {
resp.Diagnostics.AddError(
"Error deleting token",
fmt.Sprintf("Unexpected status code: %v", httpResp.StatusCode),
)
}
}

func (r *StreamingPulsarTokenResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) {
tokenPath := strings.Split(req.ID, "/")
if len(tokenPath) != 3 {
resp.Diagnostics.AddError(
"Error importing token",
"ID must be in the format <cluster>/<tenant>/<tokenID>",
)
return
}
resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("cluster"), tokenPath[0])...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("tenant"), tokenPath[1])...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("id"), tokenPath[2])...)
}
41 changes: 41 additions & 0 deletions internal/astra/resource_streaming_pulsar_token_test.go
@@ -0,0 +1,41 @@
package astra

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccStreamingPulsarTokenResource(t *testing.T) {
clusterName := getEnvVarOrDefault("ASTRA_TEST_STREAMING_CLUSTER_NAME", testDefaultStreamingClusterName)
tenant := getEnvVarOrDefault("ASTRA_TEST_STREAMING_TENANT_NAME", "terraform-"+randomString(4))

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ProtoV5ProviderFactories: testAccProtoV5ProviderFactories,
Steps: []resource.TestStep{
{
Config: testProviderConfig + "\n" + streamingPulsarTokenTestConfig(clusterName, tenant),
},
},
})
}

func streamingPulsarTokenTestConfig(cluster, tenant string) string {
return fmt.Sprintf(`
resource "astra_streaming_tenant" "streaming_tenant_1" {
cluster_name = "%s"
tenant_name = "%s"
user_email = "terraform-test-user@datastax.com"
deletion_protection = false
}

resource "astra_streaming_pulsar_token" "pulsar_token_1" {
depends_on = [
astra_streaming_tenant.streaming_tenant_1
]
cluster = "%s"
tenant = "%s"
}`, cluster, tenant, cluster, tenant)
}