Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): Implement offloading for workflow updates that are re-applied. Fixes #2856 #2941

Merged
merged 37 commits into from
May 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
dd2341d
chore(controller): Refactor hydration code
alexec May 4, 2020
715a0ba
Merge branch 'master' into chore-hydrate
alexec May 6, 2020
e9e0a27
chore-hydrate
alexec May 6, 2020
be91d47
chore-hydrate
alexec May 6, 2020
bdaacf8
chore-hydrate
alexec May 6, 2020
1349d40
has noop methods
alexec May 6, 2020
9371463
chore-hydrate
alexec May 6, 2020
c436a9b
chore-hydrate
alexec May 6, 2020
6ae55b3
chore-hydrate
alexec May 6, 2020
4851421
chore-hydrate
alexec May 6, 2020
f5402da
chore-hydrate
alexec May 6, 2020
6bbc743
lint
alexec May 6, 2020
3870f26
chore-hydrate
alexec May 6, 2020
08d9e48
chore-hydrate
alexec May 7, 2020
8791ca4
chore-hydrate
alexec May 7, 2020
250efc6
Merge branch 'master' into chore-hydrate
alexec May 7, 2020
4aafdf1
Merge branch 'chore-hydrate' of github.com:alexec/argo into chore-hyd…
alexec May 7, 2020
5e9eb02
chore-hydrate
alexec May 7, 2020
f321487
Merge branch 'master' into chore-hydrate
alexec May 7, 2020
ba42342
chore-hydrate
alexec May 7, 2020
bf39ee7
Merge branch 'master' into chore-hydrate
alexec May 8, 2020
ac2b455
Merge branch 'master' into chore-hydrate
alexec May 11, 2020
17eb397
chore-hydrate
alexec May 11, 2020
1e2f397
chore-hydrate
alexec May 11, 2020
64f8bb0
chore-hydrate
alexec May 11, 2020
1577b07
Merge branch 'master' into chore-hydrate
alexec May 11, 2020
4cc9dc2
chore-hydrate
alexec May 11, 2020
d0e92ea
Merge branch 'master' into chore-hydrate
alexec May 11, 2020
b47739a
chore-hydrate
alexec May 11, 2020
97fc1ac
chore-hydrate
alexec May 11, 2020
84d1e08
chore-hydrate
alexec May 12, 2020
cb07707
Merge branch 'master' into chore-hydrate
alexec May 12, 2020
3fe1bb7
chore-hydrate
alexec May 12, 2020
e5aa52c
chore-hydrate
alexec May 13, 2020
ad6bb44
chore-hydrate
alexec May 13, 2020
043e058
Merge branch 'master' into chore-hydrate
alexec May 22, 2020
ac9ae1e
chore-hydrate
alexec May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ CI ?= false
DB ?= postgres
K3D := $(shell if [ "`which kubectl`" != '' ] && [ "`kubectl config current-context`" = "k3s-default" ]; then echo true; else echo false; fi)
LOG_LEVEL := debug
ALWAYS_OFFLOAD_NODE_STATUS := true

ifeq ($(DB),no-db)
ALWAYS_OFFLOAD_NODE_STATUS := false
else
ALWAYS_OFFLOAD_NODE_STATUS := true
endif

ifeq ($(CI),true)
Expand Down Expand Up @@ -318,7 +319,6 @@ $(VERSION_FILE):
touch $(VERSION_FILE)

dist/$(DB).yaml: $(MANIFESTS) $(E2E_MANIFESTS) $(VERSION_FILE)
# We additionally disable ALWAYS_OFFLOAD_NODE_STATUS
kustomize build --load_restrictor=none test/e2e/manifests/$(DB) | sed 's/:$(MANIFESTS_VERSION)/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' > dist/$(DB).yaml

.PHONY: install
Expand Down
12 changes: 6 additions & 6 deletions persist/sqldb/explosive_offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

var ExplosiveOffloadNodeStatusRepo OffloadNodeStatusRepo = &explosiveOffloadNodeStatusRepo{}
var notSupportedError = fmt.Errorf("offload node status is not supported")
var OffloadNotSupportedError = fmt.Errorf("offload node status is not supported")

type explosiveOffloadNodeStatusRepo struct {
}
Expand All @@ -17,21 +17,21 @@ func (n *explosiveOffloadNodeStatusRepo) IsEnabled() bool {
}

func (n *explosiveOffloadNodeStatusRepo) Save(string, string, wfv1.Nodes) (string, error) {
return "", notSupportedError
return "", OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) Get(string, string) (wfv1.Nodes, error) {
return nil, notSupportedError
return nil, OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) List(string) (map[UUIDVersion]wfv1.Nodes, error) {
return nil, notSupportedError
return nil, OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return notSupportedError
return OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) ([]UUIDVersion, error) {
return nil, notSupportedError
return nil, OffloadNotSupportedError
}
3 changes: 2 additions & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
grpcutil "github.com/argoproj/argo/util/grpc"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/util/json"
"github.com/argoproj/argo/workflow/hydrator"
)

