Skip to content

Commit

Permalink
Update launch condition column (#4903)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
  • Loading branch information
wild-endeavor committed Feb 27, 2024
1 parent 311de70 commit 2b709f1
Show file tree
Hide file tree
Showing 24 changed files with 396 additions and 366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ func ValidateLaunchPlan(ctx context.Context,
if err := validateSchedule(request, expectedInputs); err != nil {
return err
}

// Augment default inputs with the unbound workflow inputs.
request.Spec.DefaultInputs = expectedInputs
if request.Spec.EntityMetadata != nil {
if err := validateNotifications(request.Spec.EntityMetadata.Notifications); err != nil {
return err
}
if request.GetSpec().GetEntityMetadata().GetLaunchConditions() != nil {
return errors.NewFlyteAdminErrorf(
codes.InvalidArgument,
"Launch condition must be empty, found %v", request.GetSpec().GetEntityMetadata().GetLaunchConditions())
}
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestValidateSchedule_ArgNotFixed(t *testing.T) {
})
t.Run("with rate", func(t *testing.T) {
request := testutils.GetLaunchPlanRequestWithFixedRateSchedule(2, admin.FixedRateUnit_HOUR)

err := validateSchedule(request, inputMap)
assert.NotNil(t, err)
})
Expand Down
43 changes: 42 additions & 1 deletion flyteadmin/pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,48 @@ var NoopMigrations = []*gormigrate.Migration{
},
}

var Migrations = append(LegacyMigrations, NoopMigrations...)
// ContinuedMigrations - Above are a series of migrations labeled as no-op migrations. These are migrations that we
// wrote to bring the then-existing migrations up to the Gorm standard, which is to write from scratch, each struct
// that we want auto-migrated, inside each function. Previously we had not been doing that. The idea is that we will
// one day delete the migrations prior to the no-op series. New migrations should continue in this array here, again
// using the proper Gorm methodology of including the struct definitions inside each migration function.
var ContinuedMigrations = []*gormigrate.Migration{
{
ID: "pg-continue-2024-02-launchplan",
Migrate: func(tx *gorm.DB) error {
type LaunchPlanScheduleType string
type LaunchConditionType string

type LaunchPlan struct {
ID uint `gorm:"index;autoIncrement;not null"`
CreatedAt time.Time `gorm:"type:time"`
UpdatedAt time.Time `gorm:"type:time"`
DeletedAt *time.Time `gorm:"index"`
Project string `gorm:"primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx" valid:"length(0|255)"`
Domain string `gorm:"primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx" valid:"length(0|255)"`
Name string `gorm:"primary_key;index:lp_project_domain_name_idx" valid:"length(0|255)"`
Version string `gorm:"primary_key" valid:"length(0|255)"`
Spec []byte `gorm:"not null"`
WorkflowID uint `gorm:"index"`
Closure []byte `gorm:"not null"`
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"default:0"`
// Hash of the launch plan
Digest []byte
ScheduleType LaunchPlanScheduleType
// store the type of event that this launch plan is triggered by, can be empty, or SCHED
LaunchConditionType *LaunchConditionType `gorm:"type:varchar(32);index:idx_launch_plans_launch_conditions,where:launch_condition_type is not null"`
}
return tx.AutoMigrate(&LaunchPlan{})
},
Rollback: func(tx *gorm.DB) error {
return tx.Table("launch_plans").Migrator().DropColumn(&models.LaunchPlan{}, "launch_condition_type")
},
},
}

var m = append(LegacyMigrations, NoopMigrations...)
var Migrations = append(m, ContinuedMigrations...)

func alterTableColumnType(db *sql.DB, columnName, columnType string) error {
var err error
Expand Down
8 changes: 4 additions & 4 deletions flyteadmin/pkg/repositories/gormimpl/launch_plan_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestListLaunchPlans_Pagination(t *testing.T) {
GlobalMock := mocket.Catcher.Reset()

GlobalMock.NewMock().WithQuery(
`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 LIMIT 2 OFFSET 1`).WithReply(launchPlans)
`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 LIMIT 2 OFFSET 1`).WithReply(launchPlans)

collection, err := launchPlanRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
Expand Down Expand Up @@ -311,7 +311,7 @@ func TestListLaunchPlans_Filters(t *testing.T) {
GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true
// Only match on queries that append the name filter
GlobalMock.NewMock().WithQuery(`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.version = $4 LIMIT 20`).WithReply(launchPlans[0:1])
GlobalMock.NewMock().WithQuery(`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.version = $4 LIMIT 20`).WithReply(launchPlans[0:1])

collection, err := launchPlanRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
Expand Down Expand Up @@ -403,8 +403,8 @@ func TestListLaunchPlansForWorkflow(t *testing.T) {
// HACK: gorm orders the filters on join clauses non-deterministically. Ordering of filters doesn't affect
// correctness, but because the mocket library only pattern matches on substrings, both variations of the (valid)
// SQL that gorm produces are checked below.
query := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
alternateQuery := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
query := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
alternateQuery := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
GlobalMock.NewMock().WithQuery(query).WithReply(launchPlans)
GlobalMock.NewMock().WithQuery(alternateQuery).WithReply(launchPlans)

Expand Down
14 changes: 11 additions & 3 deletions flyteadmin/pkg/repositories/models/launch_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ const (
LaunchPlanScheduleTypeRATE LaunchPlanScheduleType = "RATE"
)

// Database model to encapsulate a launch plan.
type LaunchConditionType string

const (
// LaunchConditionTypeSCHED is the const representing the launch plan has a trigger type of schedule
LaunchConditionTypeSCHED LaunchConditionType = "SCHED"
)

// LaunchPlan Database model to encapsulate a launch plan.
type LaunchPlan struct {
BaseModel
LaunchPlanKey
Expand All @@ -29,8 +36,9 @@ type LaunchPlan struct {
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"default:0"`
// Hash of the launch plan
Digest []byte
ScheduleType LaunchPlanScheduleType
Digest []byte
ScheduleType LaunchPlanScheduleType
LaunchConditionType *LaunchConditionType `gorm:"type:varchar(32);index:idx_launch_plans_launch_conditions,where:launch_condition_type is not null"`
}

var LaunchPlanColumns = modelColumns(LaunchPlan{})
11 changes: 9 additions & 2 deletions flyteadmin/pkg/repositories/transformers/launch_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,21 @@ func CreateLaunchPlanModel(
return models.LaunchPlan{}, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan closure")
}

var launchConditionType models.LaunchConditionType
scheduleType := models.LaunchPlanScheduleTypeNONE
if launchPlan.Spec.EntityMetadata != nil && launchPlan.Spec.EntityMetadata.Schedule != nil {
if launchPlan.Spec.EntityMetadata.Schedule.GetCronExpression() != "" || launchPlan.Spec.EntityMetadata.Schedule.GetCronSchedule() != nil {
scheduleType = models.LaunchPlanScheduleTypeCRON
launchConditionType = models.LaunchConditionTypeSCHED
} else if launchPlan.Spec.EntityMetadata.Schedule.GetRate() != nil {
scheduleType = models.LaunchPlanScheduleTypeRATE
launchConditionType = models.LaunchConditionTypeSCHED
}
}

state := int32(initState)

return models.LaunchPlan{
lpModel := models.LaunchPlan{
LaunchPlanKey: models.LaunchPlanKey{
Project: launchPlan.Id.Project,
Domain: launchPlan.Id.Domain,
Expand All @@ -64,7 +67,11 @@ func CreateLaunchPlanModel(
WorkflowID: workflowRepoID,
Digest: digest,
ScheduleType: scheduleType,
}, nil
}
if launchConditionType != "" {
lpModel.LaunchConditionType = &launchConditionType
}
return lpModel, nil
}

// Transforms a LaunchPlanModel to a LaunchPlan
Expand Down
9 changes: 3 additions & 6 deletions flyteidl/clients/go/assets/admin.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4273,10 +4273,11 @@
"SYSTEM",
"RELAUNCH",
"CHILD_WORKFLOW",
"RECOVERED"
"RECOVERED",
"TRIGGER"
],
"default": "MANUAL",
"description": "The method by which this execution was launched.\n\n - MANUAL: The default execution mode, MANUAL implies that an execution was launched by an individual.\n - SCHEDULED: A schedule triggered this execution launch.\n - SYSTEM: A system process was responsible for launching this execution rather an individual.\n - RELAUNCH: This execution was launched with identical inputs as a previous execution.\n - CHILD_WORKFLOW: This execution was triggered by another execution.\n - RECOVERED: This execution was recovered from another execution."
"description": "The method by which this execution was launched.\n\n - MANUAL: The default execution mode, MANUAL implies that an execution was launched by an individual.\n - SCHEDULED: A schedule triggered this execution launch.\n - SYSTEM: A system process was responsible for launching this execution rather an individual.\n - RELAUNCH: This execution was launched with identical inputs as a previous execution.\n - CHILD_WORKFLOW: This execution was triggered by another execution.\n - RECOVERED: This execution was recovered from another execution.\n - TRIGGER: Execution was kicked off by the artifact trigger system"
},
"IOStrategyDownloadMode": {
"type": "string",
Expand Down Expand Up @@ -6474,10 +6475,6 @@
"coreArtifactBindingData": {
"type": "object",
"properties": {
"index": {
"type": "integer",
"format": "int64"
},
"partition_key": {
"type": "string"
},
Expand Down
8 changes: 8 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions flyteidl/gen/pb-es/flyteidl/core/artifact_id_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 2b709f1

Please sign in to comment.