Skip to content

Commit

Permalink
feat: Unified workflows list UI and API (#11121)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Tang <terrytangyuan@gmail.com>
  • Loading branch information
terrytangyuan committed Jul 10, 2023
1 parent 9264f76 commit 82310dd
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 1,054 deletions.
28 changes: 10 additions & 18 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"upper.io/db.v3"
"upper.io/db.v3/lib/sqlbuilder"

Expand Down Expand Up @@ -144,7 +142,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
}

func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) {
var archivedWfs []archivedWorkflowMetadata
var archivedWfs []archivedWorkflowRecord
clause, err := labelsClause(r.dbType, labelRequirements)
if err != nil {
return nil, err
Expand All @@ -158,7 +156,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
}

err = r.session.
Select("name", "namespace", "uid", "phase", "startedat", "finishedat").
Select("workflow").
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(namespace)).
Expand All @@ -173,20 +171,14 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
if err != nil {
return nil, err
}
wfs := make(wfv1.Workflows, len(archivedWfs))
for i, md := range archivedWfs {
wfs[i] = wfv1.Workflow{
ObjectMeta: v1.ObjectMeta{
Name: md.Name,
Namespace: md.Namespace,
UID: types.UID(md.UID),
CreationTimestamp: v1.Time{Time: md.StartedAt},
},
Status: wfv1.WorkflowStatus{
Phase: md.Phase,
StartedAt: v1.Time{Time: md.StartedAt},
FinishedAt: v1.Time{Time: md.FinishedAt},
},
wfs := make(wfv1.Workflows, 0)
for _, archivedWf := range archivedWfs {
wf := wfv1.Workflow{}
err = json.Unmarshal([]byte(archivedWf.Workflow), &wf)
if err != nil {
log.WithFields(log.Fields{"workflowUID": archivedWf.UID, "workflowName": archivedWf.Name}).Errorln("unable to unmarshal workflow from database")
} else {
wfs = append(wfs, wf)
}
}
return wfs, nil
Expand Down
40 changes: 40 additions & 0 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/argoproj/argo-workflows/v3/server/auth"
Expand Down Expand Up @@ -128,6 +129,32 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return wf, nil
}

func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alpha1.WorkflowList, numWfsToKeep int) *v1alpha1.WorkflowList {
var mergedWfs []v1alpha1.Workflow
var uidToWfs = map[types.UID][]v1alpha1.Workflow{}
for _, item := range liveWfs.Items {
uidToWfs[item.UID] = append(uidToWfs[item.UID], item)
}
for _, item := range archivedWfs.Items {
uidToWfs[item.UID] = append(uidToWfs[item.UID], item)
}

for _, v := range uidToWfs {
mergedWfs = append(mergedWfs, v[0])
}
mergedWfsList := v1alpha1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
sort.Sort(mergedWfsList.Items)
numWfs := 0
var finalWfs []v1alpha1.Workflow
for _, item := range mergedWfsList.Items {
if numWfsToKeep == 0 || numWfs < numWfsToKeep {
finalWfs = append(finalWfs, item)
numWfs += 1
}
}
return &v1alpha1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta}
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) {
wfClient := auth.GetWfClient(ctx)

Expand All @@ -140,6 +167,19 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{
ListOptions: listOption,
NamePrefix: "",
Namespace: req.Namespace,
})
if err != nil {
log.Warnf("unable to list archived workflows:%v", err)
} else {
if archivedWfList != nil {
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit))
}
}

cleaner := fields.NewCleaner(req.Fields)
if s.offloadNodeStatusRepo.IsEnabled() && !cleaner.WillExclude("items.status.nodes") {
offloadedNodes, err := s.offloadNodeStatusRepo.List(req.Namespace)
Expand Down
17 changes: 17 additions & 0 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/go-jose/go-jose/v3/jwt"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -648,6 +649,22 @@ func (t testWatchWorkflowServer) Send(*workflowpkg.WorkflowWatchEvent) error {
panic("implement me")
}

func TestMergeWithArchivedWorkflows(t *testing.T) {
timeNow := time.Now()
wf1 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)}}}
wf2 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "2", CreationTimestamp: metav1.Time{Time: timeNow.Add(2 * time.Second)}}}
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}}
liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1, wf2}}
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1, wf3, wf2}}
expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1}}
expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}}
assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items)
assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items)
}

func TestWatchWorkflows(t *testing.T) {
server, ctx := getWorkflowServer()
wf := &v1alpha1.Workflow{
Expand Down
8 changes: 0 additions & 8 deletions ui/src/app/app-router.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {useEffect, useState} from 'react';
import {Redirect, Route, Router, Switch} from 'react-router';
import {Version} from '../models';
import apidocs from './apidocs';
import archivedWorkflows from './archived-workflows';
import clusterWorkflowTemplates from './cluster-workflow-templates';
import cronWorkflows from './cron-workflows';
import eventflow from './event-flow';
Expand Down Expand Up @@ -35,7 +34,6 @@ const workflowsEventBindingsUrl = uiUrl('workflow-event-bindings');
const workflowTemplatesUrl = uiUrl('workflow-templates');
const clusterWorkflowTemplatesUrl = uiUrl('cluster-workflow-templates');
const cronWorkflowsUrl = uiUrl('cron-workflows');
const archivedWorkflowsUrl = uiUrl('archived-workflows');
const eventSourceUrl = uiUrl('event-sources');
const pluginsUrl = uiUrl('plugins');
const helpUrl = uiUrl('help');
Expand Down Expand Up @@ -130,11 +128,6 @@ export const AppRouter = ({popupManager, history, notificationsManager}: {popupM
path: workflowsEventBindingsUrl + namespaceSuffix,
iconClassName: 'fa fa-link'
},
{
title: 'Archived Workflows',
path: archivedWorkflowsUrl + namespaceSuffix,
iconClassName: 'fa fa-archive'
},
{
title: 'Reports',
path: reportsUrl + namespaceSuffix,
Expand Down Expand Up @@ -176,7 +169,6 @@ export const AppRouter = ({popupManager, history, notificationsManager}: {popupM
<Route path={workflowTemplatesUrl} component={workflowTemplates.component} />
<Route path={clusterWorkflowTemplatesUrl} component={clusterWorkflowTemplates.component} />
<Route path={cronWorkflowsUrl} component={cronWorkflows.component} />
<Route path={archivedWorkflowsUrl} component={archivedWorkflows.component} />
<Route path={reportsUrl} component={reports.component} />
<Route path={pluginsUrl} component={plugins.component} />
<Route exact={true} strict={true} path={helpUrl} component={help.component} />
Expand Down

This file was deleted.

Loading

0 comments on commit 82310dd

Please sign in to comment.