diff --git a/pkg/apiclient/argo-kube-client.go b/pkg/apiclient/argo-kube-client.go index 8f4f32c91d29..3ae83fe809db 100644 --- a/pkg/apiclient/argo-kube-client.go +++ b/pkg/apiclient/argo-kube-client.go @@ -11,6 +11,8 @@ import ( restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "github.com/argoproj/argo-workflows/v3/server/workflow/store" + "github.com/argoproj/argo-workflows/v3" "github.com/argoproj/argo-workflows/v3/persist/sqldb" "github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate" @@ -25,7 +27,6 @@ import ( cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow" "github.com/argoproj/argo-workflows/v3/server/types" workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow" - "github.com/argoproj/argo-workflows/v3/server/workflow/store" workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate" "github.com/argoproj/argo-workflows/v3/util/help" "github.com/argoproj/argo-workflows/v3/util/instanceid" @@ -39,7 +40,6 @@ var ( type argoKubeClient struct { instanceIDService instanceid.Service wfClient workflow.Interface - wfStore store.WorkflowStore } var _ Client = &argoKubeClient{} @@ -86,16 +86,13 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, if err != nil { return nil, nil, err } - wfStore, err := store.NewSQLiteStore(instanceIDService) - if err != nil { - return nil, nil, err - } - return ctx, &argoKubeClient{instanceIDService, wfClient, wfStore}, nil + return ctx, &argoKubeClient{instanceIDService, wfClient}, nil } func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient { wfArchive := sqldb.NullWorkflowArchive - return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfStore)}} + wfLister := store.NewKubeLister(a.wfClient) + return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil)}} } func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) { diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 83b7ac0d34bc..27188374d229 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -236,7 +236,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st if err != nil { log.Fatal(err) } - workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore) + workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore) grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor) httpServer := as.newHTTPServer(ctx, port, artifactServer) diff --git a/server/utils/list_options.go b/server/utils/list_options.go index 697c09b43905..69a03456cbd5 100644 --- a/server/utils/list_options.go +++ b/server/utils/list_options.go @@ -51,10 +51,7 @@ func (l ListOptions) WithStartedAtAscending(ascending bool) ListOptions { return l } -func BuildListOptions(options *metav1.ListOptions, ns, namePrefix string) (ListOptions, error) { - if options == nil { - options = &metav1.ListOptions{} - } +func BuildListOptions(options metav1.ListOptions, ns, namePrefix string) (ListOptions, error) { if options.Continue == "" { options.Continue = "0" } diff --git a/server/workflow/store/lister.go b/server/workflow/store/lister.go new file mode 100644 index 000000000000..3a2dc0870a7a --- /dev/null +++ b/server/workflow/store/lister.go @@ -0,0 +1,41 @@ +package store + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" +) + +type WorkflowLister interface { + ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) + CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) +} + +type kubeLister struct { + wfClient versioned.Interface +} + +var _ WorkflowLister = &kubeLister{} + +func NewKubeLister(wfClient versioned.Interface) WorkflowLister { + return &kubeLister{wfClient: wfClient} +} + +func (k *kubeLister) ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) { + wfList, err := k.wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, listOptions) + if err != nil { + return nil, err + } + return wfList, nil +} + +func (k *kubeLister) CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) { + wfList, err := k.wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, listOptions) + if err != nil { + return 0, err + } + return int64(len(wfList.Items)), nil +} diff --git a/server/workflow/store/sqlite_store.go b/server/workflow/store/sqlite_store.go index 8819219e55c1..c3518a8f5e16 100644 --- a/server/workflow/store/sqlite_store.go +++ b/server/workflow/store/sqlite_store.go @@ -1,17 +1,20 @@ package store import ( + "context" "encoding/json" "fmt" log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "zombiezen.com/go/sqlite" "zombiezen.com/go/sqlite/sqlitex" + sutils "github.com/argoproj/argo-workflows/v3/server/utils" + "github.com/argoproj/argo-workflows/v3/persist/sqldb" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - sutils "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/util/instanceid" "github.com/argoproj/argo-workflows/v3/workflow/common" ) @@ -64,33 +67,36 @@ func initDB() (*sqlite.Conn, error) { type WorkflowStore interface { cache.Store - ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error) - CountWorkflows(options sutils.ListOptions) (int64, error) } -// sqliteStore is a sqlite-based store. -type sqliteStore struct { +// SQLiteStore is a sqlite-based store. +type SQLiteStore struct { conn *sqlite.Conn instanceService instanceid.Service } -var _ WorkflowStore = &sqliteStore{} +var _ WorkflowStore = &SQLiteStore{} +var _ WorkflowLister = &SQLiteStore{} -func NewSQLiteStore(instanceService instanceid.Service) (WorkflowStore, error) { +func NewSQLiteStore(instanceService instanceid.Service) (*SQLiteStore, error) { conn, err := initDB() if err != nil { return nil, err } - return &sqliteStore{conn: conn, instanceService: instanceService}, nil + return &SQLiteStore{conn: conn, instanceService: instanceService}, nil } -func (s *sqliteStore) ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error) { +func (s *SQLiteStore) ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) { + options, err := sutils.BuildListOptions(listOptions, namespace, namePrefix) + if err != nil { + return nil, err + } query := `select workflow from argo_workflows where instanceid = ? ` args := []any{s.instanceService.InstanceID()} - query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, false) + query, args, err = sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, false) if err != nil { return nil, err } @@ -114,10 +120,16 @@ where instanceid = ? return nil, err } - return workflows, nil + return &wfv1.WorkflowList{ + Items: workflows, + }, nil } -func (s *sqliteStore) CountWorkflows(options sutils.ListOptions) (int64, error) { +func (s *SQLiteStore) CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) { + options, err := sutils.BuildListOptions(listOptions, namespace, namePrefix) + if err != nil { + return 0, err + } query := `select count(*) as total from argo_workflows where instanceid = ? ` @@ -125,7 +137,7 @@ where instanceid = ? options.Limit = 0 options.Offset = 0 - query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, true) + query, args, err = sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, true) if err != nil { return 0, err } @@ -144,7 +156,7 @@ where instanceid = ? return total, nil } -func (s *sqliteStore) Add(obj interface{}) error { +func (s *SQLiteStore) Add(obj interface{}) error { wf, ok := obj.(*wfv1.Workflow) if !ok { return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) @@ -155,7 +167,7 @@ func (s *sqliteStore) Add(obj interface{}) error { return err } -func (s *sqliteStore) Update(obj interface{}) error { +func (s *SQLiteStore) Update(obj interface{}) error { wf, ok := obj.(*wfv1.Workflow) if !ok { return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) @@ -166,7 +178,7 @@ func (s *sqliteStore) Update(obj interface{}) error { return err } -func (s *sqliteStore) Delete(obj interface{}) error { +func (s *SQLiteStore) Delete(obj interface{}) error { wf, ok := obj.(*wfv1.Workflow) if !ok { return fmt.Errorf("unable to convert object to Workflow. object: %v", obj) @@ -174,7 +186,7 @@ func (s *sqliteStore) Delete(obj interface{}) error { return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) } -func (s *sqliteStore) Replace(list []interface{}, resourceVersion string) error { +func (s *SQLiteStore) Replace(list []interface{}, resourceVersion string) error { wfs := make([]*wfv1.Workflow, 0, len(list)) for _, obj := range list { wf, ok := obj.(*wfv1.Workflow) @@ -189,27 +201,27 @@ func (s *sqliteStore) Replace(list []interface{}, resourceVersion string) error return err } -func (s *sqliteStore) Resync() error { +func (s *SQLiteStore) Resync() error { return nil } -func (s *sqliteStore) List() []interface{} { +func (s *SQLiteStore) List() []interface{} { panic("not implemented") } -func (s *sqliteStore) ListKeys() []string { +func (s *SQLiteStore) ListKeys() []string { panic("not implemented") } -func (s *sqliteStore) Get(obj interface{}) (item interface{}, exists bool, err error) { +func (s *SQLiteStore) Get(obj interface{}) (item interface{}, exists bool, err error) { panic("not implemented") } -func (s *sqliteStore) GetByKey(key string) (item interface{}, exists bool, err error) { +func (s *SQLiteStore) GetByKey(key string) (item interface{}, exists bool, err error) { panic("not implemented") } -func (s *sqliteStore) upsertWorkflow(wf *wfv1.Workflow) error { +func (s *SQLiteStore) upsertWorkflow(wf *wfv1.Workflow) error { err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}}) if err != nil { return err @@ -248,7 +260,7 @@ func (s *sqliteStore) upsertWorkflow(wf *wfv1.Workflow) error { return nil } -func (s *sqliteStore) replaceWorkflows(workflows []*wfv1.Workflow) error { +func (s *SQLiteStore) replaceWorkflows(workflows []*wfv1.Workflow) error { err := sqlitex.Execute(s.conn, `delete from argo_workflows`, nil) if err != nil { return err diff --git a/server/workflow/store/sqlite_store_test.go b/server/workflow/store/sqlite_store_test.go index 11e6d731152c..086c755014a7 100644 --- a/server/workflow/store/sqlite_store_test.go +++ b/server/workflow/store/sqlite_store_test.go @@ -1,6 +1,7 @@ package store import ( + "context" "encoding/json" "fmt" "testing" @@ -13,7 +14,6 @@ import ( "zombiezen.com/go/sqlite/sqlitex" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - sutils "github.com/argoproj/argo-workflows/v3/server/utils" "github.com/argoproj/argo-workflows/v3/util/instanceid" ) @@ -71,7 +71,7 @@ func TestStoreOperation(t *testing.T) { instanceIdSvc := instanceid.NewService("my-instanceid") conn, err := initDB() require.NoError(t, err) - store := sqliteStore{ + store := SQLiteStore{ conn: conn, instanceService: instanceIdSvc, } @@ -79,7 +79,7 @@ func TestStoreOperation(t *testing.T) { for i := 0; i < 10; i++ { require.NoError(t, store.Add(generateWorkflow(i))) } - num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"}) + num, err := store.CountWorkflows(context.Background(), "argo", "", metav1.ListOptions{}) require.NoError(t, err) assert.Equal(t, int64(10), num) // Labels are also added @@ -129,12 +129,12 @@ func TestStoreOperation(t *testing.T) { })) }) t.Run("TestListWorkflows", func(t *testing.T) { - wfList, err := store.ListWorkflows(sutils.ListOptions{Namespace: "argo", Limit: 5}) + wfList, err := store.ListWorkflows(context.Background(), "argo", "", metav1.ListOptions{Limit: 5}) require.NoError(t, err) - assert.Len(t, wfList, 5) + assert.Len(t, wfList.Items, 5) }) t.Run("TestCountWorkflows", func(t *testing.T) { - num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"}) + num, err := store.CountWorkflows(context.Background(), "argo", "", metav1.ListOptions{}) require.NoError(t, err) assert.Equal(t, int64(9), num) }) diff --git a/server/workflow/workflow_server.go b/server/workflow/workflow_server.go index e009a9371935..c491009d2419 100644 --- a/server/workflow/workflow_server.go +++ b/server/workflow/workflow_server.go @@ -52,29 +52,40 @@ type workflowServer struct { offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo hydrator hydrator.Interface wfArchive sqldb.WorkflowArchive + wfLister store.WorkflowLister wfReflector *cache.Reflector - wfStore store.WorkflowStore } var _ workflowpkg.WorkflowServiceServer = &workflowServer{} // NewWorkflowServer returns a new WorkflowServer -func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfStore store.WorkflowStore) *workflowServer { - ctx := context.Background() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).List(ctx, options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).Watch(ctx, options) - }, - } - wfReflector := cache.NewReflector(lw, &wfv1.Workflow{}, wfStore, reSyncDuration) - return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo), wfArchive, wfReflector, wfStore} +func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore) *workflowServer { + ws := &workflowServer{ + instanceIDService: instanceIDService, + offloadNodeStatusRepo: offloadNodeStatusRepo, + hydrator: hydrator.New(offloadNodeStatusRepo), + wfArchive: wfArchive, + wfLister: wfLister, + } + if wfStore != nil { + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).List(context.Background(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).Watch(context.Background(), options) + }, + } + wfReflector := cache.NewReflector(lw, &wfv1.Workflow{}, wfStore, reSyncDuration) + ws.wfReflector = wfReflector + } + return ws } func (s *workflowServer) Run(stopCh <-chan struct{}) { - s.wfReflector.Run(stopCh) + if s.wfReflector == nil { + s.wfReflector.Run(stopCh) + } } func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*wfv1.Workflow, error) { @@ -156,13 +167,13 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf } func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) { - listOption := &metav1.ListOptions{} + listOption := metav1.ListOptions{} if req.ListOptions != nil { - listOption = req.ListOptions + listOption = *req.ListOptions } - s.instanceIDService.With(listOption) + s.instanceIDService.With(&listOption) - options, err := sutils.BuildListOptions(req.ListOptions, req.Namespace, "") + options, err := sutils.BuildListOptions(listOption, req.Namespace, "") if err != nil { return nil, err } @@ -176,7 +187,7 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor } var wfs wfv1.Workflows - liveWfCount, err := s.wfStore.CountWorkflows(options) + liveWfCount, err := s.wfLister.CountWorkflows(ctx, req.Namespace, "", listOption) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } @@ -187,13 +198,13 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor totalCount := liveWfCount + archivedCount // first fetch live workflows - var liveWfList []wfv1.Workflow + liveWfList := &wfv1.WorkflowList{} if liveWfCount > 0 && (options.Limit == 0 || options.Offset < int(liveWfCount)) { - liveWfList, err = s.wfStore.ListWorkflows(options) + liveWfList, err = s.wfLister.ListWorkflows(ctx, req.Namespace, "", listOption) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } - wfs = append(wfs, liveWfList...) + wfs = append(wfs, liveWfList.Items...) } // then fetch archived workflows @@ -203,7 +214,7 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor archivedLimit := options.Limit if archivedOffset < 0 { archivedOffset = 0 - archivedLimit = options.Limit - len(liveWfList) + archivedLimit = options.Limit - len(liveWfList.Items) } archivedWfList, err := s.wfArchive.ListWorkflows(options.WithLimit(archivedLimit).WithOffset(archivedOffset)) if err != nil { @@ -211,7 +222,10 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor } wfs = append(wfs, archivedWfList...) } - meta := metav1.ListMeta{ResourceVersion: s.wfReflector.LastSyncResourceVersion()} + meta := metav1.ListMeta{ResourceVersion: liveWfList.ResourceVersion} + if s.wfReflector != nil { + meta.ResourceVersion = s.wfReflector.LastSyncResourceVersion() + } remainCount := totalCount - int64(options.Offset) - int64(len(wfs)) if remainCount < 0 { remainCount = 0 diff --git a/server/workflow/workflow_server_test.go b/server/workflow/workflow_server_test.go index 0150bdf70f43..b00afd269985 100644 --- a/server/workflow/workflow_server_test.go +++ b/server/workflow/workflow_server_test.go @@ -12,6 +12,7 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes/fake" @@ -603,10 +604,14 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo.On("GetWorkflow", "", "test", "unlabelled").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "latest").Return(nil, nil) archivedRepo.On("GetWorkflow", "", "workflows", "hello-world-9tql2-not").Return(nil, nil) - archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "workflows"}).Return(int64(2), nil) - archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "workflows", Limit: -2}).Return(v1alpha1.Workflows{wfObj2, failedWfObj}, nil) - archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "test"}).Return(int64(1), nil) - archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "test", Limit: -1}).Return(v1alpha1.Workflows{wfObj4}, nil) + r, err := labels.ParseToRequirements("workflows.argoproj.io/controller-instanceid=my-instanceid") + if err != nil { + panic(err) + } + archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "workflows", LabelRequirements: r}).Return(int64(2), nil) + archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "workflows", Limit: -2, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj2, failedWfObj}, nil) + archivedRepo.On("CountWorkflows", sutils.ListOptions{Namespace: "test", LabelRequirements: r}).Return(int64(1), nil) + archivedRepo.On("ListWorkflows", sutils.ListOptions{Namespace: "test", Limit: -1, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj4}, nil) kubeClientSet := fake.NewSimpleClientset() kubeClientSet.PrependReactor("create", "selfsubjectaccessreviews", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { @@ -633,7 +638,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { if err = wfStore.Add(&wfObj5); err != nil { panic(err) } - server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore) + server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore, wfStore) return server, ctx } diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index 2103e7592594..2f6c69369dce 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -41,7 +41,7 @@ func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatus func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) { - options, err := sutils.BuildListOptions(req.ListOptions, req.Namespace, req.NamePrefix) + options, err := sutils.BuildListOptions(*req.ListOptions, req.Namespace, req.NamePrefix) if err != nil { return nil, err }