Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait until ACL resources are replicated to the local DC #385

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 61 additions & 0 deletions consul/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package consul

import (
"errors"
"fmt"
"math"
"time"

"github.com/hashicorp/consul/api"
)

func waitForACLTokenReplication(acl *api.ACL, qOpts *api.QueryOptions, index uint64) error {
for attempt := 0; attempt <= 12; attempt++ {
rs, _, err := acl.Replication(qOpts)
if err != nil {
return fmt.Errorf("error fetching ACL replication status: %w", err)
}

if !rs.Enabled || rs.ReplicationType != "tokens" || rs.ReplicatedTokenIndex >= index {
return nil
}

time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Millisecond)
}

return errors.New("timed out waiting for ACL replication")
}

func waitForACLPolicyReplication(acl *api.ACL, qOpts *api.QueryOptions, index uint64) error {
for attempt := 0; attempt <= 12; attempt++ {
rs, _, err := acl.Replication(qOpts)
if err != nil {
return fmt.Errorf("error fetching ACL replication status: %w", err)
}

if !rs.Enabled || rs.ReplicatedIndex >= index {
return nil
}

time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Millisecond)
}

return errors.New("timed out waiting for ACL replication")
}

func waitForACLRoleReplication(acl *api.ACL, qOpts *api.QueryOptions, index uint64) error {
for attempt := 0; attempt <= 12; attempt++ {
rs, _, err := acl.Replication(qOpts)
if err != nil {
return fmt.Errorf("error fetching ACL replication status: %w", err)
}

if !rs.Enabled || rs.ReplicatedRoleIndex >= index {
return nil
}

time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Millisecond)
}

return errors.New("timed out waiting for ACL replication")
}
18 changes: 14 additions & 4 deletions consul/resource_consul_acl_auth_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,23 @@ func resourceConsulACLAuthMethod() *schema.Resource {
}

func resourceConsulACLAuthMethodCreate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)
ACL := client.ACL()

authMethod, err := getAuthMethod(d, meta)
if err != nil {
return err
}

if _, _, err := ACL.AuthMethodCreate(authMethod, wOpts); err != nil {
c, _, err := ACL.AuthMethodCreate(authMethod, wOpts)
if err != nil {
return fmt.Errorf("failed to create auth method '%s': %v", authMethod.Name, err)
}

if err := waitForACLTokenReplication(client.ACL(), qOpts, c.CreateIndex); err != nil {
return err
}

return resourceConsulACLAuthMethodRead(d, meta)
}

Expand Down Expand Up @@ -202,18 +207,23 @@ func resourceConsulACLAuthMethodRead(d *schema.ResourceData, meta interface{}) e
}

func resourceConsulACLAuthMethodUpdate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)
ACL := client.ACL()

authMethod, err := getAuthMethod(d, meta)
if err != nil {
return err
}

if _, _, err := ACL.AuthMethodUpdate(authMethod, wOpts); err != nil {
u, _, err := ACL.AuthMethodUpdate(authMethod, wOpts)
if err != nil {
return fmt.Errorf("failed to update the auth method '%s': %v", authMethod.Name, err)
}

if err := waitForACLTokenReplication(client.ACL(), qOpts, u.ModifyIndex); err != nil {
return err
}

return resourceConsulACLAuthMethodRead(d, meta)
}

Expand Down
14 changes: 11 additions & 3 deletions consul/resource_consul_acl_binding_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func resourceConsulACLBindingRule() *schema.Resource {
}

func resourceConsulACLBindingRuleCreate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)
ACL := client.ACL()

rule := getBindingRule(d, meta)
Expand All @@ -96,6 +96,10 @@ func resourceConsulACLBindingRuleCreate(d *schema.ResourceData, meta interface{}
return fmt.Errorf("failed to create binding rule: %v", err)
}

if err := waitForACLTokenReplication(client.ACL(), qOpts, rule.CreateIndex); err != nil {
return err
}

d.SetId(rule.ID)

return resourceConsulACLBindingRuleRead(d, meta)
Expand Down Expand Up @@ -133,16 +137,20 @@ func resourceConsulACLBindingRuleRead(d *schema.ResourceData, meta interface{})
}

func resourceConsulACLBindingRuleUpdate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)
ACL := client.ACL()

rule := getBindingRule(d, meta)

_, _, err := ACL.BindingRuleUpdate(rule, wOpts)
u, _, err := ACL.BindingRuleUpdate(rule, wOpts)
if err != nil {
return fmt.Errorf("failed to update binding rule '%s': %v", d.Id(), err)
}

if err := waitForACLTokenReplication(client.ACL(), qOpts, u.ModifyIndex); err != nil {
return err
}

return resourceConsulACLBindingRuleRead(d, meta)
}

Expand Down
14 changes: 10 additions & 4 deletions consul/resource_consul_acl_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func resourceConsulACLPolicy() *schema.Resource {
}

func resourceConsulACLPolicyCreate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)

log.Printf("[DEBUG] Creating ACL policy")

Expand All @@ -87,7 +87,9 @@ func resourceConsulACLPolicyCreate(d *schema.ResourceData, meta interface{}) err
return fmt.Errorf("error creating ACL policy: %s", err)
}

log.Printf("[DEBUG] Created ACL policy %q", policy.ID)
if err := waitForACLPolicyReplication(client.ACL(), qOpts, policy.CreateIndex); err != nil {
return err
}

d.SetId(policy.ID)

Expand Down Expand Up @@ -120,7 +122,7 @@ func resourceConsulACLPolicyRead(d *schema.ResourceData, meta interface{}) error
}

func resourceConsulACLPolicyUpdate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)

