Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for concurrent nodepool CRUD operations #6748

Merged
merged 16 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 116 additions & 11 deletions mmv1/third_party/terraform/resources/resource_container_node_pool.go.erb
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,25 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
if err != nil {
return err
}

<% if version == "ga" -%>
mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
<% else -%>
clusterHash, err := getClusterHash(nodePoolInfo.parent(), nodePoolInfo.project, userAgent, config)
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

// Acquire read-lock on cluster.
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
clusterLockKey := nodePoolInfo.lockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolLockKey(clusterHash, nodePool.Name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)
<% end -%>

req := &container.CreateNodePoolRequest{
NodePool: nodePool,
Expand Down Expand Up @@ -464,11 +480,13 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

<% if version == "ga" %>
//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}
<% end -%>

state, err := containerNodePoolAwaitRestingState(config, d.Id(), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
Expand Down Expand Up @@ -554,11 +572,13 @@ func resourceContainerNodePoolUpdate(d *schema.ResourceData, meta interface{}) e
}
name := getNodePoolName(d.Id())

<% if version == "ga" -%>
//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}
<% end -%>

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutUpdate))
if err != nil {
Expand Down Expand Up @@ -598,6 +618,7 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e

name := getNodePoolName(d.Id())

<% if version == "ga" -%>
//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
Expand All @@ -607,6 +628,7 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e
}
return err
}
<% end -%>

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutDelete))
if err != nil {
Expand All @@ -620,9 +642,25 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e
}
}

<% if version == "ga" -%>
mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
<% else -%>
clusterHash, err := getClusterHash(nodePoolInfo.parent(), nodePoolInfo.project, userAgent, config)
if err != nil {
return err
}

// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.lockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolLockKey(clusterHash, name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)
<% end -%>

timeout := d.Timeout(schema.TimeoutDelete)
startTime := time.Now()
Expand Down Expand Up @@ -980,12 +1018,27 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
config := meta.(*Config)
name := d.Get(prefix + "name").(string)

lockKey := nodePoolInfo.lockKey()

userAgent, err := generateUserAgentString(d, config.userAgent)
if err != nil {
return err
}

<% if version == "ga" -%>
lockKey := nodePoolInfo.lockKey()
<% else -%>
clusterHash, err := getClusterHash(nodePoolInfo.parent(), nodePoolInfo.project, userAgent, config)
if err != nil {
return err
}

// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.lockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Nodepool write-lock will be acquired when update function is called.
npLockKey := nodePoolLockKey(clusterHash, name)
<% end -%>

if d.HasChange(prefix + "autoscaling") {
update := &container.ClusterUpdate{
Expand Down Expand Up @@ -1029,11 +1082,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}

<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>
log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1084,11 +1142,18 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
"updating GKE node pool tags", userAgent,
timeout)
}



<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] Updated tags for node pool %s", name)
}
Expand Down Expand Up @@ -1118,10 +1183,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] Updated image type in Node Pool %s", d.Id())
}
Expand Down Expand Up @@ -1154,10 +1225,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name)
}
Expand Down Expand Up @@ -1190,9 +1267,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated kubelet_config for node pool %s", name)
Expand Down Expand Up @@ -1224,9 +1300,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated linux_node_config for node pool %s", name)
Expand Down Expand Up @@ -1258,10 +1333,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize)
}
Expand Down Expand Up @@ -1295,10 +1376,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] Updated management in Node Pool %s", name)
}
Expand All @@ -1325,10 +1412,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] Updated version in Node Pool %s", name)
}
Expand All @@ -1352,10 +1445,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] Updated node locations in Node Pool %s", name)
}
Expand Down Expand Up @@ -1385,10 +1484,16 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout)
}

<% if version == "ga" -%>
// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
}
<% else -%>
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}
<% end -%>

log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name)
}
Expand Down
23 changes: 19 additions & 4 deletions mmv1/third_party/terraform/utils/mutexkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// their access to individual security groups based on SG ID.
type MutexKV struct {
lock sync.Mutex
store map[string]*sync.Mutex
store map[string]*sync.RWMutex
}

// Locks the mutex for the given key. Caller is responsible for calling Unlock
Expand All @@ -31,13 +31,28 @@ func (m *MutexKV) Unlock(key string) {
log.Printf("[DEBUG] Unlocked %q", key)
}

// Acquires a read-lock on the mutex for the given key. Caller is responsible for calling RUnlock
// for the same key
func (m *MutexKV) RLock(key string) {
log.Printf("[DEBUG] RLocking %q", key)
m.get(key).RLock()
log.Printf("[DEBUG] RLocked %q", key)
}

// Releases a read-lock on the mutex for the given key. Caller must have called RLock for the same key first
func (m *MutexKV) RUnlock(key string) {
log.Printf("[DEBUG] RUnlocking %q", key)
m.get(key).RUnlock()
log.Printf("[DEBUG] RUnlocked %q", key)
}

// Returns a mutex for the given key, no guarantee of its lock status
func (m *MutexKV) get(key string) *sync.Mutex {
func (m *MutexKV) get(key string) *sync.RWMutex {
m.lock.Lock()
defer m.lock.Unlock()
mutex, ok := m.store[key]
if !ok {
mutex = &sync.Mutex{}
mutex = &sync.RWMutex{}
m.store[key] = mutex
}
return mutex
Expand All @@ -46,6 +61,6 @@ func (m *MutexKV) get(key string) *sync.Mutex {
// Returns a properly initialized MutexKV
func NewMutexKV() *MutexKV {
return &MutexKV{
store: make(map[string]*sync.Mutex),
store: make(map[string]*sync.RWMutex),
}
}