Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(argo-server)!: Implement missing instanceID code. Fixes #2780 #2786

Merged
merged 26 commits into from May 4, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions api/openapi-spec/swagger.json
Expand Up @@ -2895,10 +2895,6 @@
"type": "string",
"title": "GenerateName overrides metadata.generateName"
},
"instanceID": {
"type": "string",
"title": "InstanceID binds the Resource to the specified instance ID"
},
"labels": {
"type": "string",
"title": "Labels adds to metadata.labels"
Expand Down Expand Up @@ -6229,6 +6225,7 @@
"$ref": "#/definitions/io.k8s.api.core.v1.CreateOptions"
},
"instanceID": {
"description": "This field is no longer used.",
"type": "string"
},
"namespace": {
Expand Down
16 changes: 12 additions & 4 deletions cmd/argo/commands/client/conn.go
Expand Up @@ -13,6 +13,7 @@ import (
)

var argoServer string
var instanceId string

var overrides = clientcmd.ConfigOverrides{}

Expand All @@ -31,14 +32,21 @@ func GetConfig() clientcmd.ClientConfig {
return clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
}

func AddArgoServerFlagsToCmd(cmd *cobra.Command) {
func AddAPIClientFlagsToCmd(cmd *cobra.Command) {
cmd.PersistentFlags().StringVar(&instanceId, "instanceid", "", "submit with a specific controller's instance id label")
cmd.PersistentFlags().StringVar(&argoServer, "argo-server", os.Getenv("ARGO_SERVER"), "API server `host:port`. e.g. localhost:2746. Defaults to the ARGO_SERVER environment variable.")
}

func NewAPIClient() (context.Context, apiclient.Client) {
ctx, client, err := apiclient.NewClient(argoServer, func() string {
return GetAuthString()
}, GetConfig())
ctx, client, err := apiclient.NewClientFromOpts(
apiclient.Opts{
ArgoServer: argoServer,
InstanceID: instanceId,
AuthSupplier: func() string {
return GetAuthString()
},
ClientConfig: GetConfig(),
})
if err != nil {
log.Fatal(err)
}
Expand Down
22 changes: 2 additions & 20 deletions cmd/argo/commands/cron/create.go
Expand Up @@ -20,14 +20,9 @@ type cliCreateOpts struct {
strict bool // --strict
}

type cronWorkflowSubmitOpts struct {
instanceId string
}

func NewCreateCommand() *cobra.Command {
var (
cliCreateOpts cliCreateOpts
submitOpts cronWorkflowSubmitOpts
)
var command = &cobra.Command{
Use: "create FILE1 FILE2...",
Expand All @@ -38,16 +33,15 @@ func NewCreateCommand() *cobra.Command {
os.Exit(1)
}

CreateCronWorkflows(args, &cliCreateOpts, &submitOpts)
CreateCronWorkflows(args, &cliCreateOpts)
},
}
command.Flags().StringVarP(&cliCreateOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().StringVar(&submitOpts.instanceId, "instanceid", "", "submit with a specific controller's instance id label")
command.Flags().BoolVar(&cliCreateOpts.strict, "strict", true, "perform strict workflow validation")
return command
}

func CreateCronWorkflows(filePaths []string, cliOpts *cliCreateOpts, submitOpts *cronWorkflowSubmitOpts) {
func CreateCronWorkflows(filePaths []string, cliOpts *cliCreateOpts) {

ctx, apiClient := client.NewAPIClient()
serviceClient := apiClient.NewCronWorkflowServiceClient()
Expand All @@ -70,7 +64,6 @@ func CreateCronWorkflows(filePaths []string, cliOpts *cliCreateOpts, submitOpts
}

for _, cronWf := range cronWorkflows {
applySubmitOpts(&cronWf, submitOpts)
created, err := serviceClient.CreateCronWorkflow(ctx, &cronworkflowpkg.CreateCronWorkflowRequest{
Namespace: namespace,
CronWorkflow: &cronWf,
Expand Down Expand Up @@ -100,14 +93,3 @@ func unmarshalCronWorkflows(wfBytes []byte, strict bool) []wfv1.CronWorkflow {
log.Fatalf("Failed to parse workflow template: %v", err)
return nil
}

func applySubmitOpts(cwf *wfv1.CronWorkflow, submitOpts *cronWorkflowSubmitOpts) {
labels := cwf.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
if submitOpts.instanceId != "" {
labels[common.LabelKeyControllerInstanceID] = submitOpts.instanceId
}
cwf.SetLabels(labels)
}
2 changes: 1 addition & 1 deletion cmd/argo/commands/root.go
Expand Up @@ -58,7 +58,7 @@ If you're using the Argo Server (e.g. because you need large workflow support or
command.AddCommand(clustertemplate.NewClusterTemplateCommand())

client.AddKubectlFlagsToCmd(command)
client.AddArgoServerFlagsToCmd(command)
client.AddAPIClientFlagsToCmd(command)

// global log level
var logLevel string
Expand Down
2 changes: 0 additions & 2 deletions cmd/argo/commands/submit.go
Expand Up @@ -66,7 +66,6 @@ func NewSubmitCommand() *cobra.Command {
command.Flags().StringVar(&submitOpts.Entrypoint, "entrypoint", "", "override entrypoint")
command.Flags().StringArrayVarP(&submitOpts.Parameters, "parameter", "p", []string{}, "pass an input parameter")
command.Flags().StringVar(&submitOpts.ServiceAccount, "serviceaccount", "", "run all pods in the workflow using specified serviceaccount")
command.Flags().StringVar(&submitOpts.InstanceID, "instanceid", "", "submit with a specific controller's instance id label")
command.Flags().BoolVar(&submitOpts.DryRun, "dry-run", false, "modify the workflow on the client-side without creating it")
command.Flags().BoolVar(&submitOpts.ServerDryRun, "server-dry-run", false, "send request to server with dry-run flag which will modify the workflow without creating it")
command.Flags().StringVarP(&cliSubmitOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
Expand Down Expand Up @@ -198,7 +197,6 @@ func submitWorkflows(workflows []wfv1.Workflow, submitOpts *wfv1.SubmitOpts, cli
created, err := serviceClient.CreateWorkflow(ctx, &workflowpkg.WorkflowCreateRequest{
Namespace: wf.Namespace,
Workflow: &wf,
InstanceID: submitOpts.InstanceID,
ServerDryRun: submitOpts.ServerDryRun,
CreateOptions: options,
})
Expand Down
26 changes: 23 additions & 3 deletions pkg/apiclient/apiclient.go
Expand Up @@ -2,6 +2,7 @@ package apiclient

import (
"context"
"fmt"

"k8s.io/client-go/tools/clientcmd"

Expand All @@ -22,10 +23,29 @@ type Client interface {
NewInfoServiceClient() (infopkg.InfoServiceClient, error)
}

type Opts struct {
ArgoServer string
InstanceID string
AuthSupplier func() string
ClientConfig clientcmd.ClientConfig
}

// DEPRECATED: use NewClientFromOpts
func NewClient(argoServer string, authSupplier func() string, clientConfig clientcmd.ClientConfig) (context.Context, Client, error) {
if argoServer != "" {
return newArgoServerClient(argoServer, authSupplier())
return NewClientFromOpts(Opts{
ArgoServer: argoServer,
AuthSupplier: authSupplier,
ClientConfig: clientConfig,
})
}

func NewClientFromOpts(opts Opts) (context.Context, Client, error) {
if opts.ArgoServer != "" && opts.InstanceID != "" {
return nil, nil, fmt.Errorf("cannot use instance ID with Argo Server")
}
if opts.ArgoServer != "" {
return newArgoServerClient(opts.ArgoServer, opts.AuthSupplier())
} else {
return newArgoKubeClient(clientConfig)
return newArgoKubeClient(opts.ClientConfig, opts.InstanceID)
}
}
14 changes: 8 additions & 6 deletions pkg/apiclient/argo-kube-client.go
Expand Up @@ -27,9 +27,10 @@ var argoKubeOffloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo
var NoArgoServerErr = fmt.Errorf("this is impossible if you are not using the Argo Server, see " + help.CLI)

type argoKubeClient struct {
instanceID string
}

func newArgoKubeClient(clientConfig clientcmd.ClientConfig) (context.Context, Client, error) {
func newArgoKubeClient(clientConfig clientcmd.ClientConfig, instanceID string) (context.Context, Client, error) {
restConfig, err := clientConfig.ClientConfig()
if err != nil {
return nil, nil, err
Expand All @@ -47,18 +48,19 @@ func newArgoKubeClient(clientConfig clientcmd.ClientConfig) (context.Context, Cl
if err != nil {
return nil, nil, err
}
return ctx, &argoKubeClient{}, nil
return ctx, &argoKubeClient{instanceID}, nil
}

func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer("", argoKubeOffloadNodeStatusRepo)}
return &argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceID, argoKubeOffloadNodeStatusRepo)}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() cronworkflow.CronWorkflowServiceClient {
return &argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer("")}
return &argoKubeCronWorkflowServiceClient{cronworkflowserver.NewCronWorkflowServer(a.instanceID)}
}

func (a *argoKubeClient) NewWorkflowTemplateServiceClient() workflowtemplate.WorkflowTemplateServiceClient {
return &argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer()}
return &argoKubeWorkflowTemplateServiceClient{workflowtemplateserver.NewWorkflowTemplateServer(a.instanceID)}
}

func (a *argoKubeClient) NewArchivedWorkflowServiceClient() (workflowarchivepkg.ArchivedWorkflowServiceClient, error) {
Expand All @@ -70,5 +72,5 @@ func (a *argoKubeClient) NewInfoServiceClient() (infopkg.InfoServiceClient, erro
}

func (a *argoKubeClient) NewClusterWorkflowTemplateServiceClient() clusterworkflowtemplate.ClusterWorkflowTemplateServiceClient {
return &argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer()}
return &argoKubeWorkflowClusterTemplateServiceClient{clusterworkflowtmplserver.NewClusterWorkflowTemplateServer(a.instanceID)}
}