Skip to content

Commit

Permalink
fix: DB sessions are recreated whenever controller configmap updates. F…
Browse files Browse the repository at this point in the history
…ixes #10498 (#10734)
  • Loading branch information
terrytangyuan committed Mar 29, 2023
1 parent e715488 commit 1819e30
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
24 changes: 12 additions & 12 deletions persist/sqldb/sqldb.go
Expand Up @@ -65,12 +65,7 @@ func CreatePostGresDBSession(kubectlConfig kubernetes.Interface, namespace strin
if err != nil {
return nil, "", err
}

if persistPool != nil {
session.SetMaxOpenConns(persistPool.MaxOpenConns)
session.SetMaxIdleConns(persistPool.MaxIdleConns)
session.SetConnMaxLifetime(time.Duration(persistPool.ConnMaxLifetime))
}
session = ConfigureDBSession(session, persistPool)
return session, cfg.TableName, nil
}

Expand Down Expand Up @@ -100,12 +95,7 @@ func CreateMySQLDBSession(kubectlConfig kubernetes.Interface, namespace string,
if err != nil {
return nil, "", err
}

if persistPool != nil {
session.SetMaxOpenConns(persistPool.MaxOpenConns)
session.SetMaxIdleConns(persistPool.MaxIdleConns)
session.SetConnMaxLifetime(time.Duration(persistPool.ConnMaxLifetime))
}
session = ConfigureDBSession(session, persistPool)
// this is needed to make MySQL run in a Golang-compatible UTF-8 character set.
_, err = session.Exec("SET NAMES 'utf8mb4'")
if err != nil {
Expand All @@ -117,3 +107,13 @@ func CreateMySQLDBSession(kubectlConfig kubernetes.Interface, namespace string,
}
return session, cfg.TableName, nil
}

// ConfigureDBSession configures the DB session
func ConfigureDBSession(session sqlbuilder.Database, persistPool *config.ConnectionPool) sqlbuilder.Database {
if persistPool != nil {
session.SetMaxOpenConns(persistPool.MaxOpenConns)
session.SetMaxIdleConns(persistPool.MaxIdleConns)
session.SetConnMaxLifetime(time.Duration(persistPool.ConnMaxLifetime))
}
return session
}
30 changes: 16 additions & 14 deletions workflow/controller/config.go
Expand Up @@ -23,31 +23,33 @@ func (wfc *WorkflowController) updateConfig() error {
return err
}
log.Info("Configuration:\n" + string(bytes))
wfc.session = nil
wfc.artifactRepositories = artifactrepositories.New(wfc.kubeclientset, wfc.namespace, &wfc.Config.ArtifactRepository)
wfc.offloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo
wfc.wfArchive = sqldb.NullWorkflowArchive
wfc.archiveLabelSelector = labels.Everything()
persistence := wfc.Config.Persistence
if persistence != nil {
log.Info("Persistence configuration enabled")
session, tableName, err := sqldb.CreateDBSession(wfc.kubeclientset, wfc.namespace, persistence)
if err != nil {
return err
}
log.Info("Persistence Session created successfully")
if !persistence.SkipMigration {
err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background())
var tableName string
if wfc.session == nil {
session, tableName, err := sqldb.CreateDBSession(wfc.kubeclientset, wfc.namespace, persistence)
if err != nil {
return err
}
} else {
log.Info("DB migration is disabled")
log.Info("Persistence Session created successfully")
if !persistence.SkipMigration {
err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background())
if err != nil {
return err
}
} else {
log.Info("DB migration is disabled")
}
wfc.session = session
}

wfc.session = session
sqldb.ConfigureDBSession(wfc.session, persistence.ConnectionPool)
if persistence.NodeStatusOffload {
wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(wfc.session, persistence.GetClusterName(), tableName)
if err != nil {
return err
}
Expand All @@ -62,7 +64,7 @@ func (wfc *WorkflowController) updateConfig() error {
if err != nil {
return err
}
wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), wfc.managedNamespace, instanceIDService)
wfc.wfArchive = sqldb.NewWorkflowArchive(wfc.session, persistence.GetClusterName(), wfc.managedNamespace, instanceIDService)
log.Info("Workflow archiving is enabled")
} else {
log.Info("Workflow archiving is disabled")
Expand Down

0 comments on commit 1819e30

Please sign in to comment.