Skip to content

Commit

Permalink
Added validation for scheduled workflow during registration (flyteorg…
Browse files Browse the repository at this point in the history
…#215)

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>
  • Loading branch information
pmahindrakar-oss authored and austin362667 committed May 7, 2024
1 parent 9172f13 commit 744f0ce
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 6 deletions.
39 changes: 37 additions & 2 deletions flytectl/cmd/register/register_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,39 @@ func hydrateTaskSpec(task *admin.TaskSpec, sourceCode string, sourceUploadPath s
return nil
}

func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccount string, configOutputLocationPrefix string, lpSpec *admin.LaunchPlanSpec) {
func validateLaunchSpec(lpSpec *admin.LaunchPlanSpec) error {
if lpSpec == nil {
return nil
}
if lpSpec.EntityMetadata != nil && lpSpec.EntityMetadata.Schedule != nil {
schedule := lpSpec.EntityMetadata.Schedule
var scheduleFixedParams []string
if lpSpec.DefaultInputs != nil {
for paramKey := range lpSpec.DefaultInputs.Parameters {
if paramKey != schedule.KickoffTimeInputArg {
scheduleFixedParams = append(scheduleFixedParams, paramKey)
}
}
}
if (lpSpec.FixedInputs == nil && len(scheduleFixedParams) > 0) ||
(len(scheduleFixedParams) > len(lpSpec.FixedInputs.Literals)) {
fixedInputLen := 0
if lpSpec.FixedInputs != nil {
fixedInputLen = len(lpSpec.FixedInputs.Literals)
}
return fmt.Errorf("param values are missing on scheduled workflow."+
"additional args other than %v on scheduled workflow are %v > %v fixed values", schedule.KickoffTimeInputArg,
len(scheduleFixedParams), fixedInputLen)
}
}
return nil
}

func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccount string, configOutputLocationPrefix string, lpSpec *admin.LaunchPlanSpec) error {

if err := validateLaunchSpec(lpSpec); err != nil {
return err
}
assumableIamRole := len(configAssumableIamRole) > 0
k8sServiceAcct := len(configK8sServiceAccount) > 0
outputLocationPrefix := len(configOutputLocationPrefix) > 0
Expand All @@ -276,14 +308,17 @@ func hydrateLaunchPlanSpec(configAssumableIamRole string, configK8sServiceAccoun
OutputLocationPrefix: configOutputLocationPrefix,
}
}
return nil
}

