Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for concurrent nodepool CRUD operations #6748

Merged
merged 16 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,21 @@ func (nodePoolInformation *NodePoolInformation) parent() string {
)
}

func (nodePoolInformation *NodePoolInformation) lockKey() string {
func (nodePoolInformation *NodePoolInformation) clusterLockKey() string {
return containerClusterMutexKey(nodePoolInformation.project,
nodePoolInformation.location, nodePoolInformation.cluster)
}

func (nodePoolInformation *NodePoolInformation) nodePoolLockKey(nodePoolName string) string {
return fmt.Sprintf(
"projects/%s/locations/%s/clusters/%s/nodePools/%s",
nodePoolInformation.project,
nodePoolInformation.location,
nodePoolInformation.cluster,
nodePoolName,
)
}

func extractNodePoolInformation(d *schema.ResourceData, config *Config) (*NodePoolInformation, error) {
cluster := d.Get("cluster").(string)

Expand Down Expand Up @@ -441,8 +451,15 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

req := &container.CreateNodePoolRequest{
NodePool: nodePool,
Expand Down Expand Up @@ -526,12 +543,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

state, err := containerNodePoolAwaitRestingState(config, d.Id(), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
Expand Down Expand Up @@ -616,12 +627,6 @@ func resourceContainerNodePoolUpdate(d *schema.ResourceData, meta interface{}) e
}
name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return err
Expand Down Expand Up @@ -660,16 +665,6 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e

name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
if isGoogleApiErrorWithCode(err, 404) {
log.Printf("[INFO] GKE cluster %s doesn't exist, skipping node pool %s deletion", nodePoolInfo.cluster, d.Id())
return nil
}
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutDelete))
if err != nil {
// If the node pool doesn't get created and then we try to delete it, we get an error,
Expand All @@ -682,9 +677,15 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e
}
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

timeout := d.Timeout(schema.TimeoutDelete)
startTime := time.Now()
Expand Down Expand Up @@ -1121,13 +1122,19 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
config := meta.(*Config)
name := d.Get(prefix + "name").(string)

lockKey := nodePoolInfo.lockKey()

userAgent, err := generateUserAgentString(d, config.userAgent)
if err != nil {
return err
}

// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Nodepool write-lock will be acquired when update function is called.
npLockKey := nodePoolInfo.nodePoolLockKey(name)

if d.HasChange(prefix + "autoscaling") {
update := &container.ClusterUpdate{
DesiredNodePoolId: name,
Expand Down Expand Up @@ -1170,11 +1177,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1210,9 +1215,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated logging_variant for node pool %s", name)
Expand Down Expand Up @@ -1264,12 +1268,10 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
"updating GKE node pool tags", userAgent,
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err

if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated tags for node pool %s", name)
}

Expand Down Expand Up @@ -1304,8 +1306,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated resource labels for node pool %s", name)
Expand Down Expand Up @@ -1336,11 +1338,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated image type in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1372,11 +1372,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name)
}

Expand Down Expand Up @@ -1408,9 +1406,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated kubelet_config for node pool %s", name)
Expand Down Expand Up @@ -1442,9 +1439,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated linux_node_config for node pool %s", name)
Expand Down Expand Up @@ -1475,12 +1471,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool size", userAgent,
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize)
}

Expand Down Expand Up @@ -1513,11 +1506,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated management in Node Pool %s", name)
}

Expand All @@ -1542,12 +1533,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.project,
nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated version in Node Pool %s", name)
}

Expand All @@ -1570,11 +1558,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated node locations in Node Pool %s", name)
}

Expand Down Expand Up @@ -1643,12 +1629,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
// Wait until it's updated
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name)
}

Expand Down Expand Up @@ -1677,9 +1660,8 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
return err
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name)
Expand Down