Skip to content

Commit

Permalink
fix: Operation has completed with phase: Running (#7482)
Browse files Browse the repository at this point in the history
* fix: Operation has completed with phase: Running

Signed-off-by: Alexander Matyushentsev <AMatyushentsev@gmail.com>
  • Loading branch information
Alexander Matyushentsev committed Oct 20, 2021
1 parent 00874af commit 872eff2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 20 deletions.
16 changes: 11 additions & 5 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,16 @@ func (ctrl *ApplicationController) processAppOperationQueueItem() (processNext b
app := origApp.DeepCopy()

if app.Operation != nil {
// If we get here, we are about process an operation but we cannot rely on informer since it might has stale data.
// So always retrieve the latest version to ensure it is not stale to avoid unnecessary syncing.
// We cannot rely on informer since applications might be updated by both application controller and api server.
freshApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace).Get(context.Background(), app.ObjectMeta.Name, metav1.GetOptions{})
if err != nil {
log.Errorf("Failed to retrieve latest application state: %v", err)
return
}
app = freshApp

ctrl.processRequestedAppOperation(app)
} else if app.DeletionTimestamp != nil && app.CascadedDeletion() {
_, err = ctrl.finalizeApplicationDeletion(app)
Expand Down Expand Up @@ -1125,7 +1135,7 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
}

appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace)
patchedApp, err := appClient.Patch(context.Background(), app.Name, types.MergePatchType, patchJSON, metav1.PatchOptions{})
_, err = appClient.Patch(context.Background(), app.Name, types.MergePatchType, patchJSON, metav1.PatchOptions{})
if err != nil {
// Stop retrying updating deleted application
if apierr.IsNotFound(err) {
Expand Down Expand Up @@ -1155,10 +1165,6 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
ctrl.auditLogger.LogAppEvent(app, eventInfo, strings.Join(messages, " "))
ctrl.metricsServer.IncSync(app, state)
}
// write back to informer in order to avoid stale cache
if err := ctrl.appInformer.GetStore().Update(patchedApp); err != nil {
log.Warnf("Fails to update informer: %v", err)
}
return nil
})
}
Expand Down
20 changes: 5 additions & 15 deletions controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,12 +1084,10 @@ func TestProcessRequestedAppOperation_FailedNoRetries(t *testing.T) {
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
receivedPatch := map[string]interface{}{}
fakeAppCs.PrependReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patchedApp := &v1alpha1.Application{}
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &patchedApp))
}
return true, patchedApp, nil
return true, nil, nil
})

ctrl.processRequestedAppOperation(app)
Expand All @@ -1111,12 +1109,10 @@ func TestProcessRequestedAppOperation_InvalidDestination(t *testing.T) {
fakeAppCs.Lock()
defer fakeAppCs.Unlock()
fakeAppCs.PrependReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patchedApp := &v1alpha1.Application{}
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &patchedApp))
}
return true, patchedApp, nil
return true, nil, nil
})
}()

Expand All @@ -1139,12 +1135,10 @@ func TestProcessRequestedAppOperation_FailedHasRetries(t *testing.T) {
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
receivedPatch := map[string]interface{}{}
fakeAppCs.PrependReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patchedApp := &v1alpha1.Application{}
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &patchedApp))
}
return true, patchedApp, nil
return true, nil, nil
})

ctrl.processRequestedAppOperation(app)
Expand Down Expand Up @@ -1184,12 +1178,10 @@ func TestProcessRequestedAppOperation_RunningPreviouslyFailed(t *testing.T) {
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
receivedPatch := map[string]interface{}{}
fakeAppCs.PrependReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patchedApp := &v1alpha1.Application{}
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &patchedApp))
}
return true, patchedApp, nil
return true, nil, nil
})

ctrl.processRequestedAppOperation(app)
Expand Down Expand Up @@ -1219,12 +1211,10 @@ func TestProcessRequestedAppOperation_HasRetriesTerminated(t *testing.T) {
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
receivedPatch := map[string]interface{}{}
fakeAppCs.PrependReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patchedApp := &v1alpha1.Application{}
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &patchedApp))
}
return true, patchedApp, nil
return true, nil, nil
})

ctrl.processRequestedAppOperation(app)
Expand Down

0 comments on commit 872eff2

Please sign in to comment.