func hydrateSpec(message proto.Message, sourceCode string, config rconfig.FilesConfig) error {
switch v := message.(type) {
case *admin.LaunchPlan:
launchPlan := message.(*admin.LaunchPlan)
hydrateIdentifier(launchPlan.Spec.WorkflowId, config.Version)
hydrateLaunchPlanSpec(config.AssumableIamRole, config.K8sServiceAccount, config.OutputLocationPrefix, launchPlan.Spec)
if err := hydrateLaunchPlanSpec(config.AssumableIamRole, config.K8sServiceAccount, config.OutputLocationPrefix, launchPlan.Spec); err != nil {
return err
}
case *admin.WorkflowSpec:
workflowSpec := message.(*admin.WorkflowSpec)
for _, Noderef := range workflowSpec.Template.Nodes {
Expand Down
107 changes: 103 additions & 4 deletions flytectl/cmd/register/register_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

v1 "k8s.io/api/core/v1"

"github.com/flyteorg/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
Expand Down Expand Up @@ -279,15 +280,17 @@ func TestHydrateLaunchPlanSpec(t *testing.T) {
registerFilesSetup()
rconfig.DefaultFilesConfig.AssumableIamRole = "iamRole"
lpSpec := &admin.LaunchPlanSpec{}
hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.Nil(t, err)
assert.Equal(t, &admin.AuthRole{AssumableIamRole: "iamRole"}, lpSpec.AuthRole)
})
t.Run("k8sService account override", func(t *testing.T) {
setup()
registerFilesSetup()
rconfig.DefaultFilesConfig.K8sServiceAccount = "k8Account"
lpSpec := &admin.LaunchPlanSpec{}
hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.Nil(t, err)
assert.Equal(t, &admin.AuthRole{KubernetesServiceAccount: "k8Account"}, lpSpec.AuthRole)
})
t.Run("Both k8sService and IamRole", func(t *testing.T) {
Expand All @@ -296,7 +299,8 @@ func TestHydrateLaunchPlanSpec(t *testing.T) {
rconfig.DefaultFilesConfig.AssumableIamRole = "iamRole"
rconfig.DefaultFilesConfig.K8sServiceAccount = "k8Account"
lpSpec := &admin.LaunchPlanSpec{}
hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.Nil(t, err)
assert.Equal(t, &admin.AuthRole{AssumableIamRole: "iamRole",
KubernetesServiceAccount: "k8Account"}, lpSpec.AuthRole)
})
Expand All @@ -305,9 +309,104 @@ func TestHydrateLaunchPlanSpec(t *testing.T) {
registerFilesSetup()
rconfig.DefaultFilesConfig.OutputLocationPrefix = "prefix"
lpSpec := &admin.LaunchPlanSpec{}
hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.Nil(t, err)
assert.Equal(t, &admin.RawOutputDataConfig{OutputLocationPrefix: "prefix"}, lpSpec.RawOutputDataConfig)
})
t.Run("Validation successful", func(t *testing.T) {
lpSpec := &admin.LaunchPlanSpec{
EntityMetadata: &admin.LaunchPlanMetadata{
Schedule: &admin.Schedule{
ScheduleExpression: &admin.Schedule_CronExpression{
CronExpression: "foo",
},
KickoffTimeInputArg: "kickoff_time_arg",
},
},
FixedInputs: &core.LiteralMap{
Literals: map[string]*core.Literal{},
},
}
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.Nil(t, err)
})
t.Run("Validation failure", func(t *testing.T) {
lpSpec := &admin.LaunchPlanSpec{
EntityMetadata: &admin.LaunchPlanMetadata{
Schedule: &admin.Schedule{
ScheduleExpression: &admin.Schedule_CronExpression{
CronExpression: "expr",
},
KickoffTimeInputArg: "kickoff_time_arg",
},
},
DefaultInputs: &core.ParameterMap{
Parameters: map[string]*core.Parameter{
"bar": {
Var: &core.Variable{
Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}},
},
},
},
},
FixedInputs: &core.LiteralMap{
Literals: map[string]*core.Literal{},
},
}
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.NotNil(t, err)
})
t.Run("Validation failed with fixed inputs empty", func(t *testing.T) {
lpSpec := &admin.LaunchPlanSpec{
EntityMetadata: &admin.LaunchPlanMetadata{
Schedule: &admin.Schedule{
ScheduleExpression: &admin.Schedule_CronExpression{
CronExpression: "expr",
},
KickoffTimeInputArg: "kickoff_time_arg",
},
},
DefaultInputs: &core.ParameterMap{
Parameters: map[string]*core.Parameter{
"bar": {
Var: &core.Variable{
Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}},
},
},
},
},
}
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.NotNil(t, err)
})
t.Run("Validation success with fixed", func(t *testing.T) {
lpSpec := &admin.LaunchPlanSpec{
EntityMetadata: &admin.LaunchPlanMetadata{
Schedule: &admin.Schedule{
ScheduleExpression: &admin.Schedule_CronExpression{
CronExpression: "expr",
},
KickoffTimeInputArg: "kickoff_time_arg",
},
},
DefaultInputs: &core.ParameterMap{
Parameters: map[string]*core.Parameter{
"bar": {
Var: &core.Variable{
Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}},
},
},
},
},
FixedInputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"bar": coreutils.MustMakeLiteral("bar-value"),
},
},
}
err := hydrateLaunchPlanSpec(rconfig.DefaultFilesConfig.AssumableIamRole, rconfig.DefaultFilesConfig.K8sServiceAccount, rconfig.DefaultFilesConfig.OutputLocationPrefix, lpSpec)
assert.Nil(t, err)
})
}

func TestUploadFastRegisterArtifact(t *testing.T) {
Expand Down

0 comments on commit 744f0ce

Please sign in to comment.