New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(controller): Implement offloading for workflow updates that are re-applied. Fixes #2856 #2941
Conversation
@@ -103,7 +100,7 @@ func TestPersistWithLargeWfSupport(t *testing.T) { | |||
assert.Empty(t, wf.Status.Nodes) | |||
assert.Empty(t, wf.Status.CompressedNodes) | |||
// check the updated in-memory version is pre-offloaded state | |||
assert.True(t, woc.wf.Status.IsOffloadNodeStatus()) | |||
assert.False(t, woc.wf.Status.IsOffloadNodeStatus()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
behaviour change
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase) | ||
assert.NotEmpty(t, woc.wf.Status.Nodes) | ||
assert.Empty(t, woc.wf.Status.CompressedNodes) | ||
assert.Equal(t, wfv1.NodeError, wf.Status.Phase) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected behaviour
@@ -83,7 +84,12 @@ func compressWorkflow(wf *wfv1.Workflow) error { | |||
return err | |||
} | |||
if large { | |||
compressedSize, _ := getSize(wf) | |||
compressedSize, err := getSize(wf) | |||
wf.Status.CompressedNodes = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was a bug
} | ||
|
||
//workflow is too big but offload is disabled | ||
func TestResumeWorkflowOffloadDisabled(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests are all irrelevant now - they were testing the hydration code by proxy - we don't need that - or want that
|
Makefile
Outdated
@@ -311,8 +311,7 @@ dist/postgres.yaml: $(MANIFESTS) $(E2E_MANIFESTS) $(VERSION_FILE) | |||
kustomize build --load_restrictor=none test/e2e/manifests/postgres | sed 's/:$(MANIFESTS_VERSION)/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' > dist/postgres.yaml | |||
|
|||
dist/no-db.yaml: $(MANIFESTS) $(E2E_MANIFESTS) $(VERSION_FILE) | |||
# We additionally disable ALWAYS_OFFLOAD_NODE_STATUS | |||
kustomize build --load_restrictor=none test/e2e/manifests/no-db | sed 's/:$(MANIFESTS_VERSION)/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' | sed 's/"true"/"false"/' > dist/no-db.yaml | |||
kustomize build --load_restrictor=none test/e2e/manifests/no-db | sed 's/:$(MANIFESTS_VERSION)/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' > dist/no-db.yaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dead code
workflow/controller/operator.go
Outdated
|
||
if !woc.controller.hydrator.IsHydrated(woc.wf) { | ||
woc.log.Error("workflow should be hydrated") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should fail here
@@ -0,0 +1,79 @@ | |||
package hydrator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any thoughts on another name for hydration?
if err != nil { | ||
woc.log.Errorf("workflow decompression failed: %v", err) | ||
woc.log.Errorf("hydration failed: %v", err) | ||
woc.markWorkflowError(err, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
markWorkflowError tries to access the workflow nodes, which may not exist if the hydration has failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catch-22: if we cannot hydrate the workflow, then we cannot access the nodes
woc.log.Warnf("Error compressing workflow: %v", err) | ||
err := woc.controller.hydrator.Dehydrate(woc.wf) | ||
if err != nil { | ||
woc.log.Warnf("Failed to dehydrate: %v", err) | ||
woc.markWorkflowError(err, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same problem here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to fix this - I've made changes to Dehydrate
to ensure that error branches do not result in a dehydrated workflow - could still fail to persist mind you
@@ -554,43 +541,56 @@ func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(wfClient v1alpha1.Workflo | |||
// reapplyUpdate GETs the latest version of the workflow, re-applies the updates and | |||
// retries the UPDATE multiple times. For reasoning behind this technique, see: | |||
// https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency | |||
func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface) (*wfv1.Workflow, error) { | |||
func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface, nodes wfv1.Nodes) (*wfv1.Workflow, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this do with extra unit tests, eg around the multiple attempt handling and merge scenarios?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right about this - toughie to test, but all mockable now.
@@ -80,10 +81,17 @@ func compressWorkflow(wf *wfv1.Workflow) error { | |||
// still too large? | |||
large, err := IsLargeWorkflow(wf) | |||
if err != nil { | |||
wf.Status.CompressedNodes = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an existing bug
@mark9white if you're happy that you ran this and it worked, I'll look to get a core team review and appoval. |
That would be great :) |
@sarabala1979 could I please ask you to review this PR for me? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Kudos, SonarCloud Quality Gate passed! 0 Bugs |
Checklist:
"fix(controller): Updates such and such. Fixes #1234"
.