Skip to content

Commit

Permalink
fix: fix for cli
Browse files Browse the repository at this point in the history
Signed-off-by: Jiacheng Xu <xjcmaxwellcjx@gmail.com>
  • Loading branch information
jiachengxu committed May 8, 2024
1 parent 0e3a478 commit c0db0ac
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 73 deletions.
13 changes: 5 additions & 8 deletions pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/argoproj/argo-workflows/v3/server/workflow/store"

"github.com/argoproj/argo-workflows/v3"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
"github.com/argoproj/argo-workflows/v3/pkg/apiclient/clusterworkflowtemplate"
Expand All @@ -25,7 +27,6 @@ import (
cronworkflowserver "github.com/argoproj/argo-workflows/v3/server/cronworkflow"
"github.com/argoproj/argo-workflows/v3/server/types"
workflowserver "github.com/argoproj/argo-workflows/v3/server/workflow"
"github.com/argoproj/argo-workflows/v3/server/workflow/store"
workflowtemplateserver "github.com/argoproj/argo-workflows/v3/server/workflowtemplate"
"github.com/argoproj/argo-workflows/v3/util/help"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
Expand All @@ -39,7 +40,6 @@ var (
type argoKubeClient struct {
instanceIDService instanceid.Service
wfClient workflow.Interface
wfStore store.WorkflowStore
}

var _ Client = &argoKubeClient{}
Expand Down Expand Up @@ -86,16 +86,13 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
if err != nil {
return nil, nil, err
}
wfStore, err := store.NewSQLiteStore(instanceIDService)
if err != nil {
return nil, nil, err
}
return ctx, &argoKubeClient{instanceIDService, wfClient, wfStore}, nil
return ctx, &argoKubeClient{instanceIDService, wfClient}, nil
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, a.wfStore)}}
wfLister := store.NewKubeLister(a.wfClient)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
Expand Down
2 changes: 1 addition & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore)
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

Expand Down
5 changes: 1 addition & 4 deletions server/utils/list_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ func (l ListOptions) WithStartedAtAscending(ascending bool) ListOptions {
return l
}

func BuildListOptions(options *metav1.ListOptions, ns, namePrefix string) (ListOptions, error) {
if options == nil {
options = &metav1.ListOptions{}
}
func BuildListOptions(options metav1.ListOptions, ns, namePrefix string) (ListOptions, error) {
if options.Continue == "" {
options.Continue = "0"
}
Expand Down
41 changes: 41 additions & 0 deletions server/workflow/store/lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package store

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
)

type WorkflowLister interface {
ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error)
CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error)
}

type kubeLister struct {
wfClient versioned.Interface
}

var _ WorkflowLister = &kubeLister{}

func NewKubeLister(wfClient versioned.Interface) WorkflowLister {
return &kubeLister{wfClient: wfClient}
}

func (k *kubeLister) ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) {
wfList, err := k.wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, listOptions)
if err != nil {
return nil, err
}
return wfList, nil
}

func (k *kubeLister) CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) {
wfList, err := k.wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, listOptions)
if err != nil {
return 0, err
}
return int64(len(wfList.Items)), nil
}
60 changes: 36 additions & 24 deletions server/workflow/store/sqlite_store.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package store

import (
"context"
"encoding/json"
"fmt"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitex"

sutils "github.com/argoproj/argo-workflows/v3/server/utils"

"github.com/argoproj/argo-workflows/v3/persist/sqldb"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)
Expand Down Expand Up @@ -64,33 +67,36 @@ func initDB() (*sqlite.Conn, error) {

type WorkflowStore interface {
cache.Store
ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error)
CountWorkflows(options sutils.ListOptions) (int64, error)
}

// sqliteStore is a sqlite-based store.
type sqliteStore struct {
// SQLiteStore is a sqlite-based store.
type SQLiteStore struct {
conn *sqlite.Conn
instanceService instanceid.Service
}

var _ WorkflowStore = &sqliteStore{}
var _ WorkflowStore = &SQLiteStore{}
var _ WorkflowLister = &SQLiteStore{}

func NewSQLiteStore(instanceService instanceid.Service) (WorkflowStore, error) {
func NewSQLiteStore(instanceService instanceid.Service) (*SQLiteStore, error) {
conn, err := initDB()
if err != nil {
return nil, err
}
return &sqliteStore{conn: conn, instanceService: instanceService}, nil
return &SQLiteStore{conn: conn, instanceService: instanceService}, nil
}

func (s *sqliteStore) ListWorkflows(options sutils.ListOptions) ([]wfv1.Workflow, error) {
func (s *SQLiteStore) ListWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (*wfv1.WorkflowList, error) {
options, err := sutils.BuildListOptions(listOptions, namespace, namePrefix)
if err != nil {
return nil, err
}
query := `select workflow from argo_workflows
where instanceid = ?
`
args := []any{s.instanceService.InstanceID()}

query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, false)
query, args, err = sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, false)
if err != nil {
return nil, err
}
Expand All @@ -114,18 +120,24 @@ where instanceid = ?
return nil, err
}

return workflows, nil
return &wfv1.WorkflowList{
Items: workflows,
}, nil
}

