Skip to content

Commit

Permalink
Merge branch 'master' into release-2.5
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jan 31, 2020
2 parents 26f9bca + a75ac1b commit 98702ca
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 215 deletions.
13 changes: 0 additions & 13 deletions Makefile
Expand Up @@ -49,19 +49,6 @@ DB ?= postgres
K3D := $(shell if [ "`kubectl config current-context`" = "k3s-default" ]; then echo true; else echo false; fi)
ARGO_TOKEN = $(shell kubectl -n argo get secret -o name | grep argo-server | xargs kubectl -n argo get -o jsonpath='{.data.token}' | base64 --decode)

# test flags
export SKIP_CRON_SUITE=true
# if we are on any branch with "master", "release", or "cron" in the name we should run cron tests
ifneq ($(findstring cron,$(GIT_BRANCH)),)
export SKIP_CRON_SUITE=false
endif
ifneq ($(findstring master,$(GIT_BRANCH)),)
export SKIP_CRON_SUITE=false
endif
ifneq ($(findstring release,$(GIT_BRANCH)),)
export SKIP_CRON_SUITE=false
endif

override LDFLAGS += \
-X ${PACKAGE}.version=$(VERSION) \
-X ${PACKAGE}.buildDate=${BUILD_DATE} \
Expand Down
1 change: 1 addition & 0 deletions cmd/argo/commands/common.go
Expand Up @@ -147,6 +147,7 @@ func (c LazyWorkflowTemplateGetter) Get(name string) (*wfv1.WorkflowTemplate, er
return templateresolution.WrapWorkflowTemplateInterface(wftmplClient).Get(name)
}

// DEPRECATED
var _ templateresolution.WorkflowTemplateNamespacedGetter = &LazyWorkflowTemplateGetter{}

