From 7c241fb47b0149b18f3bdde81b86551ff528e7ef Mon Sep 17 00:00:00 2001 From: Paul Gier Date: Mon, 26 Jun 2023 11:59:42 -0500 Subject: [PATCH] issue-246: manage streaming pulsar tokens --- docs/resources/streaming_pulsar_token.md | 32 +++ go.mod | 2 +- go.sum | 4 +- internal/astra/provider.go | 2 + .../astra/resource_streaming_pulsar_token.go | 261 ++++++++++++++++++ internal/astra/util_streaming.go | 33 ++- .../data_source_streaming_tenant_tokens.go | 8 +- internal/provider/resource_cdc.go | 8 +- .../provider/resource_streaming_tenant.go | 2 + 9 files changed, 337 insertions(+), 15 deletions(-) create mode 100644 docs/resources/streaming_pulsar_token.md create mode 100644 internal/astra/resource_streaming_pulsar_token.go diff --git a/docs/resources/streaming_pulsar_token.md b/docs/resources/streaming_pulsar_token.md new file mode 100644 index 00000000..bf90e036 --- /dev/null +++ b/docs/resources/streaming_pulsar_token.md @@ -0,0 +1,32 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "astra_streaming_pulsar_token Resource - terraform-provider-astra" +subcategory: "" +description: |- + A Pulsar Namespace. +--- + +# astra_streaming_pulsar_token (Resource) + +A Pulsar Namespace. + + + + +## Schema + +### Required + +- `cluster` (String) Cluster where the Pulsar tenant is located. +- `tenant` (String) Name of the tenant. + +### Optional + +- `time_to_live` (String) The relative time until the token expires. For example 1h, 1d, 1y, etc. + +### Read-Only + +- `id` (String) Full path to the namespace +- `token` (String, Sensitive) String values of the token + + diff --git a/go.mod b/go.mod index d82e9b05..2c236a97 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/datastax/terraform-provider-astra/v2 go 1.18 require ( - github.com/datastax/astra-client-go/v2 v2.2.49 + github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8 github.com/google/uuid v1.3.0 github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320 github.com/hashicorp/go-retryablehttp v0.7.4 diff --git a/go.sum b/go.sum index b12c0a1c..4f0ee426 100644 --- a/go.sum +++ b/go.sum @@ -92,8 +92,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/datastax/astra-client-go/v2 v2.2.49 h1:O0UH3rk3Zcrcs2E31+aj6bu/lZ2XZsEa6YxJS87GGJc= -github.com/datastax/astra-client-go/v2 v2.2.49/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M= +github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8 h1:lcjdcx7OmSmsXhuD7400+P8WYXGM4Q5y9gw61POOq58= +github.com/datastax/astra-client-go/v2 v2.2.50-0.20230626192203-43de11466bf8/go.mod h1:zxXWuqDkYia7PzFIL3T7RmjChc9LN81UnfI2yB4kE7M= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/astra/provider.go b/internal/astra/provider.go index 8912841e..f1fa8aac 100644 --- a/internal/astra/provider.go +++ b/internal/astra/provider.go @@ -173,6 +173,7 @@ func (p *astraProvider) Configure(ctx context.Context, req provider.ConfigureReq streamingV3Client, err := astrastreaming.NewClientWithResponses(streamingAPIServerURL, func(c *astrastreaming.Client) error { c.Client = retryClient.StandardClient() c.RequestEditors = append(c.RequestEditors, func(ctx context.Context, req *http.Request) error { + req.Header.Set("Authorization", authorization) req.Header.Set("User-Agent", userAgent) req.Header.Set("X-Astra-Provider-Version", p.Version) req.Header.Set("X-Astra-Client-Version", clientVersion) @@ -209,6 +210,7 @@ func (p *astraProvider) DataSources(_ context.Context) []func() datasource.DataS func (p *astraProvider) Resources(_ context.Context) []func() resource.Resource { return []func() resource.Resource{ NewStreamingNamespaceResource, + NewStreamingPulsarTokenResource, } } diff --git a/internal/astra/resource_streaming_pulsar_token.go b/internal/astra/resource_streaming_pulsar_token.go new file mode 100644 index 00000000..65daec1a --- /dev/null +++ b/internal/astra/resource_streaming_pulsar_token.go @@ -0,0 +1,261 @@ +package astra + +import ( + "context" + "fmt" + "io/ioutil" + "strings" + + astrastreaming "github.com/datastax/astra-client-go/v2/astra-streaming" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/types" +) + +var ( + adminPulsarTokenType = "admin" +) + +// Ensure the implementation satisfies the expected interfaces. +var ( + _ resource.Resource = &StreamingPulsarTokenResource{} + _ resource.ResourceWithConfigure = &StreamingPulsarTokenResource{} + _ resource.ResourceWithImportState = &StreamingPulsarTokenResource{} +) + +// NewStreamingPulsarTokenResource is a helper function to simplify the provider implementation. +func NewStreamingPulsarTokenResource() resource.Resource { + return &StreamingPulsarTokenResource{} +} + +// StreamingPulsarTokenResource is the resource implementation. +type StreamingPulsarTokenResource struct { + clients *astraClients +} + +// StreamingPulsarTokenResourceModel maps the resource schema data. +type StreamingPulsarTokenResourceModel struct { + ID types.String `tfsdk:"id"` + Cluster types.String `tfsdk:"cluster"` + Tenant types.String `tfsdk:"tenant"` + TimeToLive types.String `tfsdk:"time_to_live"` + Token types.String `tfsdk:"token"` +} + +// Metadata returns the data source type name. +func (r *StreamingPulsarTokenResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { + resp.TypeName = req.ProviderTypeName + "_streaming_pulsar_token" +} + +// Schema defines the schema for the data source. +func (r *StreamingPulsarTokenResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) { + resp.Schema = schema.Schema{ + Description: "A Pulsar Namespace.", + Attributes: map[string]schema.Attribute{ + "id": schema.StringAttribute{ + Description: "Full path to the namespace", + Computed: true, + }, + "cluster": schema.StringAttribute{ + Description: "Cluster where the Pulsar tenant is located.", + Required: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + }, + "tenant": schema.StringAttribute{ + Description: "Name of the tenant.", + Required: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + }, + "time_to_live": schema.StringAttribute{ + Description: "The relative time until the token expires. For example 1h, 1d, 1y, etc.", + Optional: true, + // Default: stringdefault.StaticString("1y"), + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + }, + "token": schema.StringAttribute{ + Description: "String values of the token", + Computed: true, + Sensitive: true, + }, + }, + } +} + +// Configure adds the provider configured client to the data source. +func (r *StreamingPulsarTokenResource) Configure(_ context.Context, req resource.ConfigureRequest, _ *resource.ConfigureResponse) { + if req.ProviderData == nil { + return + } + + r.clients = req.ProviderData.(*astraClients) +} + +// Create creates the resource and sets the initial Terraform state. +func (r *StreamingPulsarTokenResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { + var tokenPlan StreamingPulsarTokenResourceModel + diags := req.Plan.Get(ctx, &tokenPlan) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + astraClient := r.clients.astraClient + streamingClient := r.clients.astraStreamingClientv3 + + astraOrgID, err := getCurrentOrgID(ctx, astraClient) + if err != nil { + resp.Diagnostics.AddError( + "Error creating Pulsar token", + "Could not get current Astra organization: "+err.Error(), + ) + return + } + + tokenRequestParams := &astrastreaming.CreateTenantTokenHandlerV3Params{ + XDataStaxCurrentOrg: astraOrgID, + XDataStaxPulsarCluster: tokenPlan.Cluster.ValueString(), + } + tokenRequestBody := astrastreaming.CreateTenantTokenV3Request{ + Type: &adminPulsarTokenType, + Exp: tokenPlan.TimeToLive.ValueStringPointer(), + } + tokenHTTPResp, err := streamingClient.CreateTenantTokenHandlerV3(ctx, + tokenPlan.Tenant.ValueString(), tokenRequestParams, tokenRequestBody) + if err != nil { + resp.Diagnostics.AddError( + "Error creating Pulsar token", + "Could not get Pulsar token: "+err.Error(), + ) + return + } else if tokenHTTPResp.StatusCode >= 300 { + errorMsg, err := ioutil.ReadAll(tokenHTTPResp.Body) + if err != nil { + errorMsg = []byte(err.Error()) + } + resp.Diagnostics.AddError( + "Error creating Pulsar token", + fmt.Sprintf("Received unexpected status code, status '%s', body: %s", tokenHTTPResp.Status, string(errorMsg)), + ) + return + } + + pulsarTokenResp, err := astrastreaming.ParseCreateTenantTokenHandlerV3Response(tokenHTTPResp) + if err != nil { + resp.Diagnostics.AddError( + "Error creating Pulsar token", + fmt.Sprintf("Failed to parse token response, status '%s', body: %s", tokenHTTPResp.Status, err.Error()), + ) + return + } + + // Manually set the ID because this is computed + tokenPlan.ID = types.StringValue(*pulsarTokenResp.JSON201.ID) + tokenPlan.Token = types.StringValue(*pulsarTokenResp.JSON201.Token) + + resp.Diagnostics.Append(resp.State.Set(ctx, &tokenPlan)...) +} + +// Read refreshes the Terraform state with the latest data. +func (r *StreamingPulsarTokenResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { + var state StreamingPulsarTokenResourceModel + diags := req.State.Get(ctx, &state) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + astraClient := r.clients.astraClient + streamingClient := r.clients.astraStreamingClientv3 + + astraOrgID, err := getCurrentOrgID(ctx, astraClient) + if err != nil { + resp.Diagnostics.AddError( + "Error getting namespace", + "Could not get current organization: "+err.Error(), + ) + return + } + + pulsarToken, err := getPulsarTokenByID(ctx, streamingClient, astraOrgID, state.Cluster.ValueString(), state.Tenant.ValueString(), state.ID.ValueString()) + if err != nil { + resp.Diagnostics.AddError( + "Error getting pulsar token", + "Could not get pulsar token: "+err.Error(), + ) + return + } + + state.Token = types.StringValue(pulsarToken) + + resp.Diagnostics.Append(resp.State.Set(ctx, &state)...) +} + +// Update updates the resource and sets the updated Terraform state on success. +func (r *StreamingPulsarTokenResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) { + // Not implemented + +} + +// Delete deletes the resource and removes the Terraform state on success. +func (r *StreamingPulsarTokenResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { + // Retrieve values from state + var state StreamingPulsarTokenResourceModel + diags := req.State.Get(ctx, &state) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + astraClient := r.clients.astraClient + streamingClient := r.clients.astraStreamingClientv3 + + astraOrgID, err := getCurrentOrgID(ctx, astraClient) + if err != nil { + resp.Diagnostics.AddError( + "Error deleting pulsar token", + "Could not get current organization: "+err.Error(), + ) + return + } + + params := astrastreaming.DeletePulsarTokenByIDParams{ + XDataStaxCurrentOrg: astraOrgID, + XDataStaxPulsarCluster: state.Cluster.ValueString(), + } + httpResp, err := streamingClient.DeletePulsarTokenByID(ctx, state.Tenant.ValueString(), state.ID.ValueString(), ¶ms) + if err != nil { + resp.Diagnostics.AddError( + "Error deleting token", + "Could not create Pulsar token: "+err.Error(), + ) + } else if httpResp.StatusCode > 300 { + resp.Diagnostics.AddError( + "Error deleting token", + fmt.Sprintf("Unexpected status code: %v", httpResp.StatusCode), + ) + } +} + +func (r *StreamingPulsarTokenResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) { + tokenPath := strings.Split(req.ID, "/") + if len(tokenPath) != 3 { + resp.Diagnostics.AddError( + "Error importing token", + "ID must be in the format //", + ) + return + } + resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp) + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("cluster"), tokenPath[0])...) + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("tenant"), tokenPath[1])...) + resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("id"), tokenPath[2])...) +} diff --git a/internal/astra/util_streaming.go b/internal/astra/util_streaming.go index d779a30b..f3486a61 100644 --- a/internal/astra/util_streaming.go +++ b/internal/astra/util_streaming.go @@ -44,6 +44,31 @@ type StreamingToken struct { Tokenid string `json:"tokenid"` } +func getPulsarTokenByID(ctx context.Context, streamingClient *astrastreaming.ClientWithResponses, orgID string, pulsarCluster string, tenantName string, tokenID string) (string, error) { + + if pulsarCluster == "" { + return "", fmt.Errorf("missing pulsar cluster") + } + if tenantName == "" { + return "", fmt.Errorf("missing tenant name") + } + if tokenID == "" { + return "", fmt.Errorf("missing token ID") + } + tokenParams := astrastreaming.GetPulsarTokenByIDParams{ + XDataStaxCurrentOrg: orgID, + XDataStaxPulsarCluster: pulsarCluster, + } + + pulsarTokenResponse, err := streamingClient.GetPulsarTokenByIDWithResponse(ctx, tenantName, tokenID, &tokenParams) + if err != nil { + return "", fmt.Errorf("failed to get pulsar tokens: %w", err) + } + + pulsarToken := string(pulsarTokenResponse.Body) + return pulsarToken, nil +} + func getPulsarToken(ctx context.Context, streamingClient *astrastreaming.ClientWithResponses, astraToken string, orgID string, pulsarCluster string, tenantName string) (string, error) { if pulsarCluster == "" { @@ -52,13 +77,13 @@ func getPulsarToken(ctx context.Context, streamingClient *astrastreaming.ClientW if tenantName == "" { return "", fmt.Errorf("missing tenant name") } - tenantTokenParams := astrastreaming.IdListTenantTokensParams{ + tenantTokenParams := astrastreaming.GetPulsarTokensByTenantParams{ Authorization: fmt.Sprintf("Bearer %s", astraToken), XDataStaxCurrentOrg: orgID, XDataStaxPulsarCluster: pulsarCluster, } - pulsarTokenResponse, err := streamingClient.IdListTenantTokens(ctx, tenantName, &tenantTokenParams) + pulsarTokenResponse, err := streamingClient.GetPulsarTokensByTenant(ctx, tenantName, &tenantTokenParams) if err != nil { return "", fmt.Errorf("failed to get pulsar tokens: %w", err) } @@ -80,13 +105,13 @@ func getPulsarToken(ctx context.Context, streamingClient *astrastreaming.ClientW return "", fmt.Errorf("no valid pulsar tokens found for tenant '%s'", tenantName) } tokenId := streamingTokens[0].Tokenid - getTokenByIdParams := astrastreaming.GetTokenByIDParams{ + getTokenByIdParams := astrastreaming.GetPulsarTokenByIDParams{ Authorization: fmt.Sprintf("Bearer %s", astraToken), XDataStaxCurrentOrg: orgID, XDataStaxPulsarCluster: pulsarCluster, } - getTokenResponse, err := streamingClient.GetTokenByIDWithResponse(ctx, tenantName, tokenId, &getTokenByIdParams) + getTokenResponse, err := streamingClient.GetPulsarTokenByIDWithResponse(ctx, tenantName, tokenId, &getTokenByIdParams) if err != nil { return "", fmt.Errorf("failed to get pulsar token: %w", err) } diff --git a/internal/provider/data_source_streaming_tenant_tokens.go b/internal/provider/data_source_streaming_tenant_tokens.go index 88c5e803..9f540d30 100644 --- a/internal/provider/data_source_streaming_tenant_tokens.go +++ b/internal/provider/data_source_streaming_tenant_tokens.go @@ -74,10 +74,10 @@ func dataSourceStreamingTenantTokensRead(ctx context.Context, d *schema.Resource tenantName := d.Get("tenant_name").(string) clusterName := d.Get("cluster_name").(string) - params := astrastreaming.IdListTenantTokensParams{ + params := astrastreaming.GetPulsarTokensByTenantParams{ XDataStaxPulsarCluster: clusterName, } - tokenResponse, err := streamingClient.IdListTenantTokensWithResponse(ctx, tenantName, ¶ms) + tokenResponse, err := streamingClient.GetPulsarTokensByTenantWithResponse(ctx, tenantName, ¶ms) if err != nil { return diag.FromErr(err) @@ -99,7 +99,7 @@ func dataSourceStreamingTenantTokensRead(ctx context.Context, d *schema.Resource func setTenantTokensData(ctx context.Context, d *schema.ResourceData, streamingClient *astrastreaming.ClientWithResponses, tokenList []astrastreaming.TenantToken) error { tenantName := d.Get("tenant_name").(string) clusterName := d.Get("cluster_name").(string) - params := astrastreaming.GetTokenByIDParams{ + params := astrastreaming.GetPulsarTokenByIDParams{ XDataStaxPulsarCluster: clusterName, } tokens := make([]map[string]interface{}, 0, len(tokenList)) @@ -111,7 +111,7 @@ func setTenantTokensData(ctx context.Context, d *schema.ResourceData, streamingC "sub": token.Sub, } // now fetch the JWT token - tokenResponse, err := streamingClient.GetTokenByIDWithResponse(ctx, tenantName, *token.TokenID, ¶ms) + tokenResponse, err := streamingClient.GetPulsarTokenByIDWithResponse(ctx, tenantName, *token.TokenID, ¶ms) if err != nil { return err } diff --git a/internal/provider/resource_cdc.go b/internal/provider/resource_cdc.go index 32569920..7b999801 100644 --- a/internal/provider/resource_cdc.go +++ b/internal/provider/resource_cdc.go @@ -429,13 +429,13 @@ func GetPulsarCluster(cloudProvider string, rawRegion string) string { func getPulsarToken(ctx context.Context, pulsarCluster string, token string, org OrgId, err error, streamingClient *astrastreaming.ClientWithResponses, tenantName string) (string, error) { - tenantTokenParams := astrastreaming.IdListTenantTokensParams{ + tenantTokenParams := astrastreaming.GetPulsarTokensByTenantParams{ Authorization: fmt.Sprintf("Bearer %s", token), XDataStaxCurrentOrg: org.ID, XDataStaxPulsarCluster: pulsarCluster, } - pulsarTokenResponse, err := streamingClient.IdListTenantTokensWithResponse(ctx, tenantName, &tenantTokenParams) + pulsarTokenResponse, err := streamingClient.GetPulsarTokensByTenantWithResponse(ctx, tenantName, &tenantTokenParams) if err != nil { fmt.Println("Can't generate token", err) diag.Errorf("Can't get pulsar token") @@ -450,13 +450,13 @@ func getPulsarToken(ctx context.Context, pulsarCluster string, token string, org } tokenId := streamingTokens[0].Tokenid - getTokenByIdParams := astrastreaming.GetTokenByIDParams{ + getTokenByIdParams := astrastreaming.GetPulsarTokenByIDParams{ Authorization: fmt.Sprintf("Bearer %s", token), XDataStaxCurrentOrg: org.ID, XDataStaxPulsarCluster: pulsarCluster, } - getTokenResponse, err := streamingClient.GetTokenByIDWithResponse(ctx, tenantName, tokenId, &getTokenByIdParams) + getTokenResponse, err := streamingClient.GetPulsarTokenByIDWithResponse(ctx, tenantName, tokenId, &getTokenByIdParams) if err != nil { fmt.Println("Can't get token", err) diff --git a/internal/provider/resource_streaming_tenant.go b/internal/provider/resource_streaming_tenant.go index fbb9b563..a776f971 100644 --- a/internal/provider/resource_streaming_tenant.go +++ b/internal/provider/resource_streaming_tenant.go @@ -275,6 +275,8 @@ func getCurrentOrgID(ctx context.Context, astraClient *astra.ClientWithResponses err = json.NewDecoder(orgResponse.Body).Decode(&orgID) if err != nil { return "", fmt.Errorf("failed to unmarshal current organization ID: %w", err) + } else if orgID.ID == "" { + return "", errors.New("streaming API returned an empty organization ID") } return orgID.ID, nil }