Skip to content

Commit

Permalink
Update trigger syntax to embed within launch plan (#83)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
  • Loading branch information
wild-endeavor authored Feb 28, 2024
1 parent 4076df1 commit a3bc79f
Show file tree
Hide file tree
Showing 36 changed files with 1,471 additions and 905 deletions.
32 changes: 32 additions & 0 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,38 @@ func (a *ArtifactRegistry) RegisterTrigger(ctx context.Context, plan *admin.Laun
return nil
}

func (a *ArtifactRegistry) ActivateTrigger(ctx context.Context, identifier *core.Identifier) error {
if a == nil || a.client == nil {
logger.Debugf(ctx, "Artifact client not configured, skipping activate [%+v]", identifier)
return fmt.Errorf("artifact client not configured")
}
_, err := a.client.ActivateTrigger(ctx, &artifacts.ActivateTriggerRequest{
TriggerId: identifier,
})
if err != nil {
logger.Errorf(ctx, "Failed to activate trigger [%+v] err: %v", identifier, err)
return err
}
logger.Debugf(ctx, "Activated trigger [%+v]", identifier)
return nil
}

func (a *ArtifactRegistry) DeactivateTrigger(ctx context.Context, identifier *core.Identifier) error {
if a == nil || a.client == nil {
logger.Debugf(ctx, "Artifact client not configured, skipping deactivate [%+v]", identifier)
return fmt.Errorf("artifact client not configured")
}
_, err := a.client.DeactivateTrigger(ctx, &artifacts.DeactivateTriggerRequest{
TriggerId: identifier,
})
if err != nil {
logger.Errorf(ctx, "Failed to deactivate trigger [%+v] err: %v", identifier, err)
return err
}
logger.Debugf(ctx, "Deactivated trigger [%+v]", identifier)
return nil
}

func (a *ArtifactRegistry) GetClient() artifacts.ArtifactRegistryClient {
if a == nil {
return nil
Expand Down
70 changes: 66 additions & 4 deletions flyteadmin/pkg/manager/impl/launch_plan_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
type launchPlanMetrics struct {
Scope promutils.Scope
FailedScheduleUpdates prometheus.Counter
FailedTriggerUpdates prometheus.Counter
SpecSizeBytes prometheus.Summary
ClosureSizeBytes prometheus.Summary
}
Expand Down Expand Up @@ -99,7 +100,7 @@ func (m *LaunchPlanManager) CreateLaunchPlan(
if err != nil {
return nil, err
}
return &admin.LaunchPlanCreateResponse{}, nil
logger.Debugf(ctx, "successfully registered trigger launch plan, continuing %v", launchPlan.Id)
}

existingLaunchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *request.Id)
Expand Down Expand Up @@ -285,6 +286,19 @@ func (m *LaunchPlanManager) disableLaunchPlan(ctx context.Context, request admin
return nil, err
}
}

// Send off activeness to trigger engine here, like we do for schedules above
if m.config.ApplicationConfiguration().GetTopLevelConfig().FeatureGates.EnableArtifacts {
if launchPlanModel.LaunchConditionType != nil && *launchPlanModel.LaunchConditionType == models.LaunchConditionTypeARTIFACT {
err = m.artifactRegistry.DeactivateTrigger(ctx, request.Id)
if err != nil {
logger.Debugf(ctx, "failed to deactivate trigger for launch plan [%+v] with err: %v", request.Id, err)
m.metrics.FailedTriggerUpdates.Inc()
return nil, err
}
}
}

err = m.db.LaunchPlanRepo().Update(ctx, launchPlanModel)
if err != nil {
logger.Debugf(ctx, "Failed to update launchPlanModel with ID [%+v] with err %v", request.Id, err)
Expand All @@ -294,6 +308,43 @@ func (m *LaunchPlanManager) disableLaunchPlan(ctx context.Context, request admin
return &admin.LaunchPlanUpdateResponse{}, nil
}

func (m *LaunchPlanManager) updateTriggers(ctx context.Context, newlyActiveLaunchPlan models.LaunchPlan, formerlyActiveLaunchPlan *models.LaunchPlan) error {
var err error

if newlyActiveLaunchPlan.LaunchConditionType != nil && *newlyActiveLaunchPlan.LaunchConditionType == models.LaunchConditionTypeARTIFACT {
newID := &core.Identifier{
Org: newlyActiveLaunchPlan.Org,
Project: newlyActiveLaunchPlan.Project,
Domain: newlyActiveLaunchPlan.Domain,
Name: newlyActiveLaunchPlan.Name,
Version: newlyActiveLaunchPlan.Version,
}
err = m.artifactRegistry.ActivateTrigger(ctx, newID)
if err != nil {
logger.Infof(ctx, "failed to activate launch plan trigger [%+v] err: %v", newID, err)
return err
}
}

// Also update the old one, similar to schedule
if formerlyActiveLaunchPlan != nil && formerlyActiveLaunchPlan.LaunchConditionType != nil && *formerlyActiveLaunchPlan.LaunchConditionType == models.LaunchConditionTypeARTIFACT {
formerID := core.Identifier{
Org: formerlyActiveLaunchPlan.Org,
Project: formerlyActiveLaunchPlan.Project,
Domain: formerlyActiveLaunchPlan.Domain,
Name: formerlyActiveLaunchPlan.Name,
Version: formerlyActiveLaunchPlan.Version,
}
err = m.artifactRegistry.DeactivateTrigger(ctx, &formerID)
if err != nil {
logger.Infof(ctx, "failed to deactivate launch plan trigger [%+v] err: %v", formerlyActiveLaunchPlan.LaunchPlanKey, err)
return err
}
}

return nil
}

func (m *LaunchPlanManager) enableLaunchPlan(ctx context.Context, request admin.LaunchPlanUpdateRequest) (
*admin.LaunchPlanUpdateResponse, error) {
newlyActiveLaunchPlanModel, err := m.db.LaunchPlanRepo().Get(ctx, repoInterfaces.Identifier{
Expand Down Expand Up @@ -346,6 +397,16 @@ func (m *LaunchPlanManager) enableLaunchPlan(ctx context.Context, request admin.
return nil, err
}

// Send off activeness to trigger engine here, like we do for schedules above, but only if the type is of trigger.
if m.config.ApplicationConfiguration().GetTopLevelConfig().FeatureGates.EnableArtifacts {
err = m.updateTriggers(ctx, newlyActiveLaunchPlanModel, formerlyActiveLaunchPlanModel)
if err != nil {
m.metrics.FailedTriggerUpdates.Inc()
logger.Debugf(ctx, "failed to update trigger for launch plan [%+v] with err: %v", request.Id, err)
return nil, err
}
}

// This operation is takes in the (formerly) active launch plan version as only one version can be active at a time.
// Setting the desired launch plan to active also requires disabling the existing active launch plan version.
err = m.db.LaunchPlanRepo().SetActive(ctx, newlyActiveLaunchPlanModel, formerlyActiveLaunchPlanModel)
Expand All @@ -354,8 +415,8 @@ func (m *LaunchPlanManager) enableLaunchPlan(ctx context.Context, request admin.
"Failed to set launchPlanModel with ID [%+v] to active with err %v", request.Id, err)
return nil, err
}
return &admin.LaunchPlanUpdateResponse{}, nil

return &admin.LaunchPlanUpdateResponse{}, nil
}

func (m *LaunchPlanManager) UpdateLaunchPlan(ctx context.Context, request admin.LaunchPlanUpdateRequest) (
Expand Down Expand Up @@ -586,8 +647,9 @@ func NewLaunchPlanManager(
Scope: scope,
FailedScheduleUpdates: scope.MustNewCounter("failed_schedule_updates",
"count of unsuccessful attempts to update the schedules when updating launch plan version"),
SpecSizeBytes: scope.MustNewSummary("spec_size_bytes", "size in bytes of serialized launch plan spec"),
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized launch plan closure"),
FailedTriggerUpdates: scope.MustNewCounter("failed_trigger_updates", "count of failed attempts to activate/deactivate triggers"),
SpecSizeBytes: scope.MustNewSummary("spec_size_bytes", "size in bytes of serialized launch plan spec"),
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes", "size in bytes of serialized launch plan closure"),
}
return &LaunchPlanManager{
db: db,
Expand Down
13 changes: 13 additions & 0 deletions flyteadmin/pkg/manager/impl/validation/launch_plan_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package validation
import (
"context"


"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/common"
Expand All @@ -11,6 +13,7 @@ import (
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/validators"
)
Expand Down Expand Up @@ -54,12 +57,22 @@ func ValidateLaunchPlan(ctx context.Context,
if err := validateSchedule(request, expectedInputs); err != nil {
return err
}

// Augment default inputs with the unbound workflow inputs.
request.Spec.DefaultInputs = expectedInputs
if request.Spec.EntityMetadata != nil {
if err := validateNotifications(request.Spec.EntityMetadata.Notifications); err != nil {
return err
}
if request.GetSpec().GetEntityMetadata().GetLaunchConditions() != nil {
// Rather than returning an error if present like we do in oss, let's check to make sure it's the of
// the right type and continue.
idlTrigger := artifacts.Trigger{}
err = ptypes.UnmarshalAny(request.GetSpec().GetEntityMetadata().GetLaunchConditions(), &idlTrigger)
if err != nil {
return errors.NewFlyteAdminError(codes.InvalidArgument, "failed to unmarshal spec")
}
}
}
return nil
}
Expand Down
43 changes: 41 additions & 2 deletions flyteadmin/pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,6 @@ var NoopMigrations = []*gormigrate.Migration{
}

var UnionMigrations = []*gormigrate.Migration{

{
ID: "2024-01-01-org-primary-key",
Migrate: func(tx *gorm.DB) error {
Expand All @@ -1195,7 +1194,47 @@ var UnionMigrations = []*gormigrate.Migration{
},
}

var Migrations = append(LegacyMigrations, append(NoopMigrations, UnionMigrations...)...)
// ContinuedMigrations - Above are a series of migrations labeled as no-op migrations. These are migrations that we
// wrote to bring the then-existing migrations up to the Gorm standard, which is to write from scratch, each struct
// that we want auto-migrated, inside each function. Previously we had not been doing that. The idea is that we will
// one day delete the migrations prior to the no-op series. New migrations should continue in this array here, again
// using the proper Gorm methodology of including the struct definitions inside each migration function.
var ContinuedMigrations = []*gormigrate.Migration{
{
ID: "pg-continue-2024-02-launchplan",
Migrate: func(tx *gorm.DB) error {
type LaunchPlanScheduleType string
type LaunchConditionType string

type LaunchPlan struct {
ID uint `gorm:"index;autoIncrement;not null"`
CreatedAt time.Time `gorm:"type:time"`
UpdatedAt time.Time `gorm:"type:time"`
DeletedAt *time.Time `gorm:"index"`
Project string `gorm:"primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx" valid:"length(0|255)"`
Domain string `gorm:"primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx" valid:"length(0|255)"`
Name string `gorm:"primary_key;index:lp_project_domain_name_idx" valid:"length(0|255)"`
Version string `gorm:"primary_key" valid:"length(0|255)"`
Spec []byte `gorm:"not null"`
WorkflowID uint `gorm:"index"`
Closure []byte `gorm:"not null"`
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"default:0"`
// Hash of the launch plan
Digest []byte
ScheduleType LaunchPlanScheduleType
// store the type of event that this launch plan is triggered by, can be empty, or SCHED
LaunchConditionType *LaunchConditionType `gorm:"type:varchar(32);index:idx_launch_plans_launch_conditions,where:launch_condition_type is not null"`
}
return tx.AutoMigrate(&LaunchPlan{})
},
Rollback: func(tx *gorm.DB) error {
return tx.Table("launch_plans").Migrator().DropColumn(&models.LaunchPlan{}, "launch_condition_type")
},
},
}

var Migrations = append(LegacyMigrations, append(NoopMigrations, append(ContinuedMigrations, UnionMigrations...)...)...)

func alterTableColumnType(db *sql.DB, columnName, columnType string) error {
var err error
Expand Down
8 changes: 4 additions & 4 deletions flyteadmin/pkg/repositories/gormimpl/launch_plan_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestListLaunchPlans_Pagination(t *testing.T) {
GlobalMock.Logging = true

GlobalMock.NewMock().WithQuery(
`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.org = $4 LIMIT 2 OFFSET 1`).WithReply(launchPlans)
`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.org = $4 LIMIT 2 OFFSET 1`).WithReply(launchPlans)

collection, err := launchPlanRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestListLaunchPlans_Filters(t *testing.T) {
GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true
// Only match on queries that append the name filter
GlobalMock.NewMock().WithQuery(`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.version = $4 AND launch_plans.org = $5 LIMIT 20`).WithReply(launchPlans[0:1])
GlobalMock.NewMock().WithQuery(`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.version = $4 AND launch_plans.org = $5 LIMIT 20`).WithReply(launchPlans[0:1])

collection, err := launchPlanRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
Expand Down Expand Up @@ -413,8 +413,8 @@ func TestListLaunchPlansForWorkflow(t *testing.T) {
// HACK: gorm orders the filters on join clauses non-deterministically. Ordering of filters doesn't affect
// correctness, but because the mocket library only pattern matches on substrings, both variations of the (valid)
// SQL that gorm produces are checked below.
query := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.org = $4 AND workflows.deleted_at = $5 LIMIT 20`
alternateQuery := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.org = $4 AND workflows.deleted_at = $5 LIMIT 20`
query := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.org = $4 AND workflows.deleted_at = $5 LIMIT 20`
alternateQuery := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."org","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.org = $4 AND workflows.deleted_at = $5 LIMIT 20`
GlobalMock.NewMock().WithQuery(query).WithReply(launchPlans)
GlobalMock.NewMock().WithQuery(alternateQuery).WithReply(launchPlans)

Expand Down
17 changes: 14 additions & 3 deletions flyteadmin/pkg/repositories/models/launch_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@ const (
LaunchPlanScheduleTypeRATE LaunchPlanScheduleType = "RATE"
)

// Database model to encapsulate a launch plan.
type LaunchConditionType string

const (
// LaunchConditionTypeSCHED is the const representing the launch plan has a CRON type of schedule
LaunchConditionTypeSCHED LaunchConditionType = "SCHED"

// LaunchConditionTypeARTIFACT means this launch plan can be kicked off by the trigger engine in artifacts service
LaunchConditionTypeARTIFACT LaunchConditionType = "ARTIFACT"
)

// LaunchPlan Database model to encapsulate a launch plan.
type LaunchPlan struct {
BaseModel
LaunchPlanKey
Expand All @@ -30,8 +40,9 @@ type LaunchPlan struct {
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"default:0"`
// Hash of the launch plan
Digest []byte
ScheduleType LaunchPlanScheduleType
Digest []byte
ScheduleType LaunchPlanScheduleType
LaunchConditionType *LaunchConditionType `gorm:"type:varchar(32);index:idx_launch_plans_launch_conditions,where:launch_condition_type is not null"`
}

var LaunchPlanColumns = modelColumns(LaunchPlan{})
Loading

0 comments on commit a3bc79f

Please sign in to comment.