Skip to content

Commit

Permalink
issue-246: manage streaming pulsar tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
pgier committed Jun 27, 2023
1 parent 8bbd78f commit 7c241fb
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 15 deletions.
32 changes: 32 additions & 0 deletions docs/resources/streaming_pulsar_token.md
@@ -0,0 +1,32 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "astra_streaming_pulsar_token Resource - terraform-provider-astra"
subcategory: ""
description: |-
A Pulsar Namespace.
---

# astra_streaming_pulsar_token (Resource)

A Pulsar Namespace.



<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `cluster` (String) Cluster where the Pulsar tenant is located.
- `tenant` (String) Name of the tenant.

### Optional

- `time_to_live` (String) The relative time until the token expires. For example 1h, 1d, 1y, etc.

### Read-Only

- `id` (String) Full path to the namespace
- `token` (String, Sensitive) String values of the token


2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -3,7 +3,7 @@ module github.com/datastax/terraform-provider-astra/v2
go 1.18

require (
github.com/datastax/astra-client-go/v2 v2.2.49
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8
github.com/google/uuid v1.3.0
github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320
github.com/hashicorp/go-retryablehttp v0.7.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -92,8 +92,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datastax/astra-client-go/v2 v2.2.49 h1:O0UH3rk3Zcrcs2E31+aj6bu/lZ2XZsEa6YxJS87GGJc=
github.com/datastax/astra-client-go/v2 v2.2.49/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M=
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8 h1:lcjdcx7OmSmsXhuD7400+P8WYXGM4Q5y9gw61POOq58=
github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
2 changes: 2 additions & 0 deletions internal/astra/provider.go
Expand Up @@ -173,6 +173,7 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq
streamingV3Client, err := astrastreaming.NewClientWithResponses(streamingAPIServerURL, func(c *astrastreaming.Client) error {
c.Client = retryClient.StandardClient()
c.RequestEditors = append(c.RequestEditors, func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", authorization)
req.Header.Set("User-Agent", userAgent)
req.Header.Set("X-Astra-Provider-Version", p.Version)
req.Header.Set("X-Astra-Client-Version", clientVersion)
Expand Down Expand Up @@ -209,6 +210,7 @@ func (p *astraProvider) DataSources(_ context.Context) []func() datasource.DataS
func (p *astraProvider) Resources(_ context.Context) []func() resource.Resource {
return []func() resource.Resource{
NewStreamingNamespaceResource,
NewStreamingPulsarTokenResource,
}
}

Expand Down
261 changes: 261 additions & 0 deletions internal/astra/resource_streaming_pulsar_token.go
@@ -0,0 +1,261 @@
package astra

import (
"context"
"fmt"
"io/ioutil"
"strings"

astrastreaming "github.com/datastax/astra-client-go/v2/astra-streaming"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/types"
)

var (
adminPulsarTokenType = "admin"
)

// Ensure the implementation satisfies the expected interfaces.
var (
_ resource.Resource = &StreamingPulsarTokenResource{}
_ resource.ResourceWithConfigure = &StreamingPulsarTokenResource{}
_ resource.ResourceWithImportState = &StreamingPulsarTokenResource{}
)

// NewStreamingPulsarTokenResource is a helper function to simplify the provider implementation.
func NewStreamingPulsarTokenResource() resource.Resource {
return &StreamingPulsarTokenResource{}
}

// StreamingPulsarTokenResource is the resource implementation.
type StreamingPulsarTokenResource struct {
clients *astraClients
}

// StreamingPulsarTokenResourceModel maps the resource schema data.
type StreamingPulsarTokenResourceModel struct {
ID types.String `tfsdk:"id"`
Cluster types.String `tfsdk:"cluster"`
Tenant types.String `tfsdk:"tenant"`
TimeToLive types.String `tfsdk:"time_to_live"`
Token types.String `tfsdk:"token"`
}

// Metadata returns the data source type name.
func (r *StreamingPulsarTokenResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) {
resp.TypeName = req.ProviderTypeName + "_streaming_pulsar_token"
}

// Schema defines the schema for the data source.
func (r *StreamingPulsarTokenResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) {
resp.Schema = schema.Schema{
Description: "A Pulsar Namespace.",
Attributes: map[string]schema.Attribute{
"id": schema.StringAttribute{
Description: "Full path to the namespace",
Computed: true,
},
"cluster": schema.StringAttribute{
Description: "Cluster where the Pulsar tenant is located.",
Required: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"tenant": schema.StringAttribute{
Description: "Name of the tenant.",
Required: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"time_to_live": schema.StringAttribute{
Description: "The relative time until the token expires. For example 1h, 1d, 1y, etc.",
Optional: true,
// Default: stringdefault.StaticString("1y"),
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"token": schema.StringAttribute{
Description: "String values of the token",
Computed: true,
Sensitive: true,
},
},
}
}

// Configure adds the provider configured client to the data source.
func (r *StreamingPulsarTokenResource) Configure(_ context.Context, req resource.ConfigureRequest, _ *resource.ConfigureResponse) {
if req.ProviderData == nil {
return
}

r.clients = req.ProviderData.(*astraClients)
}

// Create creates the resource and sets the initial Terraform state.
func (r *StreamingPulsarTokenResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) {
var tokenPlan StreamingPulsarTokenResourceModel
diags := req.Plan.Get(ctx, &tokenPlan)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
"Could not get current Astra organization: "+err.Error(),
)
return
}

tokenRequestParams := &astrastreaming.CreateTenantTokenHandlerV3Params{
XDataStaxCurrentOrg: astraOrgID,
XDataStaxPulsarCluster: tokenPlan.Cluster.ValueString(),
}
tokenRequestBody := astrastreaming.CreateTenantTokenV3Request{
Type: &adminPulsarTokenType,
Exp: tokenPlan.TimeToLive.ValueStringPointer(),
}
tokenHTTPResp, err := streamingClient.CreateTenantTokenHandlerV3(ctx,
tokenPlan.Tenant.ValueString(), tokenRequestParams, tokenRequestBody)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
"Could not get Pulsar token: "+err.Error(),
)
return
} else if tokenHTTPResp.StatusCode >= 300 {
errorMsg, err := ioutil.ReadAll(tokenHTTPResp.Body)
if err != nil {
errorMsg = []byte(err.Error())
}
resp.Diagnostics.AddError(
"Error creating Pulsar token",
fmt.Sprintf("Received unexpected status code, status '%s', body: %s", tokenHTTPResp.Status, string(errorMsg)),
)
return
}

pulsarTokenResp, err := astrastreaming.ParseCreateTenantTokenHandlerV3Response(tokenHTTPResp)
if err != nil {
resp.Diagnostics.AddError(
"Error creating Pulsar token",
fmt.Sprintf("Failed to parse token response, status '%s', body: %s", tokenHTTPResp.Status, err.Error()),
)
return
}

// Manually set the ID because this is computed
tokenPlan.ID = types.StringValue(*pulsarTokenResp.JSON201.ID)
tokenPlan.Token = types.StringValue(*pulsarTokenResp.JSON201.Token)

resp.Diagnostics.Append(resp.State.Set(ctx, &tokenPlan)...)
}

// Read refreshes the Terraform state with the latest data.
func (r *StreamingPulsarTokenResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) {
var state StreamingPulsarTokenResourceModel
diags := req.State.Get(ctx, &state)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error getting namespace",
"Could not get current organization: "+err.Error(),
)
return
}

pulsarToken, err := getPulsarTokenByID(ctx, streamingClient, astraOrgID, state.Cluster.ValueString(), state.Tenant.ValueString(), state.ID.ValueString())
if err != nil {
resp.Diagnostics.AddError(
"Error getting pulsar token",
"Could not get pulsar token: "+err.Error(),
)
return
}

state.Token = types.StringValue(pulsarToken)

resp.Diagnostics.Append(resp.State.Set(ctx, &state)...)
}

// Update updates the resource and sets the updated Terraform state on success.
func (r *StreamingPulsarTokenResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
// Not implemented

}

// Delete deletes the resource and removes the Terraform state on success.
func (r *StreamingPulsarTokenResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
// Retrieve values from state
var state StreamingPulsarTokenResourceModel
diags := req.State.Get(ctx, &state)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}

astraClient := r.clients.astraClient
streamingClient := r.clients.astraStreamingClientv3

astraOrgID, err := getCurrentOrgID(ctx, astraClient)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting pulsar token",
"Could not get current organization: "+err.Error(),
)
return
}

