Skip to content

Commit

Permalink
Support for concurrent nodepool CRUD operations (GoogleCloudPlatform#…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre authored and hao-nan-li committed Dec 6, 2022
1 parent 9ab081b commit 4d361a2
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,21 @@ func (nodePoolInformation *NodePoolInformation) parent() string {
)
}

func (nodePoolInformation *NodePoolInformation) lockKey() string {
func (nodePoolInformation *NodePoolInformation) clusterLockKey() string {
return containerClusterMutexKey(nodePoolInformation.project,
nodePoolInformation.location, nodePoolInformation.cluster)
}

func (nodePoolInformation *NodePoolInformation) nodePoolLockKey(nodePoolName string) string {
return fmt.Sprintf(
"projects/%s/locations/%s/clusters/%s/nodePools/%s",
nodePoolInformation.project,
nodePoolInformation.location,
nodePoolInformation.cluster,
nodePoolName,
)
}

func extractNodePoolInformation(d *schema.ResourceData, config *Config) (*NodePoolInformation, error) {
cluster := d.Get("cluster").(string)

Expand Down Expand Up @@ -441,8 +451,15 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

req := &container.CreateNodePoolRequest{
NodePool: nodePool,
Expand Down Expand Up @@ -526,12 +543,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

state, err := containerNodePoolAwaitRestingState(config, d.Id(), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
Expand Down Expand Up @@ -616,12 +627,6 @@ func resourceContainerNodePoolUpdate(d *schema.ResourceData, meta interface{}) e
}
name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return err
Expand Down Expand Up @@ -660,16 +665,6 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e

name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
if isGoogleApiErrorWithCode(err, 404) {
log.Printf("[INFO] GKE cluster %s doesn't exist, skipping node pool %s deletion", nodePoolInfo.cluster, d.Id())
return nil
}
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutDelete))
if err != nil {
// If the node pool doesn't get created and then we try to delete it, we get an error,
Expand All @@ -682,9 +677,15 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e
}
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

timeout := d.Timeout(schema.TimeoutDelete)
startTime := time.Now()
Expand Down Expand Up @@ -1121,13 +1122,19 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
config := meta.(*Config)
name := d.Get(prefix + "name").(string)

lockKey := nodePoolInfo.lockKey()

userAgent, err := generateUserAgentString(d, config.userAgent)
if err != nil {
return err
}

// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Nodepool write-lock will be acquired when update function is called.
npLockKey := nodePoolInfo.nodePoolLockKey(name)

if d.HasChange(prefix + "autoscaling") {
update := &container.ClusterUpdate{
DesiredNodePoolId: name,
Expand Down Expand Up @@ -1170,11 +1177,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1210,9 +1215,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated logging_variant for node pool %s", name)
Expand Down Expand Up @@ -1264,12 +1268,10 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
"updating GKE node pool tags", userAgent,
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err

if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated tags for node pool %s", name)
}

Expand Down Expand Up @@ -1304,8 +1306,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated resource labels for node pool %s", name)
Expand Down Expand Up @@ -1336,11 +1338,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated image type in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1372,11 +1372,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name)
}

Expand Down Expand Up @@ -1408,9 +1406,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated kubelet_config for node pool %s", name)
Expand Down Expand Up @@ -1442,9 +1439,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated linux_node_config for node pool %s", name)
Expand Down Expand Up @@ -1475,12 +1471,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool size", userAgent,
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize)
}

Expand Down Expand Up @@ -1513,11 +1506,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated management in Node Pool %s", name)
}

Expand All @@ -1542,12 +1533,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.project,
nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated version in Node Pool %s", name)
}

Expand All @@ -1570,11 +1558,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated node locations in Node Pool %s", name)
}

Expand Down Expand Up @@ -1651,12 +1637,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
// Wait until it's updated
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name)
}

Expand Down Expand Up @@ -1685,9 +1668,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name)
Expand Down

0 comments on commit 4d361a2

Please sign in to comment.