const (
Expand Down Expand Up @@ -142,7 +143,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService)
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive, instanceIDService)
artifactServer := artifacts.NewArtifactServer(as.authenticator, hydrator.New(offloadRepo), wfArchive, instanceIDService)
grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, configMap.Links)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

Expand Down
27 changes: 8 additions & 19 deletions server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ import (
"github.com/argoproj/argo/server/auth"
"github.com/argoproj/argo/util/instanceid"
artifact "github.com/argoproj/argo/workflow/artifacts"
"github.com/argoproj/argo/workflow/packer"
"github.com/argoproj/argo/workflow/hydrator"
)

type ArtifactServer struct {
authN auth.Gatekeeper
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
wfArchive sqldb.WorkflowArchive
instanceIDService instanceid.Service
authN auth.Gatekeeper
hydrator hydrator.Interface
wfArchive sqldb.WorkflowArchive
instanceIDService instanceid.Service
}

func NewArtifactServer(authN auth.Gatekeeper, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, instanceIDService instanceid.Service) *ArtifactServer {
return &ArtifactServer{authN, offloadNodeStatusRepo, wfArchive, instanceIDService}
func NewArtifactServer(authN auth.Gatekeeper, hydrator hydrator.Interface, wfArchive sqldb.WorkflowArchive, instanceIDService instanceid.Service) *ArtifactServer {
return &ArtifactServer{authN, hydrator, wfArchive, instanceIDService}
}

func (a *ArtifactServer) GetArtifact(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -171,21 +171,10 @@ func (a *ArtifactServer) getWorkflowAndValidate(ctx context.Context, namespace s
if err != nil {
return nil, err
}
err = packer.DecompressWorkflow(wf)
err = a.hydrator.Hydrate(wf)
if err != nil {
return nil, err
}
if wf.Status.IsOffloadNodeStatus() {
if a.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := a.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": namespace, "name": workflowName}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
return wf, nil
}

Expand Down
40 changes: 9 additions & 31 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/util/logs"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/packer"
"github.com/argoproj/argo/workflow/hydrator"
"github.com/argoproj/argo/workflow/templateresolution"
"github.com/argoproj/argo/workflow/util"
"github.com/argoproj/argo/workflow/validate"
Expand All @@ -27,11 +27,12 @@ import (
type workflowServer struct {
instanceIDService instanceid.Service
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
hydrator hydrator.Interface
}

// NewWorkflowServer returns a new workflowServer
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowpkg.WorkflowServiceServer {
return &workflowServer{instanceIDService, offloadNodeStatusRepo}
return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)}
}

func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*v1alpha1.Workflow, error) {
Expand Down Expand Up @@ -83,23 +84,11 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
if err != nil {
return nil, err
}

if wf.Status.IsOffloadNodeStatus() {
if s.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
err = packer.DecompressWorkflow(wf)
err = s.hydrator.Hydrate(wf)
if err != nil {
return nil, err
}
return wf, nil
return wf, err
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*v1alpha1.WorkflowList, error) {
Expand Down Expand Up @@ -173,21 +162,10 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
return fmt.Errorf("watch object was not a workflow %v", reflect.TypeOf(event.Object))
}
logCtx := log.WithFields(log.Fields{"workflow": wf.Name, "type": event.Type, "phase": wf.Status.Phase})
err := packer.DecompressWorkflow(wf)
err := s.hydrator.Hydrate(wf)
if err != nil {
return err
}
if wf.Status.IsOffloadNodeStatus() {
if s.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
logCtx.Debug("Sending event")
err = ws.Send(&workflowpkg.WorkflowWatchEvent{Type: string(event.Type), Object: wf})
if err != nil {
Expand Down Expand Up @@ -218,7 +196,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, err
}

wf, err = util.RetryWorkflow(kubeClient, s.offloadNodeStatusRepo, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf, req.RestartSuccessful, req.NodeFieldSelector)
wf, err = util.RetryWorkflow(kubeClient, s.hydrator, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf, req.RestartSuccessful, req.NodeFieldSelector)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,7 +229,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo
return nil, err
}

err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.offloadNodeStatusRepo, req.Name, req.NodeFieldSelector)
err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, req.Name, req.NodeFieldSelector)
if err != nil {
log.Warnf("Failed to resume %s: %+v", req.Name, err)
return nil, err
Expand Down Expand Up @@ -312,7 +290,7 @@ func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.Work
if err != nil {
return nil, err
}
err = util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.offloadNodeStatusRepo, req.Name, req.NodeFieldSelector, req.Message)
err = util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, req.Name, req.NodeFieldSelector, req.Message)
if err != nil {
return nil, err
}
Expand Down
5 changes: 1 addition & 4 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,7 @@ func generateNameReactor(action ktesting.Action) (handled bool, ret runtime.Obje
}