params := astrastreaming.DeletePulsarTokenByIDParams{
XDataStaxCurrentOrg: astraOrgID,
XDataStaxPulsarCluster: state.Cluster.ValueString(),
}
httpResp, err := streamingClient.DeletePulsarTokenByID(ctx, state.Tenant.ValueString(), state.ID.ValueString(), &params)
if err != nil {
resp.Diagnostics.AddError(
"Error deleting token",
"Could not create Pulsar token: "+err.Error(),
)
} else if httpResp.StatusCode > 300 {
resp.Diagnostics.AddError(
"Error deleting token",
fmt.Sprintf("Unexpected status code: %v", httpResp.StatusCode),
)
}
}

func (r *StreamingPulsarTokenResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) {
tokenPath := strings.Split(req.ID, "/")
if len(tokenPath) != 3 {
resp.Diagnostics.AddError(
"Error importing token",
"ID must be in the format <cluster>/<tenant>/<tokenID>",
)
return
}
resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("cluster"), tokenPath[0])...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("tenant"), tokenPath[1])...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("id"), tokenPath[2])...)
}
33 changes: 29 additions & 4 deletions internal/astra/util_streaming.go
Expand Up @@ -44,6 +44,31 @@ type StreamingToken struct {
Tokenid string `json:"tokenid"`
}

