Skip to content

Commit

Permalink
fix(archive): upsert archive + ci: Pin images on CI, add readiness pr…
Browse files Browse the repository at this point in the history
…obes, clean-up logging and other tweaks (#2038)
  • Loading branch information
alexec committed Jan 23, 2020
1 parent c46c683 commit 1db74e1
Show file tree
Hide file tree
Showing 21 changed files with 149 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Expand Up @@ -60,9 +60,9 @@ commands:
name: Pull some Docker images now, to save time later
command: |
docker pull golang:1.11.5
docker pull minio/minio
docker pull minio/minio:RELEASE.2019-12-17T23-16-33Z
docker pull docker/whalesay:latest
docker pull bitnami/kubectl
docker pull bitnami/kubectl:1.15.3-ol-7-r165
background: true
- restore_go_cache
- install_golang
Expand Down
1 change: 1 addition & 0 deletions .dockerignore
Expand Up @@ -5,6 +5,7 @@
assets
coverage.out
dist
sdks
vendor
ui/dist
ui/node_modules
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -196,7 +196,7 @@ verify-codegen:
diff ./dist/swagger.json ./api/openapi-spec/swagger.json

.PHONY: manifests
manifests: manifests/install.yaml manifests/namespace-install.yaml manifests/quick-start-mysql.yaml manifests/quick-start-postgres.yaml
manifests: manifests/install.yaml manifests/namespace-install.yaml manifests/quick-start-mysql.yaml manifests/quick-start-postgres.yaml test/e2e/manifests/postgres.yaml test/e2e/manifests/mysql.yaml

manifests/install.yaml: $(MANIFESTS)
env VERSION=$(VERSION) ./hack/update-manifests.sh
Expand Down
Expand Up @@ -56,7 +56,7 @@ const wftStr2 = `
"metadata": {
"name": "workflow-template-whalesay-template2",
"namespace": "default"
},
"spec": {
"templates": [
Expand Down
14 changes: 13 additions & 1 deletion manifests/quick-start-mysql.yaml
Expand Up @@ -413,6 +413,18 @@ spec:
name: main
ports:
- containerPort: 5432
readinessProbe:
exec:
command:
- mysql
- -u
- mysql
- -ppassword
- argo
- -e
- SELECT 1
initialDelaySeconds: 15
timeoutSeconds: 2
---
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -457,7 +469,7 @@ spec:
value: admin
- name: MINIO_SECRET_KEY
value: password
image: minio/minio
image: minio/minio:RELEASE.2019-12-17T23-16-33Z
lifecycle:
postStart:
exec:
Expand Down
12 changes: 11 additions & 1 deletion manifests/quick-start-postgres.yaml
Expand Up @@ -407,6 +407,16 @@ spec:
name: main
ports:
- containerPort: 5432
readinessProbe:
exec:
command:
- psql
- -U
- postgres
- -c
- SELECT 1
initialDelaySeconds: 15
timeoutSeconds: 2
---
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -451,7 +461,7 @@ spec:
value: admin
- name: MINIO_SECRET_KEY
value: password
image: minio/minio
image: minio/minio:RELEASE.2019-12-17T23-16-33Z
lifecycle:
postStart:
exec:
Expand Down
2 changes: 1 addition & 1 deletion manifests/quick-start/base/minio/minio-pod.yaml
Expand Up @@ -7,7 +7,7 @@ metadata:
spec:
containers:
- name: main
image: minio/minio
image: minio/minio:RELEASE.2019-12-17T23-16-33Z
env:
- name: MINIO_ACCESS_KEY
value: admin
Expand Down
7 changes: 6 additions & 1 deletion manifests/quick-start/mysql/mysql-deployment.yaml
Expand Up @@ -27,4 +27,9 @@ spec:
- name: MYSQL_RANDOM_ROOT_PASSWORD
value: "yes"
ports:
- containerPort: 5432
- containerPort: 5432
readinessProbe:
exec:
command: ["mysql", "-u", "mysql", "-ppassword", "argo", "-e", "SELECT 1"]
initialDelaySeconds: 15
timeoutSeconds: 2
7 changes: 6 additions & 1 deletion manifests/quick-start/postgres/postgres-deployment.yaml
Expand Up @@ -21,4 +21,9 @@ spec:
- name: POSTGRES_PASSWORD
value: password
ports:
- containerPort: 5432
- containerPort: 5432
readinessProbe:
exec:
command: ["psql", "-U", "postgres", "-c", "SELECT 1"]
initialDelaySeconds: 15
timeoutSeconds: 2
38 changes: 32 additions & 6 deletions persist/sqldb/workflow_archive.go
Expand Up @@ -47,15 +47,15 @@ func NewWorkflowArchive(session sqlbuilder.Database, clusterName string) Workflo
}

func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
log.WithField("uid", wf.UID).Debug("Archiving workflow")
err := r.DeleteWorkflow(string(wf.UID))
if err != nil {
return err
}
logCtx := log.WithField("uid", wf.UID)
log.Debug("Archiving workflow")
workflow, err := json.Marshal(wf)
if err != nil {
return err
}
// We assume that we're much more likely to be inserting rows that updating them, so we try and insert,
// and if that fails, then we update.
// There is no check for race condition here, last writer wins.
_, err = r.session.Collection(archiveTableName).
Insert(&archivedWorkflowRecord{
archivedWorkflowMetadata: archivedWorkflowMetadata{
Expand All @@ -69,7 +69,33 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
},
Workflow: string(workflow),
})
return err
if err != nil {
if isDuplicateKeyError(err) {
res, err := r.session.
Update(archiveTableName).
Set("workflow", string(workflow)).
Set("phase", wf.Status.Phase).
Set("startedat", wf.Status.StartedAt.Time).
Set("finishedat", wf.Status.FinishedAt.Time).
Where(db.Cond{"clustername": r.clusterName}).
And(db.Cond{"uuid": wf.UID}).
Exec()
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected != 1 {
logCtx.WithField("rowsAffected", rowsAffected).Warn("Expected exactly one row affected")
}
} else {
return err
}
}

return nil
}

func (r *workflowArchive) ListWorkflows(namespace string, limit int, offset int) (wfv1.Workflows, error) {
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/argo_server_test.go
Expand Up @@ -566,7 +566,7 @@ func (s *ArgoServerSuite) TestArtifactServer() {
Expect().
Status(200).
Body().
Contains("😀 Hello Argo!")
Contains(":) Hello Argo!")
})

s.Run("GetArtifactByUid", func(t *testing.T) {
Expand All @@ -579,7 +579,7 @@ func (s *ArgoServerSuite) TestArtifactServer() {
Expect().
Status(200).
Body().
Contains("😀 Hello Argo!")
Contains(":) Hello Argo!")
})

}
Expand Down Expand Up @@ -645,7 +645,7 @@ func (s *ArgoServerSuite) TestWorkflowServiceStream() {
s := bufio.NewScanner(resp.Body)
for s.Scan() {
line := s.Text()
if strings.Contains(line, "😀 Hello Argo!") {
if strings.Contains(line, ":) Hello Argo!") {
break
}
}
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/expectedfailures/pod-termination-failure.yaml
Expand Up @@ -39,7 +39,7 @@ spec:
args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]
- name: check-status
script:
image: bitnami/kubectl
image: bitnami/kubectl:1.15.3-ol-7-r165
command: [bash]
source: |
host=`hostname`;
Expand All @@ -57,7 +57,7 @@ spec:
parameters:
- name: pod
container:
image: bitnami/kubectl
image: bitnami/kubectl:1.15.3-ol-7-r165
command: [bash, -c]
args: ["kubectl delete po {{inputs.parameters.pod}}"]

30 changes: 14 additions & 16 deletions test/e2e/fixtures/e2e_suite.go
Expand Up @@ -73,13 +73,25 @@ func (s *E2ESuite) TearDownSuite() {
func (s *E2ESuite) BeforeTest(_, _ string) {
s.Diagnostics = &Diagnostics{}

// delete all cron workflows
cronList, err := s.cronClient.List(metav1.ListOptions{LabelSelector: label})
if err != nil {
panic(err)
}
for _, cronWf := range cronList.Items {
log.WithFields(log.Fields{"cronWorkflow": cronWf.Name}).Info("Deleting cron workflow")
err = s.cronClient.Delete(cronWf.Name, nil)
if err != nil {
panic(err)
}
}
// delete all workflows
list, err := s.wfClient.List(metav1.ListOptions{LabelSelector: label})
if err != nil {
panic(err)
}
for _, wf := range list.Items {
logCtx := log.WithFields(log.Fields{"test": s.T().Name(), "workflow": wf.Name})
logCtx := log.WithFields(log.Fields{"workflow": wf.Name})
logCtx.Infof("Deleting workflow")
err = s.wfClient.Delete(wf.Name, &metav1.DeleteOptions{})
if err != nil {
Expand Down Expand Up @@ -112,27 +124,13 @@ func (s *E2ESuite) BeforeTest(_, _ string) {
time.Sleep(3 * time.Second)
}
}
// delete all cron workflows
cronList, err := s.cronClient.List(metav1.ListOptions{LabelSelector: label})
if err != nil {
panic(err)
}
for _, cronWf := range cronList.Items {
logCtx := log.WithFields(log.Fields{"test": s.T().Name(), "cron workflow": cronWf.Name})
logCtx.Infof("Deleting cron workflow")
err = s.cronClient.Delete(cronWf.Name, nil)
if err != nil {
panic(err)
}
}
// delete all workflow templates
wfTmpl, err := s.wfTemplateClient.List(metav1.ListOptions{LabelSelector: label})
if err != nil {
panic(err)
}
for _, wfTmpl := range wfTmpl.Items {
logCtx := log.WithFields(log.Fields{"test": s.T().Name(), "workflow template": wfTmpl.Name})
logCtx.Infof("Deleting workflow template")
log.WithField("template", wfTmpl.Name).Info("Deleting workflow template")
err = s.wfTemplateClient.Delete(wfTmpl.Name, nil)
if err != nil {
panic(err)
Expand Down
15 changes: 12 additions & 3 deletions test/e2e/fixtures/given.go
Expand Up @@ -6,7 +6,6 @@ import (
"testing"

"k8s.io/client-go/kubernetes"

"sigs.k8s.io/yaml"

"github.com/argoproj/argo/persist/sqldb"
Expand Down Expand Up @@ -63,6 +62,12 @@ func (g *Given) Workflow(text string) *Given {
g.t.Fatal(err)
}
}
if g.wf.GetLabels() == nil {
g.wf.SetLabels(map[string]string{})
}
if g.wf.GetLabels()[label] == "" {
g.wf.GetLabels()[label] = "true"
}
return g
}

Expand Down Expand Up @@ -104,7 +109,9 @@ func (g *Given) WorkflowTemplate(text string) *Given {
if wfTemplate.GetLabels() == nil {
wfTemplate.SetLabels(map[string]string{})
}
wfTemplate.GetLabels()[label] = "true"
if wfTemplate.GetLabels()[label] == "" {
wfTemplate.GetLabels()[label] = "true"
}
g.wfTemplates = append(g.wfTemplates, wfTemplate)
}
return g
Expand Down Expand Up @@ -143,7 +150,9 @@ func (g *Given) CronWorkflow(text string) *Given {
if g.cronWf.GetLabels() == nil {
g.cronWf.SetLabels(map[string]string{})
}
g.cronWf.GetLabels()[label] = "true"
if g.cronWf.GetLabels()[label] == "" {
g.cronWf.GetLabels()[label] = "true"
}
}
return g
}
Expand Down
11 changes: 5 additions & 6 deletions test/e2e/fixtures/then.go
Expand Up @@ -29,7 +29,7 @@ func (t *Then) Expect(block func(t *testing.T, metadata *metav1.ObjectMeta, stat
if t.workflowName == "" {
t.t.Fatal("No workflow to test")
}
log.WithFields(log.Fields{"test": t.t.Name(), "workflow": t.workflowName}).Info("Checking expectation")
log.WithFields(log.Fields{"workflow": t.workflowName}).Info("Checking expectation")
wf, err := t.client.Get(t.workflowName, metav1.GetOptions{})
if err != nil {
t.t.Fatal(err)
Expand All @@ -49,7 +49,7 @@ func (t *Then) ExpectCron(block func(t *testing.T, cronWf *wfv1.CronWorkflow)) *
if t.cronWorkflowName == "" {
t.t.Fatal("No cron workflow to test")
}
log.WithFields(log.Fields{"test": t.t.Name(), "cron workflow": t.cronWorkflowName}).Info("Checking expectation")
log.WithFields(log.Fields{"cronWorkflow": t.cronWorkflowName}).Info("Checking cron expectation")
cronWf, err := t.cronClient.Get(t.cronWorkflowName, metav1.GetOptions{})
if err != nil {
t.t.Fatal(err)
Expand All @@ -59,13 +59,12 @@ func (t *Then) ExpectCron(block func(t *testing.T, cronWf *wfv1.CronWorkflow)) *
}

func (t *Then) ExpectWorkflowList(listOptions metav1.ListOptions, block func(t *testing.T, wfList *wfv1.WorkflowList)) *Then {
log.WithFields(log.Fields{"test": t.t.Name()}).Info("Getting relevant workflows")
log.Info("Listing workflows")
wfList, err := t.client.List(listOptions)
if err != nil {
t.t.Fatal(err)
}
log.WithFields(log.Fields{"test": t.t.Name()}).Info("Got relevant workflows")
log.WithFields(log.Fields{"test": t.t.Name()}).Info("Checking expectation")
log.Info("Checking expectation")
block(t.t, wfList)
return t
}
Expand All @@ -74,7 +73,7 @@ func (t *Then) ExpectAuditEvents(block func(*testing.T, *apiv1.EventList)) *Then
if t.workflowName == "" {
t.t.Fatal("No workflow to test")
}
log.WithFields(log.Fields{"test": t.t.Name(), "workflow": t.workflowName}).Info("Checking expectation")
log.WithFields(log.Fields{"workflow": t.workflowName}).Info("Checking expectation")
wf, err := t.client.Get(t.workflowName, metav1.GetOptions{})
if err != nil {
t.t.Fatal(err)
Expand Down

0 comments on commit 1db74e1

Please sign in to comment.