func getWorkflow(ctx context.Context, server workflowpkg.WorkflowServiceServer, namespace string, wfName string) (*v1alpha1.Workflow, error) {
return server.GetWorkflow(ctx, &workflowpkg.WorkflowGetRequest{
Name: wfName,
Namespace: namespace,
})
return server.GetWorkflow(ctx, &workflowpkg.WorkflowGetRequest{Name: wfName, Namespace: namespace})
}

func getWorkflowList(ctx context.Context, server workflowpkg.WorkflowServiceServer, namespace string) (*v1alpha1.WorkflowList, error) {
Expand Down
11 changes: 0 additions & 11 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,6 @@ func (s *ArgoServerSuite) TestWorkflowService() {
Array().
Length().
Equal(1)
if s.Persistence.IsEnabled() {
// check we are loading offloaded node status
j.Path("$.items[0].status.offloadNodeStatusVersion").
NotNull()
}
j.Path("$.items[0].status.nodes").
NotNull()
})
Expand All @@ -518,12 +513,6 @@ func (s *ArgoServerSuite) TestWorkflowService() {
Expect().
Status(200).
JSON()
if s.Persistence.IsEnabled() {
// check we are loading offloaded node status
j.
Path("$.status.offloadNodeStatusVersion").
NotNull()
}
j.Path("$.status.nodes").
NotNull()
s.e(s.T()).GET("/api/v1/workflows/argo/not-found").
Expand Down
38 changes: 17 additions & 21 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func (s *CLISuite) BeforeTest(suiteName, testName string) {
_ = os.Unsetenv("ARGO_TOKEN")
}

func (s *CLISuite) testNeedsOffloading() {
skip := s.Persistence.IsEnabled() && os.Getenv("ARGO_SERVER") == ""
if skip {
s.T().Skip("test needs offloading, but not Argo Server available")
}
}

func (s *CLISuite) TestCompletion() {
s.Given().RunCli([]string{"completion", "bash"}, func(t *testing.T, output string, err error) {
assert.NoError(t, err)
Expand Down Expand Up @@ -214,6 +221,7 @@ func (s *CLISuite) TestRoot() {
})
})
s.Run("List", func() {
s.testNeedsOffloading()
for i := 0; i < 3; i++ {
s.Given().
Workflow("@smoke/basic-generate-name.yaml").
Expand All @@ -232,6 +240,7 @@ func (s *CLISuite) TestRoot() {
})
})
s.Run("Get", func() {
s.testNeedsOffloading()
s.Given().RunCli([]string{"get", "basic"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "Name:")
Expand Down Expand Up @@ -267,11 +276,7 @@ func (s *CLISuite) TestRoot() {
}

func (s *CLISuite) TestWorkflowSuspendResume() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_with_server_test.go
s.T().SkipNow()
}
s.testNeedsOffloading()
s.Given().
Workflow("@testdata/sleep-3s.yaml").
When().
Expand All @@ -294,15 +299,8 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
})
}

func (s *CLISuite) TestNodeSuspendResumeNoPersistence() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
s.T().SkipNow()
}
NodeSuspendResumeCommon(s.E2ESuite)
}

func NodeSuspendResumeCommon(s fixtures.E2ESuite) {
func (s *CLISuite) TestNodeSuspendResume() {
s.testNeedsOffloading()
s.Given().
Workflow("@testdata/node-suspend.yaml").
When().
Expand Down Expand Up @@ -497,13 +495,8 @@ func (s *CLISuite) TestWorkflowLint() {
})
}

func (s *CLISuite) TestWorkflowRetryNoPersistence() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_with_server_test.go
s.T().SkipNow()
}

func (s *CLISuite) TestWorkflowRetry() {
s.testNeedsOffloading()
var retryTime corev1.Time

s.Given().
Expand Down Expand Up @@ -556,6 +549,7 @@ func (s *CLISuite) TestWorkflowTerminate() {
}

func (s *CLISuite) TestWorkflowWait() {
s.testNeedsOffloading()
s.Given().
Workflow("@smoke/basic.yaml").
When().
Expand All @@ -569,6 +563,7 @@ func (s *CLISuite) TestWorkflowWait() {
}

func (s *CLISuite) TestWorkflowWatch() {
s.testNeedsOffloading()
s.Given().
Workflow("@smoke/basic.yaml").
When().
Expand Down Expand Up @@ -620,6 +615,7 @@ func (s *CLISuite) TestTemplate() {
})
})
s.Run("Submittable-Template", func() {
s.testNeedsOffloading()
s.Given().RunCli([]string{"submit", "--from", "workflowtemplate/workflow-template-whalesay-template", "-l", "argo-e2e=true"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "Name:")
Expand Down