// DEPRECATED
Expand Down
12 changes: 12 additions & 0 deletions cmd/argo/commands/cron/common.go
Expand Up @@ -16,15 +16,23 @@ import (

// Global variables
var (
// DEPRECATED
restConfig *rest.Config
// DEPRECATED
clientConfig clientcmd.ClientConfig
// DEPRECATED
clientset *kubernetes.Clientset
// DEPRECATED
wfClientset *versioned.Clientset
// DEPRECATED
cronWfClient v1alpha1.CronWorkflowInterface
// DEPRECATED
wftmplClient v1alpha1.WorkflowTemplateInterface
// DEPRECATED
namespace string
)

// DEPRECATED
func initKubeClient() *kubernetes.Clientset {
if clientset != nil {
return clientset
Expand All @@ -44,6 +52,7 @@ func initKubeClient() *kubernetes.Clientset {
}

// InitCronWorkflowClient creates a new client for the Kubernetes WorkflowTemplate CRD.
// DEPRECATED
func InitCronWorkflowClient(ns ...string) v1alpha1.CronWorkflowInterface {
if cronWfClient != nil {
return cronWfClient
Expand All @@ -66,14 +75,17 @@ func InitCronWorkflowClient(ns ...string) v1alpha1.CronWorkflowInterface {

// LazyWorkflowTemplateGetter is a wrapper of v1alpha1.WorkflowTemplateInterface which
// supports lazy initialization.
// DEPRECATED
type LazyWorkflowTemplateGetter struct{}

// Get initializes it just before it's actually used and returns a retrieved workflow template.
// DEPRECATED
func (c LazyWorkflowTemplateGetter) Get(name string) (*wfv1.WorkflowTemplate, error) {
if wftmplClient == nil {
_ = InitCronWorkflowClient()
}
return templateresolution.WrapWorkflowTemplateInterface(wftmplClient).Get(name)
}

// DEPRECATED
var _ templateresolution.WorkflowTemplateNamespacedGetter = &LazyWorkflowTemplateGetter{}
1 change: 1 addition & 0 deletions cmd/argo/commands/server.go
Expand Up @@ -81,6 +81,7 @@ See %s`, help.ArgoSever),
RestConfig: config,
AuthMode: authMode,
ManagedNamespace: managedNamespace,
ConfigName: configMap,
}
err = opts.ValidateOpts()
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions cmd/argo/commands/template/common.go
Expand Up @@ -18,13 +18,19 @@ import (

// Global variables
var (
// DEPRECATED
restConfig *rest.Config
// DEPRECATED
clientset *kubernetes.Clientset
// DEPRECATED
wfClientset *versioned.Clientset
// DEPRECATED
wftmplClient v1alpha1.WorkflowTemplateInterface
// DEPRECATED
namespace string
)

// DEPRECATED
func initKubeClient() *kubernetes.Clientset {
if clientset != nil {
return clientset
Expand All @@ -44,6 +50,7 @@ func initKubeClient() *kubernetes.Clientset {
}

// InitWorkflowTemplateClient creates a new client for the Kubernetes WorkflowTemplate CRD.
// DEPRECATED
func InitWorkflowTemplateClient(ns ...string) v1alpha1.WorkflowTemplateInterface {
if wftmplClient != nil {
return wftmplClient
Expand All @@ -65,18 +72,22 @@ func InitWorkflowTemplateClient(ns ...string) v1alpha1.WorkflowTemplateInterface

// LazyWorkflowTemplateGetter is a wrapper of v1alpha1.WorkflowTemplateInterface which
// supports lazy initialization.
// DEPRECATED
type LazyWorkflowTemplateGetter struct{}

// Get initializes it just before it's actually used and returns a retrieved workflow template.
// DEPRECATED
func (c LazyWorkflowTemplateGetter) Get(name string) (*wfv1.WorkflowTemplate, error) {
if wftmplClient == nil {
_ = InitWorkflowTemplateClient()
}
return templateresolution.WrapWorkflowTemplateInterface(wftmplClient).Get(name)
}

// DEPRECATED
var _ templateresolution.WorkflowTemplateNamespacedGetter = &LazyWorkflowTemplateGetter{}

// DEPRECATED
func GetWFtmplApiServerGRPCClient(conn *grpc.ClientConn) (workflowtemplate.WorkflowTemplateServiceClient, context.Context) {
return workflowtemplate.NewWorkflowTemplateServiceClient(conn), client.GetContext()
}
109 changes: 70 additions & 39 deletions server/apiserver/argoserver.go
Expand Up @@ -6,19 +6,6 @@ import (
"net/http"
"time"

cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo/pkg/apiclient/workflowarchive"
workflowtemplatepkg "github.com/argoproj/argo/pkg/apiclient/workflowtemplate"
"github.com/argoproj/argo/server/artifacts"
"github.com/argoproj/argo/server/auth"
"github.com/argoproj/argo/server/cronworkflow"
"github.com/argoproj/argo/server/info"
"github.com/argoproj/argo/server/static"
"github.com/argoproj/argo/server/workflow"
"github.com/argoproj/argo/server/workflowarchive"
"github.com/argoproj/argo/server/workflowtemplate"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand All @@ -29,13 +16,26 @@ import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/persist/sqldb"
cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
workflowpkg "github.com/argoproj/argo/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo/pkg/apiclient/workflowarchive"
workflowtemplatepkg "github.com/argoproj/argo/pkg/apiclient/workflowtemplate"
"github.com/argoproj/argo/pkg/client/clientset/versioned"
"github.com/argoproj/argo/server/artifacts"
"github.com/argoproj/argo/server/auth"
"github.com/argoproj/argo/server/cronworkflow"
"github.com/argoproj/argo/server/info"
"github.com/argoproj/argo/server/static"
"github.com/argoproj/argo/server/workflow"
"github.com/argoproj/argo/server/workflowarchive"
"github.com/argoproj/argo/server/workflowtemplate"
grpcutil "github.com/argoproj/argo/util/grpc"
"github.com/argoproj/argo/util/json"
"github.com/argoproj/argo/workflow/common"
Expand All @@ -59,11 +59,12 @@ type argoServer struct {

type ArgoServerOpts struct {
BaseHRef string
Namespace string
KubeClientset *kubernetes.Clientset
WfClientSet *versioned.Clientset
RestConfig *rest.Config
AuthMode string
Namespace string
KubeClientset *kubernetes.Clientset
WfClientSet *versioned.Clientset
RestConfig *rest.Config
AuthMode string
// config map name
ConfigName string
ManagedNamespace string
}
Expand Down Expand Up @@ -106,26 +107,27 @@ func (ao ArgoServerOpts) ValidateOpts() error {

func (as *argoServer) Run(ctx context.Context, port int) {

configMap, err := as.RsyncConfig(as.namespace, as.kubeClientset)
configMap, err := as.rsyncConfig()
if err != nil {
log.Fatal(err)
}
err = as.restartOnConfigChange(ctx.Done())
if err != nil {
// TODO: this currently returns an error every time
log.Errorf("Error marshalling config map: %s", err)
log.Fatal(err)
}
var offloadRepo = sqldb.ExplosiveOffloadNodeStatusRepo
var wfArchive = sqldb.NullWorkflowArchive
if configMap != nil {
persistence := configMap.Persistence
if persistence != nil {
session, tableName, err := sqldb.CreateDBSession(as.kubeClientset, as.namespace, persistence)
if err != nil {
log.Fatal(err)
}
log.WithField("nodeStatusOffload", persistence.NodeStatusOffload).Info("Offload node status")
if persistence.NodeStatusOffload {
offloadRepo = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
}
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
persistence := configMap.Persistence
if persistence != nil {
session, tableName, err := sqldb.CreateDBSession(as.kubeClientset, as.namespace, persistence)
if err != nil {
log.Fatal(err)
}
log.WithField("nodeStatusOffload", persistence.NodeStatusOffload).Info("Offload node status")
if persistence.NodeStatusOffload {
offloadRepo = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
}
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive)
grpcServer := as.newGRPCServer(offloadRepo, wfArchive)
Expand Down Expand Up @@ -244,20 +246,49 @@ func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runt
}

// ResyncConfig reloads the controller config from the configmap
func (as *argoServer) RsyncConfig(namespace string, kubeClientSet *kubernetes.Clientset) (*config.WorkflowControllerConfig, error) {
cmClient := kubeClientSet.CoreV1().ConfigMaps(namespace)
cm, err := cmClient.Get("workflow-controller-configmap", metav1.GetOptions{})
func (as *argoServer) rsyncConfig() (*config.WorkflowControllerConfig, error) {
cm, err := as.kubeClientset.CoreV1().ConfigMaps(as.namespace).
Get(as.configName, metav1.GetOptions{})
if err != nil {
return nil, errors.InternalWrapError(err)
}
return as.UpdateConfig(cm)
return as.updateConfig(cm)
}

// Unlike the controller, the server creates object based on the config map at init time, and will not pick-up on
// changes unless we restart.
// Instead of opting to re-write the server, instead we'll just listen for any old change and restart.
func (as *argoServer) restartOnConfigChange(stopCh <-chan struct{}) error {
w, err := as.kubeClientset.CoreV1().ConfigMaps(as.namespace).
Watch(metav1.ListOptions{FieldSelector: "metadata.name=" + as.configName})
if err != nil {
return err
}
go func() {
defer w.Stop()
for {
select {
// normal exit, e.g. due to user interupt
case <-stopCh:
return
case e := <-w.ResultChan():
if e.Type != watch.Added {
log.WithField("eventType", e.Type).Info("config map event, exiting gracefully")
as.stopCh <- struct{}{}
return
}
}
}
}()
return nil
}

func (as *argoServer) UpdateConfig(cm *apiv1.ConfigMap) (*config.WorkflowControllerConfig, error) {
func (as *argoServer) updateConfig(cm *apiv1.ConfigMap) (*config.WorkflowControllerConfig, error) {

configStr, ok := cm.Data[common.WorkflowControllerConfigMapKey]
if !ok {
return nil, errors.InternalErrorf("ConfigMap '%s' does not have key '%s'", as.configName, common.WorkflowControllerConfigMapKey)
log.Warnf("ConfigMap '%s' does not have key '%s'", as.configName, common.WorkflowControllerConfigMapKey)
configStr = ""
}
var config config.WorkflowControllerConfig
log.Infof("Config Map: %s", configStr)
Expand Down
25 changes: 24 additions & 1 deletion test/e2e/argo_server_test.go
Expand Up @@ -568,7 +568,30 @@ func (s *ArgoServerSuite) TestCronWorkflowService() {
s.Run("List", func(t *testing.T) {
// make sure list options work correctly
s.Given(t).
CronWorkflow("@testdata/basic.yaml")
CronWorkflow(`apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: test-cron-wf-basic
labels:
argo-e2e: true
spec:
schedule: "* * * * *"
concurrencyPolicy: "Allow"
startingDeadlineSeconds: 0
successfulJobsHistoryLimit: 4
failedJobsHistoryLimit: 2
workflowSpec:
podGC:
strategy: OnPodCompletion
entrypoint: whalesay
templates:
- name: whalesay
container:
image: python:alpine3.6
imagePullPolicy: IfNotPresent
command: ["sh", -c]
args: ["echo hello"]
`)

s.e(t).GET("/api/v1/cron-workflows/argo").
WithQuery("listOptions.labelSelector", "argo-e2e=subject").
Expand Down

0 comments on commit 98702ca

Please sign in to comment.