Skip to content

Commit

Permalink
Create FlyteWorkflow CRD on FlytePropeller startup (flyteorg#414)
Browse files Browse the repository at this point in the history
* creating FlyteWorkflow CRD on FlytePropeller startup

Signed-off-by: Daniel Rammer <daniel@union.ai>

* checking for CRD existence before creating

Signed-off-by: Daniel Rammer <daniel@union.ai>

* added CreateFlyteWorkflowCRD configuration option

Signed-off-by: Daniel Rammer <daniel@union.ai>

* fixed CreateFlyteWorkflowCRD configuration description

Signed-off-by: Daniel Rammer <daniel@union.ai>
  • Loading branch information
hamersaw committed Mar 31, 2022
1 parent cf88449 commit 15d2eab
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 30 deletions.
1 change: 1 addition & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.25.0
k8s.io/api v0.20.2
k8s.io/apiextensions-apiserver v0.20.1
k8s.io/apimachinery v0.20.2
k8s.io/client-go v0.20.2
k8s.io/klog v1.0.0
Expand Down
41 changes: 41 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/crd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package flyteworkflow

import (
"fmt"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
preserveUnknownFields = true
CRD = apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("flyteworkflows.%s", GroupName),
},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: GroupName,
Names: apiextensionsv1.CustomResourceDefinitionNames{
Kind: "FlyteWorkflow",
Plural: "flyteworkflows",
Singular: "flyteworkflow",
ShortNames: []string{"fly"},
},
Scope: apiextensionsv1.NamespaceScoped,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
apiextensionsv1.CustomResourceDefinitionVersion{
Name: "v1alpha1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
XPreserveUnknownFields: &preserveUnknownFields,
},
},
},
},
},
}
)
4 changes: 3 additions & 1 deletion flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ var (
EventConfig: EventConfig{
RawOutputPolicy: RawOutputPolicyReference,
},
ClusterID: "propeller",
ClusterID: "propeller",
CreateFlyteWorkflowCRD: false,
}
)

Expand Down Expand Up @@ -145,6 +146,7 @@ type Config struct {
IncludeDomainLabel []string `json:"include-domain-label" pflag:",Include the specified domain label in the k8s FlyteWorkflow CRD label selector"`
ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"`
ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 50 additions & 29 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,58 @@ import (
"runtime/pprof"
"time"

"github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

"google.golang.org/grpc"

"github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/flytestdlib/contextutils"
"k8s.io/apimachinery/pkg/labels"

stdErrs "github.com/flyteorg/flytestdlib/errors"

errors3 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
flyteK8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

"github.com/flyteorg/flytepropeller/events"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
clientset "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned"
informers "github.com/flyteorg/flytepropeller/pkg/client/informers/externalversions"
lister "github.com/flyteorg/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes"
errors3 "github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog"

"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/workflow"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"
leader "github.com/flyteorg/flytepropeller/pkg/leaderelection"
"github.com/flyteorg/flytepropeller/pkg/utils"

"github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flytestdlib/contextutils"
stdErrs "github.com/flyteorg/flytestdlib/errors"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"

"github.com/pkg/errors"

"github.com/prometheus/client_golang/prometheus"

"google.golang.org/grpc"

apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"

apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"

k8sInformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
flyteK8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
clientset "github.com/flyteorg/flytepropeller/pkg/client/clientset/versioned"
informers "github.com/flyteorg/flytepropeller/pkg/client/informers/externalversions"
lister "github.com/flyteorg/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/workflow"
leader "github.com/flyteorg/flytepropeller/pkg/leaderelection"
"github.com/flyteorg/flytepropeller/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
Expand Down Expand Up @@ -506,6 +509,24 @@ func StartController(ctx context.Context, cfg *config.Config, defaultNamespace s
return errors.Wrapf(err, "error building FlyteWorkflow clientset")
}

// Create FlyteWorkflow CRD if it does not exist
if cfg.CreateFlyteWorkflowCRD {
logger.Infof(ctx, "creating FlyteWorkflow CRD")
apiextensionsClient, err := apiextensionsclientset.NewForConfig(kubecfg)
if err != nil {
return errors.Wrapf(err, "error building apiextensions clientset")
}

_, err = apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &flyteworkflow.CRD, v1.CreateOptions{})
if err != nil {
if apierrors.IsAlreadyExists(err) {
logger.Warnf(ctx, "FlyteWorkflow CRD already exists")
} else {
return errors.Wrapf(err, "failed to create FlyteWorkflow CRD")
}
}
}

opts := SharedInformerOptions(cfg, defaultNamespace)
flyteworkflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(flyteworkflowClient, cfg.WorkflowReEval.Duration, opts...)

Expand Down

0 comments on commit 15d2eab

Please sign in to comment.