func getPulsarTokenByID(ctx context.Context, streamingClient *astrastreaming.ClientWithResponses, orgID string, pulsarCluster string, tenantName string, tokenID string) (string, error) {

if pulsarCluster == "" {
return "", fmt.Errorf("missing pulsar cluster")
}
if tenantName == "" {
return "", fmt.Errorf("missing tenant name")
}
if tokenID == "" {
return "", fmt.Errorf("missing token ID")
}
tokenParams := astrastreaming.GetPulsarTokenByIDParams{
XDataStaxCurrentOrg: orgID,
XDataStaxPulsarCluster: pulsarCluster,
}

pulsarTokenResponse, err := streamingClient.GetPulsarTokenByIDWithResponse(ctx, tenantName, tokenID, &tokenParams)
if err != nil {
return "", fmt.Errorf("failed to get pulsar tokens: %w", err)
}

pulsarToken := string(pulsarTokenResponse.Body)
return pulsarToken, nil
}

func getPulsarToken(ctx context.Context, streamingClient *astrastreaming.ClientWithResponses, astraToken string, orgID string, pulsarCluster string, tenantName string) (string, error) {

if pulsarCluster == "" {
Expand All @@ -52,13 +77,13 @@ func getPulsarToken(ctx context.Context, streamingClient *astrastreaming.ClientW
if tenantName == "" {
return "", fmt.Errorf("missing tenant name")
}
tenantTokenParams := astrastreaming.IdListTenantTokensParams{
tenantTokenParams := astrastreaming.GetPulsarTokensByTenantParams{
Authorization: fmt.Sprintf("Bearer %s", astraToken),
XDataStaxCurrentOrg: orgID,
XDataStaxPulsarCluster: pulsarCluster,
}

pulsarTokenResponse, err := streamingClient.IdListTenantTokens(ctx, tenantName, &tenantTokenParams)
pulsarTokenResponse, err := streamingClient.GetPulsarTokensByTenant(ctx, tenantName, &tenantTokenParams)
if err != nil {
return "", fmt.Errorf("failed to get pulsar tokens: %w", err)
}
Expand All @@ -80,13 +105,13 @@ func getPulsarToken(ctx context.Context, streamingClient *astrastreaming.ClientW
return "", fmt.Errorf("no valid pulsar tokens found for tenant '%s'", tenantName)
}
tokenId := streamingTokens[0].Tokenid
getTokenByIdParams := astrastreaming.GetTokenByIDParams{
getTokenByIdParams := astrastreaming.GetPulsarTokenByIDParams{
Authorization: fmt.Sprintf("Bearer %s", astraToken),
XDataStaxCurrentOrg: orgID,
XDataStaxPulsarCluster: pulsarCluster,
}

getTokenResponse, err := streamingClient.GetTokenByIDWithResponse(ctx, tenantName, tokenId, &getTokenByIdParams)
getTokenResponse, err := streamingClient.GetPulsarTokenByIDWithResponse(ctx, tenantName, tokenId, &getTokenByIdParams)
if err != nil {
return "", fmt.Errorf("failed to get pulsar token: %w", err)
}
Expand Down

0 comments on commit 7c241fb

Please sign in to comment.