id := d.Id()
log.Printf("[DEBUG] Updating ACL policy %q", id)
Expand All @@ -142,12 +144,16 @@ func resourceConsulACLPolicyUpdate(d *schema.ResourceData, meta interface{}) err
aclPolicy.Datacenters = s
}

_, _, err := client.ACL().PolicyUpdate(&aclPolicy, wOpts)
policy, _, err := client.ACL().PolicyUpdate(&aclPolicy, wOpts)
if err != nil {
return fmt.Errorf("error updating ACL policy %q: %s", id, err)
}
log.Printf("[DEBUG] Updated ACL policy %q", id)

if err := waitForACLPolicyReplication(client.ACL(), qOpts, policy.ModifyIndex); err != nil {
return err
}

return resourceConsulACLPolicyRead(d, meta)
}

Expand Down
8 changes: 8 additions & 0 deletions consul/resource_consul_acl_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func resourceConsulACLRoleCreate(d *schema.ResourceData, meta interface{}) error
return fmt.Errorf("failed to create role '%s': %s", name, err)
}

if err := waitForACLRoleReplication(client.ACL(), qOpts, role.CreateIndex); err != nil {
return err
}

d.SetId(role.ID)
return resourceConsulACLRoleRead(d, meta)
}
Expand Down Expand Up @@ -238,6 +242,10 @@ func resourceConsulACLRoleUpdate(d *schema.ResourceData, meta interface{}) error
return fmt.Errorf("failed to update role '%s': %s", d.Id(), err)
}

if err := waitForACLRoleReplication(client.ACL(), qOpts, role.ModifyIndex); err != nil {
return err
}

d.SetId(role.ID)
return resourceConsulACLRoleRead(d, meta)
}
Expand Down
6 changes: 5 additions & 1 deletion consul/resource_consul_acl_role_policy_attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ func resourceConsulACLRolePolicyAttachmentCreate(d *schema.ResourceData, meta in
Name: newPolicyName,
})

_, _, err = client.ACL().RoleUpdate(role, wOpts)
u, _, err := client.ACL().RoleUpdate(role, wOpts)
if err != nil {
return fmt.Errorf("error updating role '%q' to set new policy attachment: '%s'", roleID, err)
}

if err := waitForACLRoleReplication(client.ACL(), qOpts, u.ModifyIndex); err != nil {
return err
}

id := fmt.Sprintf("%s:%s", roleID, newPolicyName)

d.SetId(id)
Expand Down
18 changes: 15 additions & 3 deletions consul/resource_consul_acl_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func resourceConsulACLToken() *schema.Resource {
}

func resourceConsulACLTokenCreate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)

log.Printf("[DEBUG] Creating ACL token")

Expand All @@ -176,6 +176,12 @@ func resourceConsulACLTokenCreate(d *schema.ResourceData, meta interface{}) erro

log.Printf("[DEBUG] Created ACL token %q", token.AccessorID)

if !aclToken.Local {
if err := waitForACLTokenReplication(client.ACL(), qOpts, token.CreateIndex); err != nil {
return err
}
}

d.SetId(token.AccessorID)

return resourceConsulACLTokenRead(d, meta)
Expand Down Expand Up @@ -265,20 +271,26 @@ func getTemplateVariables(templatedPolicy *consulapi.ACLTemplatedPolicy) []map[s
}

func resourceConsulACLTokenUpdate(d *schema.ResourceData, meta interface{}) error {
client, _, wOpts := getClient(d, meta)
client, qOpts, wOpts := getClient(d, meta)

id := d.Id()
log.Printf("[DEBUG] Updating ACL token %q", id)

aclToken := getToken(d)
aclToken.AccessorID = id

_, _, err := client.ACL().TokenUpdate(aclToken, wOpts)
u, _, err := client.ACL().TokenUpdate(aclToken, wOpts)
if err != nil {
return fmt.Errorf("error updating ACL token %q: %s", id, err)
}
log.Printf("[DEBUG] Updated ACL token %q", id)

if !aclToken.Local {
if err := waitForACLTokenReplication(client.ACL(), qOpts, u.ModifyIndex); err != nil {
return err
}
}

return resourceConsulACLTokenRead(d, meta)
}

Expand Down
8 changes: 7 additions & 1 deletion consul/resource_consul_acl_token_policy_attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ func resourceConsulACLTokenPolicyAttachmentCreate(d *schema.ResourceData, meta i
Name: newPolicyName,
})

_, _, err = client.ACL().TokenUpdate(aclToken, wOpts)
u, _, err := client.ACL().TokenUpdate(aclToken, wOpts)
if err != nil {
return fmt.Errorf("error updating ACL token '%q' to set new policy attachment: '%s'", tokenID, err)
}

if !aclToken.Local {
if err := waitForACLTokenReplication(client.ACL(), qOpts, u.ModifyIndex); err != nil {
return err
}
}

id := fmt.Sprintf("%s:%s", tokenID, newPolicyName)

d.SetId(id)
Expand Down
8 changes: 7 additions & 1 deletion consul/resource_consul_acl_token_role_attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ func resourceConsulACLTokenRoleAttachmentCreate(d *schema.ResourceData, meta int
Name: roleName,
})

_, _, err = client.ACL().TokenUpdate(aclToken, wOpts)
u, _, err := client.ACL().TokenUpdate(aclToken, wOpts)
if err != nil {
return fmt.Errorf("error updating ACL token '%q' to set new role attachment: '%s'", tokenID, err)
}

if !aclToken.Local {
if err := waitForACLTokenReplication(client.ACL(), qOpts, u.CreateIndex); err != nil {
return err
}
}

id := fmt.Sprintf("%s:%s", tokenID, roleName)

d.SetId(id)
Expand Down