From 9a71ba608a5e1eb18f0da370e7644fcc6a461e53 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Wed, 6 Sep 2023 07:41:37 +0800 Subject: [PATCH] fix: Argo DB init conflict when deploy workflow-controller with multiple replicas #11177 (#11569) Signed-off-by: astraw99 Signed-off-by: Dillen Padhiar --- workflow/controller/config.go | 32 +++++++++++++++++++++++-------- workflow/controller/controller.go | 5 +++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/workflow/controller/config.go b/workflow/controller/config.go index 483fe3f07182..37dc20ffc78b 100644 --- a/workflow/controller/config.go +++ b/workflow/controller/config.go @@ -27,6 +27,7 @@ func (wfc *WorkflowController) updateConfig() error { wfc.offloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo wfc.wfArchive = sqldb.NullWorkflowArchive wfc.archiveLabelSelector = labels.Everything() + persistence := wfc.Config.Persistence if persistence != nil { log.Info("Persistence configuration enabled") @@ -40,14 +41,7 @@ func (wfc *WorkflowController) updateConfig() error { return err } log.Info("Persistence Session created successfully") - if !persistence.SkipMigration { - err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background()) - if err != nil { - return err - } - } else { - log.Info("DB migration is disabled") - } + wfc.session = session } sqldb.ConfigureDBSession(wfc.session, persistence.ConnectionPool) @@ -75,6 +69,7 @@ func (wfc *WorkflowController) updateConfig() error { } else { log.Info("Persistence configuration disabled") } + wfc.hydrator = hydrator.New(wfc.offloadNodeStatusRepo) wfc.updateEstimatorFactory() wfc.rateLimiter = wfc.newRateLimiter() @@ -86,6 +81,27 @@ func (wfc *WorkflowController) updateConfig() error { return nil } +// initDB inits argo DB tables +func (wfc *WorkflowController) initDB() error { + persistence := wfc.Config.Persistence + if persistence != nil { + tableName, err := sqldb.GetTableName(persistence) + if err != nil { + return err + } + if !persistence.SkipMigration { + err = sqldb.NewMigrate(wfc.session, persistence.GetClusterName(), tableName).Exec(context.Background()) + if err != nil { + return err + } + } else { + log.Info("DB migration is disabled") + } + } + + return nil +} + func (wfc *WorkflowController) newRateLimiter() *rate.Limiter { return rate.NewLimiter(rate.Limit(wfc.Config.GetResourceRateLimit().Limit), wfc.Config.GetResourceRateLimit().Burst) } diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 096a51c9f8a7..0d1459e63caa 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -239,6 +239,11 @@ var indexers = cache.Indexers{ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers int) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + // init DB after leader election (if enabled) + if err := wfc.initDB(); err != nil { + log.Fatalf("Failed to init db: %v", err) + } + ctx, cancel := context.WithCancel(ctx) defer cancel()