func (s *sqliteStore) CountWorkflows(options sutils.ListOptions) (int64, error) {
func (s *SQLiteStore) CountWorkflows(ctx context.Context, namespace, namePrefix string, listOptions metav1.ListOptions) (int64, error) {
options, err := sutils.BuildListOptions(listOptions, namespace, namePrefix)
if err != nil {
return 0, err
}
query := `select count(*) as total from argo_workflows
where instanceid = ?
`
args := []any{s.instanceService.InstanceID()}

options.Limit = 0
options.Offset = 0
query, args, err := sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, true)
query, args, err = sqldb.BuildWorkflowSelector(query, args, workflowTableName, workflowLabelsTableName, sqldb.SQLite, options, true)
if err != nil {
return 0, err
}
Expand All @@ -144,7 +156,7 @@ where instanceid = ?
return total, nil
}

func (s *sqliteStore) Add(obj interface{}) error {
func (s *SQLiteStore) Add(obj interface{}) error {
wf, ok := obj.(*wfv1.Workflow)
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
Expand All @@ -155,7 +167,7 @@ func (s *sqliteStore) Add(obj interface{}) error {
return err
}

func (s *sqliteStore) Update(obj interface{}) error {
func (s *SQLiteStore) Update(obj interface{}) error {
wf, ok := obj.(*wfv1.Workflow)
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
Expand All @@ -166,15 +178,15 @@ func (s *sqliteStore) Update(obj interface{}) error {
return err
}

func (s *sqliteStore) Delete(obj interface{}) error {
func (s *SQLiteStore) Delete(obj interface{}) error {
wf, ok := obj.(*wfv1.Workflow)
if !ok {
return fmt.Errorf("unable to convert object to Workflow. object: %v", obj)
}
return sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}})
}

func (s *sqliteStore) Replace(list []interface{}, resourceVersion string) error {
func (s *SQLiteStore) Replace(list []interface{}, resourceVersion string) error {
wfs := make([]*wfv1.Workflow, 0, len(list))
for _, obj := range list {
wf, ok := obj.(*wfv1.Workflow)
Expand All @@ -189,27 +201,27 @@ func (s *sqliteStore) Replace(list []interface{}, resourceVersion string) error
return err
}

func (s *sqliteStore) Resync() error {
func (s *SQLiteStore) Resync() error {
return nil
}

func (s *sqliteStore) List() []interface{} {
func (s *SQLiteStore) List() []interface{} {
panic("not implemented")
}

func (s *sqliteStore) ListKeys() []string {
func (s *SQLiteStore) ListKeys() []string {
panic("not implemented")
}

func (s *sqliteStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
func (s *SQLiteStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
panic("not implemented")
}

func (s *sqliteStore) GetByKey(key string) (item interface{}, exists bool, err error) {
func (s *SQLiteStore) GetByKey(key string) (item interface{}, exists bool, err error) {
panic("not implemented")
}

func (s *sqliteStore) upsertWorkflow(wf *wfv1.Workflow) error {
func (s *SQLiteStore) upsertWorkflow(wf *wfv1.Workflow) error {
err := sqlitex.Execute(s.conn, deleteWorkflowQuery, &sqlitex.ExecOptions{Args: []any{string(wf.UID)}})
if err != nil {
return err
Expand Down Expand Up @@ -248,7 +260,7 @@ func (s *sqliteStore) upsertWorkflow(wf *wfv1.Workflow) error {
return nil
}

func (s *sqliteStore) replaceWorkflows(workflows []*wfv1.Workflow) error {
func (s *SQLiteStore) replaceWorkflows(workflows []*wfv1.Workflow) error {
err := sqlitex.Execute(s.conn, `delete from argo_workflows`, nil)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions server/workflow/store/sqlite_store_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"encoding/json"
"fmt"
"testing"
Expand All @@ -13,7 +14,6 @@ import (
"zombiezen.com/go/sqlite/sqlitex"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
)

Expand Down Expand Up @@ -71,15 +71,15 @@ func TestStoreOperation(t *testing.T) {
instanceIdSvc := instanceid.NewService("my-instanceid")
conn, err := initDB()
require.NoError(t, err)
store := sqliteStore{
store := SQLiteStore{
conn: conn,
instanceService: instanceIdSvc,
}
t.Run("TestAddWorkflow", func(t *testing.T) {
for i := 0; i < 10; i++ {
require.NoError(t, store.Add(generateWorkflow(i)))
}
num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"})
num, err := store.CountWorkflows(context.Background(), "argo", "", metav1.ListOptions{})
require.NoError(t, err)
assert.Equal(t, int64(10), num)
// Labels are also added
Expand Down Expand Up @@ -129,12 +129,12 @@ func TestStoreOperation(t *testing.T) {
}))
})
t.Run("TestListWorkflows", func(t *testing.T) {
wfList, err := store.ListWorkflows(sutils.ListOptions{Namespace: "argo", Limit: 5})
wfList, err := store.ListWorkflows(context.Background(), "argo", "", metav1.ListOptions{Limit: 5})
require.NoError(t, err)
assert.Len(t, wfList, 5)
assert.Len(t, wfList.Items, 5)
})
t.Run("TestCountWorkflows", func(t *testing.T) {
num, err := store.CountWorkflows(sutils.ListOptions{Namespace: "argo"})
num, err := store.CountWorkflows(context.Background(), "argo", "", metav1.ListOptions{})
require.NoError(t, err)
assert.Equal(t, int64(9), num)
})
Expand Down

0 comments on commit c0db0ac

Please sign in to comment.